# Databricks Notebook for UK Railway Data Analysis

**Objective**: Analyse railway ticket transactions to uncover insights on:
- Journey performance (on-time, delayed, cancelled)
- Revenue patterns and pricing
- Station performance metrics
- Customer behavior analysis
- **Dataset**: 31,653 railway transactions with 18 features

In [0]:
# Import libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import count, when, col, sum, round
from pyspark.sql.window import Window

1.  Data Loading and Schema Definition

In [0]:
# initialise spark session
spark = SparkSession.builder.appName("Spark DataFrames").getOrCreate()
# Read in data from Unity Catalog table
#spark_df = spark.read.table("workspace.default.railway")
# Create Pandas DataFrame
#df = spark_df.toPandas()


In [0]:
# Defined schema for the railway dataset
schema = StructType([
    StructField("Transaction_ID", StringType(), True),
    StructField("Date_of_Purchase", DateType(), True),
    StructField("Time_of_Purchase", StringType(), True),
    StructField("Purchase_Type", StringType(), True),
    StructField("Payment_Method", StringType(), True),
    StructField("Railcard", StringType(), True),
    StructField("Ticket_Class", StringType(), True),
    StructField("Ticket_Type", StringType(), True),
    StructField("Price", IntegerType(), True),
    StructField("Departure_Station", StringType(), True),
    StructField("Arrival_Destination", StringType(), True),
    StructField("Date_of_Journey", DateType(), True),
    StructField("Departure_Time", StringType(), True),
    StructField("Arrival_Time", StringType(), True),
    StructField("Actual_Arrival_Time", StringType(), True),
    StructField("Journey_Status", StringType(), True),
    StructField("Reason_for_Delay", StringType(), True),
    StructField("Refund_Request", StringType(), True)
])

In [0]:
# Load railway data from Unity Catalog table instead of DBFS
# This avoids DBFS_DISABLED errors and uses the managed table

df_spark = spark.read.table("workspace.default.railway")

In [0]:
# Display basic info
try:
    print(f"Dataset Shape: {df_spark.count()} rows x {len(df_spark.columns)} columns")
except Exception as e:
    print("Could not count rows:", e)
    print(f"Columns: {df_spark.columns}")
df_spark.printSchema()

2. Data Quality Check

In [0]:
# Check for missing values
from pyspark.sql.functions import col, sum as spark_sum, when, count

missing_values = df_spark.select([
    count(when(col(c).isNull(), c)).alias(c) for c in df_spark.columns
])
missing_values.show()

3. Key Business Metrics

In [0]:
# Journey Status Distribution
journey_status = df_spark.groupBy("Journey Status").agg(
    count("*").alias("Count"),
    round(count("*") / df_spark.count() * 100, 2).alias("Percentage")
).orderBy(col("Count").desc())

journey_status.show()


In [0]:
# Revenue Analysis
revenue_metrics = df_spark.agg(
    sum("Price").alias("Total_Revenue"),
    avg("Price").alias("Avg_Price"),
    max("Price").alias("Max_Price"),
    min("Price").alias("Min_Price"),
)
revenue_metrics.show()

In [0]:
# Revenue by Ticket Type
revenue_by_type = df_spark.groupBy("Ticket Type").agg(
    sum("Price").alias("Total_Revenue"),
    avg("Price").alias("Avg_Price"),
    count("*").alias("Ticket_Count")
).orderBy(col("Total_Revenue").desc())

revenue_by_type.show()

In [0]:
# Purchase Type Distribution
purchase_type = df_spark.groupby("Purchase Type").agg(count("*")).alias("count")
display(purchase_type)


In [0]:
# Payment Method Distribution
payment_method = df_spark.groupby("Payment Method").agg(count("*")).alias("count")
display(payment_method) 

4.  Delay Analysis

In [0]:
# Delay reasons analysis
delay_reasons = df_spark.filter(col("Reason for Delay").isNotNull()) \
    .groupBy("Reason for Delay") \
    .agg(count("*").alias("Incident_Count")) \
    .orderBy(col("Incident_Count").desc())

delay_reasons.show(10)

