In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col

# expr allow to use sql in pyspark code
# col to tell spark this is column and look for its name

# data frame level 
-immutable

In [None]:
# start Spark session with builder and customized app name
spark = SparkSession.builder.appName("Jupyter").getOrCreate()

# will show spark session details
spark

25/12/19 14:23:41 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+----------+-----------+--------+--------------------+----------+--------------------+-------------------+--------------+---------+-----------+
| device_id|    user_id|referrer|                host|       url|          event_time|         event_date|browser_family|os_family|device_type|
+----------+-----------+--------+--------------------+----------+--------------------+-------------------+--------------+---------+-----------+
| 532630305| 1037710827|    NULL| www.zachwilson.tech|         /|2021-03-08 17:27:...|2021-03-08 00:00:00|         Other|    Other|      Other|
| 532630305|  925588856|    NULL|    www.eczachly.com|         /|2021-05-10 11:26:...|2021-05-10 00:00:00|         Other|    Other|      Other|
| 532630305|-1180485268|    NULL|admin.zachwilson....|         /|2021-02-17 16:19:...|2021-02-17 00:00:00|         Other|    Other|      Other|
| 532630305|-1044833855|    NULL| www.zachwilson.tech|         /|2021-09-24 15:53:...|2021-09-24 00:00:00|         Other|    Other|     

In [None]:
# lazy - read events data and create a new column event_data to use in Iceberg later on
# date_trunc to make time to be like 00:00:00:00
events = spark.read.option("header", "true").csv("/home/iceberg/data/events.csv").withColumn("event_date", expr("DATE_TRUNC('day', event_time)"))
# lazy - read devices data
devices = spark.read.option("header","true").csv("/home/iceberg/data/devices.csv")

# lazy - left join two dataframes
df = events.join(devices,on="device_id",how="left")
# lazy - rename columns
df = df.withColumnsRenamed({'browser_type': 'browser_family', 'os_type': 'os_family'})

# action - show() to show some rows, similar to take(n)
# must avoid using collect() to show all rows because it will cause OOM issue and kill kernel 
## only use collect() when it's small table like aggregated table
df.show()

.sortWithinPartitions() sorts within partitions, whereas .sort() is a global sort, which is very slow (global sort = True)     

Note - exchange is synonymous with Shuffle

In [None]:
# lazy - repartition to 10 by event_date (local - within own partition) 
# -> sort data in each partition with event_date and host 
# -> create a new column event_time by converting event_time to timestamp
sorted = df.repartition(10, col("event_date"))\
    .sortWithinPartitions(col("event_date"), col("host"))\
    .withColumn("event_time", col("event_time").cast("timestamp")) 

# lazy - repartition to 10 by event_date (global) 
# -> sort all data not only within each partition 
# -> create a new column event_time by converting event_time to timestamp
sortedTwo = df.repartition(10, col("event_date"))\
    .sort(col("event_date"), col("host"))\
    .withColumn("event_time", col("event_time").cast("timestamp")) 

# action
sorted.show()
sortedTwo.show()

                                                                                

+-----------+-----------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------+---------+------------------+
|  device_id|    user_id|            referrer|                host|                 url|          event_time|         event_date|browser_family|os_family|       device_type|
+-----------+-----------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------+---------+------------------+
|  532630305| 1129583063|                NULL|admin.zachwilson....|                   /|2021-01-07 09:21:...|2021-01-07 00:00:00|         Other|    Other|             Other|
| 1088283544| -648945006|                NULL|    www.eczachly.com|                   /|2021-01-07 02:58:...|2021-01-07 00:00:00|      PetalBot|  Android|Generic Smartphone|
| -158310583|-1871780024|                NULL|    www.eczachly.com|                   /|2021-01-07 04:17:...|2021-01-07 00:00:00| 

**query plan**

- use explain() to show how query plan works in order: it start from left side command
- a word "exchange" in query plan means shuffle

In [None]:
sorted = df.repartition(10, col("event_date"))\
    .sortWithinPartitions(col("event_date"), col("host"))\
    .withColumn("event_time", col("event_time").cast("timestamp")) 

sortedTwo = df.repartition(10, col("event_date"))\
    .sort(col("event_date"), col("host"))\
    .withColumn("event_time", col("event_time").cast("timestamp")) 

