<a href="https://colab.research.google.com/github/bsrikanth24/Python-Interview/blob/main/MergeContinuousDates.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, sum as _sum, min as _min, max as _max, date_add
from pyspark.sql.window import Window

# Initialize Spark session
spark = SparkSession.builder.appName("MergeContinuousDates").getOrCreate()

# Sample data
data = [
    (1, "2025-01-01", "2025-01-10"),
    (1, "2025-01-11", "2025-01-15"),
    (2, "2025-02-01", "2025-02-05"),
    (2, "2025-02-10", "2025-02-15")
]

# Create DataFrame
df = spark.createDataFrame(data, ["id", "start_date", "end_date"])

# Convert dates to proper date format
df = df.withColumn("start_date", col("start_date").cast("date")) \
       .withColumn("end_date", col("end_date").cast("date"))

# Define a window partitioned by ID and ordered by start_date
window_spec = Window.partitionBy("id").orderBy("start_date")

# Use lag to find the previous row's end_date
df = df.withColumn("prev_end_date", lag("end_date").over(window_spec))

# Identify gaps: If the current start_date is NOT continuous with the previous end_date, mark it as a new group
# Use date_add to add 1 day to prev_end_date for comparison
df = df.withColumn(
    "is_new_group",
    (col("start_date") > date_add(col("prev_end_date"), 1)).cast("int")
)

# Use a cumulative sum to create a group identifier for continuous ranges
df = df.withColumn("grp", _sum("is_new_group").over(window_spec))

# Aggregate by ID and group to find the min(start_date) and max(end_date) for each group
result = df.groupBy("id", "grp") \
           .agg(_min("start_date").alias("start_date"), _max("end_date").alias("end_date")) \
           .orderBy("id", "start_date")

# Show the result
result.show(truncate=False)

+---+----+----------+----------+
|id |grp |start_date|end_date  |
+---+----+----------+----------+
|1  |NULL|2025-01-01|2025-01-10|
|1  |0   |2025-01-11|2025-01-15|
|2  |NULL|2025-02-01|2025-02-05|
|2  |1   |2025-02-10|2025-02-15|
+---+----+----------+----------+



In [None]:
"""CREATE TABLE date_ranges (
    id INT,
    start_date DATE,
    end_date DATE
);

INSERT INTO date_ranges (id, start_date, end_date) VALUES
(1, DATE'2025-01-01', DATE'2025-01-10'),
(1, DATE'2025-01-11', DATE'2025-01-15'),
(2, DATE'2025-02-01', DATE'2025-02-05'),
(2, DATE'2025-02-10', DATE'2025-02-15'); """

code = """
WITH step1 AS (
    SELECT
        id,
        start_date,
        end_date,
        LAG(end_date) OVER (PARTITION BY id ORDER BY start_date) AS prev_end_date
    FROM date_ranges
),
step2 AS (
    SELECT
        id,
        start_date,
        end_date,
        CASE
            WHEN prev_end_date IS NULL THEN 0
            WHEN start_date > prev_end_date + 1 THEN 1
            ELSE 0
        END AS is_new_group
    FROM step1
),
step3 AS (
    SELECT
        id,
        start_date,
        end_date,
        SUM(is_new_group) OVER (PARTITION BY id ORDER BY start_date ROWS UNBOUNDED PRECEDING) AS grp
    FROM step2
)
SELECT
    id,
    MIN(start_date) AS start_date,
    MAX(end_date) AS end_date
FROM step3
GROUP BY id, grp
ORDER BY id, start_date;
"""

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, sum as _sum, min as _min, max as _max, date_add, when, coalesce, lit
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("MergeContinuousDates").getOrCreate()

data = [
    (1, "2025-01-01", "2025-01-10"),
    (1, "2025-01-11", "2025-01-15"),
    (2, "2025-02-01", "2025-02-05"),
    (2, "2025-02-10", "2025-02-15")
]

df = spark.createDataFrame(data, ["id", "start_date", "end_date"]) \
          .withColumn("start_date", col("start_date").cast("date")) \
          .withColumn("end_date", col("end_date").cast("date"))

window_spec = Window.partitionBy("id").orderBy("start_date")

df = df.withColumn("prev_end_date", lag("end_date").over(window_spec))
print("After lag:")
df.show(truncate=False)

df = df.withColumn(
    "is_new_group",
    when(col("prev_end_date").isNull(), 0)
    .when(col("start_date") > date_add(col("prev_end_date"), 1), 1)
    .otherwise(0)
)
print("After is_new_group:")
df.show(truncate=False)

df = df.withColumn("grp", _sum("is_new_group").over(window_spec))
df = df.withColumn("grp", coalesce(col("grp"), lit(0)))  # fill NULL grp with 0
print("After cumulative sum grp:")
df.show(truncate=False)

result = df.groupBy("id", "grp") \
           .agg(_min("start_date").alias("start_date"), _max("end_date").alias("end_date")) \
           .orderBy("id", "start_date")

print("Final result:")
result.show(truncate=False)

After lag:
+---+----------+----------+-------------+
|id |start_date|end_date  |prev_end_date|
+---+----------+----------+-------------+
|1  |2025-01-01|2025-01-10|NULL         |
|1  |2025-01-11|2025-01-15|2025-01-10   |
|2  |2025-02-01|2025-02-05|NULL         |
|2  |2025-02-10|2025-02-15|2025-02-05   |
+---+----------+----------+-------------+

After is_new_group:
+---+----------+----------+-------------+------------+
|id |start_date|end_date  |prev_end_date|is_new_group|
+---+----------+----------+-------------+------------+
|1  |2025-01-01|2025-01-10|NULL         |0           |
|1  |2025-01-11|2025-01-15|2025-01-10   |0           |
|2  |2025-02-01|2025-02-05|NULL         |0           |
|2  |2025-02-10|2025-02-15|2025-02-05   |1           |
+---+----------+----------+-------------+------------+

After cumulative sum grp:
+---+----------+----------+-------------+------------+---+
|id |start_date|end_date  |prev_end_date|is_new_group|grp|
+---+----------+----------+-------------+------