# PySpark Transformation Examples

This notebook collects compact, runnable examples of the PySpark transformations you've used across the `lib/` files. Each code cell includes explanatory comments describing the purpose, function arguments and common pitfalls.

Run cells in order. These are examples intended for learning and reference; adapt paths and Kafka settings to your environment.

In [None]:
# 1) Setup: create a Spark session and import frequently used helpers
from pyspark.sql import SparkSession
from pyspark.sql.functions import (col, regexp_replace, regexp_extract, when, trim, split, explode, from_json, lit, desc)
from pyspark.sql.functions import sum as spark_sum, count as spark_count
from pyspark.sql.types import MapType, StringType, LongType

spark = SparkSession.builder.appName("PySpark Transform Examples").getOrCreate()
# Reduce verbosity when running interactively
spark.sparkContext.setLogLevel("WARN")

# Notes:
# - SparkSession: the entry point for DataFrame operations.
# - Common functions: col wraps a column name into a Column object; regexp_replace/reg_exp_extract are for text cleaning; when constructs conditional expressions.

## Read CSV robustly
Example: CSV files often contain quoted fields, embedded commas or newlines. Use these reader options to handle them.

In [None]:
path = "../spotify-2023.csv"  # adjust relative path as needed
df = (
    spark.read
         .option("header", "true")   # use the first row as column names
         .option("sep", ",")          # field separator (comma by default)
         .option("quote", '"')         # character used for quoting fields
         .option("escape", '"')        # escape character for quotes inside fields
         .option("multiLine", "true") # parse fields spanning multiple lines
         .csv(path)
)

# Quick inspection to verify parsing
print("Columns:", df.columns)
df.printSchema()
df.show(3, truncate=False)

# Notes:
# - header=True tells Spark to treat the first CSV row as column names.
# - multiLine=True is required if some fields contain embedded newlines; it's slower but safer for messy CSVs.

## Cleaning text-based numeric fields (streams example)
Many datasets use formatted numbers like `1,234` or `1.2M`. Example below shows removing separators, extracting numeric part and handling K/M/B suffixes.

In [None]:
from pyspark.sql.functions import regexp_replace, regexp_extract, when

# create an intermediate column streams_str that removes commas/spaces and keeps letters/digits/dot
df2 = df.withColumn("streams_str", regexp_replace(col("streams"), ",", ""))\
         .withColumn("streams_str", regexp_replace(col("streams_str"), " ", ""))\
         .withColumn("streams_str", regexp_replace(col("streams_str"), r"[^0-9A-Za-z.]", ""))

# extract numeric part and optional K/M/B suffix
df2 = df2.withColumn("num_part", regexp_extract(col("streams_str"), r"([0-9]*\.?[0-9]+)", 1))\
         .withColumn("suffix", regexp_extract(col("streams_str"), r"([KMB])", 1))

# convert to a numeric value (double) and handle suffix multipliers
df2 = df2.withColumn("streams_num",
    when(col("suffix")=="K", col("num_part").cast("double")*1_000)
    .when(col("suffix")=="M", col("num_part").cast("double")*1_000_000)
    .when(col("suffix")=="B", col("num_part").cast("double")*1_000_000_000)
    .otherwise(col("num_part").cast("double"))
)

# keep only rows where parsing succeeded and rename the numeric column
df_clean = df2.filter(col("streams_num").isNotNull())\
               .withColumnRenamed("streams_num", "streams")\
               .drop("streams_str", "num_part", "suffix")

df_clean.select("track_name", "streams").show(5, truncate=False)

# Notes:
# - regexp_replace removes characters by regex (first arg is Column, second is pattern).
# - regexp_extract extracts capture groups; the final arg is the group index.
# - when / otherwise build conditional expressions (similar to SQL CASE).

## Aggregations: groupBy + agg
Group rows and compute aggregate metrics (sum/count). Common pattern: groupBy(...).agg(sum(col).alias('name')).

In [None]:
# Example: top 5 tracks by total streams (group by track and artist if you want to disambiguate)
top5 = (
    df_clean.groupBy("track_name", "artist(s)_name")
           .agg(spark_sum("streams").alias("total_streams"))
           .orderBy(desc("total_streams"))
           .limit(5)
)
top5.show(truncate=False)

