# Spark DataFrames API


### Dataframe

- DataFrames are ditributed collections of records, all with pre-defined structure(schema - structure and data types of all columns)
-  DataFrames are built on Spark's core concepts but with structure, optimization and SQL-like operations for data manipulation.
- DataFrames track their schema and provide native support for many common SQL functions and relational operators
- DataFrames are evaluated as DAGs, using lazy evaluation and providing lineage and fault tolerance.
- DataFrames are immutable

### SparkContext vs SparkSession

- SparkSession is Spark application entry point. 
- Introduced in spark 2.0 as a unified entry point for all contexts (formerly instantiated individually as SparkContext, SQLContext, HiveContext, StreamingContext)

<i>Note: In databricks it is automatically created for you as spark</i>

### DataFrame API Optimizations

- **Adaptive Query Execution:** Dynamic plan adjustments during runtime based on actual data characteristics and execution patterns.
- **In-Memory Columnar Storage(Tungsten):** In-Memory coloumnar format for all the DataFrames enabling efficient analytical query performance and reduced memory footprint.
- **Built-in Statistics** - Automatic statistics collection when saving to optimized formats (Parqurt, Delta in databricks) enables smarter query planning and execution.
- **Catalyst Optimizer:** Query optimization engine that coverts DataFrame operations into an optimized execution plan


<i>**Note** Databricks comes with a native vectorized query engine that accelerates query execution using photon engine</i>

**DataFrame Query Planning:** 

- When a DataFrame is evaluated, the driver creates an optimized execution plan through a series of transformations 
- Converts the logical plan into phycal execution that minimizes resource usage and execution time. (Unresolved LP -> analysed LP -> optimized LP -> Physical Plan)



reference: [spark df and sql docs](https://spark.apache.org/docs/latest/sql-programming-guide.html)



In [1]:
! pip install pyspark


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("demoDFCustomers").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/06 03:47:24 WARN Utils: Your hostname, codespaces-c6070e, resolves to a loopback address: 127.0.0.1; using 10.0.1.244 instead (on interface eth0)
25/08/06 03:47:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/06 03:47:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

In [5]:
# Creating DataFrames - DataFrameReader
# supports multiple formats such as CSV, JSON, Parquet, Text or Binary and even RDDs (existing)

df_customers = spark.read.csv("file:///workspaces/trng-2286/datasets/raw_customer_data.csv", header= True, inferSchema=True, escape='"')

In [7]:
df_customers.show(4, truncate=False)

+------------------------------------+--------------+---------------------------+----+------+-------------+-----------+-------------------+---------+-----------+----------------------------------------------------------------+
|customer_id                         |name          |email                      |age |gender|country      |signup_date|last_login         |is_active|total_spent|preferences                                                     |
+------------------------------------+--------------+---------------------------+----+------+-------------+-----------+-------------------+---------+-----------+----------------------------------------------------------------+
|0e99a07c-c7a5-43df-b5f3-79e2a9f18fc6|Thomas Lamb   |robinjackson@wright.com    |NULL|Female|France       |2023-03-01 |2025-05-29 22:36:25|true     |1438.4     |{"newsletter": true, "notifications": "push", "language": "en"} |
|3a69ac3e-6726-431c-82a7-5241ef568188|Kimberly Blake|susan51@johnson-garrett.com|20.0|Male  


### DataFrame Data Types

#### Primitive

**`pyspark.sql.types.DataType`**

- `ByteType`
- `ShortType`
- `IntegerType`
- `LongType`
- `FloatType`
- `DoubleType`
- `BooleanType`
- `StringType`
- `BinaryType`
- `TimestampType`
- `DateType`

#### complex data types

- `ArrayType`
- `MapType`
- `StructType`

### Common DataFrame API methods

#### Transformations

##### Narrow Transformations

- narrow transformations process data within each partition independetly, without needing to combine data from other partitions.
- faster and more efficient because they avoid data shuffling between partitions. 

1. `select()` : selecting specific rows
2. `filter()`: Applying a filter condition to rows. 
3. `map()`: Applying a function to each row. 
4. `union()`: Combining two DataFrames with identical schemas. 
5. `withColumn()`: Adding a new column based on existing ones. 
6. `drop()`: Removing a column. 

##### Wide Transformations

- Wide transformations require data to be redistributed across partitions, often involving shuffling data based on keys.

1. `groupBy()`: Grouping data based on a column, which often requires shuffling to aggregate data from different partitions. 
2. `join()`: Joining two DataFrames, which requires shuffling data to combine rows based on a join key. 
3. `distinct()`: Removing duplicate rows, which might require shuffling to compare rows across partitions. 

#### Actions

1. `count()`: returns number of rows in a Dataframe
2. `show()`: display DataFrame content
3. `take(n)`: return first n rows from a DataFrame
4. `first()`: return first row from a DataFrame
5. `write()`: save DataFrame to storage

In [11]:
# DataFrame Schema

df_customers.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- age: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- country: string (nullable = true)
 |-- signup_date: date (nullable = true)
 |-- last_login: timestamp (nullable = true)
 |-- is_active: boolean (nullable = true)
 |-- total_spent: double (nullable = true)
 |-- preferences: string (nullable = true)



In [15]:
# custom schema

from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DoubleType, BooleanType, TimestampType, DateType


custom_schema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("age", DoubleType(), True ),
    StructField("gender", StringType(), True),
    StructField("country", StringType(), True),
    StructField("signup_date", DateType(), True),
    StructField("last_login", TimestampType(), True),
    StructField("is_active", BooleanType(), True),
    StructField("total_spent", DoubleType(), True),
    StructField("preferences", StringType(), True)

])

