# Problem Statement
https://medium.com/data-engineer-things/goldman-sachs-pyspark-interview-question-hard-level-8ed11a49c9c7

Calculate the minimum number of platforms required at a train station based on the given arrival_times and departure_times

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("train-station").getOrCreate()

# Sample Data (Train Arrival and Departure times)
arrivals_data = [
    (1, '2024-11-17 08:00'),
    (2, '2024-11-17 08:05'),
    (3, '2024-11-17 08:05'),
    (4, '2024-11-17 08:10'),
    (5, '2024-11-17 08:10'),
    (6, '2024-11-17 12:15'),
    (7, '2024-11-17 12:20'),
    (8, '2024-11-17 12:25'),
    (9, '2024-11-17 15:00'),
    (10, '2024-11-17 15:00'),
    (11, '2024-11-17 15:00'),
    (12, '2024-11-17 15:06'),
    (13, '2024-11-17 20:00'),
    (14, '2024-11-17 20:10')
]

departures_data = [
    (1, '2024-11-17 08:15'),
    (2, '2024-11-17 08:10'),
    (3, '2024-11-17 08:20'),
    (4, '2024-11-17 08:25'),
    (5, '2024-11-17 08:20'),
    (6, '2024-11-17 13:00'),
    (7, '2024-11-17 12:25'),
    (8, '2024-11-17 12:30'),
    (9, '2024-11-17 15:05'),
    (10, '2024-11-17 15:10'),
    (11, '2024-11-17 15:15'),
    (12, '2024-11-17 15:15'),
    (13, '2024-11-17 20:15'),
    (14, '2024-11-17 20:15')
]

# Create DataFrames
arrivals_df = spark.createDataFrame(arrivals_data, ['train_id', 'arrival_time'])
departures_df = spark.createDataFrame(departures_data, ['train_id', 'departure_time'])

# join the two dataframes
train_station_df = arrivals_df.join(departures_df, 'train_id')

In [None]:
# cast the arrival and departure times to timestamp
train_station_df = train_station_df \
    .withColumn('arrival_time', F.to_timestamp('arrival_time')) \
    .withColumn('departure_time', F.to_timestamp('departure_time'))

train_station_df.show()

In [None]:
train_station_df.printSchema()

In [None]:
intervals_df = train_station_df \
    .withColumn('interval', F.expr('departure_time - arrival_time')) \

# get minimum interval

intervals_min_df = intervals_df.select(F.min('interval'))
intervals_min_df.show()

interval_string = intervals_min_df.first()[0]
print(f"Minimum interval: {interval_string}")

# Interval can be anything less than or equal to the difference between arrival and departure (see previous code cell)
interval_minutes = int(interval_string.total_seconds() / 60)
print(f"Interval in minutes: {interval_minutes}")

In [None]:
trains_at_station = train_station_df \
    .withColumn(
        "at_station_sequence", 
        F.explode(
            F.sequence(
                F.col("arrival_time"),
                F.col("departure_time"),
                F.expr(f"INTERVAL {interval_minutes} minutes") 
            )
        )
    )

trains_at_station.show()

In [None]:
# Recommended by Copilot for comparing depatures to sequence
# .withColumn("train_count_minus_departures", F.size("trains_at_station") - F.size(F.expr("filter(trains_at_station, x -> x.departure_time = at_station_sequence)"))) 

# My version
# .withColumn(
#     "train_count_minus_departures", 
#     F.when(
#         F.expr("array_contains(trains_at_station.departure_time, at_station_sequence)"), 
#         F.col("train_count") - 1
#     ).otherwise(F.col("train_count"))
# )

In [None]:
trains_at_station_during_sequence = trains_at_station \
    .groupBy("at_station_sequence") \
    .agg(
        F.collect_list(
            F.struct(
                "train_id",
                "departure_time"
        )).alias("trains_at_station") \
    ) \
    .withColumn("train_count", F.size("trains_at_station")) \
    .withColumn(
        "train_count_minus_departures", 
        F.when(
            F.expr("array_contains(trains_at_station.departure_time, at_station_sequence)"), 
            F.col("train_count") - 1
        ).otherwise(F.col("train_count"))
    ) \
    .sort("at_station_sequence")

trains_at_station_during_sequence.show(truncate=False)

In [49]:
platform_count = trains_at_station_during_sequence \
    .orderBy(F.desc("train_count_minus_departures")) \
    .limit(1)     

platform_count.show(truncate=False)

+-------------------+----------------------------------------------------------------------------------------------------------------------------------+-----------+----------------------------+
|at_station_sequence|trains_at_station                                                                                                                 |train_count|train_count_minus_departures|
+-------------------+----------------------------------------------------------------------------------------------------------------------------------+-----------+----------------------------+
|2024-11-17 08:10:00|[{1, 2024-11-17 08:15:00}, {2, 2024-11-17 08:10:00}, {3, 2024-11-17 08:20:00}, {4, 2024-11-17 08:25:00}, {5, 2024-11-17 08:20:00}]|5          |4                           |
+-------------------+----------------------------------------------------------------------------------------------------------------------------------+-----------+----------------------------+

