In [1]:
from pyspark.sql import SparkSession
import pandas as pd

In [4]:
spark = SparkSession.builder.appName('data').getOrCreate()
spark

24/04/28 18:07:09 WARN Utils: Your hostname, Andrews-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.6.209.31 instead (on interface en0)
24/04/28 18:07:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/28 18:07:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


24/04/28 18:07:24 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [32]:
df_spark = spark.read.csv('groups.csv')
# change column names
df_spark = spark.read.option('header', 'true').csv('groups.csv')
df_spark.show()

+--------+--------------+--------------------+--------------------+--------------------+
|group_id|deviceIDs_list|          created_at|          updated_at|          deleted_at|
+--------+--------------+--------------------+--------------------+--------------------+
|       1|       [1,2,3]|2022-07-14 21:14:...|                null|                null|
|       1|     [1,2,3,4]|2022-07-14 21:14:...|2022-07-15 21:14:...|                null|
|       1|   [1,2,3,4,5]|2022-07-14 21:14:...|2022-07-17 21:14:...|2022-07-18 21:14:...|
|       2|       [1,2,3]|2022-08-14 21:14:...|                null|                null|
|       2|     [1,2,3,4]|2022-08-14 21:14:...|2022-08-15 21:14:...|                null|
|       3|       [1,2,3]|2022-09-14 21:14:...|                null|                null|
|       3|     [1,2,3,4]|2022-09-14 21:14:...|2022-09-15 21:14:...|                null|
+--------+--------------+--------------------+--------------------+--------------------+



# Lifetime

In [34]:
# Calculate the lifetime of each group by substracting the timestamp of 'created_at' from the timestamp of 'deleted_at'
# If 'deleted_at' is null, then the group is still active, so we will use the current timestamp
from pyspark.sql.functions import col

df_spark = df_spark.withColumn('created_at', col('created_at').cast('timestamp'))
df_spark = df_spark.withColumn('deleted_at', col('deleted_at').cast('timestamp'))
df_spark.show()

# replace null values in 'deleted_at' with the current timestamp
from pyspark.sql.functions import current_timestamp, coalesce
df_spark_non_null = df_spark.withColumn('deleted_at', coalesce(col("deleted_at"), current_timestamp()))
df_spark_non_null.show()

# if there's a timestamp in 'deleted_at', remove all the other rows with the same 'group_id'
# TODO

# if there are multiple rows with the same 'group_id' just keep the first one and remove the rest
# for the lifetime calculation, it doesn't matter if we keep the first or the last row because 
# the lifetime is calculated by substracting the created_at from the now()
# TODO


# calculate the lifetime of each group
from pyspark.sql.functions import datediff
df_spark_lifetime = df_spark_non_null.withColumn('lifetime', datediff(col('deleted_at'), col('created_at')))
df_spark_lifetime.show()

+--------+--------------+--------------------+--------------------+--------------------+
|group_id|deviceIDs_list|          created_at|          updated_at|          deleted_at|
+--------+--------------+--------------------+--------------------+--------------------+
|       1|       [1,2,3]|2022-07-14 21:14:...|                null|                NULL|
|       1|     [1,2,3,4]|2022-07-14 21:14:...|2022-07-15 21:14:...|                NULL|
|       1|   [1,2,3,4,5]|2022-07-14 21:14:...|2022-07-17 21:14:...|2022-07-18 21:14:...|
|       2|       [1,2,3]|2022-08-14 21:14:...|                null|                NULL|
|       2|     [1,2,3,4]|2022-08-14 21:14:...|2022-08-15 21:14:...|                NULL|
|       3|       [1,2,3]|2022-09-14 21:14:...|                null|                NULL|
|       3|     [1,2,3,4]|2022-09-14 21:14:...|2022-09-15 21:14:...|                NULL|
+--------+--------------+--------------------+--------------------+--------------------+

+--------+----------

# Avg, Min, Max Cardinality

In [31]:
# idea: group by 'group_id' and calculate the average size of devicedIDs_list for each group 
from pyspark.sql.functions import avg
df_spark_avg = df_spark_lifetime.groupBy('group_id').agg(avg('deviceIDs_list'))
df_spark_avg.show()


+--------------------+-------------------+
|            group_id|avg(deviceIDs_list)|
+--------------------+-------------------+
|c15eb63e-bc0f-412...|               NULL|
|160a2907-ce33-43a...|               NULL|
|5809f334-5f9e-44b...|               NULL|
|e959e56f-9df2-425...|               NULL|
|a91a31cd-ce59-4b9...|               NULL|
|d5405986-c54c-474...|               NULL|
|2a8fd6e1-4fae-478...|               NULL|
|b5e39e1e-ea8a-4da...|               NULL|
|4b597716-2155-498...|               NULL|
|9c1e473b-b2e7-45b...|               NULL|
|4d69f5cf-8957-464...|               NULL|
|cfba891c-256d-4f0...|               NULL|
|0034ffcf-77d8-486...|               NULL|
|3e5f13bb-0ea5-43f...|               NULL|
|d668c66e-9d36-4c9...|               NULL|
|f346d3f4-93b5-40e...|               NULL|
|78b43764-17c9-48b...|               NULL|
|93786375-d922-466...|               NULL|
|8ba4b09f-d214-458...|               NULL|
|e60c13cd-1f0d-4f8...|               NULL|
+----------