In [29]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col, count, avg
from pyspark.sql.functions import initcap

In [38]:
# Initialize Spark Session
spark = SparkSession.builder.appName("SparkChallenge").getOrCreate()

In [3]:
# Step 1: Read the CSV file
df = spark.read.csv("/app/data/events.csv", header=True, inferSchema=True)

                                                                                

In [30]:
df = df.withColumn("action", initcap(col("action")))

In [32]:
df_grouped = df.groupBy(window(col("time"), "10 minutes"), col("action")) \
               .count() \
               .groupBy("window") \
               .pivot("action", ["Open", "Close"]) \
               .sum("count") \
               .withColumnRenamed("Open", "open_count") \
               .withColumnRenamed("Close", "close_count") \
               .na.fill(0)  # Replace nulls with zeros for missing actions in some windows

In [34]:
df_grouped = df_grouped.withColumn("avg_actions", (col("open_count") + col("close_count")) / 2)

In [35]:
df_grouped.show(1, 1000, vertical=True)

-RECORD 0-------------------------------------------------
 window      | {2016-07-27 06:00:00, 2016-07-27 06:10:00} 
 open_count  | 179                                        
 close_count | 184                                        
 avg_actions | 181.5                                      
only showing top 1 row



In [36]:
top_10_opens = df_grouped.orderBy(col("open_count").desc()).limit(10)

In [37]:
top_10_opens.show()

+--------------------+----------+-----------+-----------+
|              window|open_count|close_count|avg_actions|
+--------------------+----------+-----------+-----------+
|{2016-07-26 20:10...|       185|        189|      187.0|
|{2016-07-27 15:00...|       184|        160|      172.0|
|{2016-07-27 00:00...|       184|        170|      177.0|
|{2016-07-27 07:40...|       184|        164|      174.0|
|{2016-07-27 08:20...|       184|        165|      174.5|
|{2016-07-27 20:50...|       184|        210|      197.0|
|{2016-07-27 13:20...|       182|        173|      177.5|
|{2016-07-27 14:30...|       182|        152|      167.0|
|{2016-07-27 06:50...|       182|        164|      173.0|
|{2016-07-27 05:40...|       180|        177|      178.5|
+--------------------+----------+-----------+-----------+



In [27]:
df.groupBy(window(col("time"), "10 minutes"), col("action")).count().groupBy("window").pivot("action", ["Open", "Close"]).sum("count").orderBy(col('window')).show(4, 1000, vertical=True)

-RECORD 0--------------------------------------------
 window | {2016-07-26 02:40:00, 2016-07-26 02:50:00} 
 Open   | 32                                         
 Close  | NULL                                       
-RECORD 1--------------------------------------------
 window | {2016-07-26 02:50:00, 2016-07-26 03:00:00} 
 Open   | 147                                        
 Close  | 11                                         
-RECORD 2--------------------------------------------
 window | {2016-07-26 03:00:00, 2016-07-26 03:10:00} 
 Open   | 162                                        
 Close  | 19                                         
-RECORD 3--------------------------------------------
 window | {2016-07-26 03:10:00, 2016-07-26 03:20:00} 
 Open   | 169                                        
 Close  | 42                                         
only showing top 4 rows



In [22]:
df.groupBy(window(col("time"), "10 minutes"), col("action")).count().orderBy(col('window')).show(4, 1000, vertical=True)

-RECORD 0--------------------------------------------
 window | {2016-07-26 02:40:00, 2016-07-26 02:50:00} 
 action | Open                                       
 count  | 32                                         
-RECORD 1--------------------------------------------
 window | {2016-07-26 02:50:00, 2016-07-26 03:00:00} 
 action | Close                                      
 count  | 11                                         
-RECORD 2--------------------------------------------
 window | {2016-07-26 02:50:00, 2016-07-26 03:00:00} 
 action | Open                                       
 count  | 147                                        
-RECORD 3--------------------------------------------
 window | {2016-07-26 03:00:00, 2016-07-26 03:10:00} 
 action | Close                                      
 count  | 19                                         
only showing top 4 rows

