In [None]:
!pip install findspark

In [18]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from IPython.core.magic import register_cell_magic

In [52]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder \
    .appName("Iceberg-MinIO") \
    .config("spark.sql.catalog", "mycatalog") \
    .config("spark.sql.catalog.mycatalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.mycatalog.catalog-impl", "org.apache.iceberg.rest.RESTCatalog") \
    .config("spark.sql.catalog.mycatalog.uri", "http://iceberg-rest:8181") \
    .config("spark.sql.catalog.mycatalog.warehouse", "s3://warehouse/") \
    .config("spark.sql.catalog.mycatalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.catalog.mycatalog.s3.endpoint", "http://minio:9000") \
    .config("spark.sql.catalog.mycatalog.s3.region", "us-east-1") \
    .config("spark.sql.catalog.mycatalog.s3.path-style-access", "true") \
    .config("spark.sql.catalog.mycatalog.s3.signing-region", "us-east-1") \
    .config("spark.sql.catalog.mycatalog.s3.signing-name", "s3") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin123") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .getOrCreate()


In [20]:
# Define the %%sparksql cell magic
@register_cell_magic
def sparksql(line, cell):
    """Run SQL queries using %%sparksql in a Jupyter cell."""
    result = spark.sql(cell)
    result.show(truncate=False)
    return result

print("Spark session and %%sparksql magic initialized!")

Spark session and %%sparksql magic initialized!


In [21]:
%%sparksql
SHOW CATALOGS

+-------------+
|catalog      |
+-------------+
|mycatalog    |
|spark_catalog|
+-------------+



DataFrame[catalog: string]

In [41]:
%%sparksql
SHOW TABLES IN iceberg.bootcamp

+---------+---------------+-----------+
|namespace|tableName      |isTemporary|
+---------+---------------+-----------+
|bootcamp |events         |false      |
|bootcamp |events_sorted  |false      |
|bootcamp |events_unsorted|false      |
+---------+---------------+-----------+



DataFrame[namespace: string, tableName: string, isTemporary: boolean]

In [44]:
!pip install jupyter-spark

Collecting jupyter-spark
  Downloading jupyter_spark-0.4.0-py2.py3-none-any.whl (7.5 kB)
Collecting jupyter
  Downloading jupyter-1.1.1-py2.py3-none-any.whl (2.7 kB)
Collecting widgetsnbextension
  Downloading widgetsnbextension-4.0.14-py3-none-any.whl (2.2 MB)
