- Creates a Spark session with the app name "Jupyter"
- Reads a CSV file from a specified path
- Uses DATE_TRUNC to create a new column event_date that truncates the event_time to the start of the day

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col
# Build a Spark Session and give it a app name
spark = SparkSession.builder.appName("Jupyter").getOrCreate()

#spark

df = spark.read.option("header", "true").csv("/home/iceberg/data/updated_events_lab_1.csv").withColumn("event_date", expr("DATE_TRUNC('day', event_time)"))

df.show()
#df.collect() can lead OOM instead you can do like .take(5) or something. Data should always be limited to avoid over memory issues

+-----------+--------+-----------------+---------+-------------+--------------------+--------------------+-------------------+
|        url|referrer|   browser_family|os_family|device_family|                host|          event_time|         event_date|
+-----------+--------+-----------------+---------+-------------+--------------------+--------------------+-------------------+
|          /|    NULL|             curl|    Other|        Other| www.zachwilson.tech|2023-01-09 11:55:...|2023-01-09 00:00:00|
|          /|    NULL|             curl|    Other|        Other| www.zachwilson.tech|2023-01-09 11:55:...|2023-01-09 00:00:00|
|          /|    NULL|             curl|    Other|        Other| www.zachwilson.tech|2023-01-09 20:10:...|2023-01-09 00:00:00|
|          /|    NULL|             curl|    Other|        Other| www.zachwilson.tech|2023-01-09 20:10:...|2023-01-09 00:00:00|
|          /|    NULL|            Other|    Other|        Other|admin.zachwilson....|2023-01-10 04:43:...|2023-

Some key points to understand the .explain() output
- Start by reading the most indented line first
- Project can be translated as "SELECT" statement
- Exchange indicates Shuffle

In [18]:
sorted = df.repartition(10, col("event_date")) \
        .sortWithinPartitions(col("event_date"), col("host")) \
        .withColumn("event_time", col("event_time").cast("timestamp")) \

sortedTwo = df.repartition(10, col("event_date")) \
        .sort(col("event_date"), col("host")) \
        .withColumn("event_time", col("event_time").cast("timestamp")) \

#sorted.show()
#sortedTwo.show()
sorted.explain()
sortedTwo.explain()
"""
sortWithinPartitions Vs sort
- sortWithinPartitions: This will start by sorting within the partitions. Should be used at scale instead of .sort
- sort: This does a global sort, not just within the partitions. This can be very slow if the size of the dataset is huge
"""

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [url#432, referrer#433, browser_family#434, os_family#435, device_family#436, host#437, cast(event_time#438 as timestamp) AS event_time#498, event_date#448]
   +- Sort [event_date#448 ASC NULLS FIRST, host#437 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(event_date#448, 10), REPARTITION_BY_NUM, [plan_id=508]
         +- Project [url#432, referrer#433, browser_family#434, os_family#435, device_family#436, host#437, event_time#438, date_trunc(day, cast(event_time#438 as timestamp), Some(Etc/UTC)) AS event_date#448]
            +- FileScan csv [url#432,referrer#433,browser_family#434,os_family#435,device_family#436,host#437,event_time#438] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/updated_events_lab_1.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<url:string,referrer:string,browser_family:string,os_family:string,device_fami

'\nsortWithinPartitions Vs sort\n- sortWithinPartitions: This will start by sorting within the partitions. Should be used at scale instead of .sort\n- sort: This does a global sort, not just within the partitions. This can be very slow if the size of the dataset is huge\n'

Database and Table Creation
The script then creates several SQL tables:


- Creates a database bootcamp
- Creates three tables:

1. events: Main events table
2. events_sorted: Sorted events table
3. events_unsorted: Unsorted events table

All tables are created using the Iceberg format and partitioned by date.

Iceberg Format
- Open table format for large-scale data processing first developed by Netflix

Key Pros
1. You can add/delete/modify columns without reading the entire table
2. You can change table's partitioning strategy without rewriting the table
3. Allows querying historical versions of the table

How It Works
- Uses metadata files to track table structure and data files
- Maintains a snapshot history of table changes
- Supports ACID (Atomicity, Consistency, Isolation, Durability) transactions


Comparison to Other Formats
- More advanced than traditional Hive tables
- More flexible than Parquet or ORC file formats
- Provides better performance and management for large-scale data

In [19]:
%%sql

CREATE DATABASE IF NOT EXISTS bootcamp

In [20]:
%%sql

DROP TABLE IF EXISTS bootcamp.events

In [21]:
%%sql

CREATE TABLE IF NOT EXISTS bootcamp.events (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (years(event_date));


In [22]:
%%sql


CREATE TABLE IF NOT EXISTS bootcamp.events_sorted (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (event_date);

In [23]:
%%sql


CREATE TABLE IF NOT EXISTS bootcamp.events_unsorted (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (event_date);

In [31]:

start_df = df.repartition(4, col("event_date")).withColumn("event_time", col("event_time").cast("timestamp")) 
    

first_sort_df = start_df.sortWithinPartitions(col("event_date"), col("browser_family"), col("host"))


start_df.write.mode("overwrite").saveAsTable("bootcamp.events_unsorted")
first_sort_df.write.mode("overwrite").saveAsTable("bootcamp.events_sorted")

                                                                                

In [32]:
%%sql

SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM demo.bootcamp.events_sorted.files

UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM demo.bootcamp.events_unsorted.files

size,num_files,sorted
122268,4,sorted
128866,4,unsorted


In [26]:
%%sql
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files FROM demo.bootcamp.events.files;

size,num_files
,0
