# find n consecutive date records

In [1]:
import os
os.getcwd()
os.chdir("H:\pyspark_advanced-coding_interview")
os.getcwd()

'H:\\pyspark_advanced-coding_interview'

# Spark SQL

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
from datetime import datetime

# Initialize Spark Session
spark = SparkSession.builder.appName("ConsecutiveSalesDays").getOrCreate()

# Define schema using StructType and StructField
schema = StructType([
    StructField("SaleID", IntegerType(), True),
    StructField("ProductID", IntegerType(), True),
    StructField("SaleDate", DateType(), True),
    StructField("SalesAmount", IntegerType(), True)
])

# Sample sales data
data = [
    (1, 101, datetime(2024, 10, 1), 200),
    (2, 101, datetime(2024, 10, 2), 150),
    (3, 101, datetime(2024, 10, 3), 180),
    (4, 101, datetime(2024, 10, 5), 220),
    (5, 101, datetime(2024, 10, 6), 210),
    (6, 101, datetime(2024, 10, 7), 190),
    (7, 102, datetime(2024, 10, 2), 300),
    (8, 102, datetime(2024, 10, 3), 250),
    (9, 102, datetime(2024, 10, 4), 350),
    (10, 102, datetime(2024, 10, 6), 400),
    (11, 102, datetime(2024, 10, 8), 450),
    (12, 103, datetime(2024, 10, 1), 100),
    (13, 103, datetime(2024, 10, 2), 120),
    (14, 103, datetime(2024, 10, 3), 140),
    (15, 103, datetime(2024, 10, 7), 160),
]

# Create DataFrame
df = spark.createDataFrame(data, schema)
df.show(truncate=False)


+------+---------+----------+-----------+
|SaleID|ProductID|SaleDate  |SalesAmount|
+------+---------+----------+-----------+
|1     |101      |2024-10-01|200        |
|2     |101      |2024-10-02|150        |
|3     |101      |2024-10-03|180        |
|4     |101      |2024-10-05|220        |
|5     |101      |2024-10-06|210        |
|6     |101      |2024-10-07|190        |
|7     |102      |2024-10-02|300        |
|8     |102      |2024-10-03|250        |
|9     |102      |2024-10-04|350        |
|10    |102      |2024-10-06|400        |
|11    |102      |2024-10-08|450        |
|12    |103      |2024-10-01|100        |
|13    |103      |2024-10-02|120        |
|14    |103      |2024-10-03|140        |
|15    |103      |2024-10-07|160        |
+------+---------+----------+-----------+



In [3]:
# Register the DataFrame as a temporary table
df.createOrReplaceTempView("Sales")


In [4]:

query = spark.sql("""    
  WITH ConsecutiveSales AS (
    SELECT *,
           DATEDIFF(SaleDate, LAG(SaleDate, 1) OVER (PARTITION BY ProductID ORDER BY SaleDate)) AS PrevDiff,
           DATEDIFF(LEAD(SaleDate, 1) OVER (PARTITION BY ProductID ORDER BY SaleDate), SaleDate) AS NextDiff
    FROM Sales
)
SELECT ProductID, SaleDate, SalesAmount
FROM ConsecutiveSales
WHERE PrevDiff = 1 AND NextDiff = 1
ORDER BY ProductID, SaleDate;                
                  """)

query.show()




+---------+----------+-----------+
|ProductID|  SaleDate|SalesAmount|
+---------+----------+-----------+
|      101|2024-10-02|        150|
|      101|2024-10-06|        210|
|      102|2024-10-03|        250|
|      103|2024-10-02|        120|
+---------+----------+-----------+



In [9]:
# Execute the corrected query
query2 = spark.sql(""" 
WITH NumberedSales AS (
    SELECT ProductID, SaleDate,
           ROW_NUMBER() OVER (PARTITION BY ProductID ORDER BY SaleDate) AS RowNum
    FROM Sales
),
DateGroups AS (
    SELECT ProductID, SaleDate,
           DATE_SUB(SaleDate, RowNum) AS GroupID
    FROM NumberedSales
)
SELECT ProductID, MIN(SaleDate) AS StartDate, MAX(SaleDate) AS EndDate, COUNT(*) AS ConsecutiveDays
FROM DateGroups
GROUP BY ProductID, GroupID
HAVING COUNT(*) >= 3
ORDER BY ProductID, StartDate;
""")

# Show the results
query2.show(truncate=False)


