#### Import packages to establish spark connection

In [1]:
try:
    sc.stop()
except:
    pass
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc=SparkContext()
spark = SparkSession(sparkContext=sc)

#### Load Data

In [2]:
Sensor_Locationsdf = spark.read.csv(path='Pedestrian_Counting_System_-_Sensor_Locations.csv',
                        sep=',',
                        encoding='UTF-8',
                        comment=None,
                        header=True, 
                        inferSchema=True)

counts_per_hourdf = spark.read.csv(path='Pedestrian_Counting_System___2009_to_Present__counts_per_hour_.csv',
                        sep=',',
                        encoding='UTF-8',
                        comment=None,
                        header=True, 
                        inferSchema=True)

In [3]:
counts_per_hourdf.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Date_Time: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: string (nullable = true)
 |-- Mdate: integer (nullable = true)
 |-- Day: string (nullable = true)
 |-- Time: integer (nullable = true)
 |-- Sensor_ID: integer (nullable = true)
 |-- Sensor_Name: string (nullable = true)
 |-- Hourly_Counts: integer (nullable = true)



In [4]:
counts_per_hourdf.select('Date_Time').show()

+--------------------+
|           Date_Time|
+--------------------+
|11/01/2019 05:00:...|
|11/01/2019 05:00:...|
|11/01/2019 05:00:...|
|11/01/2019 05:00:...|
|11/01/2019 05:00:...|
|11/01/2019 05:00:...|
|11/01/2019 05:00:...|
|11/01/2019 05:00:...|
|11/01/2019 05:00:...|
|11/01/2019 05:00:...|
|11/01/2019 05:00:...|
|11/01/2019 05:00:...|
|11/01/2019 05:00:...|
|11/01/2019 05:00:...|
|11/01/2019 05:00:...|
|11/01/2019 05:00:...|
|11/01/2019 05:00:...|
|11/01/2019 05:00:...|
|11/01/2019 05:00:...|
|11/01/2019 05:00:...|
+--------------------+
only showing top 20 rows



#### Data Wrangling

In [5]:
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import unix_timestamp, from_unixtime

In [6]:
counts_per_hourdf = counts_per_hourdf.withColumn("Date_Time", 
                    to_timestamp(unix_timestamp(counts_per_hourdf.Date_Time, 'MM/dd/yyyy hh:mm:ss aa').cast("timestamp")))