# action
sorted.explain()
sortedTwo.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [user_id#17, device_id#18, referrer#19, host#20, url#21, cast(event_time#22 as timestamp) AS event_time#288, event_date#29]
   +- Sort [event_date#29 ASC NULLS FIRST, host#20 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(event_date#29, 10), REPARTITION_BY_NUM, [plan_id=294]
         +- Project [user_id#17, device_id#18, referrer#19, host#20, url#21, event_time#22, date_trunc(day, cast(event_time#22 as timestamp), Some(Etc/UTC)) AS event_date#29]
            +- FileScan csv [user_id#17,device_id#18,referrer#19,host#20,url#21,event_time#22] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/events.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<user_id:string,device_id:string,referrer:string,host:string,url:string,event_time:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [user_id#17, device_id#18, referr

# Spark SQL

create bootcamp database for Iceberg

In [3]:
%%sql

CREATE DATABASE IF NOT EXISTS bootcamp

drop table events if it exists

In [4]:
%%sql

DROP TABLE IF EXISTS bootcamp.events

create Iceberg table named events

partitioned by whatever columns (using date_trunc? for years or hours or days) 

In [15]:
%%sql

CREATE TABLE IF NOT EXISTS bootcamp.events (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (event_date);


In [5]:
%%sql

DROP TABLE IF EXISTS bootcamp.events_sorted

In [29]:
%%sql


CREATE TABLE IF NOT EXISTS bootcamp.events_sorted (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (event_date);

In [20]:
%%sql

DROP TABLE bootcamp.events_unsorted;

In [30]:
%%sql


CREATE TABLE IF NOT EXISTS bootcamp.events_unsorted (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (year(event_date));

- sort partitioned files can reduce total files size if it starts reading from low cardinality column first (low unique data) to higher cardinality column (high unique data)
- can see which column has less unique data

In [38]:
%%sql

select  count(distinct event_date) as event_date, count(distinct browser_family) as browser, count(distinct os_family) as os, count(distinct host) as host
from demo.bootcamp.events_unsorted

event_date,browser,os,host
931,216,31,261


25/12/19 18:09:57 WARN JavaUtils: Attempt to delete using native Unix OS command failed for path = /tmp/blockmgr-efdbbb68-05f5-4c52-9940-5d8a336fe60e. Falling back to Java IO way
java.io.IOException: Failed to delete: /tmp/blockmgr-efdbbb68-05f5-4c52-9940-5d8a336fe60e
	at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingUnixNative(JavaUtils.java:166)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:109)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:90)
	at org.apache.spark.util.SparkFileUtils.deleteRecursively(SparkFileUtils.scala:121)
	at org.apache.spark.util.SparkFileUtils.deleteRecursively$(SparkFileUtils.scala:120)
	at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1126)
	at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1(DiskBlockManager.scala:368)
	at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1$adapted(DiskBlockManager.scala:364)
	at scala.collection.IndexedSeqOptimize

In [None]:
# repartition into 4 parts by event_date and create new column "event_time" with converting itself to be timestamp
## shuffle once to repartition but unsorted
start_df = df.repartition(4, col("event_date")).withColumn("event_time", col("event_time").cast("timestamp")) \

# sort the unsorted dataframe by using low cardinality (low unique data) first     
#first_sort_df = start_df.sortWithinPartitions(col("event_date"), col("browser_family"), col("host"))
first_sort_df = start_df.sortWithinPartitions(col("os_family"), col("browser_family"), col("event_date"))


# write dataframes into tables
start_df.write.mode("overwrite").saveAsTable("bootcamp.events_unsorted")
first_sort_df.write.mode("overwrite").saveAsTable("bootcamp.events_sorted")

**Iceberg -order in name**     

demo.bootcamp.events_sorted.files -> highest level of data . database . table . meta data of the files



In [37]:
%%sql

SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM demo.bootcamp.events_sorted.files

UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM demo.bootcamp.events_unsorted.files


size,num_files,sorted
4992940,4,sorted
5556664,4,unsorted


In [None]:
%%sql
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files 
FROM demo.bootcamp.events.files;

size,num_files
3145713,5


In [None]:
%%sql 
SELECT COUNT(1) 
FROM bootcamp.matches_bucketed.files