In [87]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import window, count, session_window


In [88]:
spark = SparkSession.builder.master("local").appName("Windowing").getOrCreate()

# Sample Data is generated for windowing examples
windowingData = (
("12", "2019-01-02 15:30:00"),
("12",  "2019-01-02 15:30:30"),
("12",  "2019-01-02 15:31:00"),
("12",  "2019-01-02 15:31:50"),
("12",  "2019-01-02 15:31:55"),
("16",  "2019-01-02 15:33:00"),
("16",  "2019-01-02 15:35:20"),
("16",  "2019-01-02 15:37:00"),
("20",  "2019-01-02 15:30:30"),
("20",  "2019-01-02 15:31:00"),
("20",  "2019-01-02 15:31:50"),
("20",  "2019-01-02 15:31:55"),
("20",  "2019-01-02 15:33:00"),
("20",  "2019-01-02 15:35:20"),
("20",  "2019-01-02 15:37:00"),
("20",  "2019-01-02 15:40:00"),
("20",  "2019-01-02 15:45:00"),
("20",  "2019-01-02 15:46:00"),
("20",  "2019-01-02 15:47:30"),
("20",  "2019-01-02 15:48:00"),
("20",  "2019-01-02 15:48:10"),
("20",  "2019-01-02 15:48:20"),
("20",  "2019-01-02 15:48:30"),
("20",  "2019-01-02 15:50:00"),
("20",  "2019-01-02 15:53:00"),
("20",  "2019-01-02 15:54:30"),
("20",  "2019-01-02 15:55:00"),
("22",  "2019-01-02 15:50:30"),
("22",  "2019-01-02 15:52:00"),
("22",  "2019-01-02 15:50:30"),
("22",  "2019-01-02 15:52:00"),
("22",  "2019-01-02 15:50:30"),
("22",  "2019-01-02 15:52:00"))

columns = ["eventId", "timeReceived"]

windowing_df = spark.createDataFrame(data = windowingData, schema = columns)

windowing_df = windowing_df.sort(windowing_df.timeReceived.asc()) # .withWatermark("timeReceived", "10 minutes")

windowing_df.show(40, truncate=False)



+-------+-------------------+
|eventId|timeReceived       |
+-------+-------------------+
|12     |2019-01-02 15:30:00|
|12     |2019-01-02 15:30:30|
|20     |2019-01-02 15:30:30|
|20     |2019-01-02 15:31:00|
|12     |2019-01-02 15:31:00|
|12     |2019-01-02 15:31:50|
|20     |2019-01-02 15:31:50|
|12     |2019-01-02 15:31:55|
|20     |2019-01-02 15:31:55|
|16     |2019-01-02 15:33:00|
|20     |2019-01-02 15:33:00|
|16     |2019-01-02 15:35:20|
|20     |2019-01-02 15:35:20|
|20     |2019-01-02 15:37:00|
|16     |2019-01-02 15:37:00|
|20     |2019-01-02 15:40:00|
|20     |2019-01-02 15:45:00|
|20     |2019-01-02 15:46:00|
|20     |2019-01-02 15:47:30|
|20     |2019-01-02 15:48:00|
|20     |2019-01-02 15:48:10|
|20     |2019-01-02 15:48:20|
|20     |2019-01-02 15:48:30|
|20     |2019-01-02 15:50:00|
|22     |2019-01-02 15:50:30|
|22     |2019-01-02 15:50:30|
|22     |2019-01-02 15:50:30|
|22     |2019-01-02 15:52:00|
|22     |2019-01-02 15:52:00|
|22     |2019-01-02 15:52:00|
|20     |2

In [93]:
session = windowing_df.groupBy(session_window("timeReceived", "5 minutes")) \
                       .agg(count("eventId").alias("conteo")) 
                    


session = windowing_df.select("eventId","timeReceived",session_window("timeReceived", "5 minutes")).orderBy("timeReceived")


session.show(1000,truncate=False)

+-------+-------------------+------------------------------------------+
|eventId|timeReceived       |session_window                            |
+-------+-------------------+------------------------------------------+
|12     |2019-01-02 15:30:00|{2019-01-02 15:30:00, 2019-01-02 15:35:00}|
|12     |2019-01-02 15:30:30|{2019-01-02 15:30:30, 2019-01-02 15:35:30}|
|20     |2019-01-02 15:30:30|{2019-01-02 15:30:30, 2019-01-02 15:35:30}|
|20     |2019-01-02 15:31:00|{2019-01-02 15:31:00, 2019-01-02 15:36:00}|
|12     |2019-01-02 15:31:00|{2019-01-02 15:31:00, 2019-01-02 15:36:00}|
|12     |2019-01-02 15:31:50|{2019-01-02 15:31:50, 2019-01-02 15:36:50}|
|20     |2019-01-02 15:31:50|{2019-01-02 15:31:50, 2019-01-02 15:36:50}|
|12     |2019-01-02 15:31:55|{2019-01-02 15:31:55, 2019-01-02 15:36:55}|
|20     |2019-01-02 15:31:55|{2019-01-02 15:31:55, 2019-01-02 15:36:55}|
|16     |2019-01-02 15:33:00|{2019-01-02 15:33:00, 2019-01-02 15:38:00}|
|20     |2019-01-02 15:33:00|{2019-01-02 15:33:00, 

In [86]:
spark.stop()