In [16]:
!pip install psycopg2-binary

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.10-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Downloading psycopg2_binary-2.9.10-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m10.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Installing collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.10
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [23]:
import requests
import json
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, lit
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, ArrayType
)
from pyspark.sql.functions import to_timestamp



ModuleNotFoundError: No module named 'utils'

In [2]:
BASE_URL = "http://irctc-connect-main:3001/api/train"

In [3]:
def get_train_info(train_number: str):
    """Fetch train info and route."""
    url = f"{BASE_URL}/trainInfo"
    params = {"trainNumber": train_number}
    try:
        response = requests.get(url, params=params, timeout=10)
        response.raise_for_status()
        data = response.json()

        print("\n===== TRAIN INFO =====")
        print(f"Train Number: {train_number}")
        if data.get("success"):
            train_info = data["data"].get("trainInfo", {})
            route = data["data"].get("route", [])
            print(f"Name        : {train_info.get('name', 'N/A')}")
            print(f"Type        : {train_info.get('type', 'N/A')}")
            print("\n--- ROUTE ---")
            for stop in route:
                print(f"  {stop}")
        else:
            print("Error:", data.get("error"))
        print("\n--- RAW RESPONSE ---")
        print(json.dumps(data, indent=4))
        return data
    except requests.exceptions.RequestException as e:
        print(f"Error fetching train info: {e}")
        return None

In [4]:
def track_train(train_number: str, date_str: str):
    url = f"{BASE_URL}/trackTrain"
    headers = {"Content-Type": "application/json"}
    payload = {
        "trainNumber": train_number,
        "date": date_str
    }
    try:
        response = requests.post(url, headers=headers, json=payload, timeout=10)
        response.raise_for_status()
        data = response.json()
        print("\n===== LIVE TRAIN STATUS =====")
        print(f"Train Number : {train_number}")
        print(f"Date         : {date_str}")
        print(f"Last Updated : {datetime.now().strftime('%d-%m-%Y %H:%M:%S')}")
        print("\n--- RAW RESPONSE ---")
        # print(json.dumps(data, indent=4))
        if isinstance(data, dict):
            if "current_station" in data:
                print("\nCurrent Station:", data["current_station"])
            if "status" in data:
                print("Status:", data["status"])
            if "upcoming_stations" in data:
                print("\nUpcoming Stations:")
                for stn in data["upcoming_stations"]:
                    print(f"  - {stn}")
        return data
    except requests.exceptions.RequestException as e:
        print(f"Error fetching train status: {e}")
        return None


In [5]:
def check_pnr_status(pnr: str):
    """Check PNR status."""
    url = f"{BASE_URL}/checkPNRStatus"
    params = {"pnr": pnr}
    try:
        response = requests.get(url, params=params, timeout=10)
        response.raise_for_status()
        data = response.json()

        print("\n===== PNR STATUS =====")
        print(f"PNR Number  : {pnr}")
        if data.get("success"):
            print("Train Name  :", data["data"]["train"]["name"])
            print("From        :", data["data"]["journey"]["from"]["name"])
            print("To          :", data["data"]["journey"]["to"]["name"])
            print("Status      :", data["data"]["status"])
        else:
            print("Error:", data.get("error"))
        print("\n--- RAW RESPONSE ---")
        print(json.dumps(data, indent=4))
        return data
    except requests.exceptions.RequestException as e:
        print(f"Error fetching PNR status: {e}")
        return None

In [6]:

def live_at_station(stn_code: str):
    """Get live upcoming trains at a station."""
    url = f"{BASE_URL}/liveAtStation"
    params = {"stnCode": stn_code}
    try:
        response = requests.get(url, params=params, timeout=10)
        response.raise_for_status()
        data = response.json()

        print("\n===== LIVE AT STATION =====")
        print(f"Station Code: {stn_code}")
        if data.get("success"):
            trains = data.get("data", [])
            print(f"Upcoming Trains: {len(trains)}")
            for t in trains:
                print(f"  Train: {t}")
        else:
            print("Error:", data.get("error"))
        print("\n--- RAW RESPONSE ---")
        print(json.dumps(data, indent=4))
        return data
    except requests.exceptions.RequestException as e:
        print(f"Error fetching live station data: {e}")
        return None

