In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col

In [17]:
# Create a SparkSession
spark = SparkSession.builder.appName("Jupyter").getOrCreate()

# Read the events CSV file and create a new column 'event_date'
events = spark.read.option("header", "true").csv("/home/iceberg/data/events.csv").withColumn("event_date", expr("DATE_TRUNC('day', event_time)"))
print(events.columns)

['user_id', 'device_id', 'referrer', 'host', 'url', 'event_time', 'event_date']


In [18]:
# Read the devices CSV file
devices = spark.read.option("header", "true").csv("/home/iceberg/data/devices.csv") #devices.csv
devices.columns

['device_id', 'browser_type', 'os_type', 'device_type']

In [19]:
# Join the events and devices DataFrames
joined_df = events.join(devices, on="device_id", how="left")
joined_df.columns

['device_id',
 'user_id',
 'referrer',
 'host',
 'url',
 'event_time',
 'event_date',
 'browser_type',
 'os_type',
 'device_type']

In [20]:
# Rename columns using select and cast 'event_time' correctly
df = joined_df.select(
    "url",
    "referrer",
    col("browser_type").alias("browser_family"),
    col("os_type").alias("os_family"),
    col("device_type").alias("device_family"),
    "host",
    "event_time",
    "event_date"
)

# Show the result
df.show()

+----------+--------+--------------+---------+-------------+--------------------+--------------------+-------------------+
|       url|referrer|browser_family|os_family|device_family|                host|          event_time|         event_date|
+----------+--------+--------------+---------+-------------+--------------------+--------------------+-------------------+
|         /|    NULL|         Other|    Other|        Other| www.zachwilson.tech|2021-03-08 17:27:...|2021-03-08 00:00:00|
|         /|    NULL|         Other|    Other|        Other|    www.eczachly.com|2021-05-10 11:26:...|2021-05-10 00:00:00|
|         /|    NULL|         Other|    Other|        Other|admin.zachwilson....|2021-02-17 16:19:...|2021-02-17 00:00:00|
|         /|    NULL|         Other|    Other|        Other| www.zachwilson.tech|2021-09-24 15:53:...|2021-09-24 00:00:00|
|         /|    NULL|         Other|    Other|        Other| www.zachwilson.tech|2021-09-26 16:03:...|2021-09-26 00:00:00|
|         /|    

In [23]:
#repartitioning the data into 10 part by  'event_date" column where sorting data in each partition before joining with "event_date",'host 
sorted = df.repartition(10, col("event_date")) \
         .sortWithinPartitions(col("event_date"), col("host"), col("browser_family")) \
        .withColumn("event_time", col("event_time").cast("timestamp")) \

sorted.show()
sorted.explain()

[Stage 44:>                                                         (0 + 2) / 2]

+--------------------+--------------------+--------------+---------+------------------+--------------------+--------------------+-------------------+
|                 url|            referrer|browser_family|os_family|     device_family|                host|          event_time|         event_date|
+--------------------+--------------------+--------------+---------+------------------+--------------------+--------------------+-------------------+
|                   /|                NULL|         Other|    Other|             Other|admin.zachwilson....|2021-01-07 09:21:...|2021-01-07 00:00:00|
|                   /|                NULL|         Other|    Other|             Other|    www.eczachly.com|2021-01-07 18:45:...|2021-01-07 00:00:00|
|                   /|                NULL|         Other|    Other|             Other|    www.eczachly.com|2021-01-07 21:57:...|2021-01-07 00:00:00|
|                   /|                NULL|      PetalBot|  Android|Generic Smartphone|    www.eczac

                                                                                

In [5]:
%%sql

CREATE DATABASE IF NOT EXISTS bootcamp

In [6]:
%%sql

DROP TABLE IF EXISTS bootcamp.eventsdevices

In [7]:
%%sql

