In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.functions import to_timestamp, concat_ws, col
from pyspark.ml import Pipeline

# Data pre-processing

## Traffic data

In [0]:
sam_traffic_data = spark.sql("SELECT * FROM workspace.default.samyuktha_trafficdata")

In [0]:
display(sam_traffic_data)

In [0]:
(sam_traffic_data.count(), len(sam_traffic_data.columns)) 

## Weather data

In [0]:
sam_weather_data = spark.sql("SELECT * FROM workspace.default.samyuktha_weatherdata")

In [0]:
display(sam_weather_data)

In [0]:
(sam_weather_data.count(), len(sam_weather_data.columns)) 

In [0]:
sam_traffic_data.select("destination").distinct().show()

In [0]:
sam_traffic_data.select("origin").distinct().show()

In [0]:
sam_weather_data.select('location').distinct().show()

In [0]:
from pyspark.sql.functions import concat, lit

sam_weather_data = sam_weather_data.withColumn(
    'location',
    concat(sam_weather_data['location'], lit(', Hyderabad'))
)
# display(sam_weather_data)

In [0]:
display(sam_weather_data.select('location').distinct())

## Merge datasets

In [0]:
# Convert date + hour into datetime
sam_traffic_data = sam_traffic_data.withColumn("datetime", to_timestamp(concat_ws(" ", col("date"), col("hour")), "yyyy-MM-dd H"))

In [0]:
# Merge on datetime and origin
merged_df = sam_traffic_data.join(
    sam_weather_data,
    (sam_traffic_data.origin == sam_weather_data.location) & (sam_traffic_data.datetime == sam_weather_data.datetime),
    "left"
).drop(sam_weather_data.location, sam_weather_data.datetime)

In [0]:
display(merged_df)

In [0]:
merged_df = merged_df.withColumn("day_of_week", dayofweek("datetime"))

In [0]:
display(merged_df)

In [0]:
%python
from pyspark.ml.feature import StringIndexer

origin_indexer = StringIndexer(     
    inputCol="origin",
    outputCol="origin_index",
    handleInvalid="keep"
)

In [0]:
# Categorical encodings
origin_indexer = StringIndexer(inputCol="origin", outputCol="origin_index", handleInvalid="keep")
dest_indexer = StringIndexer(inputCol="destination", outputCol="destination_index", handleInvalid="keep")
weather_indexer = StringIndexer(inputCol="condition", outputCol="condition_index", handleInvalid="keep")

In [0]:
origin_encoder = OneHotEncoder(inputCol="origin_index", outputCol="origin_vec")
dest_encoder = OneHotEncoder(inputCol="destination_index", outputCol="dest_vec")
weather_encoder = OneHotEncoder(inputCol="condition_index", outputCol="condition_vec")

In [0]:
# Categorical encodings
origin_indexer = StringIndexer(inputCol="origin", outputCol="origin_index", handleInvalid="keep")
dest_indexer = StringIndexer(inputCol="destination", outputCol="destination_index", handleInvalid="keep")
weather_indexer = StringIndexer(inputCol="condition", outputCol="condition_index", handleInvalid="keep")

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

# Generate index for 'origin'
origin_labels = sam_traffic_data.select("origin").distinct().withColumn("origin_index", monotonically_increasing_id())

# Generate index for 'destination'
dest_labels = sam_traffic_data.select("destination").distinct().withColumn("destination_index", monotonically_increasing_id())

# Join indices back
sam_traffic_data = sam_traffic_data.join(origin_labels, on="origin", how="left")
sam_traffic_data = sam_traffic_data.join(dest_labels, on="destination", how="left")


In [0]:
origin_encoder = OneHotEncoder(inputCol="origin_index", outputCol="origin_vec")
dest_encoder = OneHotEncoder(inputCol="destination_index", outputCol="destination_vec")

In [0]:
# Assemble features
feature_cols = [
    "hour_of_day", "day_of_week", "is_weekend",
    "distance_value", "temperature_C", "precipitation_mm",
    "cloudcover_percent", "windspeed_kmph",
    "origin_vec", "destination_vec"
]


In [0]:
assembler = VectorAssembler(
    inputCols=[
        "hour_of_day", "day_of_week", "is_weekend",
        "distance_value", "temperature_C", "precipitation_mm",
        "cloudcover_percent", "windspeed_kmph",
        "origin_vec", "destination_vec"
    ],
    outputCol="features"
)

In [0]:

# Define model
rf = RandomForestRegressor(featuresCol="features", labelCol="duration_in_traffic_value", numTrees=100)

# Build Pipeline
pipeline = Pipeline(stages=[origin_indexer, dest_indexer, origin_encoder, dest_encoder, assembler, rf])


In [0]:
# Import necessary libraries
from pyspark.sql.functions import col, hour, dayofweek, to_timestamp, concat_ws
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator


# Convert date + hour to timestamp
traffic_df = traffic_df.withColumn("datetime", to_timestamp(concat_ws(" ", col("date"), col("hour")), "yyyy-MM-dd H"))

# Join weather data on origin and datetime (assume weather recorded hourly)
joined_df = traffic_df.join(
    weather_df,
    (traffic_df.origin == weather_df.location) & (traffic_df.datetime == weather_df.datetime),
    "left"
).drop(weather_df.location)

# Feature engineering: extract hour and day of week
joined_df = joined_df.withColumn("hour_of_day", hour("datetime"))
joined_df = joined_df.withColumn("day_of_week", dayofweek("datetime"))

# Categorical encodings
origin_indexer = StringIndexer(inputCol="origin", outputCol="origin_index", handleInvalid="keep")
dest_indexer = StringIndexer(inputCol="destination", outputCol="destination_index", handleInvalid="keep")
weather_indexer = StringIndexer(inputCol="condition", outputCol="condition_index", handleInvalid="keep")

origin_encoder = OneHotEncoder(inputCol="origin_index", outputCol="origin_vec")
dest_encoder = OneHotEncoder(inputCol="destination_index", outputCol="dest_vec")
weather_encoder = OneHotEncoder(inputCol="condition_index", outputCol="condition_vec")

# Define feature columns
features = [
    "distance_value", "hour_of_day", "day_of_week", "temperature", "humidity",
    "origin_vec", "dest_vec", "condition_vec"
]

assembler = VectorAssembler(inputCols=features, outputCol="features")

# Define regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="duration_in_traffic_value")

# Pipeline
pipeline = Pipeline(stages=[
    origin_indexer, dest_indexer, weather_indexer,
    origin_encoder, dest_encoder, weather_encoder,
    assembler, rf
])

# Train-test split
train_data, test_data = joined_df.randomSplit([0.8, 0.2], seed=42)

# Train model
model = pipeline.fit(train_data)

# Evaluate
predictions = model.transform(test_data)
evaluator = RegressionEvaluator(
    labelCol="duration_in_traffic_value", predictionCol="prediction", metricName="rmse"
)
rmse = evaluator.evaluate(predictions)
mae = evaluator.setMetricName("mae").evaluate(predictions)

print(f"RMSE: {rmse:.2f}")
print(f"MAE: {mae:.2f}")


PyTorch

In [0]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error

In [0]:
%sql
SELECT * FROM workspace.default.traffic_30 LIMIT 10