In [7]:
def track_train(train_number: str, date_str: str):
    """Track live train status."""
    url = f"{BASE_URL}/trackTrain"
    headers = {"Content-Type": "application/json"}
    payload = {"trainNumber": train_number, "date": date_str}
    try:
        response = requests.post(url, headers=headers, json=payload, timeout=10)
        response.raise_for_status()
        data = response.json()

        print("\n===== LIVE TRAIN STATUS =====")
        print(f"Train Number : {train_number}")
        print(f"Date         : {date_str}")
        print(f"Last Updated : {datetime.now().strftime('%d-%m-%Y %H:%M:%S')}")
        print(data)

        if isinstance(data, dict) and data.get("success"):
            print("Status:", data.get("data", {}).get("status", "N/A"))
        elif isinstance(data, dict):
            print("Error:", data.get("error"))

        print("\n--- RAW RESPONSE ---")
        print(json.dumps(data, indent=4))
        return data
    except requests.exceptions.RequestException as e:
        print(f"Error fetching train status: {e}")
        return None

In [8]:
def search_train_between_stations(from_stn_code: str, to_stn_code: str):
    """Search trains between two stations."""
    url = f"{BASE_URL}/searchTrainBetweenStations"
    headers = {"Content-Type": "application/json"}
    payload = {"fromStnCode": from_stn_code, "toStnCode": to_stn_code}
    try:
        response = requests.post(url, headers=headers, json=payload, timeout=10)
        response.raise_for_status()
        data = response.json()

        print("\n===== SEARCH BETWEEN STATIONS =====")
        print(f"From: {from_stn_code}  To: {to_stn_code}")
        if data.get("success"):
            trains = data.get("data", [])
            print(f"Found {len(trains)} trains")
        else:
            print("Error:", data.get("error"))
        print("\n--- RAW RESPONSE ---")
        # print(json.dumps(data, indent=4))
        return data
    except requests.exceptions.RequestException as e:
        print(f"Error searching trains: {e}")
        return None

In [9]:
from sqlalchemy import create_engine, MetaData
from sqlalchemy.exc import SQLAlchemyError

DATABASE_URL = "postgresql://postgres:iaCkmHPhuyhFLEBDGdwxQGGqlHvdgWJA@yamanote.proxy.rlwy.net:29855/railway"

def drop_all_tables():
    try:
        # Create DB engine
        engine = create_engine(DATABASE_URL)

        # Reflect existing tables
        metadata = MetaData()
        metadata.reflect(bind=engine)

        if not metadata.tables:
            print("No tables found in the database.")
            return

        print(f"Found tables: {list(metadata.tables.keys())}")

        # Drop all tables
        metadata.drop_all(bind=engine)
        print("✅ All tables dropped successfully.")

    except SQLAlchemyError as e:
        print(f"❌ Error while dropping tables: {e}")

if __name__ == "__main__":
    drop_all_tables()


Found tables: ['stations', 'trains']
✅ All tables dropped successfully.


In [10]:
spark = SparkSession.builder.appName("IndianRailwayTrainTracker").master("local[*]").config("spark.jars.packages", "org.postgresql:postgresql:42.7.4").getOrCreate()

In [19]:
spark

In [20]:
print(spark.sparkContext.getConf().getAll())