# Notes:
# - groupBy accepts one or multiple column names (strings) or Column objects.
# - agg takes expressions (e.g., sum, count). alias gives the resulting column a name.
# - orderBy(desc('col')) sorts descending; use asc or omit for ascending.

## Explode / split: handle multi-valued columns (artists example)
If a column contains comma-separated artists, `split` followed by `explode` will create one row per artist.

In [None]:
from pyspark.sql.functions import split, explode, trim

# create rows per artist and count chart appearances where in_spotify_charts is present
df_artists = (
    df.withColumn("in_spotify_charts_i", df['in_spotify_charts'].cast("int"))
      .filter((col("in_spotify_charts_i") >= 1) & (col("in_spotify_charts_i") <= 1000))
      .withColumn("artist", explode(split(col("artist(s)_name"), ",")))
      .withColumn("artist", trim(col("artist")))
)
artist_counts = df_artists.groupBy("artist").agg(spark_count('*').alias("chart_appearances")).orderBy(desc("chart_appearances"))
artist_counts.show(20, truncate=False)

# Notes:
# - split(column, pattern) returns an Array column; explode(arrayCol) converts each array element into its own row.
# - trim removes leading/trailing whitespace which is common after splitting on commas.

## UDFs and from_json (Kafka message parsing)
Use `from_json` with a MapType to parse arbitrary JSON payloads produced to Kafka. A small UDF can extract keys from the parsed map.

In [None]:
from pyspark.sql.functions import from_json, udf
from pyspark.sql.types import MapType, StringType

# Example UDF: safely get a value from a map (declare return type so Spark can optimize)
def getKey(d, k):
    if d is None:
        return None
    return d.get(k)

udfGetKey = udf(getKey, StringType())

# Suppose `kafka_df` is a streaming DataFrame read from Kafka with a binary/string `value` column
# kafka_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "orders").load()
# Demonstration (non-streaming): parse the `value` column as a Map and extract named fields
demo = df.selectExpr("CAST(value AS STRING) as value_str")
demo = demo.withColumn("value_map", from_json(col("value_str"), MapType(StringType(), StringType())))
demo = demo.withColumn("order_id", udfGetKey(col("value_map"), lit("order_id")))
demo = demo.withColumn("amount", udfGetKey(col("value_map"), lit("amount")))
demo.show(5, truncate=False)

# Notes:
# - from_json(col, schema) parses a JSON string column into the provided schema (MapType is flexible).
# - UDFs must declare a return type (e.g., StringType()) when used with Spark DataFrames.

## Streaming basics (readStream / writeStream)
A minimal example showing how to read from Kafka, write to an in-memory table and query it from the driver. Use memory sink only for development/testing.

In [None]:
# Read streaming data from Kafka (adjust options for your cluster)
kafka_options = {
    "kafka.bootstrap.servers": "localhost:9092",
    "subscribe": "orders",
    "startingOffsets": "latest"
}

kdf = spark.readStream.format("kafka").options(**kafka_options).load()

# Parse JSON value and write to an in-memory table named 'orders_mem'
parsed = kdf.selectExpr("CAST(value AS STRING) as value_str")
parsed = parsed.withColumn("value_map", from_json(col("value_str"), MapType(StringType(), StringType())))
parsed = parsed.withColumn("order_id", udfGetKey(col("value_map"), lit("order_id")))
parsed = parsed.withColumn("amount", udfGetKey(col("value_map"), lit("amount"))).withColumn("amount", col("amount").cast("long"))

query = parsed.writeStream.format("memory").queryName("orders_mem").outputMode("append").start()

# Notes:
# - writeStream.start() returns a StreamingQuery object.
# - avoid awaitTermination() in examples where you want the driver to continue executing additional logic.
# - memory sink is for testing only (not for production).

## Practical tips and pitfalls
- Always inspect `df.printSchema()` after reading external data; mismatched schemas are a common source of NULLs.
- Use explicit casts (IntegerType/LongType) when aggregating large numbers to avoid overflow.
- Prefer built-in functions over Python UDFs for performance when possible; UDFs break Catalyst optimization.
- For large CSVs, specify an explicit schema to avoid expensive inferSchema on big files.