df_customers = spark.read.csv("file:///workspaces/trng-2286/datasets/raw_customer_data.csv", header= True, schema=custom_schema, escape='"')

df_customers.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- age: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- country: string (nullable = true)
 |-- signup_date: date (nullable = true)
 |-- last_login: timestamp (nullable = true)
 |-- is_active: boolean (nullable = true)
 |-- total_spent: double (nullable = true)
 |-- preferences: string (nullable = true)



In [17]:
# ddl schema 

ddl_schema = """
customer_id STRING,
name STRING,
email STRING,
age DOUBLE,
gender STRING,
country STRING,
signup_date DATE,
last_login TIMESTAMP,
is_active BOOLEAN,
total_spent DOUBLE,
preferences STRING
"""

df_customers = spark.read.csv("file:///workspaces/trng-2286/datasets/raw_customer_data.csv", header= True, schema=ddl_schema, escape='"')

df_customers.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- age: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- country: string (nullable = true)
 |-- signup_date: date (nullable = true)
 |-- last_login: timestamp (nullable = true)
 |-- is_active: boolean (nullable = true)
 |-- total_spent: double (nullable = true)
 |-- preferences: string (nullable = true)



#### Handiling missing values

**common functions**

- `isNull()`/`isNotNull()` - checks if values are null
- `count(col)` - counts non null values in a specific column
- `df.fillna()`/`df.na.fill()` - replace nulls with values
- `df.dropna()`/`df.na.drop()` - remove rows with nulls

#### referencing columns

- direct - `df.select("col_name")` - basic column selection

- by attribute - `df.select(df.attribute)` - column names that are valid python idenfiers, can be referenced across DataFrames(e.g., join)

- column expression - `df.select(df["col_name"])` - any column names, can be referenced across DataFrames

- column object - `df.select(col("name"))` - required when building complex expressions or using column specific operations like `cast()`, `alias()`, `asc()`or `desc()`


#### common column object methods

- `alias()` - rename column
- `cast()` - chnage data type
- `isNull()` or `isNotNull()` - check for nulls
- `contains()` - string matching
- `asc()`/`desc()` - sort direction - `df.sort(col("c1").asc())`


[pyspark.sql.Column](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.html)

#### common built-in functions

- `concat(col1, col2)` - concatenate strings
- `date_format(col, fmt)` - fromat date strings
- `round(col, scale)` - round number to scale
- `regexp_replace(col, pattern, replace)` - replace using regex
- `coalesce(col1, col2)` - first non null value


[pyspark built in functions](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html)

#### user defined functions

- allows to use python functions on dataframe columns
- helpful to create custom reusable functions
- can impact performance as they can not be optimized by the Catalyst optimizer and has serialiation overhead.