[('spark.driver.extraJavaOptions', '-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED'), ('spark.app.initial.jar.urls', 'spark://d1bd379c1917:33579/jars/org.checkerframework_checker-qual-3.42.0.jar,spark://d1bd379c1917:33579/jars/org.postgresql_postgresql-42.7.4.jar'), ('spark.repl.local.jars', 'file:///root/.ivy2/jars/org

In [11]:
stations_df = spark.read.format("json").option("multiline", "true").load("/home/jupyter/data/stations1.json")

In [12]:
stations_df.printSchema()

root
 |-- district: string (nullable = true)
 |-- division: string (nullable = true)
 |-- new_station_category: string (nullable = true)
 |-- old_station_category: string (nullable = true)
 |-- sno: string (nullable = true)
 |-- state: string (nullable = true)
 |-- station_code: string (nullable = true)
 |-- station_name: string (nullable = true)
 |-- zone: string (nullable = true)



In [13]:
jdbc_url = "jdbc:postgresql://yamanote.proxy.rlwy.net:29855/railway"
table_name = "stations"
db_properties = {
    "user": "postgres",
    "password": "iaCkmHPhuyhFLEBDGdwxQGGqlHvdgWJA",
    "driver": "org.postgresql.Driver"
}

In [14]:
stations_df_casted = stations_df \
    .withColumn("sno", col("sno").cast("int"))

In [15]:
jdbc_url = "jdbc:postgresql://yamanote.proxy.rlwy.net:29855/railway"
table_name = "stations"
db_properties = {
    "user": "postgres",
    "password": "iaCkmHPhuyhFLEBDGdwxQGGqlHvdgWJA",
    "driver": "org.postgresql.Driver"
}

In [17]:
print(stations_df_casted.count())
stations_df_unique = stations_df_casted.dropDuplicates(["station_code"])
print(stations_df_unique.count())

8194
4475


In [20]:
stations_df_no_sno = stations_df.drop("sno")

AnalysisException: Column 'created_at' does not exist. Did you mean one of the following? [state, district, station_name, zone, division, station_code, new_station_category, old_station_category];
'Project [district#0, division#1, new_station_category#2, old_station_category#3, state#5, station_code#6, station_name#7, zone#8, to_timestamp('created_at, Some(yyyy-MM-dd HH:mm:ss), TimestampType, Some(Etc/UTC)) AS created_at#194]
+- Project [district#0, division#1, new_station_category#2, old_station_category#3, state#5, station_code#6, station_name#7, zone#8]
   +- Relation [district#0,division#1,new_station_category#2,old_station_category#3,sno#4,state#5,station_code#6,station_name#7,zone#8] json


In [237]:
from pyspark.sql.functions import col
stations_df_unique.filter(col("station_name") == "HATKANANGALE").show()

+--------+--------+--------------------+--------------------+---+-----------+------------+------------+----+
|district|division|new_station_category|old_station_category|sno|      state|station_code|station_name|zone|
+--------+--------+--------------------+--------------------+---+-----------+------------+------------+----+
|KOLHAPUR|    PUNE|                NSG5|                   D|334|MAHARASHTRA|         HTK|HATKANANGALE|  CR|
+--------+--------+--------------------+--------------------+---+-----------+------------+------------+----+



In [130]:
stations_df_unique.filter(col('station_code') == 'CSMT').show()

+--------+--------+--------------------+--------------------+---+-----------+------------+---------------+----+
|district|division|new_station_category|old_station_category|sno|      state|station_code|   station_name|zone|
+--------+--------+--------------------+--------------------+---+-----------+------------+---------------+----+
|  MUMBAI|    CSTM|                NSG1|                  A1|130|MAHARASHTRA|        CSMT|C SHIVAJI MAH T|  CR|
+--------+--------+--------------------+--------------------+---+-----------+------------+---------------+----+



In [124]:
final_stations_df = stations_df_unique.select("station_code", "district", "division", "new_station_category", "old_station_category", "state", "station_name", "zone")
final_stations_df.write \
    .jdbc(url=jdbc_url, table=table_name, mode="append", properties=db_properties)

In [176]:
train_data = search_train_between_stations("KYN", "DR").get("data")


===== SEARCH BETWEEN STATIONS =====
From: KYN  To: DR
Found 138 trains

--- RAW RESPONSE ---


In [97]:
schema = StructType([
    StructField("train_no", StringType(), True),
    StructField("train_name", StringType(), True),
    StructField("source_stn_name", StringType(), True),
    StructField("source_stn_code", StringType(), True),
    StructField("dstn_stn_name", StringType(), True),
    StructField("dstn_stn_code", StringType(), True),
    StructField("from_stn_name", StringType(), True),
    StructField("from_stn_code", StringType(), True),
    StructField("to_stn_name", StringType(), True),
    StructField("to_stn_code", StringType(), True),
    StructField("from_time", StringType(), True),
    StructField("to_time", StringType(), True),
    StructField("travel_time", StringType(), True),
    StructField("running_days", ArrayType(IntegerType()), True),
    StructField("distance", StringType(), True),
    StructField("halts", IntegerType(), True),
])

In [98]:
trains_df = spark.createDataFrame(train_data)

In [99]:
def running_days_to_array(rd_str):
    if rd_str is None:
        return []
    return [int(ch) for ch in rd_str]

udf_running_days = udf(running_days_to_array, ArrayType(IntegerType()))

In [100]:
df = trains_df.withColumn("running_days_array", udf_running_days(col("running_days"))).drop("running_days") \
    .withColumnRenamed("running_days_array", "running_days")

In [101]:
df = df.withColumn("halts", col("halts").cast(IntegerType()))

In [108]:
trains_df = df.select("distance", "dstn_stn_code", "from_time", "halts", "source_stn_code", "to_time", "train_name", "train_no", "travel_time", "running_days") 

In [107]:
"""
{
            "train_no": "01028",
            "train_name": "GKP DR SPL",
            "source_stn_name": "Mau Jn",
            "source_stn_code": "MAU",
            "dstn_stn_name": "Dadar",
            "dstn_stn_code": "DR",
            "from_stn_name": "Kalyan Jn",
            "from_stn_code": "KYN",
            "to_stn_name": "Dadar",
            "to_stn_code": "DR",
            "from_time": "02:43",
            "to_time": "03:30",
            "travel_time": "00:47 hrs",
            "running_days": "1011010",
            "distance": "47",
            "halts": 0
        },
        {
            "train_no": "01026",
            "train_name": "BUI DR SPL",
            "source_stn_name": "Ballia",
            "source_stn_code": "BUI",
            "dstn_stn_name": "Dadar",
            "dstn_stn_code": "DR",
            "from_stn_name": "Kalyan Jn",
            "from_stn_code": "KYN",
            "to_stn_name": "Dadar",
            "to_stn_code": "DR",
            "from_time": "02:43",
            "to_time": "03:35",
            "travel_time": "00:52 hrs",
            "running_days": "0100101",
            "distance": "44",
            "halts": 0
        }
"""

'\n{\n            "train_no": "01028",\n            "train_name": "GKP DR SPL",\n            "source_stn_name": "Mau Jn",\n            "source_stn_code": "MAU",\n            "dstn_stn_name": "Dadar",\n            "dstn_stn_code": "DR",\n            "from_stn_name": "Kalyan Jn",\n            "from_stn_code": "KYN",\n            "to_stn_name": "Dadar",\n            "to_stn_code": "DR",\n            "from_time": "02:43",\n            "to_time": "03:30",\n            "travel_time": "00:47 hrs",\n            "running_days": "1011010",\n            "distance": "47",\n            "halts": 0\n        },\n        {\n            "train_no": "01026",\n            "train_name": "BUI DR SPL",\n            "source_stn_name": "Ballia",\n            "source_stn_code": "BUI",\n            "dstn_stn_name": "Dadar",\n            "dstn_stn_code": "DR",\n            "from_stn_name": "Kalyan Jn",\n            "from_stn_code": "KYN",\n            "to_stn_name": "Dadar",\n            "to_stn_code": "DR",\n   

In [109]:
trains_df.show(5)

+--------+-------------+---------+-----+---------------+-------+---------------+--------+-----------+--------------------+
|distance|dstn_stn_code|from_time|halts|source_stn_code|to_time|     train_name|train_no|travel_time|        running_days|
+--------+-------------+---------+-----+---------------+-------+---------------+--------+-----------+--------------------+
|      47|           DR|    02:43|    0|            MAU|  03:30|     GKP DR SPL|   01028|  00:47 hrs|[1, 0, 1, 1, 0, 1...|
|      44|           DR|    02:43|    0|            BUI|  03:35|     BUI DR SPL|   01026|  00:52 hrs|[0, 1, 0, 0, 1, 0...|
|      44|         CSMT|    03:10|    0|            HWH|  03:52|  HWH CSMT MAIL|   12810|  00:42 hrs|[1, 1, 1, 1, 1, 1...|
|      44|           DR|    03:22|    1|           SNSI|  04:30|SAINAGAR DR EXP|   12132|  01:08 hrs|[1, 0, 1, 0, 1, 0...|
|      44|         CSMT|    03:28|    0|            HYB|  04:15|HUSSAINSAGAR SF|   12702|  00:47 hrs|[1, 1, 1, 1, 1, 1...|
+--------+------

In [110]:
trains_df.printSchema()

root
 |-- distance: string (nullable = true)
 |-- dstn_stn_code: string (nullable = true)
 |-- from_time: string (nullable = true)
 |-- halts: integer (nullable = true)
 |-- source_stn_code: string (nullable = true)
 |-- to_time: string (nullable = true)
 |-- train_name: string (nullable = true)
 |-- train_no: string (nullable = true)
 |-- travel_time: string (nullable = true)
 |-- running_days: array (nullable = true)
 |    |-- element: integer (containsNull = true)



In [None]:
final_df = df.join(stations_df_unique, stations_df_unique['station_code'] == trains_df['source_stn_code'], how = 'left_anti')

print(final_df.count())

final_df.show()

In [165]:
remaining_stations_df = final_df.select("source_stn_name", "source_stn_code")
remaining_stations_df = remaining_stations_df

col = ["district","division","new_station_category","old_station_category", "state", "zone"]
for c in col:
    remaining_stations_df= remaining_stations_df.withColumn(c, lit("Unknown"))

remaining_stations_df=remaining_stations_df.withColumnRenamed("source_stn_name", "station_name").withColumnRenamed("source_stn_code", "station_code").distinct()
remaining_stations_df=remaining_stations_df.filter(~(remaining_stations_df.station_code == "HWH"))
remaining_stations_df.show()
stations_df_unique.show()

+--------------------+------------+--------+--------+--------------------+--------------------+-------+-------+
|        station_name|station_code|district|division|new_station_category|old_station_category|  state|   zone|
+--------------------+------------+--------+--------+--------------------+--------------------+-------+-------+
|          Asansol Jn|         ASN| Unknown| Unknown|             Unknown|             Unknown|Unknown|Unknown|
|              Ballia|         BUI| Unknown| Unknown|             Unknown|             Unknown|Unknown|Unknown|
|      Chennai Egmore|          MS| Unknown| Unknown|             Unknown|             Unknown|Unknown|Unknown|
|         Tirunelveli|         TEN| Unknown| Unknown|             Unknown|             Unknown|Unknown|Unknown|
|              Mau Jn|         MAU| Unknown| Unknown|             Unknown|             Unknown|Unknown|Unknown|
|C Sahumaharaj T K...|         KOP| Unknown| Unknown|             Unknown|             Unknown|Unknown|U

In [173]:
stations_df = spark.read.jdbc(
    url=jdbc_url,
    table=table_name,
    properties=db_properties
)

remaining_stations_df = remaining_stations_df.join(stations_df, stations_df.station_code == remaining_stations_df.station_code, "left_anti")
remaining_stations_df.show()

stations_df.filter(stations_df.station_code == "BSB").show()

+--------------------+------------+--------+--------+--------------------+--------------------+-------+-------+
|        station_name|station_code|district|division|new_station_category|old_station_category|  state|   zone|
+--------------------+------------+--------+--------+--------------------+--------------------+-------+-------+
|          Asansol Jn|         ASN| Unknown| Unknown|             Unknown|             Unknown|Unknown|Unknown|
|         Varanasi Jn|         BSB| Unknown| Unknown|             Unknown|             Unknown|Unknown|Unknown|
|              Ballia|         BUI| Unknown| Unknown|             Unknown|             Unknown|Unknown|Unknown|
|            Jabalpur|         JBP| Unknown| Unknown|             Unknown|             Unknown|Unknown|Unknown|
|         Lingampalli|         LPI| Unknown| Unknown|             Unknown|             Unknown|Unknown|Unknown|
|     Mgr Chennai Ctr|         MAS| Unknown| Unknown|             Unknown|             Unknown|Unknown|U

In [141]:
final_df.withColumn(select("source_stn_code").show()

+---------------+
|source_stn_code|
+---------------+
|            MAU|
|            BUI|
|            HWH|
|             MS|
|            TEN|
|            ASN|
|            LPI|
|            KOP|
|            BSB|
|            MAS|
|            JBP|
|           REWA|
|            HWH|
|            TVC|
|            NCJ|
|            NCJ|
|            KOP|
|            SBC|
|            HWH|
|            HWH|
+---------------+



In [140]:
stations_df_unique.columns

['district',
 'division',
 'new_station_category',
 'old_station_category',
 'sno',
 'state',
 'station_code',
 'station_name',
 'zone']

In [180]:
trains_df= trains_df.distinct()
print(trains_df.count())

138


In [181]:
trains_df.write \
    .jdbc(url=jdbc_url, table="trains", mode="append", properties=db_properties)

In [187]:
trains_df.printSchema()

trains_df.show()

root
 |-- distance: string (nullable = true)
 |-- dstn_stn_code: string (nullable = true)
 |-- from_time: string (nullable = true)
 |-- halts: integer (nullable = true)
 |-- source_stn_code: string (nullable = true)
 |-- to_time: string (nullable = true)
 |-- train_name: string (nullable = true)
 |-- train_no: string (nullable = true)
 |-- travel_time: string (nullable = true)
 |-- running_days: array (nullable = true)
 |    |-- element: integer (containsNull = true)

+--------+-------------+---------+-----+---------------+-------+----------------+--------+-----------+--------------------+
|distance|dstn_stn_code|from_time|halts|source_stn_code|to_time|      train_name|train_no|travel_time|        running_days|
+--------+-------------+---------+-----+---------------+-------+----------------+--------+-----------+--------------------+
|      44|           DR|    03:22|    1|           SNSI|  04:30| SAINAGAR DR EXP|   12132|  01:08 hrs|[1, 0, 1, 0, 1, 0...|
|      44|           DR|    04:

In [248]:
live_status = track_train("17412", "12-08-2025").get("data")


===== LIVE TRAIN STATUS =====
Train Number : 17412
Date         : 12-08-2025
Last Updated : 12-08-2025 16:25:33

--- RAW RESPONSE ---


In [250]:
for i, v in enumerate(live_status):
    if v.get("status") == "crossed":
        if live_status[i+1].get("status") == "upcoming":
            last_crossed = v

In [251]:
print(json.dumps(live_status, indent=2))

[
  {
    "index": 0,
    "station": "C SHAHUMHARAJ T",
    "arr": "",
    "dep": "21:01",
    "delay": " 6 min",
    "status": "crossed",
    "current": "false"
  },
  {
    "index": 1,
    "station": "Hatkanagale",
    "arr": "21:21",
    "dep": "21:24",
    "delay": " 4 min",
    "status": "crossed",
    "current": "false"
  },
  {
    "index": 2,
    "station": "Jayasingpur",
    "arr": "21:39",
    "dep": "21:40",
    "delay": " 5 min",
    "status": "crossed",
    "current": "true"
  },
  {
    "index": 3,
    "station": "Miraj Jn",
    "arr": "22:06",
    "dep": "22:11",
    "delay": " 16 min",
    "status": "upcoming",
    "current": "false"
  },
  {
    "index": 4,
    "station": "Sangli",
    "arr": "22:27",
    "dep": "22:30",
    "delay": " 20 min",
    "status": "upcoming",
    "current": "false"
  },
  {
    "index": 5,
    "station": "Kirloskarvadi",
    "arr": "22:57",
    "dep": "22:59",
    "delay": " 19 min",
    "status": "upcoming",
    "current": "false"
  },
  {


In [252]:
train_live_status_df = spark.createDataFrame([last_crossed])

In [253]:
train_live_status_df.printSchema()

root
 |-- arr: string (nullable = true)
 |-- current: string (nullable = true)
 |-- delay: string (nullable = true)
 |-- dep: string (nullable = true)
 |-- index: long (nullable = true)
 |-- station: string (nullable = true)
 |-- status: string (nullable = true)



In [259]:
def extract_int_from_delay(delay_str):
    if delay_str is None:
        return None
    try:
        return int(''.join(filter(str.isdigit, delay_str)))
    except ValueError:
        return None


print(extract_int_from_delay("gysdf52min"))

52


In [255]:
extract_int_udf = udf(extract_int_from_delay, IntegerType())

# train_live_status_df = train_live_status_df.withColumn("delay", udf(extract_int_from_delay, IntegerType()))
train_live_status_df = train_live_status_df.withColumn("delay", extract_int_udf(train_live_status_df["delay"]))

train_live_status_df.show()

+-----+-------+-----+-----+-----+-----------+-------+
|  arr|current|delay|  dep|index|    station| status|
+-----+-------+-----+-----+-----+-----------+-------+
|21:39|   true|    5|21:40|    2|Jayasingpur|crossed|
+-----+-------+-----+-----+-----+-----------+-------+

