In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col
spark = SparkSession.builder.appName("Jupyter").getOrCreate()

spark

events = spark.read.option("header", "true").csv("/home/iceberg/data/events.csv").withColumn("event_date", expr("DATE_TRUNC('day', event_time)"))

events.show()

+-----------+----------+--------+--------------------+----------+--------------------+-------------------+
|    user_id| device_id|referrer|                host|       url|          event_time|         event_date|
+-----------+----------+--------+--------------------+----------+--------------------+-------------------+
| 1037710827| 532630305|    NULL| www.zachwilson.tech|         /|2021-03-08 17:27:...|2021-03-08 00:00:00|
|  925588856| 532630305|    NULL|    www.eczachly.com|         /|2021-05-10 11:26:...|2021-05-10 00:00:00|
|-1180485268| 532630305|    NULL|admin.zachwilson....|         /|2021-02-17 16:19:...|2021-02-17 00:00:00|
|-1044833855| 532630305|    NULL| www.zachwilson.tech|         /|2021-09-24 15:53:...|2021-09-24 00:00:00|
|  747494706| 532630305|    NULL| www.zachwilson.tech|         /|2021-09-26 16:03:...|2021-09-26 00:00:00|
|  747494706| 532630305|    NULL|admin.zachwilson....|         /|2021-02-21 16:08:...|2021-02-21 00:00:00|
| -824540328| 532630305|    NULL|admi

In [12]:
# Read the devices CSV file
devices = spark.read.option("header", "true").csv("/home/iceberg/data/devices.csv")

# Join the events and devices DataFrames
joined_df = events.join(devices, on="device_id", how="left")

# Rename columns using select and cast 'event_time' correctly
df = joined_df.select(
    "url",
    "referrer",
    col("browser_type").alias("browser_family"),
    col("os_type").alias("os_family"),
    col("device_type").alias("device_family"),
    "host",
    "event_time",
    "event_date"
)

# Show the result
df.show()

+----------+--------+--------------+---------+-------------+--------------------+--------------------+-------------------+
|       url|referrer|browser_family|os_family|device_family|                host|          event_time|         event_date|
+----------+--------+--------------+---------+-------------+--------------------+--------------------+-------------------+
|         /|    NULL|         Other|    Other|        Other| www.zachwilson.tech|2021-03-08 17:27:...|2021-03-08 00:00:00|
|         /|    NULL|         Other|    Other|        Other|    www.eczachly.com|2021-05-10 11:26:...|2021-05-10 00:00:00|
|         /|    NULL|         Other|    Other|        Other|admin.zachwilson....|2021-02-17 16:19:...|2021-02-17 00:00:00|
|         /|    NULL|         Other|    Other|        Other| www.zachwilson.tech|2021-09-24 15:53:...|2021-09-24 00:00:00|
|         /|    NULL|         Other|    Other|        Other| www.zachwilson.tech|2021-09-26 16:03:...|2021-09-26 00:00:00|
|         /|    

In [13]:
sorted = df.repartition(10, col("event_date")) \
        .sortWithinPartitions(col("event_date"), col("host"), col("browser_family")) \
        .withColumn("event_time", col("event_time").cast("timestamp")) \

# sorted = df.repartition(10, col("event_date")) \
#         .sortWithinPartitions(col("event_date"), col("host")) \
#         .withColumn("event_time", col("event_time").cast("timestamp")) \

sorted.show()



+--------------------+--------------------+--------------+---------+------------------+--------------------+--------------------+-------------------+
|                 url|            referrer|browser_family|os_family|     device_family|                host|          event_time|         event_date|
+--------------------+--------------------+--------------+---------+------------------+--------------------+--------------------+-------------------+
|                   /|                NULL|         Other|    Other|             Other|admin.zachwilson....|2021-01-07 09:21:...|2021-01-07 00:00:00|
|                   /|                NULL|         Other|    Other|             Other|    www.eczachly.com|2021-01-07 18:45:...|2021-01-07 00:00:00|
|                   /|                NULL|         Other|    Other|             Other|    www.eczachly.com|2021-01-07 21:57:...|2021-01-07 00:00:00|
|                   /|                NULL|      PetalBot|  Android|Generic Smartphone|    www.eczac

                                                                                

In [5]:
%%sql

CREATE DATABASE IF NOT EXISTS bootcamp

In [6]:
%%sql

DROP TABLE IF EXISTS bootcamp.events

In [7]:
%%sql

CREATE TABLE IF NOT EXISTS bootcamp.events (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (event_date);


In [42]:
%%sql

-- DROP TABLE IF EXISTS bootcamp.events_sorted;

CREATE TABLE IF NOT EXISTS bootcamp.events_sorted_v3 (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (event_date);

In [43]:
%%sql

-- DROP TABLE IF EXISTS bootcamp.events_unsorted;

CREATE TABLE IF NOT EXISTS bootcamp.events_unsorted_v3 (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (event_date);

In [39]:

df.groupBy('event_date').count().show()



+-------------------+-----+
|         event_date|count|
+-------------------+-----+
|2021-07-08 00:00:00|   89|
|2021-07-20 00:00:00|  134|
|2021-08-27 00:00:00|  191|
|2022-01-03 00:00:00|  179|
|2022-04-07 00:00:00|  119|
|2022-01-02 00:00:00|  105|
|2021-07-06 00:00:00|   89|
|2022-05-01 00:00:00|  155|
|2022-05-18 00:00:00|  199|
|2022-02-25 00:00:00|   84|
|2022-08-18 00:00:00|  216|
|2021-05-07 00:00:00|  124|
|2021-12-28 00:00:00|  197|
|2021-11-16 00:00:00|  128|
|2022-05-07 00:00:00|  416|
|2022-04-30 00:00:00|  142|
|2022-05-04 00:00:00|  127|
|2022-10-19 00:00:00|   76|
|2021-03-16 00:00:00|   82|
|2021-09-07 00:00:00|  122|
+-------------------+-----+
only showing top 20 rows



                                                                                

In [56]:

start_df = df.repartition(15, col("event_date")).withColumn("event_time", col("event_time").cast("timestamp")) \
    

# first_sort_df = start_df.sortWithinPartitions(col("event_date"), col("browser_family"), col("host"))
first_sort_df = start_df.sortWithinPartitions(col("event_date"))

start_df.write.mode("overwrite").saveAsTable("bootcamp.events_unsorted_v3")
first_sort_df.write.mode("overwrite").saveAsTable("bootcamp.events_sorted_v3")

                                                                                

In [57]:
%%sql

SELECT 'events_sorted_v3', SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM demo.bootcamp.events_sorted_v3.files

UNION ALL
SELECT 'events_unsorted_v3', SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM demo.bootcamp.events_unsorted_v3.files





events_sorted_v3,size,num_files,sorted
events_sorted_v3,3717905,15,sorted
events_unsorted_v3,3731407,15,unsorted


In [58]:
%%sql
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files FROM demo.bootcamp.events_sorted_v3.files;

size,num_files
3717905,15


In [3]:
%%sql 
SELECT COUNT(1) FROM bootcamp.match_details_bucketed.files
-- we have switched here from bucket-joins notebook to select all what we did


count(1)
0


In [4]:
%%sql 
SELECT COUNT(1) FROM bootcamp.matches_bucketed.files
--# we have switched here from bucket-joins notebook to select all what we did


count(1)
0