CREATE TABLE IF NOT EXISTS bootcamp.eventsdevices (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (years(event_date));


In [8]:
# DataFrame with data to be inserted
sorted.write.format("iceberg").mode("append").saveAsTable("bootcamp.eventsdevices")

                                                                                

In [9]:
%%sql
select * from bootcamp.eventsdevices limit 5;

url,referrer,browser_family,os_family,device_family,host,event_time,event_date
/,,Android,Android,Samsung SM-N900T,admin.zachwilson.tech,2023-01-01 17:14:00.071000,2023-01-01
/,,Android,Android,ZTE BA520,admin.zachwilson.tech,2023-01-01 17:16:47.060000,2023-01-01
/,,Android,Android,ZTE BA520,admin.zachwilson.tech,2023-01-01 17:16:52.225000,2023-01-01
/,,Android,Android,ZTE BA520,admin.zachwilson.tech,2023-01-01 17:38:40.362000,2023-01-01
/,http://admin.zachwilson.tech,Chrome,Mac OS X,Other,admin.zachwilson.tech,2023-01-01 00:22:11.594000,2023-01-01


In [10]:
%%sql


CREATE TABLE IF NOT EXISTS bootcamp.events_sorted (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg;

In [11]:
%%sql


CREATE TABLE IF NOT EXISTS bootcamp.events_unsorted (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg;

In [12]:

start_df = df.repartition(4, col("event_date")).withColumn("event_time", col("event_time").cast("timestamp")) \
    

first_sort_df = start_df.sortWithinPartitions(col("event_date"), col("browser_family"), col("host"))

sorted = df.repartition(10, col("event_date")) \
        .sortWithinPartitions(col("event_date")) \
        .withColumn("event_time", col("event_time").cast("timestamp")) \

start_df.write.mode("overwrite").saveAsTable("bootcamp.events_unsorted")
first_sort_df.write.mode("overwrite").saveAsTable("bootcamp.events_sorted")

                                                                                

In [13]:
%%sql
SELECT *
FROM demo.bootcamp.events_sorted.files

content,file_path,file_format,spec_id,record_count,file_size_in_bytes,column_sizes,value_counts,null_value_counts,nan_value_counts,lower_bounds,upper_bounds,key_metadata,split_offsets,equality_ids,sort_order_id,readable_metrics
0,s3://warehouse/bootcamp/events_sorted/data/00000-34-7adc2411-023d-42f0-bf54-0e24a44e5fb0-0-00001.parquet,PARQUET,0,89391,643178,"{1: 107448, 2: 61005, 3: 11437, 4: 12908, 5: 10692, 6: 7365, 7: 426434, 8: 2274}","{1: 89391, 2: 89391, 3: 89391, 4: 89391, 5: 89391, 6: 89391, 7: 89391, 8: 89391}","{1: 0, 2: 46359, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0}",{},"{1: bytearray(b'/'), 2: bytearray(b'52.20.78.240'), 3: bytearray(b'%E3%82%A6%E3%82%'), 4: bytearray(b'Android'), 5: bytearray(b'17MB150WB'), 6: bytearray(b'aashish.techcrea'), 7: bytearray(b' \xba\xe7\xb8\xa8\xb8\x05\x00'), 8: bytearray(b'\x00\xa0&\xb4\xa8\xb8\x05\x00')}","{1: bytearray(b'/zzageqnf.php?Fp'), 2: bytearray(b'zachwilson.tech'), 3: bytearray(b'webprosbot'), 4: bytearray(b'iOS'), 5: bytearray(b'vivo $2'), 6: bytearray(b'zachwilson.techd'), 7: bytearray(b'\xe8\xb0\x1b\x8ec\x03\x06\x00'), 8: bytearray(b'\x00\xe0dqO\x03\x06\x00')}",,[4],,0,"Row(browser_family=Row(column_size=11437, value_count=89391, null_value_count=0, nan_value_count=None, lower_bound='%E3%82%A6%E3%82%', upper_bound='webprosbot'), device_family=Row(column_size=10692, value_count=89391, null_value_count=0, nan_value_count=None, lower_bound='17MB150WB', upper_bound='vivo $2'), event_date=Row(column_size=2274, value_count=89391, null_value_count=0, nan_value_count=None, lower_bound=datetime.datetime(2021, 1, 12, 0, 0), upper_bound=datetime.datetime(2023, 8, 20, 0, 0)), event_time=Row(column_size=426434, value_count=89391, null_value_count=0, nan_value_count=None, lower_bound=datetime.datetime(2021, 1, 12, 0, 1, 19, 764000), upper_bound=datetime.datetime(2023, 8, 20, 23, 59, 41, 89000)), host=Row(column_size=7365, value_count=89391, null_value_count=0, nan_value_count=None, lower_bound='aashish.techcrea', upper_bound='zachwilson.techd'), os_family=Row(column_size=12908, value_count=89391, null_value_count=0, nan_value_count=None, lower_bound='Android', upper_bound='iOS'), referrer=Row(column_size=61005, value_count=89391, null_value_count=46359, nan_value_count=None, lower_bound='52.20.78.240', upper_bound='zachwilson.tech'), url=Row(column_size=107448, value_count=89391, null_value_count=0, nan_value_count=None, lower_bound='/', upper_bound='/zzageqnf.php?Fp'))"
0,s3://warehouse/bootcamp/events_sorted/data/00001-35-7adc2411-023d-42f0-bf54-0e24a44e5fb0-0-00001.parquet,PARQUET,0,99232,740373,"{1: 142161, 2: 67344, 3: 11896, 4: 16525, 5: 11505, 6: 9100, 7: 475847, 8: 2355}","{1: 99232, 2: 99232, 3: 99232, 4: 99232, 5: 99232, 6: 99232, 7: 99232, 8: 99232}","{1: 0, 2: 49299, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0}",{},"{1: bytearray(b'""/?""""<?=print(93'), 2: bytearray(b'""https://www.goo'), 3: bytearray(b') Bot'), 4: bytearray(b'Android'), 5: bytearray(b'13 Pro Max'), 6: bytearray(b'abhishekanand.te'), 7: bytearray(b'(\x83\xb2EX\xb8\x05\x00'), 8: bytearray(b'\x00 \xc9<X\xb8\x05\x00')}","{1: bytearray(b'/zz.php'), 2: bytearray(b'zachwilson.tech'), 3: bytearray(b'webprosbot'), 4: bytearray(b'iOS'), 5: bytearray(b'vivo $2'), 6: bytearray(b'zsavi524.techcrf'), 7: bytearray(b'\x88\xb8\x07P;\x03\x06\x00'), 8: bytearray(b""\x00 \xb65\'\x03\x06\x00"")}",,[4],,0,"Row(browser_family=Row(column_size=11896, value_count=99232, null_value_count=0, nan_value_count=None, lower_bound=') Bot', upper_bound='webprosbot'), device_family=Row(column_size=11505, value_count=99232, null_value_count=0, nan_value_count=None, lower_bound='13 Pro Max', upper_bound='vivo $2'), event_date=Row(column_size=2355, value_count=99232, null_value_count=0, nan_value_count=None, lower_bound=datetime.datetime(2021, 1, 8, 0, 0), upper_bound=datetime.datetime(2023, 8, 18, 0, 0)), event_time=Row(column_size=475847, value_count=99232, null_value_count=0, nan_value_count=None, lower_bound=datetime.datetime(2021, 1, 8, 0, 2, 29, 513000), upper_bound=datetime.datetime(2023, 8, 18, 23, 59, 0, 901000)), host=Row(column_size=9100, value_count=99232, null_value_count=0, nan_value_count=None, lower_bound='abhishekanand.te', upper_bound='zsavi524.techcrf'), os_family=Row(column_size=16525, value_count=99232, null_value_count=0, nan_value_count=None, lower_bound='Android', upper_bound='iOS'), referrer=Row(column_size=67344, value_count=99232, null_value_count=49299, nan_value_count=None, lower_bound='""https://www.goo', upper_bound='zachwilson.tech'), url=Row(column_size=142161, value_count=99232, null_value_count=0, nan_value_count=None, lower_bound='""/?""""<?=print(93', upper_bound='/zz.php'))"
0,s3://warehouse/bootcamp/events_sorted/data/00002-36-7adc2411-023d-42f0-bf54-0e24a44e5fb0-0-00001.parquet,PARQUET,0,93956,929896,"{1: 345957, 2: 86593, 3: 10948, 4: 12876, 5: 12120, 6: 8573, 7: 447228, 8: 2019}","{1: 93956, 2: 93956, 3: 93956, 4: 93956, 5: 93956, 6: 93956, 7: 93956, 8: 93956}","{1: 0, 2: 48227, 3: 1, 4: 1, 5: 1, 6: 0, 7: 0, 8: 0}",{},"{1: bytearray(b'""/?""""<?=print(93'), 2: bytearray(b'""https://www.goo'), 3: bytearray(b') Bot'), 4: bytearray(b'Android'), 5: bytearray(b'ALP-AL00'), 6: bytearray(b'ablumhardt.techc'), 7: bytearray(b'\x18\xe8_\xb2\xf3\xb7\x05\x00'), 8: bytearray(b'\x00@\x94\xa7\xf3\xb7\x05\x00')}","{1: bytearray(b'/zz/address.php@'), 2: bytearray(b'zachwilson.tech'), 3: bytearray(b'webprosbot'), 4: bytearray(b'webOS'), 5: bytearray(b'vivo $2'), 6: bytearray(b'zzz.techcreator/'), 7: bytearray(b'HE\xdbM\xb3\x03\x06\x00'), 8: bytearray(b'\x00`\xc2\xe8\x9f\x03\x06\x00')}",,[4],,0,"Row(browser_family=Row(column_size=10948, value_count=93956, null_value_count=1, nan_value_count=None, lower_bound=') Bot', upper_bound='webprosbot'), device_family=Row(column_size=12120, value_count=93956, null_value_count=1, nan_value_count=None, lower_bound='ALP-AL00', upper_bound='vivo $2'), event_date=Row(column_size=2019, value_count=93956, null_value_count=0, nan_value_count=None, lower_bound=datetime.datetime(2021, 1, 3, 0, 0), upper_bound=datetime.datetime(2023, 8, 24, 0, 0)), event_time=Row(column_size=447228, value_count=93956, null_value_count=0, nan_value_count=None, lower_bound=datetime.datetime(2021, 1, 3, 0, 3, 1, 119000), upper_bound=datetime.datetime(2023, 8, 24, 23, 8, 20, 509000)), host=Row(column_size=8573, value_count=93956, null_value_count=0, nan_value_count=None, lower_bound='ablumhardt.techc', upper_bound='zzz.techcreator/'), os_family=Row(column_size=12876, value_count=93956, null_value_count=1, nan_value_count=None, lower_bound='Android', upper_bound='webOS'), referrer=Row(column_size=86593, value_count=93956, null_value_count=48227, nan_value_count=None, lower_bound='""https://www.goo', upper_bound='zachwilson.tech'), url=Row(column_size=345957, value_count=93956, null_value_count=0, nan_value_count=None, lower_bound='""/?""""<?=print(93', upper_bound='/zz/address.php@'))"
0,s3://warehouse/bootcamp/events_sorted/data/00003-37-7adc2411-023d-42f0-bf54-0e24a44e5fb0-0-00001.parquet,PARQUET,0,122235,987801,"{1: 284335, 2: 87432, 3: 11501, 4: 16872, 5: 13540, 6: 9324, 7: 558656, 8: 2154}","{1: 122235, 2: 122235, 3: 122235, 4: 122235, 5: 122235, 6: 122235, 7: 122235, 8: 122235}","{1: 0, 2: 53009, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0}",{},"{1: bytearray(b'/'), 2: bytearray(b'3.220.57.224'), 3: bytearray(b') Bot'), 4: bytearray(b'Android'), 5: bytearray(b'$2'), 6: bytearray(b'accc.techcreator'), 7: bytearray(b'@n.\xbd\xdf\xb7\x05\x00'), 8: bytearray(b'\x00\xe0\xbc\x89\xdf\xb7\x05\x00')}","{1: bytearray(b'/zz.php'), 2: bytearray(b'zachwilson.tech'), 3: bytearray(b'webprosbot'), 4: bytearray(b'iOS'), 5: bytearray(b'vivo $2'), 6: bytearray(b'zachwilson.techd'), 7: bytearray(b'\xd8\xaf\x9a\xe8\x9f\x03\x06\x00'), 8: bytearray(b'\x00\x00\xeb\xca\x8b\x03\x06\x00')}",,[4],,0,"Row(browser_family=Row(column_size=11501, value_count=122235, null_value_count=0, nan_value_count=None, lower_bound=') Bot', upper_bound='webprosbot'), device_family=Row(column_size=13540, value_count=122235, null_value_count=0, nan_value_count=None, lower_bound='$2', upper_bound='vivo $2'), event_date=Row(column_size=2154, value_count=122235, null_value_count=0, nan_value_count=None, lower_bound=datetime.datetime(2021, 1, 2, 0, 0), upper_bound=datetime.datetime(2023, 8, 23, 0, 0)), event_time=Row(column_size=558656, value_count=122235, null_value_count=0, nan_value_count=None, lower_bound=datetime.datetime(2021, 1, 2, 0, 14, 23, 80000), upper_bound=datetime.datetime(2023, 8, 23, 23, 59, 57, 399000)), host=Row(column_size=9324, value_count=122235, null_value_count=0, nan_value_count=None, lower_bound='accc.techcreator', upper_bound='zachwilson.techd'), os_family=Row(column_size=16872, value_count=122235, null_value_count=0, nan_value_count=None, lower_bound='Android', upper_bound='iOS'), referrer=Row(column_size=87432, value_count=122235, null_value_count=53009, nan_value_count=None, lower_bound='3.220.57.224', upper_bound='zachwilson.tech'), url=Row(column_size=284335, value_count=122235, null_value_count=0, nan_value_count=None, lower_bound='/', upper_bound='/zz.php'))"


In [14]:
%%sql

SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM demo.bootcamp.events_sorted.files

UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM demo.bootcamp.events_unsorted.files


size,num_files,sorted
3301248,4,sorted
3589113,4,unsorted


In [15]:
%%sql
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files FROM demo.bootcamp.eventsdevices.files;

size,num_files
3269744,3
