In [1]:
from pyspark import SparkContext, SparkConf
cf = SparkConf()
cf.set("spark.submit.deployMode","client")
sc = SparkContext.getOrCreate(cf)
from pyspark.sql import SparkSession
spark = SparkSession \
	    .builder \
	    .appName("Python Spark SQL basic example") \
	    .config("spark.some.config.option", "some-value") \
	    .getOrCreate()
                            

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/21 04:18:54 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
23/04/21 04:18:54 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
23/04/21 04:18:54 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
23/04/21 04:18:54 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


In [5]:
import pyspark.sql.functions as F
from pyspark.sql.types import StructField, StructType, StringType, TimestampType, DateType
from pyspark.sql import Window


# Load the data into a Spark DataFrame
data = spark.read.csv(path='/shared/TransitTrends/MTA_RAW', header=True, inferSchema=True)

#Load the reference data for complex id's for stations
rcl=spark.read.csv('/shared/TransitTrends/rcl.csv' ,header=True)

# Clean the column names
data = data.select([F.col(column).alias(column.strip()) for column in data.columns])

# Combine DATE and TIME into a single column, and convert to timestamp
data = data.withColumn("DATE_TIME", F.to_timestamp(F.concat("DATE", "TIME"), "MM/dd/yyyyHH:mm:ss"))

# Remove duplicates
data = data.dropDuplicates(["C/A", "UNIT", "SCP", "STATION", "DATE_TIME"])

# Drop unnecessary columns
data = data.drop("DESC", "LINENAME", "DIVISION")

# Aggregate ENTRIES and EXITS by taking the maximum value for each group

w = Window.partitionBy("C/A", "UNIT", "SCP", "STATION", "DATE")
data = data.withColumn('LastTimes', F.max('DATE_TIME').over(w))\
    .where(F.col('DATE_TIME') == F.col('LastTimes'))\
    .drop('LastTimes')

# Merge with rcl DataFrame and drop duplicates
data = data.join(rcl.select("complex_id", "remote"), F.col("UNIT") == F.col("remote"), "left").drop("remote").dropDuplicates()

#Rename complex_id
data = data.withColumnRenamed("complex_id", "COMPLEX_ID")

#Convert DATE_TIME to DATE
data = data.withColumn("DATE", F.to_date(F.col("DATE_TIME")))

# Drop unnecessary columns
data = data.drop("TIME", "DATE_TIME")

                                                                                

In [6]:
#Add Prev_Entries and Prev_Exits columns to dataframe based on previous days entries and exits
w = Window.partitionBy("C/A", "UNIT", "SCP", "STATION").orderBy("DATE")
data = data.withColumn("PREV_DATE", F.lag(F.col("DATE"), 1).over(w))
data = data.withColumn("PREV_ENTRIES", F.lag(F.col("ENTRIES"), 1).over(w))
data = data.withColumn("PREV_EXITS", F.lag(F.col("EXITS"), 1).over(w))

In [7]:
#Fi;ter for data in 2019-2022, removing the last week of 2018 from the dataset
data = data.filter(F.year("DATE").isin([2019, 2020, 2021, 2022]))


In [9]:
#Calculating the daily Entries and Exits for each turnstile and setting limits to ensure bad values are filtered out
data = data.withColumn("DAILY_ENTRIES", F.col("ENTRIES") - F.col("PREV_ENTRIES"))
data = data.withColumn("DAILY_EXITS", F.col("EXITS") - F.col("PREV_EXITS"))

data = data.withColumn("DAILY_ENTRIES", F.when(F.col("DAILY_ENTRIES") < 0, 0)
                       .when(F.col("DAILY_ENTRIES") > 100000, 100000)
                       .otherwise(F.col("DAILY_ENTRIES")))
data = data.withColumn("DAILY_EXITS", F.when(F.col("DAILY_EXITS") < 0, 0)
                       .when(F.col("DAILY_EXITS") > 100000, 100000)
                       .otherwise(F.col("DAILY_EXITS")))


In [10]:

agg_data = data.groupBy("STATION", "COMPLEX_ID", "DATE").agg(F.sum("DAILY_ENTRIES").alias("ENTRIES"), F.sum("DAILY_EXITS").alias("EXITS")).orderBy("STATION","COMPLEX_ID", "DATE")


In [11]:
agg_data.show()



+-------+----------+----------+-------+-----+
|STATION|COMPLEX_ID|      DATE|ENTRIES|EXITS|
+-------+----------+----------+-------+-----+
|   1 AV|     119.0|2019-01-01|   9989|12097|
|   1 AV|     119.0|2019-01-02|  18476|21058|
|   1 AV|     119.0|2019-01-03|  19866|22531|
|   1 AV|     119.0|2019-01-04|  20389|23633|
|   1 AV|     119.0|2019-01-05|  13930|16661|
|   1 AV|     119.0|2019-01-06|  12021|13789|
|   1 AV|     119.0|2019-01-07|  19538|22308|
|   1 AV|     119.0|2019-01-08|  20414|22397|
|   1 AV|     119.0|2019-01-09|  21066|23373|
|   1 AV|     119.0|2019-01-10|  21178|23377|
|   1 AV|     119.0|2019-01-11|  21612|24818|
|   1 AV|     119.0|2019-01-12|  15137|18197|
|   1 AV|     119.0|2019-01-13|  12232|14431|
|   1 AV|     119.0|2019-01-14|  19474|22219|
|   1 AV|     119.0|2019-01-15|  20349|22619|
|   1 AV|     119.0|2019-01-16|  20866|23107|
|   1 AV|     119.0|2019-01-17|  21631|23915|
|   1 AV|     119.0|2019-01-18|  21097|24298|
|   1 AV|     119.0|2019-01-19|  1

                                                                                

In [12]:
#Write CSV
agg_data.write.csv("/shared/TransitTrends/MTA_CLEAN/daily_counts.csv")

AnalysisException: path hdfs://nyu-dataproc-m/shared/TransitTrends/MTA_CLEAN/daily_counts.csv already exists.