+---------+----------+----------+---------------+
|ProductID|StartDate |EndDate   |ConsecutiveDays|
+---------+----------+----------+---------------+
|101      |2024-10-01|2024-10-03|3              |
|101      |2024-10-05|2024-10-07|3              |
|102      |2024-10-02|2024-10-04|3              |
|103      |2024-10-01|2024-10-03|3              |
+---------+----------+----------+---------------+



# Pyspark

In [10]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, lead, datediff

# Define a window specification
window_spec = Window.partitionBy("ProductID").orderBy("SaleDate")

# Calculate difference with previous and next date
df_consecutive = df.withColumn("PrevDateDiff", datediff(col("SaleDate"), lag("SaleDate", 1).over(window_spec))) \
                   .withColumn("NextDateDiff", datediff(lead("SaleDate", 1).over(window_spec), col("SaleDate")))

# Filter to find records where both previous and next dates are consecutive
df_filtered = df_consecutive.filter((col("PrevDateDiff") == 1) & (col("NextDateDiff") == 1))
df_filtered.show(truncate=False)


+------+---------+----------+-----------+------------+------------+
|SaleID|ProductID|SaleDate  |SalesAmount|PrevDateDiff|NextDateDiff|
+------+---------+----------+-----------+------------+------------+
|2     |101      |2024-10-02|150        |1           |1           |
|5     |101      |2024-10-06|210        |1           |1           |
|8     |102      |2024-10-03|250        |1           |1           |
|13    |103      |2024-10-02|120        |1           |1           |
+------+---------+----------+-----------+------------+------------+



In [11]:
from pyspark.sql.functions import row_number, date_sub

# Create a window with row number
window_spec = Window.partitionBy("ProductID").orderBy("SaleDate")

# Apply row number and create a group ID
df_grouped = df.withColumn("RowNum", row_number().over(window_spec)) \
               .withColumn("GroupID", date_sub(col("SaleDate"), col("RowNum")))

# Group by ProductID and GroupID to find consecutive sequences
df_consecutive_grouped = df_grouped.groupBy("ProductID", "GroupID") \
                                   .agg({"SaleDate": "min", "SaleDate": "max", "SaleID": "count"}) \
                                   .withColumnRenamed("min(SaleDate)", "StartDate") \
                                   .withColumnRenamed("max(SaleDate)", "EndDate") \
                                   .withColumnRenamed("count(SaleID)", "ConsecutiveDays") \
                                   .filter(col("ConsecutiveDays") >= 3)

df_consecutive_grouped.show(truncate=False)


+---------+----------+---------------+----------+
|ProductID|GroupID   |ConsecutiveDays|EndDate   |
+---------+----------+---------------+----------+
|101      |2024-09-30|3              |2024-10-03|
|101      |2024-10-01|3              |2024-10-07|
|102      |2024-10-01|3              |2024-10-04|
|103      |2024-09-30|3              |2024-10-03|
+---------+----------+---------------+----------+



In [13]:
from pyspark.sql.functions import row_number, datediff, min, max, count, expr

# Define a window specification
window_spec = Window.partitionBy("ProductID").orderBy("SaleDate")

# Assign RowNum for each SaleDate and create a consistent offset
df_advanced = df.withColumn("RowNum", row_number().over(window_spec)) \
                .withColumn("DateOffset", expr("date_sub(SaleDate, RowNum)"))

# Group by ProductID and DateOffset to identify consecutive dates
df_grouped_advanced = df_advanced.groupBy("ProductID", "DateOffset") \
                                 .agg(min("SaleDate").alias("StartDate"), 
                                      max("SaleDate").alias("EndDate"), 
                                      count("SaleID").alias("ConsecutiveDays")) \
                                 .filter(col("ConsecutiveDays") >= 3) \
                                 .orderBy("ProductID", "StartDate")

df_grouped_advanced.show(truncate=False)



+---------+----------+----------+----------+---------------+
|ProductID|DateOffset|StartDate |EndDate   |ConsecutiveDays|
+---------+----------+----------+----------+---------------+
|101      |2024-09-30|2024-10-01|2024-10-03|3              |
|101      |2024-10-01|2024-10-05|2024-10-07|3              |
|102      |2024-10-01|2024-10-02|2024-10-04|3              |
|103      |2024-09-30|2024-10-01|2024-10-03|3              |
+---------+----------+----------+----------+---------------+

