# PySpark Data Wrangling Demo

This notebook demonstrates common data wrangling techniques using PySpark with Delta tables.

In [None]:
# Setup
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# Create Spark session
spark = SparkSession.builder.appName("hds_sparkhive_demo").getOrCreate()

## Create Sample DataFrame

In [24]:
data = [
    (1, "Alice", "2023-01-01", 23),
    (2, "Bob", "2023-01-02", 30),
    (3, "Cathy", "2023-01-02", 45),
    (4, "David", "2023-01-03", 35),
    (5, "Eve", "2023-01-03", 29),
    (6, "Frank", "2023-01-01", 23)
]

columns = ["id", "name", "date", "age"]
df = spark.createDataFrame(data, columns)
df.show()

+---+-----+----------+---+
| id| name|      date|age|
+---+-----+----------+---+
|  1|Alice|2023-01-01| 23|
|  2|  Bob|2023-01-02| 30|
|  3|Cathy|2023-01-02| 45|
|  4|David|2023-01-03| 35|
|  5|  Eve|2023-01-03| 29|
|  6|Frank|2023-01-01| 23|
+---+-----+----------+---+



## Save data to Delta table `demo_db.people`

In [None]:
# Create database if not exists (not allowed in restricted environments)
spark.sql("CREATE DATABASE IF NOT EXISTS demo_db")

# Save dataframe as Delta table
df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("demo_db.people")

## Load table from database.table

In [26]:
people_df = spark.table("demo_db.people")
people_df.show()

+---+-----+----------+---+
| id| name|      date|age|
+---+-----+----------+---+
|  6|Frank|2023-01-01| 23|
|  1|Alice|2023-01-01| 23|
|  3|Cathy|2023-01-02| 45|
|  4|David|2023-01-03| 35|
|  5|  Eve|2023-01-03| 29|
|  2|  Bob|2023-01-02| 30|
+---+-----+----------+---+



## Select columns

In [27]:
people_df.select("id", "name").show()

+---+-----+
| id| name|
+---+-----+
|  6|Frank|
|  1|Alice|
|  3|Cathy|
|  4|David|
|  5|  Eve|
|  2|  Bob|
+---+-----+



## Add new columns

In [28]:
people_df = people_df.withColumn("age_plus_10", F.col("age") + 10)
people_df.show()

+---+-----+----------+---+-----------+
| id| name|      date|age|age_plus_10|
+---+-----+----------+---+-----------+
|  6|Frank|2023-01-01| 23|         33|
|  1|Alice|2023-01-01| 23|         33|
|  3|Cathy|2023-01-02| 45|         55|
|  4|David|2023-01-03| 35|         45|
|  5|  Eve|2023-01-03| 29|         39|
|  2|  Bob|2023-01-02| 30|         40|
+---+-----+----------+---+-----------+



## Filtering rows

In [29]:
# Using DataFrame API
people_df.filter(F.col("age") > 30).show()

+---+-----+----------+---+-----------+
| id| name|      date|age|age_plus_10|
+---+-----+----------+---+-----------+
|  3|Cathy|2023-01-02| 45|         55|
|  4|David|2023-01-03| 35|         45|
+---+-----+----------+---+-----------+



In [30]:
# Using SQL expression string
people_df.filter("age > 30").show()

+---+-----+----------+---+-----------+
| id| name|      date|age|age_plus_10|
+---+-----+----------+---+-----------+
|  3|Cathy|2023-01-02| 45|         55|
|  4|David|2023-01-03| 35|         45|
+---+-----+----------+---+-----------+



## Joins

Create a second DataFrame to join with.

In [31]:
data2 = [
    (1, "NY"),
    (2, "CA"),
    (3, "TX"),
    (7, "WA")
]
columns2 = ["id", "state"]
df2 = spark.createDataFrame(data2, columns2)
df2.show()

+---+-----+
| id|state|
+---+-----+
|  1|   NY|
|  2|   CA|
|  3|   TX|
|  7|   WA|
+---+-----+



### Inner join

In [32]:
people_df.join(df2, on="id", how="inner").show()

+---+-----+----------+---+-----------+-----+
| id| name|      date|age|age_plus_10|state|
+---+-----+----------+---+-----------+-----+
|  1|Alice|2023-01-01| 23|         33|   NY|
|  2|  Bob|2023-01-02| 30|         40|   CA|
|  3|Cathy|2023-01-02| 45|         55|   TX|
+---+-----+----------+---+-----------+-----+



### Left join

In [33]:
people_df.join(df2, on="id", how="left").show()

+---+-----+----------+---+-----------+-----+
| id| name|      date|age|age_plus_10|state|
+---+-----+----------+---+-----------+-----+
|  6|Frank|2023-01-01| 23|         33| NULL|
|  1|Alice|2023-01-01| 23|         33|   NY|
|  3|Cathy|2023-01-02| 45|         55|   TX|
|  4|David|2023-01-03| 35|         45| NULL|
|  5|  Eve|2023-01-03| 29|         39| NULL|
|  2|  Bob|2023-01-02| 30|         40|   CA|
+---+-----+----------+---+-----------+-----+



### Full outer join

In [34]:
people_df.join(df2, on="id", how="outer").show()

