In [1]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [2]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Project").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [3]:
from pyspark.sql.functions import col, sum, round ,split

In [4]:
df = spark.read.csv("schedules.csv",header=True,inferSchema=True)

In [5]:
df.show(5)

+-----------+----------------+-----------+---------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------------+--------------------+
|trainNumber|       trainName|stationFrom|stationTo|trainRunsOnMon|trainRunsOnTue|trainRunsOnWed|trainRunsOnThu|trainRunsOnFri|trainRunsOnSat|trainRunsOnSun|           timeStamp|         stationList|
+-----------+----------------+-----------+---------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------------+--------------------+
|        961|VALLEY QUEEN SPL|         MJ|     MJMG|             Y|             Y|             N|             Y|             Y|             N|             N|2023-10-15 13:04:...|[{'stationCode': ...|
|       1027|  DR GKP SPECIAL|         DR|      GKP|             N|             Y|             N|             Y|             N|             Y|             Y|2023-10-15 13:04:...|[{'stationCode': ...|


In [6]:
df.count()

3292

In [7]:
df.columns

['trainNumber',
 'trainName',
 'stationFrom',
 'stationTo',
 'trainRunsOnMon',
 'trainRunsOnTue',
 'trainRunsOnWed',
 'trainRunsOnThu',
 'trainRunsOnFri',
 'trainRunsOnSat',
 'trainRunsOnSun',
 'timeStamp',
 'stationList']

In [8]:
df=df.drop("stationList")

In [9]:
df.columns

['trainNumber',
 'trainName',
 'stationFrom',
 'stationTo',
 'trainRunsOnMon',
 'trainRunsOnTue',
 'trainRunsOnWed',
 'trainRunsOnThu',
 'trainRunsOnFri',
 'trainRunsOnSat',
 'trainRunsOnSun',
 'timeStamp']

In [10]:
df.show(4)

+-----------+----------------+-----------+---------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------------+
|trainNumber|       trainName|stationFrom|stationTo|trainRunsOnMon|trainRunsOnTue|trainRunsOnWed|trainRunsOnThu|trainRunsOnFri|trainRunsOnSat|trainRunsOnSun|           timeStamp|
+-----------+----------------+-----------+---------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------------+
|        961|VALLEY QUEEN SPL|         MJ|     MJMG|             Y|             Y|             N|             Y|             Y|             N|             N|2023-10-15 13:04:...|
|       1027|  DR GKP SPECIAL|         DR|      GKP|             N|             Y|             N|             Y|             N|             Y|             Y|2023-10-15 13:04:...|
|       1065| DADAR DHULE SPL|         DR|      DHI|             Y|             N|             N|        

In [11]:
new_column = {
    "trainRunsOnMon": "Monday",
    "trainRunsOnTue": "Tuesday",
    "trainRunsOnWed": "Wednesday",
    "trainRunsOnThu": "Thursday",
    "trainRunsOnFri": "Friday",
    "trainRunsOnSat": "Saturday",
    "trainRunsOnSun": "Sunday"
}

for old_name, new_name in new_column.items():
    df = df.withColumnRenamed(old_name, new_name)

In [12]:
df.show(4)

+-----------+----------------+-----------+---------+------+-------+---------+--------+------+--------+------+--------------------+
|trainNumber|       trainName|stationFrom|stationTo|Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday|           timeStamp|
+-----------+----------------+-----------+---------+------+-------+---------+--------+------+--------+------+--------------------+
|        961|VALLEY QUEEN SPL|         MJ|     MJMG|     Y|      Y|        N|       Y|     Y|       N|     N|2023-10-15 13:04:...|
|       1027|  DR GKP SPECIAL|         DR|      GKP|     N|      Y|        N|       Y|     N|       Y|     Y|2023-10-15 13:04:...|
|       1065| DADAR DHULE SPL|         DR|      DHI|     Y|      N|        N|       N|     Y|       N|     Y|2023-10-15 13:05:...|
|       1066| DHULE DADAR SPL|        DHI|       DR|     Y|      Y|        N|       N|     N|       Y|     N|2023-10-15 13:05:...|
+-----------+----------------+-----------+---------+------+-------+---------+------

In [13]:
df_split = df.withColumn("date", split(col("timestamp"), " ")[0]) \
                .withColumn("time", split(col("timestamp"), " ")[1])

In [14]:
df_split.select("date","time").show(4)

+----------+------------+
|      date|        time|
+----------+------------+
|2023-10-15|13:04:05.392|
|2023-10-15|13:04:36.625|
|2023-10-15|13:05:16.833|
|2023-10-15|13:05:17.545|
+----------+------------+
only showing top 4 rows



In [15]:
df_split.columns

['trainNumber',
 'trainName',
 'stationFrom',
 'stationTo',
 'Monday',
 'Tuesday',
 'Wednesday',
 'Thursday',
 'Friday',
 'Saturday',
 'Sunday',
 'timeStamp',
 'date',
 'time']

In [16]:
df_split = df_split.drop("timeStamp")

In [17]:
col_name = df_split.columns

In [18]:
df_capital = df_split.select([col(col_name).alias(col_name.capitalize()) for col_name in df_split.columns])

In [19]:
df_capital.show(5)

+-----------+----------------+-----------+---------+------+-------+---------+--------+------+--------+------+----------+------------+
|Trainnumber|       Trainname|Stationfrom|Stationto|Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday|      Date|        Time|
+-----------+----------------+-----------+---------+------+-------+---------+--------+------+--------+------+----------+------------+
|        961|VALLEY QUEEN SPL|         MJ|     MJMG|     Y|      Y|        N|       Y|     Y|       N|     N|2023-10-15|13:04:05.392|
|       1027|  DR GKP SPECIAL|         DR|      GKP|     N|      Y|        N|       Y|     N|       Y|     Y|2023-10-15|13:04:36.625|
|       1065| DADAR DHULE SPL|         DR|      DHI|     Y|      N|        N|       N|     Y|       N|     Y|2023-10-15|13:05:16.833|
|       1066| DHULE DADAR SPL|        DHI|       DR|     Y|      Y|        N|       N|     N|       Y|     N|2023-10-15|13:05:17.545|
|       1127|     LTT BPQ SPL|        LTT|      BPQ|     N|   

In [20]:
df_capital.printSchema()

root
 |-- Trainnumber: integer (nullable = true)
 |-- Trainname: string (nullable = true)
 |-- Stationfrom: string (nullable = true)
 |-- Stationto: string (nullable = true)
 |-- Monday: string (nullable = true)
 |-- Tuesday: string (nullable = true)
 |-- Wednesday: string (nullable = true)
 |-- Thursday: string (nullable = true)
 |-- Friday: string (nullable = true)
 |-- Saturday: string (nullable = true)
 |-- Sunday: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)



In [24]:
import pandas as pd

# Convert PySpark DataFrame to Pandas DataFrame
pandas_df = df_capital.toPandas()

# Specify the output path
output_path = "/home/talentum/shared/Project/Train Schedules1.csv"

# Write Pandas DataFrame to CSV file with header
pandas_df.to_csv(output_path, header=True, index=False)

In [25]:
df_capital.printSchema()

root
 |-- Trainnumber: integer (nullable = true)
 |-- Trainname: string (nullable = true)
 |-- Stationfrom: string (nullable = true)
 |-- Stationto: string (nullable = true)
 |-- Monday: string (nullable = true)
 |-- Tuesday: string (nullable = true)
 |-- Wednesday: string (nullable = true)
 |-- Thursday: string (nullable = true)
 |-- Friday: string (nullable = true)
 |-- Saturday: string (nullable = true)
 |-- Sunday: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)



In [26]:
mysql_url = "jdbc:mysql://127.0.0.1:3306/project?useSSL=false&allowPublicKeyRetrieval=true"
conn_prop={
    
    "user":"bigdata",
    "password":"Bigdata@123",
    "driver":"com.mysql.jdbc.Driver"
}

table_name="train_schedule"

df_capital.write.jdbc(url=mysql_url,table=table_name,mode="overwrite",properties=conn_prop)