[K     |████████████████████████████████| 2.2 MB 249 kB/s eta 0:00:01
[?25hCollecting notebook>=4.2
  Downloading notebook-7.4.4-py3-none-any.whl (14.3 MB)
[K     |████████████████████████████████| 14.3 MB 9.7 MB/s eta 0:00:01    |███████                         | 3.1 MB 7.4 MB/s eta 0:00:02
Collecting ipywidgets
  Downloading ipywidgets-8.1.7-py3-none-any.whl (139 kB)
[K     |████████████████████████████████| 139 kB 9.4 MB/s eta 0:00:01
[?25hCollecting jupyter-console
  Downloading jupyter_console-6.6.3-py3-none-any.whl (24 kB)
Collecting jupyterlab_widgets~=3.0.15
  Downloading jupyterlab_widgets-3.0.15-py3-none-any.whl (216 kB)
[K     |████████████████████████████████| 216 kB 6.0 MB/s eta 0:00:01
[31mERROR: notebook 7.

In [43]:
spark.conf.get("spark.sql.catalog.mycatalog.warehouse")
spark.conf.get("spark.sql.catalog.iceberg.warehouse")


's3://warehouse/'

In [42]:
%%sparksql
SHOW TABLES IN mycatalog.bootcamp

+---------+---------------+-----------+
|namespace|tableName      |isTemporary|
+---------+---------------+-----------+
|bootcamp |events         |false      |
|bootcamp |events_sorted  |false      |
|bootcamp |events_unsorted|false      |
+---------+---------------+-----------+



DataFrame[namespace: string, tableName: string, isTemporary: boolean]

In [22]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col
# spark = SparkSession.builder.appName("Jupyter").getOrCreate()

# spark

events = spark.read.option("header", "true").csv("/home/iceberg/data/events.csv").withColumn("event_date", expr("DATE_TRUNC('day', event_time)"))
devices = spark.read.option("header","true").csv("/home/iceberg/data/devices.csv")

df = events.join(devices,on="device_id",how="left")
df = df.withColumnsRenamed({'browser_type': 'browser_family', 'os_type': 'os_family'})

df.show()

+----------+-----------+--------+--------------------+----------+--------------------+-------------------+--------------+---------+-----------+
| device_id|    user_id|referrer|                host|       url|          event_time|         event_date|browser_family|os_family|device_type|
+----------+-----------+--------+--------------------+----------+--------------------+-------------------+--------------+---------+-----------+
| 532630305| 1037710827|    NULL| www.zachwilson.tech|         /|2021-03-08 17:27:...|2021-03-08 00:00:00|         Other|    Other|      Other|
| 532630305|  925588856|    NULL|    www.eczachly.com|         /|2021-05-10 11:26:...|2021-05-10 00:00:00|         Other|    Other|      Other|
| 532630305|-1180485268|    NULL|admin.zachwilson....|         /|2021-02-17 16:19:...|2021-02-17 00:00:00|         Other|    Other|      Other|
| 532630305|-1044833855|    NULL| www.zachwilson.tech|         /|2021-09-24 15:53:...|2021-09-24 00:00:00|         Other|    Other|     

In [23]:
sorted = df.repartition(10, col("event_date"))\
    .sortWithinPartitions(col("event_date"), col("host"))\
    .withColumn("event_time", col("event_time").cast("timestamp")) 

sortedTwo = df.repartition(10, col("event_date"))\
    .sort(col("event_date"), col("host"))\
    .withColumn("event_time", col("event_time").cast("timestamp")) 

sorted.show()
sortedTwo.show()


                                                                                

+-----------+-----------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------+---------+------------------+
|  device_id|    user_id|            referrer|                host|                 url|          event_time|         event_date|browser_family|os_family|       device_type|
+-----------+-----------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------+---------+------------------+
|  532630305| 1129583063|                NULL|admin.zachwilson....|                   /|2021-01-07 09:21:...|2021-01-07 00:00:00|         Other|    Other|             Other|
| 1088283544| -648945006|                NULL|    www.eczachly.com|                   /|2021-01-07 02:58:...|2021-01-07 00:00:00|      PetalBot|  Android|Generic Smartphone|
| -158310583|-1871780024|                NULL|    www.eczachly.com|                   /|2021-01-07 04:17:...|2021-01-07 00:00:00| 



+-----------+-----------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------+---------+------------------+
|  device_id|    user_id|            referrer|                host|                 url|          event_time|         event_date|browser_family|os_family|       device_type|
+-----------+-----------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------+---------+------------------+
| -643696601| 1272828233|                NULL|admin.zachwilson....|                   /|2021-01-02 13:53:...|2021-01-02 00:00:00|        Chrome|  Windows|             Other|
|  532630305|  747494706|                NULL|admin.zachwilson....|                   /|2021-01-02 19:36:...|2021-01-02 00:00:00|         Other|    Other|             Other|
|  898871897| 2110046626|                NULL|admin.zachwilson....|       /wp-login.php|2021-01-02 19:57:...|2021-01-02 00:00:00| 

                                                                                

In [None]:
# .sortWithinPartitions() sorts within partitions, whereas .sort() is a global sort, which is very slow

# Note - exchange is synonymous with Shuffle

In [24]:
sorted = df.repartition(10, col("event_date"))\
    .sortWithinPartitions(col("event_date"), col("host"))\
    .withColumn("event_time", col("event_time").cast("timestamp")) 

sortedTwo = df.repartition(10, col("event_date"))\
    .sort(col("event_date"), col("host"))\
    .withColumn("event_time", col("event_time").cast("timestamp")) 

sorted.explain()
sortedTwo.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [device_id#658, user_id#657, referrer#659, host#660, url#661, cast(event_time#662 as timestamp) AS event_time#901, event_date#669, browser_family#714, os_family#715, device_type#698]
   +- Sort [event_date#669 ASC NULLS FIRST, host#660 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(event_date#669, 10), REPARTITION_BY_NUM, [plan_id=1254]
         +- Project [device_id#658, user_id#657, referrer#659, host#660, url#661, event_time#662, event_date#669, browser_type#696 AS browser_family#714, os_type#697 AS os_family#715, device_type#698]
            +- BroadcastHashJoin [device_id#658], [device_id#695], LeftOuter, BuildRight, false
               :- Project [user_id#657, device_id#658, referrer#659, host#660, url#661, event_time#662, date_trunc(day, cast(event_time#662 as timestamp), Some(Etc/UTC)) AS event_date#669]
               :  +- FileScan csv [user_id#657,device_id#658,referrer#659,host#660,url#661,ev

In [25]:
%%sparksql
CREATE DATABASE IF NOT EXISTS bootcamp10

++
||
++
++



DataFrame[]

In [26]:
%%sparksql
DROP TABLE IF EXISTS bootcamp.events


++
||
++
++



DataFrame[]

In [27]:
%%sparksql
CREATE NAMESPACE IF NOT EXISTS mycatalog.bootcamp


++
||
++
++



DataFrame[]

In [28]:
%%sparksql
DROP TABLE IF EXISTS bootcamp.events_sorted

++
||
++
++



DataFrame[]

In [29]:
%%sparksql
CREATE TABLE IF NOT EXISTS mycatalog.bootcamp.events (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (event_date)



++
||
++
++



DataFrame[]

In [31]:
%%sparksql
SHOW TABLES IN mycatalog.bootcamp


+---------+---------------+-----------+
|namespace|tableName      |isTemporary|
+---------+---------------+-----------+
|bootcamp |events         |false      |
|bootcamp |events_sorted  |false      |
|bootcamp |events_unsorted|false      |
+---------+---------------+-----------+



DataFrame[namespace: string, tableName: string, isTemporary: boolean]

In [32]:
%%sparksql
CREATE TABLE IF NOT EXISTS mycatalog.bootcamp.events_sorted (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (event_date)


++
||
++
++



DataFrame[]

In [33]:
%%sparksql

CREATE TABLE IF NOT EXISTS mycatalog.bootcamp.events_unsorted (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (event_date) 


++
||
++
++



DataFrame[]

In [34]:
start_df = df.repartition(4, col("event_date")).withColumn("event_time", col("event_time").cast("timestamp")) \
    
first_sort_df = start_df.sortWithinPartitions(col("event_date"), col("host"))

start_df.write.mode("overwrite").saveAsTable("mycatalog.bootcamp.events_unsorted")
first_sort_df.write.mode("overwrite").saveAsTable("mycatalog.bootcamp.events_sorted")

                                                                                

In [35]:
%%sparksql

SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM mycatalog.bootcamp.events_sorted.files

UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM mycatalog.bootcamp.events_unsorted.files


[Stage 46:>                                                         (0 + 1) / 1]

+-------+---------+--------+
|size   |num_files|sorted  |
+-------+---------+--------+
|5444855|4        |sorted  |
|5575558|4        |unsorted|
+-------+---------+--------+



                                                                                

DataFrame[size: bigint, num_files: bigint, sorted: string]

In [37]:
%%sparksql
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files FROM mycatalog.bootcamp.events.files;

+----+---------+
|size|num_files|
+----+---------+
|NULL|0        |
+----+---------+



DataFrame[size: bigint, num_files: bigint]

In [40]:
%%sparksql
SELECT COUNT(1) FROM mycatalog.bootcamp.matches_bucketed.files

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `mycatalog`.`bootcamp`.`matches_bucketed`.`files` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 1 pos 21;
'Aggregate [unresolvedalias(count(1), None)]
+- 'UnresolvedRelation [mycatalog, bootcamp, matches_bucketed, files], [], false