```py
@udf("data_type")
def function_name(name):
    return value

df.select(function_name("name"))
```

<i><b>note:</b> pandas udfs are efficient becuase they operate on batches of rows instead of single rows, they leverage Apache Arrow for more efficient python-JVM serialization </i>

### Aggregate functions

can be applied in `agg()` method after `groupBy()` operation or directly within `select()`

- `sum()`
- `avg()`
- `min()`
- `max()`
- `count()`
- `first()`
- `last()`

### Spark SQL

- Spark SQL is a module in spark that allows you to run SQL queries on structured and semi-structured data.
- It provides a SQL-like interface on top of Spark's powerful DataFrame API, enabling both SQL and programmatic access to data, all with the same optimized execution engine.

- `createOrReplaceTempView()` : created a temp in-memory view that you can query with SQL.
- `createGlobalTempView()` : Creates a global temporary view accessible across sessions (under global_temp database).

In [20]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

#### Data Cleaning

In [18]:
df_customers.show()

+--------------------+----------------+--------------------+----+------+--------------------+-----------+-------------------+---------+-----------+--------------------+
|         customer_id|            name|               email| age|gender|             country|signup_date|         last_login|is_active|total_spent|         preferences|
+--------------------+----------------+--------------------+----+------+--------------------+-----------+-------------------+---------+-----------+--------------------+
|0e99a07c-c7a5-43d...|     Thomas Lamb|robinjackson@wrig...|NULL|Female|              France| 2023-03-01|2025-05-29 22:36:25|     true|     1438.4|{"newsletter": tr...|
|3a69ac3e-6726-431...|  Kimberly Blake|susan51@johnson-g...|20.0|  Male|       Guinea-Bissau| 2020-12-14|2025-03-21 23:52:55|     true|    2364.98|{"newsletter": tr...|
|c63cab5f-dc06-484...|  William Taylor|leahwilliams@gmai...|NULL|Female|               Kenya| 2023-11-16|2024-09-05 04:59:24|    false|    5913.19|{"newsle

In [22]:
# drop records with missing email

df_clean = df_customers.na.drop(subset="email")

df_clean.filter(col("email").isNull()==True).show()

+-----------+----+-----+---+------+-------+-----------+----------+---------+-----------+-----------+
|customer_id|name|email|age|gender|country|signup_date|last_login|is_active|total_spent|preferences|
+-----------+----+-----+---+------+-------+-----------+----------+---------+-----------+-----------+
+-----------+----+-----+---+------+-------+-----------+----------+---------+-----------+-----------+



In [23]:
# add a placeholder for missing gender

df_clean = df_clean.na.fill({"gender" : "Unknown"})

df_clean.show()

+--------------------+--------------+--------------------+----+-------+--------------------+-----------+-------------------+---------+-----------+--------------------+
|         customer_id|          name|               email| age| gender|             country|signup_date|         last_login|is_active|total_spent|         preferences|
+--------------------+--------------+--------------------+----+-------+--------------------+-----------+-------------------+---------+-----------+--------------------+
|0e99a07c-c7a5-43d...|   Thomas Lamb|robinjackson@wrig...|NULL| Female|              France| 2023-03-01|2025-05-29 22:36:25|     true|     1438.4|{"newsletter": tr...|
|3a69ac3e-6726-431...|Kimberly Blake|susan51@johnson-g...|20.0|   Male|       Guinea-Bissau| 2020-12-14|2025-03-21 23:52:55|     true|    2364.98|{"newsletter": tr...|
|c63cab5f-dc06-484...|William Taylor|leahwilliams@gmai...|NULL| Female|               Kenya| 2023-11-16|2024-09-05 04:59:24|    false|    5913.19|{"newsletter":

In [None]:
# avg age

avg_age = df_clean.select(avg(col("age")))

avg_age.show()

+--------+
|avg(age)|
+--------+
|   46.85|
+--------+



In [32]:
# replace missing age with median

median_age = df_clean.approxQuantile("age", [0.5], 0.01)[0]

df_clean = df_clean.na.fill({"age": median_age})

df_clean.show()

+--------------------+--------------+--------------------+----+-------+--------------------+-----------+-------------------+---------+-----------+--------------------+
|         customer_id|          name|               email| age| gender|             country|signup_date|         last_login|is_active|total_spent|         preferences|
+--------------------+--------------+--------------------+----+-------+--------------------+-----------+-------------------+---------+-----------+--------------------+
|0e99a07c-c7a5-43d...|   Thomas Lamb|robinjackson@wrig...|50.0| Female|              France| 2023-03-01|2025-05-29 22:36:25|     true|     1438.4|{"newsletter": tr...|
|3a69ac3e-6726-431...|Kimberly Blake|susan51@johnson-g...|20.0|   Male|       Guinea-Bissau| 2020-12-14|2025-03-21 23:52:55|     true|    2364.98|{"newsletter": tr...|
|c63cab5f-dc06-484...|William Taylor|leahwilliams@gmai...|50.0| Female|               Kenya| 2023-11-16|2024-09-05 04:59:24|    false|    5913.19|{"newsletter":

In [33]:
# remove zero total spent

df_clean = df_clean.filter((col("total_spent").isNotNull())  & (col("total_spent") > 0))

df_clean.show()


+--------------------+--------------+--------------------+----+-------+--------------------+-----------+-------------------+---------+-----------+--------------------+
|         customer_id|          name|               email| age| gender|             country|signup_date|         last_login|is_active|total_spent|         preferences|
+--------------------+--------------+--------------------+----+-------+--------------------+-----------+-------------------+---------+-----------+--------------------+
|0e99a07c-c7a5-43d...|   Thomas Lamb|robinjackson@wrig...|50.0| Female|              France| 2023-03-01|2025-05-29 22:36:25|     true|     1438.4|{"newsletter": tr...|
|3a69ac3e-6726-431...|Kimberly Blake|susan51@johnson-g...|20.0|   Male|       Guinea-Bissau| 2020-12-14|2025-03-21 23:52:55|     true|    2364.98|{"newsletter": tr...|
|c63cab5f-dc06-484...|William Taylor|leahwilliams@gmai...|50.0| Female|               Kenya| 2023-11-16|2024-09-05 04:59:24|    false|    5913.19|{"newsletter":

#### Enrichment and handiling JSON data

In [36]:
df_enriched = df_clean.withColumn("age_group", 
                                  when(col("age")< 30, "Young")
                                  .when((col("age")>=30) & (col("age")<60), "Adult")
                                  .otherwise("Senior")
                                  )

df_enriched.take(4)

[Row(customer_id='0e99a07c-c7a5-43df-b5f3-79e2a9f18fc6', name='Thomas Lamb', email='robinjackson@wright.com', age=50.0, gender='Female', country='France', signup_date=datetime.date(2023, 3, 1), last_login=datetime.datetime(2025, 5, 29, 22, 36, 25), is_active=True, total_spent=1438.4, preferences='{"newsletter": true, "notifications": "push", "language": "en"}', age_group='Adult'),
 Row(customer_id='3a69ac3e-6726-431c-82a7-5241ef568188', name='Kimberly Blake', email='susan51@johnson-garrett.com', age=20.0, gender='Male', country='Guinea-Bissau', signup_date=datetime.date(2020, 12, 14), last_login=datetime.datetime(2025, 3, 21, 23, 52, 55), is_active=True, total_spent=2364.98, preferences='{"newsletter": true, "notifications": "push", "language": "fr"}', age_group='Young'),
 Row(customer_id='c63cab5f-dc06-4842-926a-118f85a2e7ae', name='William Taylor', email='leahwilliams@gmail.com', age=50.0, gender='Female', country='Kenya', signup_date=datetime.date(2023, 11, 16), last_login=datetime.

In [41]:
df_enriched.filter(col("age_group") == "Senior").show()

+--------------------+------------------+--------------------+----+-------+-----------------+-----------+-------------------+---------+-----------+--------------------+---------+
|         customer_id|              name|               email| age| gender|          country|signup_date|         last_login|is_active|total_spent|         preferences|age_group|
+--------------------+------------------+--------------------+----+-------+-----------------+-----------+-------------------+---------+-----------+--------------------+---------+
|4657a2b1-abae-49a...|      Molly Watson|danieldiaz@hendri...|69.0|Unknown|           Taiwan| 2022-11-05|2025-06-14 04:15:42|     true|     9732.5|{"newsletter": fa...|   Senior|
|a97da932-82a3-4ec...|     Carrie Hughes|youngjacob@yahoo.com|66.0|   Male|        Mauritius| 2025-06-24|2024-08-02 21:59:11|    false|    8424.61|{"newsletter": tr...|   Senior|
|20136bac-081b-47b...|   Gabriel Vincent| wsalazar@torres.biz|66.0| Female|          Albania| 2022-02-15|

In [42]:
# handle json data 
df_enriched.select("preferences").show(truncate=False)


+-----------------------------------------------------------------+
|preferences                                                      |
+-----------------------------------------------------------------+
|{"newsletter": true, "notifications": "push", "language": "en"}  |
|{"newsletter": true, "notifications": "push", "language": "fr"}  |
|{"newsletter": true, "notifications": "email", "language": "de"} |
|{"newsletter": true, "notifications": "sms", "language": "es"}   |
|{"newsletter": false, "notifications": "email", "language": "es"}|
|{"newsletter": true, "notifications": "sms", "language": "en"}   |
|{"newsletter": false, "notifications": "email", "language": "es"}|
|{"newsletter": true, "notifications": "email", "language": "de"} |
|{"newsletter": true, "notifications": "sms", "language": "de"}   |
|{"newsletter": false, "notifications": "email", "language": "es"}|
|{"newsletter": true, "notifications": "push", "language": "de"}  |
|{"newsletter": true, "notifications": "push", "

In [44]:
# parse json
preferences_schema = StructType([
    StructField("newsletter", BooleanType(), True),
    StructField("notifications", StringType(), True),
    StructField("language", StringType(), True)
])

df_parsed = df_enriched.withColumn("preferences_struct", from_json(col("preferences"), preferences_schema))

df_parsed.select("preferences_struct").take(1)

[Row(preferences_struct=Row(newsletter=True, notifications='push', language='en'))]

In [45]:
df_final = df_parsed.withColumn("pref_newsletter", col("preferences_struct.newsletter")) \
                .withColumn("pref_notifications", col("preferences_struct.notifications")) \
                .withColumn("pref_language", col("preferences_struct.language")) \
                .drop("preferences_struct", "preferences")

df_final.select("customer_id", "name", "pref_newsletter","pref_notifications","pref_language").show()

+--------------------+--------------+---------------+------------------+-------------+
|         customer_id|          name|pref_newsletter|pref_notifications|pref_language|
+--------------------+--------------+---------------+------------------+-------------+
|0e99a07c-c7a5-43d...|   Thomas Lamb|           true|              push|           en|
|3a69ac3e-6726-431...|Kimberly Blake|           true|              push|           fr|
|c63cab5f-dc06-484...|William Taylor|           true|             email|           de|
|50b165d0-6486-4d5...| Amanda Wright|           true|               sms|           es|
|4657a2b1-abae-49a...|  Molly Watson|          false|             email|           es|
|0ffe272a-f261-450...|    Kelly Boyd|           true|               sms|           en|
|ca9191a8-f736-46c...|   Jeremy Rios|          false|             email|           es|
|808301e6-260a-47a...|   Jason Hicks|           true|             email|           de|
|997d2ea4-5957-43d...| Rickey Snyder|      

#### Few Advanced Transformations

In [None]:
# python udfs
# split first_name and last_name from name

return_type = StructType([
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True)
])

@udf(returnType=return_type)
def split_full_name(name:str) -> dict:
    if name is None:
        return None
    parts = name.split(" ")
    first_name = parts[0]
    if(len(parts)>0):
        last_name = parts[1]
    else:
        last_name= None
    return {"first_name": first_name, "last_name": last_name}

df_final_with_slit_name = df_final.withColumn("name_parts", split_full_name(col("name")))

df_final_with_slit_name.select("name", "name_parts").show()



[Stage 25:>                                                         (0 + 1) / 1]

+--------------+-----------------+
|          name|       name_parts|
+--------------+-----------------+
|   Thomas Lamb|   {Thomas, Lamb}|
|Kimberly Blake|{Kimberly, Blake}|
|William Taylor|{William, Taylor}|
| Amanda Wright| {Amanda, Wright}|
|  Molly Watson|  {Molly, Watson}|
|    Kelly Boyd|    {Kelly, Boyd}|
|   Jeremy Rios|   {Jeremy, Rios}|
|   Jason Hicks|   {Jason, Hicks}|
| Rickey Snyder| {Rickey, Snyder}|
|Stefanie Moore|{Stefanie, Moore}|
|  David Wilson|  {David, Wilson}|
|Kristin Barker|{Kristin, Barker}|
|  Anthony Hart|  {Anthony, Hart}|
| Carrie Hughes| {Carrie, Hughes}|
|Shelley Morrow|{Shelley, Morrow}|
|Andrew Stanley|{Andrew, Stanley}|
|Tyler Crawford|{Tyler, Crawford}|
|   Ryan Gibson|   {Ryan, Gibson}|
|  Jacob Dodson|  {Jacob, Dodson}|
|   Lisa Pierce|   {Lisa, Pierce}|
+--------------+-----------------+
only showing top 20 rows


                                                                                

In [47]:
df_final = df_final_with_slit_name.withColumn("first_name", col("name_parts.first_name")) \
                                .withColumn("last_name", col("name_parts.last_name")) \
                                .drop("name", "name_parts")

df_final.select("customer_id", "first_name", "last_name").show()

+--------------------+----------+---------+
|         customer_id|first_name|last_name|
+--------------------+----------+---------+
|0e99a07c-c7a5-43d...|    Thomas|     Lamb|
|3a69ac3e-6726-431...|  Kimberly|    Blake|
|c63cab5f-dc06-484...|   William|   Taylor|
|50b165d0-6486-4d5...|    Amanda|   Wright|
|4657a2b1-abae-49a...|     Molly|   Watson|
|0ffe272a-f261-450...|     Kelly|     Boyd|
|ca9191a8-f736-46c...|    Jeremy|     Rios|
|808301e6-260a-47a...|     Jason|    Hicks|
|997d2ea4-5957-43d...|    Rickey|   Snyder|
|f1d943c5-9fba-4c9...|  Stefanie|    Moore|
|438f6971-c3b4-4fe...|     David|   Wilson|
|3457b0f6-1777-449...|   Kristin|   Barker|
|adbc75ff-9d33-446...|   Anthony|     Hart|
|a97da932-82a3-4ec...|    Carrie|   Hughes|
|a5767eda-bd1d-482...|   Shelley|   Morrow|
|cc7f129e-fb05-49d...|    Andrew|  Stanley|
|9977d5e4-6b2d-418...|     Tyler| Crawford|
|9b330788-2219-400...|      Ryan|   Gibson|
|c0d08fef-2f97-410...|     Jacob|   Dodson|
|1c462dc3-44aa-413...|      Lisa

#### Aggregate functions - Analysis

In [52]:
# total and avg revenue by country

df_final.groupBy("country").agg(
    count("*").alias("customers"),
    sum("total_spent").alias("total_revenue"),
    round(avg("total_spent"), 2).alias("avg_revenue")
).orderBy(col("total_revenue").desc()).show()

+--------------------+---------+------------------+-----------+
|             country|customers|     total_revenue|avg_revenue|
+--------------------+---------+------------------+-----------+
|      United Kingdom|        3|          14057.16|    4685.72|
|             Jamaica|        2|          14054.52|    7027.26|
|Saint Vincent and...|        2|          13525.15|    6762.58|
|           Lithuania|        2|           12224.8|     6112.4|
|             Tunisia|        3|          11639.62|    3879.87|
|              Guinea|        2|          11605.49|    5802.75|
|          Bangladesh|        2|           11274.3|    5637.15|
| Trinidad and Tobago|        2|          10533.08|    5266.54|
|            Anguilla|        2|10513.830000000002|    5256.92|
|              Zambia|        1|           9995.75|    9995.75|
|            Mongolia|        1|           9763.25|    9763.25|
|              Taiwan|        1|            9732.5|     9732.5|
|        Sierra Leone|        1|        

In [53]:
# DataFrameWriter
# save to parquet

df_final.write.mode("overwrite").parquet("./datasets/final_customer_data.parquet")



                                                                                

#### Spark SQL and DataFrame API Comparision

In [54]:
df_final.createOrReplaceTempView("customers")

In [56]:
spark.sql("""SELECT country, COUNT(*) as total, ROUND(AVG(total_spent), 2) as avg_spent
          FROM customers
          GROUP BY country
""").show()


+--------------------+-----+---------+
|             country|total|avg_spent|
+--------------------+-----+---------+
|            Anguilla|    2|  5256.92|
|               Macao|    1|  7789.03|
|Heard Island and ...|    1|  7823.27|
|             Senegal|    1|  1352.24|
|             Tokelau|    1|  9292.69|
|            Kiribati|    1|  6889.48|
|             Eritrea|    1|  6174.37|
|               Tonga|    1|  8230.05|
|           Singapore|    1|  5761.94|
|            Cambodia|    1|  2997.65|
|              France|    2|  2732.48|
|Turks and Caicos ...|    1|  8776.53|
|           Sri Lanka|    1|  2329.03|
|              Taiwan|    1|   9732.5|
|           Argentina|    1|  2475.12|
|             Albania|    1|  4882.13|
|          Madagascar|    1|  1855.82|
|             Finland|    1|   2560.1|
|   Brunei Darussalam|    2|  4098.51|
|        Sierra Leone|    1|  9473.25|
+--------------------+-----+---------+
only showing top 20 rows


In [57]:
sql_plan = spark.sql("""SELECT country, COUNT(*) as total, ROUND(AVG(total_spent), 2) as avg_spent
          FROM customers
          GROUP BY country
""").explain(True)

sql_plan

== Parsed Logical Plan ==
'Aggregate ['country], ['country, 'COUNT(1) AS total#742, 'ROUND('AVG('total_spent), 2) AS avg_spent#743]
+- 'UnresolvedRelation [customers], [], false

== Analyzed Logical Plan ==
country: string, total: bigint, avg_spent: double
Aggregate [country#135], [country#135, count(1) AS total#742L, round(avg(total_spent#139), 2) AS avg_spent#743]
+- SubqueryAlias customers
   +- View (`customers`, [customer_id#130, email#132, age#385, gender#277, country#135, signup_date#136, last_login#137, is_active#138, total_spent#139, age_group#476, pref_newsletter#555, pref_notifications#557, pref_language#559, first_name#595, last_name#597])
      +- Project [customer_id#130, email#132, age#385, gender#277, country#135, signup_date#136, last_login#137, is_active#138, total_spent#139, age_group#476, pref_newsletter#555, pref_notifications#557, pref_language#559, first_name#595, last_name#597]
         +- Project [customer_id#130, name#131, email#132, age#385, gender#277, count

In [58]:
df_plan = df_final.groupBy("country").agg(
    count("*").alias("total"),
    round(avg("total_spent"), 2).alias("avg_spent")
).explain(True)

df_plan

== Parsed Logical Plan ==
'Aggregate ['country], ['country, 'count(*) AS total#752, 'round('avg('total_spent), 2) AS avg_spent#753]
+- Project [customer_id#130, email#132, age#385, gender#277, country#135, signup_date#136, last_login#137, is_active#138, total_spent#139, age_group#476, pref_newsletter#555, pref_notifications#557, pref_language#559, first_name#595, last_name#597]
   +- Project [customer_id#130, name#131, email#132, age#385, gender#277, country#135, signup_date#136, last_login#137, is_active#138, total_spent#139, age_group#476, pref_newsletter#555, pref_notifications#557, pref_language#559, name_parts#583, first_name#595, name_parts#583.last_name AS last_name#597]
      +- Project [customer_id#130, name#131, email#132, age#385, gender#277, country#135, signup_date#136, last_login#137, is_active#138, total_spent#139, age_group#476, pref_newsletter#555, pref_notifications#557, pref_language#559, name_parts#583, name_parts#583.first_name AS first_name#595]
         +- Projec

In [59]:
sql_plan==df_plan

True

#### handiling json, arrays and maps

In [51]:
users_df = spark.read.json("file:///workspaces/trng-2286/datasets/users.json", multiLine=True)

users_df.printSchema()

root
 |-- activities: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: string (nullable = true)
 |    |    |-- metrics: struct (nullable = true)
 |    |    |    |-- clicks: long (nullable = true)
 |    |    |    |-- time_spent: long (nullable = true)
 |    |    |-- pages: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- name: string (nullable = true)
 |-- user_id: string (nullable = true)



In [62]:
users_df.show(truncate=False)

+--------------------------------------------------------------------------------------------+-----+-------+
|activities                                                                                  |name |user_id|
+--------------------------------------------------------------------------------------------+-----+-------+
|[{2025-08-01, {5, 120}, [home, profile, settings]}, {2025-08-02, {3, 60}, [home, products]}]|Alice|u1     |
|[{2025-08-01, {2, 30}, [home]}]                                                             |Bob  |u2     |
+--------------------------------------------------------------------------------------------+-----+-------+



In [None]:
# explode is used to flatten the arrays

df_activities = users_df.withColumn("activity", explode("activities")).drop("activities")

df_activities.show(truncate=False)

+-----+-------+-------------------------------------------------+
|name |user_id|activity                                         |
+-----+-------+-------------------------------------------------+
|Alice|u1     |{2025-08-01, {5, 120}, [home, profile, settings]}|
|Alice|u1     |{2025-08-02, {3, 60}, [home, products]}          |
|Bob  |u2     |{2025-08-01, {2, 30}, [home]}                    |
+-----+-------+-------------------------------------------------+



In [65]:
df_pages = df_activities.withColumn("page", explode("activity.pages"))

df_pages.show(truncate=False)

+-----+-------+-------------------------------------------------+--------+
|name |user_id|activity                                         |page    |
+-----+-------+-------------------------------------------------+--------+
|Alice|u1     |{2025-08-01, {5, 120}, [home, profile, settings]}|home    |
|Alice|u1     |{2025-08-01, {5, 120}, [home, profile, settings]}|profile |
|Alice|u1     |{2025-08-01, {5, 120}, [home, profile, settings]}|settings|
|Alice|u1     |{2025-08-02, {3, 60}, [home, products]}          |home    |
|Alice|u1     |{2025-08-02, {3, 60}, [home, products]}          |products|
|Bob  |u2     |{2025-08-01, {2, 30}, [home]}                    |home    |
+-----+-------+-------------------------------------------------+--------+



In [68]:
df_date_metrics = df_pages.withColumn("date", col("activity.date")) \
                            .withColumn("clicks",col("activity.metrics.clicks") ) \
                            .withColumn("time_spent",col("activity.metrics.time_spent") \
                                        )
df_date_metrics.show()

+-----+-------+--------------------+--------+----------+------+----------+
| name|user_id|            activity|    page|      date|clicks|time_spent|
+-----+-------+--------------------+--------+----------+------+----------+
|Alice|     u1|{2025-08-01, {5, ...|    home|2025-08-01|     5|       120|
|Alice|     u1|{2025-08-01, {5, ...| profile|2025-08-01|     5|       120|
|Alice|     u1|{2025-08-01, {5, ...|settings|2025-08-01|     5|       120|
|Alice|     u1|{2025-08-02, {3, ...|    home|2025-08-02|     3|        60|
|Alice|     u1|{2025-08-02, {3, ...|products|2025-08-02|     3|        60|
|  Bob|     u2|{2025-08-01, {2, ...|    home|2025-08-01|     2|        30|
+-----+-------+--------------------+--------+----------+------+----------+



In [69]:
df_users_final = df_date_metrics.drop("activity")

df_users_final.show()

+-----+-------+--------+----------+------+----------+
| name|user_id|    page|      date|clicks|time_spent|
+-----+-------+--------+----------+------+----------+
|Alice|     u1|    home|2025-08-01|     5|       120|
|Alice|     u1| profile|2025-08-01|     5|       120|
|Alice|     u1|settings|2025-08-01|     5|       120|
|Alice|     u1|    home|2025-08-02|     3|        60|
|Alice|     u1|products|2025-08-02|     3|        60|
|  Bob|     u2|    home|2025-08-01|     2|        30|
+-----+-------+--------+----------+------+----------+