In [7]:
counts_per_hourdf.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Date_Time: timestamp (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: string (nullable = true)
 |-- Mdate: integer (nullable = true)
 |-- Day: string (nullable = true)
 |-- Time: integer (nullable = true)
 |-- Sensor_ID: integer (nullable = true)
 |-- Sensor_Name: string (nullable = true)
 |-- Hourly_Counts: integer (nullable = true)



In [8]:
counts_per_hourdf.show()

+-------+-------------------+----+--------+-----+------+----+---------+--------------------+-------------+
|     ID|          Date_Time|Year|   Month|Mdate|   Day|Time|Sensor_ID|         Sensor_Name|Hourly_Counts|
+-------+-------------------+----+--------+-----+------+----+---------+--------------------+-------------+
|2887628|2019-11-01 17:00:00|2019|November|    1|Friday|  17|       34|Flinders St-Spark La|          300|
|2887629|2019-11-01 17:00:00|2019|November|    1|Friday|  17|       39|        Alfred Place|          604|
|2887630|2019-11-01 17:00:00|2019|November|    1|Friday|  17|       37|     Lygon St (East)|          216|
|2887631|2019-11-01 17:00:00|2019|November|    1|Friday|  17|       40|Lonsdale St-Sprin...|          627|
|2887632|2019-11-01 17:00:00|2019|November|    1|Friday|  17|       36|     Queen St (West)|          774|
|2887633|2019-11-01 17:00:00|2019|November|    1|Friday|  17|       29|St Kilda Rd-Alexa...|          644|
|2887634|2019-11-01 17:00:00|2019|Nov

#### Convert Datetime to a Datekey

In [9]:
from pyspark.sql.functions import date_format, col
counts_per_hourdf = counts_per_hourdf.withColumn("DateKey", date_format(col("Date_Time"), "yyyyMMdd"))
counts_per_hourdf.show()

+-------+-------------------+----+--------+-----+------+----+---------+--------------------+-------------+--------+
|     ID|          Date_Time|Year|   Month|Mdate|   Day|Time|Sensor_ID|         Sensor_Name|Hourly_Counts| DateKey|
+-------+-------------------+----+--------+-----+------+----+---------+--------------------+-------------+--------+
|2887628|2019-11-01 17:00:00|2019|November|    1|Friday|  17|       34|Flinders St-Spark La|          300|20191101|
|2887629|2019-11-01 17:00:00|2019|November|    1|Friday|  17|       39|        Alfred Place|          604|20191101|
|2887630|2019-11-01 17:00:00|2019|November|    1|Friday|  17|       37|     Lygon St (East)|          216|20191101|
|2887631|2019-11-01 17:00:00|2019|November|    1|Friday|  17|       40|Lonsdale St-Sprin...|          627|20191101|
|2887632|2019-11-01 17:00:00|2019|November|    1|Friday|  17|       36|     Queen St (West)|          774|20191101|
|2887633|2019-11-01 17:00:00|2019|November|    1|Friday|  17|       29|S

#### Creation of Dimensions and Facts

In [10]:
DimDateDF = counts_per_hourdf.select("DateKey","Year", "Month","Mdate")
DimDateDF = DimDateDF.dropDuplicates()
DimDateDF.count()


3963

In [11]:
FactPedCountPerSensorDF = counts_per_hourdf.select("ID", "DateKey","Date_Time", "Sensor_ID","Hourly_Counts")
FactPedCountPerSensorDF.count()

3048367

In [12]:

DimSensorsDF=Sensor_Locationsdf

In [13]:
DimSensorsDF.printSchema()

root
 |-- sensor_id: integer (nullable = true)
 |-- sensor_description: string (nullable = true)
 |-- sensor_name: string (nullable = true)
 |-- installation_date: string (nullable = true)
 |-- status: string (nullable = true)
 |-- note: string (nullable = true)
 |-- direction_1: string (nullable = true)
 |-- direction_2: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- location: string (nullable = true)



### Load- Data into S3 in an appropriate format for future querying

In [14]:
FactPedCountPerSensorDF.write.format("csv").mode("overwrite").save("FactPedCountPerSensor.csv",header=True)
DimSensorsDF.write.format("csv").mode("overwrite").save("DimSensors.csv",header=True)
DimDateDF.write.format("csv").mode("overwrite").save("DimDate.csv",header=True)

#### Querying for future Process

In [15]:
FactPedCountPerSensorDF.createGlobalTempView("PedCountPerSensor")
DimDateDF.createGlobalTempView("DimDate")
DimSensorsDF.createGlobalTempView("DimSensors")

In [17]:
#FactPedCountPerSensorDF.createGlobalTempView("PedCountPerSensor")

# Global temporary view is tied to a system preserved database `global_temp`
Top10LocPerDayDF = spark.sql('''
          with Top10PerDay as (
          SELECT 
          DateKey,
          Sensor_ID,
          SUM(Hourly_Counts) AS TotalCounts,
          ROW_NUMBER() OVER(Partition by DateKey ORDER BY SUM(Hourly_Counts) DESC) AS Ranked
          FROM global_temp.PedCountPerSensor
          GROUP BY
          DateKey,
          Sensor_ID)
          
          SELECT 
          DateKey,
          Que1.Sensor_ID,
          sensor_description,
          TotalCounts
          FROM 
          Top10PerDay Que1 INNER JOIN global_temp.DimSensors DS ON Que1.Sensor_ID = DS.sensor_id 
          WHERE  
          Ranked<=10
          
          ''')

In [18]:
Top10LocPerDayDF.show()

+--------+---------+--------------------+-----------+
| DateKey|Sensor_ID|  sensor_description|TotalCounts|
+--------+---------+--------------------+-----------+
|20100117|        4|    Town Hall (West)|      27633|
|20100117|        5|      Princes Bridge|      21484|
|20100117|        2|Bourke Street Mal...|      19931|
|20100117|        3|   Melbourne Central|      19100|
|20100117|        6|Flinders Street S...|      15517|
|20100117|        1|Bourke Street Mal...|      13202|
|20100117|        7|      Birrarung Marr|      10111|
|20100117|       15|       State Library|       9820|
|20100117|       16|Australia on Collins|       8619|
|20100117|       11|     Waterfront City|       6439|
|20100315|        4|    Town Hall (West)|      32713|
|20100315|        3|   Melbourne Central|      29207|
|20100315|       13|   Flagstaff Station|      26510|
|20100315|        6|Flinders Street S...|      26052|
|20100315|        5|      Princes Bridge|      24416|
|20100315|        2|Bourke S

In [19]:
Top10LocPerMonthDF = spark.sql('''
          with Top10PerMonth as (
          SELECT 
          DD.Year,
          DD.Month,
          Sensor_ID,
          SUM(Hourly_Counts) AS TotalCounts,
          ROW_NUMBER() OVER(Partition by DD.Year,DD.Month ORDER BY SUM(Hourly_Counts) DESC) AS Ranked
          FROM global_temp.PedCountPerSensor Cnt INNER JOIN 
          global_temp.DimDate DD on Cnt.DateKey = DD.DateKey
          GROUP BY
          DD.Year,
          DD.Month,
          Sensor_ID)
          
          SELECT 
          Year,
          Month,
          Que1.Sensor_ID,
          sensor_description,
          TotalCounts
          FROM 
          Top10PerMonth Que1 INNER JOIN global_temp.DimSensors DS ON Que1.Sensor_ID = DS.sensor_id 
          WHERE  
          Ranked<=10
          
          ''')

In [20]:
Top10LocPerMonthDF.show()

+----+--------+---------+--------------------+-----------+
|Year|   Month|Sensor_ID|  sensor_description|TotalCounts|
+----+--------+---------+--------------------+-----------+
|2012|February|        4|    Town Hall (West)|    1136505|
|2012|February|        3|   Melbourne Central|     797638|
|2012|February|        6|Flinders Street S...|     796421|
|2012|February|        2|Bourke Street Mal...|     795730|
|2012|February|        1|Bourke Street Mal...|     693906|
|2012|February|        5|      Princes Bridge|     629101|
|2012|February|       13|   Flagstaff Station|     566731|
|2012|February|       15|       State Library|     548209|
|2012|February|       16|Australia on Collins|     501062|
|2012|February|       17|Collins Place (So...|     350724|
|2011|November|        4|    Town Hall (West)|    1080001|
|2011|November|        2|Bourke Street Mal...|     927147|
|2011|November|        6|Flinders Street S...|     827084|
|2011|November|        3|   Melbourne Central|     79643

##### Above results can be send to database or s3 using glue 