In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf

spark = SparkSession.builder.appName('Jupyter')\
    .config('spark.sql.autoBroadcastJoinThreshold', '-1')\
    .config('spark.sql.analyzer.failAmbiguousSelfJoin', 'false')\
    .config('spark.sql.shuffle.partitions', '4')\
    .getOrCreate()

25/08/04 18:17:31 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
events = spark.read.options(header=True, inferSchema=True)\
    .csv('/home/iceberg/data/events.csv')\
    .where(sf.column('user_id').isNotNull())

devices = spark.read.options(header=True, inferSchema=True)\
    .csv('/home/iceberg/data/devices.csv')

                                                                                

In [3]:
events.createOrReplaceTempView('events')
devices.createOrReplaceTempView('devices')

In [4]:
devices.schema

StructType([StructField('device_id', IntegerType(), True), StructField('browser_type', StringType(), True), StructField('os_type', StringType(), True), StructField('device_type', StringType(), True)])

In [5]:
events_aggregated = spark.sql("""
    SELECT
        user_id,
        device_id,
        COUNT(1) as event_counts,
        ARRAY_AGG(DISTINCT host) as host_array
    FROM events
    GROUP BY 1, 2
""").cache()

In [6]:
events_aggregated.show()

                                                                                

+-----------+-----------+------------+--------------------+
|    user_id|  device_id|event_counts|          host_array|
+-----------+-----------+------------+--------------------+
|-2147421007| -807271869|           1|[admin.zachwilson...|
|-2147340867| 1324700293|           1|  [www.eczachly.com]|
|-2147051672|  583904608|           1|  [www.eczachly.com]|
|-2146961540| 1436415199|           1|   [dataengineer.io]|
|-2146752203| -887516106|           1|  [www.eczachly.com]|
|-2146501484| -240714045|           1|  [www.eczachly.com]|
|-2146204366|-1893334816|           1|  [www.eczachly.com]|
|-2146151971| 1177446421|           5|  [www.eczachly.com]|
|-2146090945|-1238295066|           1|  [www.eczachly.com]|
|-2146031264| 1439772194|           1|  [www.eczachly.com]|
|-2146003818| 1348650756|           5|[www.zachwilson.t...|
|-2145736510|-1243615787|           2|[www.zachwilson.t...|
|-2145518400|  298775979|           3|[www.zachwilson.t...|
|-2145042879| -339390162|           2|[w

In [7]:
users_and_devices = events.join(
    events_aggregated,
    'user_id'
).groupBy('user_id').agg(
    sf.sum(sf.col('event_counts')).alias('total_hits'), 
    sf.array_agg(sf.col('events.device_id')).alias('devices')
)
users_and_devices.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- ObjectHashAggregate(keys=[user_id#17], functions=[sum(event_counts#55L), collect_list(device_id#217, 0, 0)])
   +- ObjectHashAggregate(keys=[user_id#17], functions=[partial_sum(event_counts#55L), partial_collect_list(device_id#217, 0, 0)])
      +- Project [user_id#17, device_id#217, event_counts#55L]
         +- SortMergeJoin [user_id#17], [user_id#216], Inner
            :- Sort [user_id#17 ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(user_id#17, 4), ENSURE_REQUIREMENTS, [plan_id=187]
            :     +- Filter isnotnull(user_id#17)
            :        +- FileScan csv [user_id#17] Batched: false, DataFilters: [isnotnull(user_id#17)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/events.csv], PartitionFilters: [], PushedFilters: [IsNotNull(user_id)], ReadSchema: struct<user_id:int>
            +- Sort [user_id#216 ASC NULLS FIRST], false, 0
               +- Exchange hash

In [8]:
devices_on_events = devices.join(
    events_aggregated,
    'device_id'
).groupBy('device_id').agg(
    sf.max(sf.col('device_type')).alias('device_type'),
    sf.array_agg(sf.col('events.user_id')).alias('users')
)

devices_on_events.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- ObjectHashAggregate(keys=[device_id#47], functions=[max(device_type#50), collect_list(user_id#17, 0, 0)])
   +- ObjectHashAggregate(keys=[device_id#47], functions=[partial_max(device_type#50), partial_collect_list(user_id#17, 0, 0)])
      +- Project [device_id#47, device_type#50, user_id#17]
         +- SortMergeJoin [device_id#47], [device_id#18], Inner
            :- Sort [device_id#47 ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(device_id#47, 4), ENSURE_REQUIREMENTS, [plan_id=229]
            :     +- Filter isnotnull(device_id#47)
            :        +- FileScan csv [device_id#47,device_type#50] Batched: false, DataFilters: [isnotnull(device_id#47)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/devices.csv], PartitionFilters: [], PushedFilters: [IsNotNull(device_id)], ReadSchema: struct<device_id:int,device_type:string>
            +- Sort [device_id#18 ASC NULLS FIRS