In [6]:
#using lag first and last functions

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [8]:
spark = SparkSession.builder.appName("event_status").getOrCreate()
# Create input data
data = [
    ('2020-06-01', 'Won'),
    ('2020-06-02', 'Won'),
    ('2020-06-03', 'Won'),
    ('2020-06-04', 'Lost'),
    ('2020-06-05', 'Lost'),
    ('2020-06-07', 'Won')
]

In [9]:
df = spark.createDataFrame(data,["event_date","event_status"])

In [10]:
df.show()

+----------+------------+
|event_date|event_status|
+----------+------------+
|2020-06-01|         Won|
|2020-06-02|         Won|
|2020-06-03|         Won|
|2020-06-04|        Lost|
|2020-06-05|        Lost|
|2020-06-07|         Won|
+----------+------------+



In [11]:
df = df.withColumn("event_date", col("event_date").cast("date"))

In [14]:


from pyspark.sql.window import Window
df1 = df.withColumn("event_change",when(col("event_status")!=lag("event_status").over(Window.orderBy("event_date")),1).otherwise(0))
df1.show()

+----------+------------+------------+
|event_date|event_status|event_change|
+----------+------------+------------+
|2020-06-01|         Won|           0|
|2020-06-02|         Won|           0|
|2020-06-03|         Won|           0|
|2020-06-04|        Lost|           1|
|2020-06-05|        Lost|           0|
|2020-06-07|         Won|           1|
+----------+------------+------------+



In [17]:
df2 =df1.withColumn("event_group",sum("event_change").over(Window.orderBy("event_date")))

In [18]:
df2.show()

+----------+------------+------------+-----------+
|event_date|event_status|event_change|event_group|
+----------+------------+------------+-----------+
|2020-06-01|         Won|           0|          0|
|2020-06-02|         Won|           0|          0|
|2020-06-03|         Won|           0|          0|
|2020-06-04|        Lost|           1|          1|
|2020-06-05|        Lost|           0|          1|
|2020-06-07|         Won|           1|          2|
+----------+------------+------------+-----------+



In [19]:
output_df = df2.groupBy("event_group","event_status")\
                .agg(first("event_date").alias("start_date"),last("event_date").alias("end_date")).drop("event_group")

output_df.show()

+------------+----------+----------+
|event_status|start_date|  end_date|
+------------+----------+----------+
|         Won|2020-06-01|2020-06-03|
|        Lost|2020-06-04|2020-06-05|
|         Won|2020-06-07|2020-06-07|
+------------+----------+----------+



In [None]:
#writing the code in sparksql

In [20]:
df.createOrReplaceTempView("events")

In [24]:
query = """
WITH CTE1 AS (
    SELECT
        event_date,
        event_status,
        CASE 
            WHEN event_status != LAG(event_status) OVER (ORDER BY event_date) 
            THEN 1 
            ELSE 0 
        END AS event_change
    FROM events
),
CTE2 AS (
    SELECT
        event_date,
        event_status,
        SUM(event_change) OVER (ORDER BY event_date) AS event_group
    FROM CTE1
)
SELECT
    event_status,
    MIN(event_date) AS start_date,
    MAX(event_date) AS end_date
FROM CTE2
GROUP BY event_group, event_status
ORDER BY start_date
"""

In [25]:
# Execute the query and get the result
result = spark.sql(query)

# Show the result
result.show()

+------------+----------+----------+
|event_status|start_date|  end_date|
+------------+----------+----------+
|         Won|2020-06-01|2020-06-03|
|        Lost|2020-06-04|2020-06-05|
|         Won|2020-06-07|2020-06-07|
+------------+----------+----------+

