In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

In [2]:
#Spark cofiguration
spark = SparkSession.builder.master('local')\
                            .appName('spark')\
                            .getOrCreate()

In [3]:
def extract_from_csv(schema, csvfile):
    '''Takes csv file's name and schema, returns the dataframe'''
    df = spark.read.option("header", True)\
                .schema(schema)\
                .csv(csvfile, header=True)
    return df

In [4]:
from pyspark.sql.functions import col, rank 
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, IntegerType, StructType, StringType, StructField, TimestampType

In [5]:
sensor_schema = StructType([\
    StructField("sensor_id",IntegerType(),True),\
    StructField("sensor_description",StringType(),True),\
    StructField("sensor_name",StringType(),True),\
    StructField("installation_date",TimestampType(),True),\
    StructField("status",StringType(),True),\
    StructField("note",StringType(),True),\
    StructField("direction_1",StringType(),True),\
    StructField("direction_2",StringType(),True),\
    StructField("latitude",IntegerType(),True),\
    StructField("longitude",IntegerType(),True),\
    StructField("location",StringType(),True)\
    ])
pedestrian_counting_schema = StructType([\
    StructField("ID",IntegerType(),True),\
    StructField("Date_Time",StringType(),True),\
    StructField("Year",IntegerType(),True),\
    StructField("Month",StringType(),True),\
    StructField("Mdate",IntegerType(),True),\
    StructField("Day",StringType(),True),\
    StructField("Time",IntegerType(),True),\
    StructField("Sensor_ID",IntegerType(),True),\
    StructField("Sensor_Name",StringType(),True),\
    StructField("Hourly_Counts",IntegerType(),True),\
    ])    

In [6]:
sensor = extract_from_csv(sensor_schema,'./Pedestrian_Counting_System_-_Sensor_Locations.csv')
pedestrian_counting = extract_from_csv(pedestrian_counting_schema,'./Pedestrian_Counting_System_-_Monthly__counts_per_hour_.csv')

In [7]:
sensor.printSchema()

root
 |-- sensor_id: integer (nullable = true)
 |-- sensor_description: string (nullable = true)
 |-- sensor_name: string (nullable = true)
 |-- installation_date: timestamp (nullable = true)
 |-- status: string (nullable = true)
 |-- note: string (nullable = true)
 |-- direction_1: string (nullable = true)
 |-- direction_2: string (nullable = true)
 |-- latitude: integer (nullable = true)
 |-- longitude: integer (nullable = true)
 |-- location: string (nullable = true)



In [8]:
#Preprocessing data
from pyspark.sql.functions import to_date, date_format, to_csv
pedestrian_counting = pedestrian_counting.withColumn("Date_Time", to_date(col('Date_Time'), 'MMMM dd, yyyy hh:mm:ss a'))\
                                    .withColumn('Month', date_format(col('Date_Time'), 'M'))\
                                    .withColumn('Month', col('Month').cast('int'))\
                                    .withColumn('Hourly_Counts', col('Hourly_Counts').cast('int'))\
                                    .drop('Date_Time', 'Day', 'Time')

### TOP 10 (MOST PEDESTRIANS) LOCATIONS BY DAY ###

In [9]:
window = Window.partitionBy(['Year','Month', 'Mdate'])\
                            .orderBy(col('sum(Hourly_Counts)').desc())

top_by_day = pedestrian_counting.groupBy(['Year','Month','Mdate','Sensor_ID']).sum('Hourly_Counts')\
                    .select('*', rank().over(window).alias('rank'))\
                    .filter(col('rank') <= 10)\
                    .withColumnRenamed('sum(Hourly_Counts)', 'Daily_Counts')\
                    .drop('rank')\
                    .orderBy(col('Year').asc(), col('Month').asc(),col('Mdate').asc(), col('Daily_Counts').desc())

### TOP 10 (MOST PEDESTRIANS) LOCATIONS BY MONTH ###

In [10]:
window = Window.partitionBy(['Year','Month'])\
                            .orderBy(col('sum(Hourly_Counts)').desc())

top_by_month = pedestrian_counting.groupBy(['Year','Month','Sensor_ID']).sum('Hourly_Counts')\
                    .select('*', rank().over(window).alias('rank'))\
                    .filter(col('rank') <= 10)\
                    .withColumnRenamed('sum(Hourly_Counts)', 'Monthly_Counts')\
                    .drop('rank')\
                    .orderBy(col('Year').asc(), col('Month').asc(), col('Monthly_Counts').desc())

### MOST DECLINE PAST 2 YEARS ###

In [17]:
from pyspark.sql.functions import isnan, when, count


most_decline = pedestrian_counting[pedestrian_counting['Year'].isin([2020,2021])]\
                                            .groupBy(['Sensor_ID'])\
                                            .pivot('Year')\
                                            .sum('Hourly_Counts')\
                                            .orderBy(col('Sensor_ID').asc())\
                                            .withColumn('Coef', col('2020') - col('2021'))\
                                            .orderBy(col('Coef').desc())\
                                            .limit(1)


### MOST GROWTH LOCATION ###

In [16]:
most_growth = pedestrian_counting[pedestrian_counting['Year'].isin([2020,2021])]\
                                            .groupBy(['Sensor_ID'])\
                                            .pivot('Year')\
                                            .sum('Hourly_Counts')\
                                            .orderBy(col('Sensor_ID').asc())\
                                            .withColumn('Coef', col('2021') - col('2020'))\
                                            .orderBy(col('Coef').desc())\
                                            .limit(1)

In [18]:
import pandas as pd
with pd.ExcelWriter('./results.xlsx', engine="xlsxwriter") as writer:
    top_by_day.toPandas().to_excel(writer, sheet_name="Top_By_Day", index=False)
    top_by_month.toPandas().to_excel(writer, sheet_name="Top_By_Month", index=False)
    most_decline.toPandas().to_excel(writer, sheet_name="Most_Decline_Location", index=False)
    most_growth.toPandas().to_excel(writer, sheet_name="Most_Growth_Location", index=False)
