In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col

spark = SparkSession.builder.appName("Jupyter").getOrCreate()

df_events = spark.read.option("header", "true") \
            .csv("/home/iceberg/data/events.csv") \
            .withColumn("event_date", 
                        expr("DATE_TRUNC('day', event_time)"))

df_events.show()

+-----------+----------+--------+--------------------+----------+--------------------+-------------------+
|    user_id| device_id|referrer|                host|       url|          event_time|         event_date|
+-----------+----------+--------+--------------------+----------+--------------------+-------------------+
| 1037710827| 532630305|    NULL| www.zachwilson.tech|         /|2021-03-08 17:27:...|2021-03-08 00:00:00|
|  925588856| 532630305|    NULL|    www.eczachly.com|         /|2021-05-10 11:26:...|2021-05-10 00:00:00|
|-1180485268| 532630305|    NULL|admin.zachwilson....|         /|2021-02-17 16:19:...|2021-02-17 00:00:00|
|-1044833855| 532630305|    NULL| www.zachwilson.tech|         /|2021-09-24 15:53:...|2021-09-24 00:00:00|
|  747494706| 532630305|    NULL| www.zachwilson.tech|         /|2021-09-26 16:03:...|2021-09-26 00:00:00|
|  747494706| 532630305|    NULL|admin.zachwilson....|         /|2021-02-21 16:08:...|2021-02-21 00:00:00|
| -824540328| 532630305|    NULL|admi

In [9]:
df_devices = spark.read.option("header", 
                               "true").csv("/home/iceberg/data/devices.csv")
df_devices.show()

+-----------+--------------------+-------+------------------+
|  device_id|        browser_type|os_type|       device_type|
+-----------+--------------------+-------+------------------+
|-2147042689|             Firefox| Ubuntu|             Other|
|-2146219609|            WhatsApp|  Other|            Spider|
|-2145574618|       Chrome Mobile|Android|Generic Smartphone|
|-2144707350|Chrome Mobile Web...|Android|  Samsung SM-G988B|
|-2143813999|Mobile Safari UI/...|    iOS|            iPhone|
|-2142634982|   Chrome Mobile iOS|    iOS|            iPhone|
|-2142350383|   Chrome Mobile iOS|    iOS|            iPhone|
|-2141256237|   Chrome Mobile iOS|    iOS|            iPhone|
|-2138977887|             Firefox|OpenBSD|             Other|
|-2136667425|              Chrome|Windows|             Other|
|-2136444196|   Chrome Mobile iOS|    iOS|            iPhone|
|-2136415223|Chrome Mobile Web...|Android| Samsung SM-A305GT|
|-2136251094|             Firefox| Ubuntu|             Other|
|-213605

In [20]:
df_events_devices = df_events.join(df_devices, 
                                   on="device_id",
                                   how="left")
df_events_devices.show()

+----------+-----------+--------+--------------------+----------+--------------------+-------------------+------------+-------+-----------+
| device_id|    user_id|referrer|                host|       url|          event_time|         event_date|browser_type|os_type|device_type|
+----------+-----------+--------+--------------------+----------+--------------------+-------------------+------------+-------+-----------+
| 532630305| 1037710827|    NULL| www.zachwilson.tech|         /|2021-03-08 17:27:...|2021-03-08 00:00:00|       Other|  Other|      Other|
| 532630305|  925588856|    NULL|    www.eczachly.com|         /|2021-05-10 11:26:...|2021-05-10 00:00:00|       Other|  Other|      Other|
| 532630305|-1180485268|    NULL|admin.zachwilson....|         /|2021-02-17 16:19:...|2021-02-17 00:00:00|       Other|  Other|      Other|
| 532630305|-1044833855|    NULL| www.zachwilson.tech|         /|2021-09-24 15:53:...|2021-09-24 00:00:00|       Other|  Other|      Other|
| 532630305|  747494

In [22]:
# .sortWithinPartitions() sorts within partitions, whereas .sort() is a global sort, which is very slow

# Note - exchange is synonymous with Shuffle
sorted = df_events_devices.repartition(10, col("event_date")) \
            .sortWithinPartitions(col("event_date"), col("host"), col("browser_type")) \
            .withColumn("event_time", col("event_time").cast("timestamp"))

sorted_two = df_events_devices.repartition(10, col("event_date")) \
                .sort(col("event_date"), col("host"), col("browser_type")) \
                .withColumn("event_time", col("event_time").cast("timestamp"))

sorted.show()
sorted_two.show()

                                                                                

