# Setup and Spark Basics

Ing. Jeison Robles Arias

```Goal```: learn transformations vs actions, schema, explain plans, partitions, cache, basic I/O.

In [0]:
#import spark
#import pyspark

In [0]:
# Confirm working

spark.version

## Import Helpers

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

## Create an Small DataFrame with explicit schema

In [0]:
data = [
    (1, "alice", "CR", 120.50),
    (2, "bob", "CR", 75.00),
    (3, "carol", "PA", 210.10),
    (4, "dave", "CR", 10.00),
    (5, "erin", "PA", 99.99),
]

schema = T.StructType([
    T.StructField("customer_id", T.IntegerType(), False),
    T.StructField("name", T.StringType(), True),
    T.StructField("country", T.StringType(), True),
    T.StructField("amount", T.DoubleType(), True),
])

df = spark.createDataFrame(data, schema=schema)
df


## Transformations vs Actions 

In [0]:
df_cr = df.filter(F.col("country") == "CR").select("customer_id","amount")
df_cr

## Trigger an action (runs a Spark Job)

In [0]:
df_cr.count()

## Common DataFrame Operations

with column + select + orderBy

In [0]:
df2 = (
    df
    .withColumn("amount_usd", F.round(F.col("amount"), 2)) # Creates a new column
    .withColumn("is_high", F.col("amount")  >= F.lit(100.0)) # Booolean
    .select("customer_id","country","amount","amount_usd","is_high")
    .orderBy(F.desc("amount_usd"))
)
display(df2)

In [0]:
agg = (
    df
    .groupBy("country")
    .agg(
        F.count("*").alias("n_rows"),    # alias it just for adding names
        F.countDistinct("customer_id").alias("n_customers"),
        F.round(F.sum("amount"),2).alias("total_amount"),
        F.round(F.avg("amount"),2).alias("avg_amount"),
        F.max("amount").alias("max_amount")
    )
    .orderBy(F.desc("total_amount"))
)

display(agg)

___
## Inspect the plan

In [0]:
agg.explain("formatted")

In [0]:
print("Partitions:" , df.rdd.getNumPartitions())

In [0]:
df_big = spark.range(0, 500_000).withColumn("x", (F.col("id") % 100).cast("int"))

# Without cache: Spark recomputes df_big each time
_ = df_big.groupBy("x").count().count()
_ = df_big.groupBy("x").agg(F.avg("id")).count()

# With cache: compute once, reuse
df_big_cached = df_big.cache()
_ = df_big_cached.count()  # materialize cache

_ = df_big_cached.groupBy("x").count().count()
_ = df_big_cached.groupBy("x").agg(F.avg("id")).count()

df_big_cached.unpersist()

In [0]:
tmp_path = "dbfs:/tmp/spark_lakehouse_labs/lab00_parquet_df"

(df2
 .write
 .mode("overwrite")
 .parquet(tmp_path)
)

df_read = spark.read.parquet(tmp_path)
display(df_read)