In [0]:
// Read the bus and subway CSV files
val busFilePath = "MTA_Bus_Hourly_Ridership__Beginning_February_2022_20240329.csv"
val subwayFilePath = "MTA_Subway_Hourly_Ridership__Beginning_February_2022_20240327.csv"

val busDf = spark.read
  .option("header", "true") 
  .option("multiLine", "true")
  .option("inferSchema", "true")
  .option("escape", "\"")
  .csv(busFilePath)

val subwayDf = spark.read
  .option("header", "true") 
  .option("multiLine", "true")
  .option("inferSchema", "true")
  .option("escape", "\"")
  .csv(subwayFilePath)

// Print the bus and subway DataFrames
println("Bus DataFrame:")
z.show(busDf)

println("Subway DataFrame:")
z.show(subwayDf)

In [1]:
// Select specific columns from the bus and subway DataFrames
val BusbaseDF = busDf.select(
  "transit_timestamp",
  "ridership")

val SubwaybaseDF = subwayDf.select(
  "transit_timestamp",
  "ridership")
  
// Cache and count the rows in the bus and subway DataFrames
BusbaseDF.cache().count
SubwaybaseDF.cache().count

In [2]:
z.show(BusbaseDF)
z.show(SubwaybaseDF)

In [3]:
z.show(BusbaseDF.describe())
z.show(SubwaybaseDF.describe())

In [4]:
// Print the schema of the bus DataFrame
BusbaseDF.printSchema

In [5]:
import org.apache.spark.sql.types._
// Convert the transit_timestamp column from string to timestamp format
val timestampColumns = for (x <- BusbaseDF.schema.fields if x.dataType == StringType && x.name == "transit_timestamp") yield x.name

var convertedBusDF = BusbaseDF

for (c <- timestampColumns)
  convertedBusDF = convertedBusDF.withColumn(c, to_timestamp(col(c), "MM/dd/yyyy hh:mm:00 a"))

z.show(convertedBusDF,10)

val subwayTimestampColumns = for (x <- SubwaybaseDF.schema.fields if x.dataType == StringType && x.name == "transit_timestamp") yield x.name

var convertedSubwayDF = SubwaybaseDF

for (c <- subwayTimestampColumns)
  convertedSubwayDF = convertedSubwayDF.withColumn(c, to_timestamp(col(c), "MM/dd/yyyy hh:mm:00 a"))

z.show(convertedSubwayDF,10)

In [6]:
z.show(BusbaseDF.describe())
z.show(SubwaybaseDF.describe())

In [7]:
// Find the minimum timestamp in the bus and subway DataFrames
val busMinTimestamp = convertedBusDF.agg(min("transit_timestamp")).first().getTimestamp(0)
println(s"Bus data min timestamp: $busMinTimestamp")

val subwayMinTimestamp = convertedSubwayDF.agg(min("transit_timestamp")).first().getTimestamp(0)
println(s"Subway data min timestamp: $subwayMinTimestamp")

In [8]:
// Find the maximum timestamp in the bus and subway DataFrames
val busMaxTimestamp = convertedBusDF.agg(max("transit_timestamp")).first().getTimestamp(0)
println(s"Bus data max timestamp: $busMaxTimestamp")

val subwayMaxTimestamp = convertedSubwayDF.agg(max("transit_timestamp")).first().getTimestamp(0)  
println(s"Subway data max timestamp: $subwayMaxTimestamp")

In [9]:
convertedBusDF.filter($"ridership" === 0).count

In [10]:
convertedSubwayDF.filter($"ridership" === 0).count

In [11]:
// Check for duplicate timestamps in the bus and subway DataFrames
val busDuplicates = convertedBusDF.groupBy("transit_timestamp").count().filter("count > 1")
println("Bus data duplicates:")
z.show(busDuplicates)

val subwayDuplicates = convertedSubwayDF.groupBy("transit_timestamp").count().filter("count > 1")
println("Subway data duplicates:")
z.show(subwayDuplicates)

In [12]:
// Merge the ridership values for duplicate timestamps
val mergedBusDF = convertedBusDF.groupBy("transit_timestamp").agg(sum("ridership").alias("total_ridership"))
println("Merged bus data:")
z.show(mergedBusDF)

val mergedSubwayDF = convertedSubwayDF.groupBy("transit_timestamp").agg(sum("ridership").alias("total_ridership"))
println("Merged subway data:")
z.show(mergedSubwayDF)

In [13]:
// Write the cleaned bus and subway DataFrames to Parquet files
val outputBusPath = "bus-clean.parquet"

mergedBusDF.write.mode("overwrite").parquet(outputBusPath)

val outputSubwayPath = "subway-clean.parquet"

mergedSubwayDF.write.mode("overwrite").parquet(outputSubwayPath)