+-----------+-----------+--------------------+--------------------+--------------------+--------------------+-------------------+------------+-------+------------------+
|  device_id|    user_id|            referrer|                host|                 url|          event_time|         event_date|browser_type|os_type|       device_type|
+-----------+-----------+--------------------+--------------------+--------------------+--------------------+-------------------+------------+-------+------------------+
|  532630305| 1129583063|                NULL|admin.zachwilson....|                   /|2021-01-07 09:21:...|2021-01-07 00:00:00|       Other|  Other|             Other|
|  532630305|-1180485268|                NULL|    www.eczachly.com|                   /|2021-01-07 18:45:...|2021-01-07 00:00:00|       Other|  Other|             Other|
|  532630305| 1129583063|                NULL|    www.eczachly.com|                   /|2021-01-07 21:57:...|2021-01-07 00:00:00|       Other|  Other|

In [23]:
sorted.explain()
sorted_two.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [device_id#884, user_id#883, referrer#885, host#886, url#887, cast(event_time#888 as timestamp) AS event_time#1248, event_date#895, browser_type#335, os_type#336, device_type#337]
   +- Sort [event_date#895 ASC NULLS FIRST, host#886 ASC NULLS FIRST, browser_type#335 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(event_date#895, 10), REPARTITION_BY_NUM, [plan_id=2097]
         +- Project [device_id#884, user_id#883, referrer#885, host#886, url#887, event_time#888, event_date#895, browser_type#335, os_type#336, device_type#337]
            +- BroadcastHashJoin [device_id#884], [device_id#334], LeftOuter, BuildRight, false
               :- Project [user_id#883, device_id#884, referrer#885, host#886, url#887, event_time#888, date_trunc(day, cast(event_time#888 as timestamp), Some(Etc/UTC)) AS event_date#895]
               :  +- FileScan csv [user_id#883,device_id#884,referrer#885,host#886,url#887,event_time

In [31]:
%%sql

CREATE DATABASE IF NOT EXISTS bootcamp;

In [32]:
%%sql

DROP TABLE IF EXISTS bootcamp.events;

In [33]:
%%sql

CREATE TABLE IF NOT EXISTS bootcamp.events (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (event_date);

In [35]:
%%sql

CREATE TABLE IF NOT EXISTS bootcamp.events_sorted (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (event_date);

In [37]:
%%sql

CREATE TABLE IF NOT EXISTS bootcamp.events_unsorted (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (event_date);

In [41]:
start_df = df_events_devices.repartition(4, col("event_date")).withColumn("event_time", col("event_time").cast("timestamp"))
first_sort_df = start_df.sortWithinPartitions(col("event_date"), col("browser_type"), col("host"))

start_df.write.mode("overwrite").saveAsTable("bootcamp.events_unsorted")
first_sort_df.write.mode("overwrite").saveAsTable("bootcamp.events_sorted")

                                                                                

In [43]:
%%sql

SELECT 
    SUM(file_size_in_bytes) AS size,
    COUNT(*) AS num_files,
    'sorted'
FROM demo.bootcamp.events_sorted.files
UNION ALL
SELECT 
    SUM(file_size_in_bytes) AS size,
    COUNT(*) AS num_files,
    'unsorted'
FROM demo.bootcamp.events_unsorted.files


size,num_files,sorted
5091412,4,sorted
5552970,4,unsorted


In [44]:
%%sql

SELECT *
FROM demo.bootcamp.events_sorted.files

content,file_path,file_format,spec_id,partition,record_count,file_size_in_bytes,column_sizes,value_counts,null_value_counts,nan_value_counts,lower_bounds,upper_bounds,key_metadata,split_offsets,equality_ids,sort_order_id,readable_metrics
0,s3://warehouse/bootcamp/events_sorted/data/00000-141-537288d5-5451-4fbc-9e53-83941bf7cd59-0-00001.parquet,PARQUET,1,Row(event_date=None),89391,1031388,"{1: 107448, 2: 61005, 6: 7365, 7: 426434, 8: 2274, 9: 77406, 10: 310046, 11: 11437, 12: 12908, 13: 10692}","{1: 89391, 2: 89391, 6: 89391, 7: 89391, 8: 89391, 9: 89391, 10: 89391, 11: 89391, 12: 89391, 13: 89391}","{1: 0, 2: 46359, 6: 0, 7: 0, 8: 0, 9: 0, 10: 1, 11: 0, 12: 0, 13: 0}",{},"{1: bytearray(b'/'), 2: bytearray(b'52.20.78.240'), 6: bytearray(b'aashish.techcrea'), 7: bytearray(b' \xba\xe7\xb8\xa8\xb8\x05\x00'), 8: bytearray(b'\x00\xa0&\xb4\xa8\xb8\x05\x00'), 9: bytearray(b'-100210680'), 10: bytearray(b'-1000095488'), 11: bytearray(b'%E3%82%A6%E3%82%'), 12: bytearray(b'Android'), 13: bytearray(b'17MB150WB')}","{1: bytearray(b'/zzageqnf.php?Fp'), 2: bytearray(b'zachwilson.tech'), 6: bytearray(b'zachwilson.techd'), 7: bytearray(b'\xe8\xb0\x1b\x8ec\x03\x06\x00'), 8: bytearray(b'\x00\xe0dqO\x03\x06\x00'), 9: bytearray(b'999535123'), 10: bytearray(b'999884938'), 11: bytearray(b'webprosbot'), 12: bytearray(b'iOS'), 13: bytearray(b'vivo $2')}",,[4],,0,"Row(browser_type=Row(column_size=11437, value_count=89391, null_value_count=0, nan_value_count=None, lower_bound='%E3%82%A6%E3%82%', upper_bound='webprosbot'), device_id=Row(column_size=77406, value_count=89391, null_value_count=0, nan_value_count=None, lower_bound='-100210680', upper_bound='999535123'), device_type=Row(column_size=10692, value_count=89391, null_value_count=0, nan_value_count=None, lower_bound='17MB150WB', upper_bound='vivo $2'), event_date=Row(column_size=2274, value_count=89391, null_value_count=0, nan_value_count=None, lower_bound=datetime.datetime(2021, 1, 12, 0, 0), upper_bound=datetime.datetime(2023, 8, 20, 0, 0)), event_time=Row(column_size=426434, value_count=89391, null_value_count=0, nan_value_count=None, lower_bound=datetime.datetime(2021, 1, 12, 0, 1, 19, 764000), upper_bound=datetime.datetime(2023, 8, 20, 23, 59, 41, 89000)), host=Row(column_size=7365, value_count=89391, null_value_count=0, nan_value_count=None, lower_bound='aashish.techcrea', upper_bound='zachwilson.techd'), os_type=Row(column_size=12908, value_count=89391, null_value_count=0, nan_value_count=None, lower_bound='Android', upper_bound='iOS'), referrer=Row(column_size=61005, value_count=89391, null_value_count=46359, nan_value_count=None, lower_bound='52.20.78.240', upper_bound='zachwilson.tech'), url=Row(column_size=107448, value_count=89391, null_value_count=0, nan_value_count=None, lower_bound='/', upper_bound='/zzageqnf.php?Fp'), user_id=Row(column_size=310046, value_count=89391, null_value_count=1, nan_value_count=None, lower_bound='-1000095488', upper_bound='999884938'))"
0,s3://warehouse/bootcamp/events_sorted/data/00001-142-537288d5-5451-4fbc-9e53-83941bf7cd59-0-00001.parquet,PARQUET,1,Row(event_date=None),99232,1164679,"{1: 142161, 2: 67344, 6: 9100, 7: 475847, 8: 2355, 9: 86496, 10: 336994, 11: 11896, 12: 16525, 13: 11505}","{1: 99232, 2: 99232, 6: 99232, 7: 99232, 8: 99232, 9: 99232, 10: 99232, 11: 99232, 12: 99232, 13: 99232}","{1: 0, 2: 49299, 6: 0, 7: 0, 8: 0, 9: 0, 10: 58, 11: 0, 12: 0, 13: 0}",{},"{1: bytearray(b'""/?""""<?=print(93'), 2: bytearray(b'""https://www.goo'), 6: bytearray(b'abhishekanand.te'), 7: bytearray(b'(\x83\xb2EX\xb8\x05\x00'), 8: bytearray(b'\x00 \xc9<X\xb8\x05\x00'), 9: bytearray(b'-100210680'), 10: bytearray(b'-1000370060'), 11: bytearray(b') Bot'), 12: bytearray(b'Android'), 13: bytearray(b'13 Pro Max')}","{1: bytearray(b'/zz.php'), 2: bytearray(b'zachwilson.tech'), 6: bytearray(b'zsavi524.techcrf'), 7: bytearray(b'\x88\xb8\x07P;\x03\x06\x00'), 8: bytearray(b""\x00 \xb65\'\x03\x06\x00""), 9: bytearray(b'999535123'), 10: bytearray(b'999956796'), 11: bytearray(b'webprosbot'), 12: bytearray(b'iOS'), 13: bytearray(b'vivo $2')}",,[4],,0,"Row(browser_type=Row(column_size=11896, value_count=99232, null_value_count=0, nan_value_count=None, lower_bound=') Bot', upper_bound='webprosbot'), device_id=Row(column_size=86496, value_count=99232, null_value_count=0, nan_value_count=None, lower_bound='-100210680', upper_bound='999535123'), device_type=Row(column_size=11505, value_count=99232, null_value_count=0, nan_value_count=None, lower_bound='13 Pro Max', upper_bound='vivo $2'), event_date=Row(column_size=2355, value_count=99232, null_value_count=0, nan_value_count=None, lower_bound=datetime.datetime(2021, 1, 8, 0, 0), upper_bound=datetime.datetime(2023, 8, 18, 0, 0)), event_time=Row(column_size=475847, value_count=99232, null_value_count=0, nan_value_count=None, lower_bound=datetime.datetime(2021, 1, 8, 0, 2, 29, 513000), upper_bound=datetime.datetime(2023, 8, 18, 23, 59, 0, 901000)), host=Row(column_size=9100, value_count=99232, null_value_count=0, nan_value_count=None, lower_bound='abhishekanand.te', upper_bound='zsavi524.techcrf'), os_type=Row(column_size=16525, value_count=99232, null_value_count=0, nan_value_count=None, lower_bound='Android', upper_bound='iOS'), referrer=Row(column_size=67344, value_count=99232, null_value_count=49299, nan_value_count=None, lower_bound='""https://www.goo', upper_bound='zachwilson.tech'), url=Row(column_size=142161, value_count=99232, null_value_count=0, nan_value_count=None, lower_bound='""/?""""<?=print(93', upper_bound='/zz.php'), user_id=Row(column_size=336994, value_count=99232, null_value_count=58, nan_value_count=None, lower_bound='-1000370060', upper_bound='999956796'))"
0,s3://warehouse/bootcamp/events_sorted/data/00002-143-537288d5-5451-4fbc-9e53-83941bf7cd59-0-00001.parquet,PARQUET,1,Row(event_date=None),93956,1354000,"{1: 345957, 2: 86593, 6: 8573, 7: 447228, 8: 2019, 9: 86867, 10: 336417, 11: 10948, 12: 12876, 13: 12120}","{1: 93956, 2: 93956, 6: 93956, 7: 93956, 8: 93956, 9: 93956, 10: 93956, 11: 93956, 12: 93956, 13: 93956}","{1: 0, 2: 48227, 6: 0, 7: 0, 8: 0, 9: 0, 10: 0, 11: 1, 12: 1, 13: 1}",{},"{1: bytearray(b'""/?""""<?=print(93'), 2: bytearray(b'""https://www.goo'), 6: bytearray(b'ablumhardt.techc'), 7: bytearray(b'\x18\xe8_\xb2\xf3\xb7\x05\x00'), 8: bytearray(b'\x00@\x94\xa7\xf3\xb7\x05\x00'), 9: bytearray(b'-1000866068'), 10: bytearray(b'-1000675882'), 11: bytearray(b') Bot'), 12: bytearray(b'Android'), 13: bytearray(b'ALP-AL00')}","{1: bytearray(b'/zz/address.php@'), 2: bytearray(b'zachwilson.tech'), 6: bytearray(b'zzz.techcreator/'), 7: bytearray(b'HE\xdbM\xb3\x03\x06\x00'), 8: bytearray(b'\x00`\xc2\xe8\x9f\x03\x06\x00'), 9: bytearray(b'998961543'), 10: bytearray(b'999956796'), 11: bytearray(b'webprosbot'), 12: bytearray(b'webOS'), 13: bytearray(b'vivo $2')}",,[4],,0,"Row(browser_type=Row(column_size=10948, value_count=93956, null_value_count=1, nan_value_count=None, lower_bound=') Bot', upper_bound='webprosbot'), device_id=Row(column_size=86867, value_count=93956, null_value_count=0, nan_value_count=None, lower_bound='-1000866068', upper_bound='998961543'), device_type=Row(column_size=12120, value_count=93956, null_value_count=1, nan_value_count=None, lower_bound='ALP-AL00', upper_bound='vivo $2'), event_date=Row(column_size=2019, value_count=93956, null_value_count=0, nan_value_count=None, lower_bound=datetime.datetime(2021, 1, 3, 0, 0), upper_bound=datetime.datetime(2023, 8, 24, 0, 0)), event_time=Row(column_size=447228, value_count=93956, null_value_count=0, nan_value_count=None, lower_bound=datetime.datetime(2021, 1, 3, 0, 3, 1, 119000), upper_bound=datetime.datetime(2023, 8, 24, 23, 8, 20, 509000)), host=Row(column_size=8573, value_count=93956, null_value_count=0, nan_value_count=None, lower_bound='ablumhardt.techc', upper_bound='zzz.techcreator/'), os_type=Row(column_size=12876, value_count=93956, null_value_count=1, nan_value_count=None, lower_bound='Android', upper_bound='webOS'), referrer=Row(column_size=86593, value_count=93956, null_value_count=48227, nan_value_count=None, lower_bound='""https://www.goo', upper_bound='zachwilson.tech'), url=Row(column_size=345957, value_count=93956, null_value_count=0, nan_value_count=None, lower_bound='""/?""""<?=print(93', upper_bound='/zz/address.php@'), user_id=Row(column_size=336417, value_count=93956, null_value_count=0, nan_value_count=None, lower_bound='-1000675882', upper_bound='999956796'))"
0,s3://warehouse/bootcamp/events_sorted/data/00003-144-537288d5-5451-4fbc-9e53-83941bf7cd59-0-00001.parquet,PARQUET,1,Row(event_date=None),122235,1541345,"{1: 284335, 2: 87432, 6: 9324, 7: 558656, 8: 2154, 9: 110112, 10: 442464, 11: 11501, 12: 16872, 13: 13540}","{1: 122235, 2: 122235, 6: 122235, 7: 122235, 8: 122235, 9: 122235, 10: 122235, 11: 122235, 12: 122235, 13: 122235}","{1: 0, 2: 53009, 6: 0, 7: 0, 8: 0, 9: 0, 10: 8, 11: 0, 12: 0, 13: 0}",{},"{1: bytearray(b'/'), 2: bytearray(b'3.220.57.224'), 6: bytearray(b'accc.techcreator'), 7: bytearray(b'@n.\xbd\xdf\xb7\x05\x00'), 8: bytearray(b'\x00\xe0\xbc\x89\xdf\xb7\x05\x00'), 9: bytearray(b'-1001669954'), 10: bytearray(b'-1000015881'), 11: bytearray(b') Bot'), 12: bytearray(b'Android'), 13: bytearray(b'$2')}","{1: bytearray(b'/zz.php'), 2: bytearray(b'zachwilson.tech'), 6: bytearray(b'zachwilson.techd'), 7: bytearray(b'\xd8\xaf\x9a\xe8\x9f\x03\x06\x00'), 8: bytearray(b'\x00\x00\xeb\xca\x8b\x03\x06\x00'), 9: bytearray(b'998766634'), 10: bytearray(b'999882344'), 11: bytearray(b'webprosbot'), 12: bytearray(b'iOS'), 13: bytearray(b'vivo $2')}",,[4],,0,"Row(browser_type=Row(column_size=11501, value_count=122235, null_value_count=0, nan_value_count=None, lower_bound=') Bot', upper_bound='webprosbot'), device_id=Row(column_size=110112, value_count=122235, null_value_count=0, nan_value_count=None, lower_bound='-1001669954', upper_bound='998766634'), device_type=Row(column_size=13540, value_count=122235, null_value_count=0, nan_value_count=None, lower_bound='$2', upper_bound='vivo $2'), event_date=Row(column_size=2154, value_count=122235, null_value_count=0, nan_value_count=None, lower_bound=datetime.datetime(2021, 1, 2, 0, 0), upper_bound=datetime.datetime(2023, 8, 23, 0, 0)), event_time=Row(column_size=558656, value_count=122235, null_value_count=0, nan_value_count=None, lower_bound=datetime.datetime(2021, 1, 2, 0, 14, 23, 80000), upper_bound=datetime.datetime(2023, 8, 23, 23, 59, 57, 399000)), host=Row(column_size=9324, value_count=122235, null_value_count=0, nan_value_count=None, lower_bound='accc.techcreator', upper_bound='zachwilson.techd'), os_type=Row(column_size=16872, value_count=122235, null_value_count=0, nan_value_count=None, lower_bound='Android', upper_bound='iOS'), referrer=Row(column_size=87432, value_count=122235, null_value_count=53009, nan_value_count=None, lower_bound='3.220.57.224', upper_bound='zachwilson.tech'), url=Row(column_size=284335, value_count=122235, null_value_count=0, nan_value_count=None, lower_bound='/', upper_bound='/zz.php'), user_id=Row(column_size=442464, value_count=122235, null_value_count=8, nan_value_count=None, lower_bound='-1000015881', upper_bound='999882344'))"