In [0]:
# Station delay rates (minimum 100 bookings)
from pyspark.sql.functions import count, when, col, sum, round
from pyspark.sql.window import Window

station_stats = df_spark.groupBy("Departure Station").agg(
    count("*").alias("Total Bookings"),
    sum(when(col("Journey Status") == "Delayed", 1).otherwise(0)).alias("Delayed Count")
).filter(col("Total Bookings") >= 100)

station_delay_rate = station_stats.withColumn(
    "Delay Rate", 
    round(col("Delayed Count") / col("Total Bookings") * 100, 2)
).orderBy(col("Delay Rate").desc())

station_delay_rate.show(10)


5. Refund Analysis

In [0]:
# Refund request rate by journey status
refund_analysis = df_spark.groupBy("Journey Status").agg(
    count("*").alias("Total Journeys"),
    sum(when(col("Refund Request") == "Yes", 1).otherwise(0)).alias("Refund Requests")
).withColumn(
    "Refund Rate Pct", 
    round(col("Refund Requests") / col("Total Journeys") * 100, 2)
)

refund_analysis.show()

6. Customer Behavior Insights

In [0]:
# Purchase type distribution
purchase_behavior = df_spark.groupBy("Purchase Type", "Ticket Class").agg(
    count("*").alias("Count"),
    avg("Price").alias("Avg_Price")
).orderBy("Purchase Type", "Ticket Class")

purchase_behavior.show()

In [0]:
# Payment method preferences
payment_stats = df_spark.groupBy("Payment Method").agg(
    count("*").alias("Count"),
    round(count("*") / df_spark.count() * 100, 2).alias("Percentage")
).orderBy(col("Count").desc())

payment_stats.show()


7. Top Routes Analysis

In [0]:
#  created route column and find top routes
df_with_route = df_spark.withColumn(
    "Route", 
    concat(col("Departure Station"), lit(" → "), col("Arrival Destination"))
)

top_routes = df_with_route.groupBy("Route").agg(
    count("*").alias("Booking_Count"),
    avg("Price").alias("Avg_Price"),
    sum("Price").alias("Total_Revenue")
).orderBy(col("Booking_Count").desc())

top_routes.show(10, truncate=False)

8. Time-Based Analysis

In [0]:
# Extracted hours from departure time
df_with_hour = df_spark.withColumn(
    "Departure Hour",
    hour(col("Departure Time"))
)

# Peak hours analysis
peak_hours = df_with_hour.groupBy("Departure Hour").agg(
    count("*").alias("Booking_Count")
).orderBy("Departure Hour")
peak_hours.display(24)

9. Visualisation (Matplotlib)

In [0]:
journey_status_pd= journey_status.toPandas()


In [0]:
fig, ax = plt.subplots(figsize=(10, 6))
colors = ['#2ecc71', '#f39c12', '#e74c3c']
ax.pie(journey_status_pd['Count'], labels= journey_status_pd['Journey Status'], 
       autopct='%1.1f%%', colors=colors, explode=(0.05, 0.05, 0.05))
ax.set_title('Journey Status Distribution', fontsize=14, fontweight='bold')
plt.show()


10. Key Insights Summary

In [0]:
#key metrics for summary
total_revenue = df_spark.agg(sum("Price")).collect()[0][0]
total_bookings = df_spark.count()
delay_rate = df_spark.filter(col("Journey Status") == "Delayed").count() / total_bookings * 100
cancellation_rate = df_spark.filter(col("Journey Status") == "Cancelled").count() / total_bookings * 100
refund_rate = df_spark.filter(col("Refund Request") == "Yes").count() / total_bookings * 100

print("=" * 50)
print("KEY INSIGHTS SUMMARY")
print("=" * 50)
print(f"Total Revenue: £{total_revenue:,}")
print(f"Total Bookings: {total_bookings:,}")
print(f"On-Time Performance: {100 - delay_rate - cancellation_rate:.1f}%")
print(f"Delay Rate: {delay_rate:.1f}%")
print(f"Cancellation Rate: {cancellation_rate:.1f}%")
print(f"Refund Request Rate: {refund_rate:.1f}%")
print("=" * 50)