+---+-----+----------+----+-----------+-----+
| id| name|      date| age|age_plus_10|state|
+---+-----+----------+----+-----------+-----+
|  1|Alice|2023-01-01|  23|         33|   NY|
|  2|  Bob|2023-01-02|  30|         40|   CA|
|  3|Cathy|2023-01-02|  45|         55|   TX|
|  4|David|2023-01-03|  35|         45| NULL|
|  5|  Eve|2023-01-03|  29|         39| NULL|
|  6|Frank|2023-01-01|  23|         33| NULL|
|  7| NULL|      NULL|NULL|       NULL|   WA|
+---+-----+----------+----+-----------+-----+



## Pivot wider using `.pivot()`

In [35]:
# Pivot the data to show count of people per date
pivoted_wide = people_df.groupBy("date").pivot("age").count()
pivoted_wide.show()

+----------+----+----+----+----+----+
|      date|  23|  29|  30|  35|  45|
+----------+----+----+----+----+----+
|2023-01-03|NULL|   1|NULL|   1|NULL|
|2023-01-01|   2|NULL|NULL|NULL|NULL|
|2023-01-02|NULL|NULL|   1|NULL|   1|
+----------+----+----+----+----+----+



## Pivot longer using `.unpivot()`

In [36]:
# Must be same type for unpivot
unpivot_df = (
    people_df.select("id", "name", "date", "age", "age_plus_10")
    .unpivot(
        ids=["id", "name", "date"],
        values=["age", "age_plus_10"],
        variableColumnName="age_column",
        valueColumnName="value"
    )
)
    
unpivot_df.show()

+---+-----+----------+-----------+-----+
| id| name|      date| age_column|value|
+---+-----+----------+-----------+-----+
|  6|Frank|2023-01-01|        age|   23|
|  6|Frank|2023-01-01|age_plus_10|   33|
|  1|Alice|2023-01-01|        age|   23|
|  1|Alice|2023-01-01|age_plus_10|   33|
|  3|Cathy|2023-01-02|        age|   45|
|  3|Cathy|2023-01-02|age_plus_10|   55|
|  4|David|2023-01-03|        age|   35|
|  4|David|2023-01-03|age_plus_10|   45|
|  5|  Eve|2023-01-03|        age|   29|
|  5|  Eve|2023-01-03|age_plus_10|   39|
|  2|  Bob|2023-01-02|        age|   30|
|  2|  Bob|2023-01-02|age_plus_10|   40|
+---+-----+----------+-----------+-----+



## Formatting dates and timestamps

In [37]:

df_dates = (
    people_df.withColumn("date_dt", F.to_date("date", "yyyy-MM-dd"))
    .withColumn("date_ts", F.to_timestamp("date", "yyyy-MM-dd"))
    .withColumn("formatted_date", F.date_format(F.col("date_dt"), "dd/MM/yyyy"))
)

df_dates.select("date", "date_dt", "date_ts", "formatted_date").show()

+----------+----------+-------------------+--------------+
|      date|   date_dt|            date_ts|formatted_date|
+----------+----------+-------------------+--------------+
|2023-01-01|2023-01-01|2023-01-01 00:00:00|    01/01/2023|
|2023-01-01|2023-01-01|2023-01-01 00:00:00|    01/01/2023|
|2023-01-02|2023-01-02|2023-01-02 00:00:00|    02/01/2023|
|2023-01-03|2023-01-03|2023-01-03 00:00:00|    03/01/2023|
|2023-01-03|2023-01-03|2023-01-03 00:00:00|    03/01/2023|
|2023-01-02|2023-01-02|2023-01-02 00:00:00|    02/01/2023|
+----------+----------+-------------------+--------------+



## Mapping values from dictionary

In [38]:
mapping_dict = {"Alice": "F", "Bob": "M", "Cathy": "F", "David": "M", "Eve": "F", "Frank": "M"}

mapping_expr = F.create_map([F.lit(x) for x in sum(mapping_dict.items(), ())])

df_mapped = people_df.withColumn("gender", mapping_expr[F.col("name")])
df_mapped.show()

+---+-----+----------+---+-----------+------+
| id| name|      date|age|age_plus_10|gender|
+---+-----+----------+---+-----------+------+
|  6|Frank|2023-01-01| 23|         33|     M|
|  1|Alice|2023-01-01| 23|         33|     F|
|  3|Cathy|2023-01-02| 45|         55|     F|
|  4|David|2023-01-03| 35|         45|     M|
|  5|  Eve|2023-01-03| 29|         39|     F|
|  2|  Bob|2023-01-02| 30|         40|     M|
+---+-----+----------+---+-----------+------+



## Aggregations with `.groupBy()`

- Count rows
- Count distinct ages

In [39]:
agg_df = (
    people_df.groupBy("date")
    .agg(F.count("id").alias("row_count"), F.countDistinct("age").alias("distinct_ages"))
)
agg_df.show()

+----------+---------+-------------+
|      date|row_count|distinct_ages|
+----------+---------+-------------+
|2023-01-03|        2|            2|
|2023-01-01|        2|            1|
|2023-01-02|        2|            2|
+----------+---------+-------------+

