In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext, HiveContext
from pyspark.streaming import StreamingContext
from pyspark.conf import SparkConf

In [3]:
# spark.local.dir: default /tmp (overridden by SPARK_LOCAL_DIRS)
# spark.sql.warehouse.dir: default $PWD/spark-warehouse

spark = SparkSession.builder \
        .master("local") \
        .appName("sandbox") \
        .config("spark.local.dir", "/home/jovyan/spark_local_dir/") \
        .config("spark.sql.warehouse.dir", "/home/jovyan/spark-warehouse") \
        .config("spark.driver.extraJavaOptions", "-Dderby.stream.error.file=/home/jovyan") \
        .config("spark.driver.extraJavaOptions", "-Dderby.system.home=/home/jovyan/derby_sys") \
        .enableHiveSupport() \
        .getOrCreate()

spark

In [75]:
report_filename = 'report_daily_8188_31012020_31072020.json'

raw_df = spark.read.json(report_filename)

raw_df.printSchema()

root
 |-- 0 - 10 mph: string (nullable = true)
 |-- 0 - 520 cm: string (nullable = true)
 |-- 11 - 15 mph: string (nullable = true)
 |-- 1160+ cm: string (nullable = true)
 |-- 16 - 20 mph: string (nullable = true)
 |-- 21 - 25 mph: string (nullable = true)
 |-- 26 - 30 mph: string (nullable = true)
 |-- 31 - 35 mph: string (nullable = true)
 |-- 36 - 40 mph: string (nullable = true)
 |-- 41 - 45 mph: string (nullable = true)
 |-- 46 - 50 mph: string (nullable = true)
 |-- 51 - 55 mph: string (nullable = true)
 |-- 521 - 660 cm: string (nullable = true)
 |-- 56 - 60 mph: string (nullable = true)
 |-- 61 - 70 mph: string (nullable = true)
 |-- 661 - 1160 cm: string (nullable = true)
 |-- 71 - 80 mph: string (nullable = true)
 |-- 80+ mph: string (nullable = true)
 |-- Avg mph: string (nullable = true)
 |-- Report Date: string (nullable = true)
 |-- Site Name: string (nullable = true)
 |-- Time Interval: string (nullable = true)
 |-- Time Period Ending: string (nullable = true)
 |-- Tota

In [76]:
raw_df.show(1, vertical=True)

-RECORD 0---------------------------------
 0 - 10 mph         |                     
 0 - 520 cm         | 12                  
 11 - 15 mph        |                     
 1160+ cm           | 6                   
 16 - 20 mph        |                     
 21 - 25 mph        |                     
 26 - 30 mph        |                     
 31 - 35 mph        |                     
 36 - 40 mph        |                     
 41 - 45 mph        |                     
 46 - 50 mph        |                     
 51 - 55 mph        |                     
 521 - 660 cm       | 0                   
 56 - 60 mph        |                     
 61 - 70 mph        |                     
 661 - 1160 cm      | 1                   
 71 - 80 mph        |                     
 80+ mph            |                     
 Avg mph            | 63                  
 Report Date        | 2020-01-31T00:00:00 
 Site Name          | 7001/1              
 Time Interval      | 0                   
 Time Perio

In [77]:
columns = [
#     'Site Name',  # 7001/1
    'Report Date',
    'Time Interval',
    'Time Period Ending',
    '0 - 520 cm',
    '521 - 660 cm',
    '661 - 1160 cm',
    '1160+ cm',
    'Total Volume',
    'Avg mph'
]

report_df = raw_df.select(columns)

report_df.show()

+-------------------+-------------+------------------+----------+------------+-------------+--------+------------+-------+
|        Report Date|Time Interval|Time Period Ending|0 - 520 cm|521 - 660 cm|661 - 1160 cm|1160+ cm|Total Volume|Avg mph|
+-------------------+-------------+------------------+----------+------------+-------------+--------+------------+-------+
|2020-01-31T00:00:00|            0|          00:14:00|        12|           0|            1|       6|          19|     63|
|2020-01-31T00:00:00|            1|          00:29:00|         8|           1|            0|       5|          14|     54|
|2020-01-31T00:00:00|            2|          00:44:00|         0|           0|            0|       6|           6|     58|
|2020-01-31T00:00:00|            3|          00:59:00|         6|           0|            0|       3|           9|     59|
|2020-01-31T00:00:00|            4|          01:14:00|         7|           0|            0|       8|          15|     60|
|2020-01-31T00:0

In [78]:
# Cast types

from pyspark.sql.functions import col

cast_as_ints = [
    'Time Interval',
    '0 - 520 cm',
    '521 - 660 cm',
    '661 - 1160 cm',
    '1160+ cm',
    'Total Volume',
    'Avg mph'
]

for column in cast_as_ints:
    
    report_df = report_df.withColumn(column, col(column).cast('int'))


report_df.printSchema()

root
 |-- Report Date: string (nullable = true)
 |-- Time Interval: integer (nullable = true)
 |-- Time Period Ending: string (nullable = true)
 |-- 0 - 520 cm: integer (nullable = true)
 |-- 521 - 660 cm: integer (nullable = true)
 |-- 661 - 1160 cm: integer (nullable = true)
 |-- 1160+ cm: integer (nullable = true)
 |-- Total Volume: integer (nullable = true)
 |-- Avg mph: integer (nullable = true)



### Reading using a schema

In [116]:
from pyspark.sql.types import (
    StructType,
    StructField,
    IntegerType,
    StringType,
    TimestampType,
    DateType
)

schema = StructType([
    StructField('Site Name', StringType(), False),
    StructField('Report Date', TimestampType(), True),
    StructField('Time Period Ending', TimestampType(), True),
    StructField('Time Interval', StringType(), True),
    StructField('0 - 520 cm', StringType(), True),
    StructField('521 - 660 cm', StringType(), True),
    StructField('661 - 1160 cm', StringType(), True),
    StructField('1160+ cm', StringType(), True),
    StructField('Total Volume', StringType(), True)
])


from_schema_df = spark \
            .read \
            .schema(schema) \
            .json(report_filename)

from_schema_df.printSchema()

root
 |-- Site Name: string (nullable = true)
 |-- Report Date: timestamp (nullable = true)
 |-- Time Period Ending: timestamp (nullable = true)
 |-- Time Interval: string (nullable = true)
 |-- 0 - 520 cm: string (nullable = true)
 |-- 521 - 660 cm: string (nullable = true)
 |-- 661 - 1160 cm: string (nullable = true)
 |-- 1160+ cm: string (nullable = true)
 |-- Total Volume: string (nullable = true)



In [117]:
from_schema_df.show()

+---------+-------------------+-------------------+-------------+----------+------------+-------------+--------+------------+
|Site Name|        Report Date| Time Period Ending|Time Interval|0 - 520 cm|521 - 660 cm|661 - 1160 cm|1160+ cm|Total Volume|
+---------+-------------------+-------------------+-------------+----------+------------+-------------+--------+------------+
|   7001/1|2020-01-31 00:00:00|2021-05-06 00:14:00|            0|        12|           0|            1|       6|          19|
|   7001/1|2020-01-31 00:00:00|2021-05-06 00:29:00|            1|         8|           1|            0|       5|          14|
|   7001/1|2020-01-31 00:00:00|2021-05-06 00:44:00|            2|         0|           0|            0|       6|           6|
|   7001/1|2020-01-31 00:00:00|2021-05-06 00:59:00|            3|         6|           0|            0|       3|           9|
|   7001/1|2020-01-31 00:00:00|2021-05-06 01:14:00|            4|         7|           0|            0|       8|      

### Simple Aggergations

In [62]:
# small vehicles by hour of the day

from pyspark.sql.functions import to_timestamp, date_format


report_df \
    .withColumn('Hour', date_format(col("Time Period Ending"), "k").cast('int')) \
    .groupBy('Hour') \
    .sum('0 - 520 cm') \
    .orderBy('Hour') \
    .show(100) 

+----+---------------+
|Hour|sum(0 - 520 cm)|
+----+---------------+
|   1|           1967|
|   2|           1438|
|   3|           1778|
|   4|           4113|
|   5|          15748|
|   6|          47194|
|   7|          74069|
|   8|          71276|
|   9|          57701|
|  10|          62063|
|  11|          65850|
|  12|          70568|
|  13|          71711|
|  14|          78743|
|  15|          81461|
|  16|          92098|
|  17|          82934|
|  18|          56308|
|  19|          38152|
|  20|          27341|
|  21|          18723|
|  22|          14311|
|  23|           7849|
|  24|           3945|
+----+---------------+



In [51]:
# total volume by month

from pyspark.sql.functions import month

report_df \
    .withColumn('Month', month(report_df['Report Date'])) \
    .groupBy('Month') \
    .sum('Total Volume') \
    .orderBy('Month') \
    .show()

+-----+-----------------+
|Month|sum(Total Volume)|
+-----+-----------------+
|    1|            11172|
|    2|           296970|
|    3|           247328|
|    4|            96891|
|    5|           152918|
|    6|           218275|
|    7|           279321|
+-----+-----------------+



In [52]:
# total volume by day of the week

from pyspark.sql.functions import date_format
from pyspark.sql.functions import col


report_df \
    .withColumn("Week Day", date_format(col("Report Date"), "E")) \
    .groupBy('Week Day') \
    .sum('Total Volume') \
    .withColumnRenamed('Sum(Total Volume)', 'Total Volume') \
    .orderBy('Total Volume', ascending=False) \
    .show()


+--------+------------+
|Week Day|Total Volume|
+--------+------------+
|     Fri|      215643|
|     Thu|      205929|
|     Wed|      201701|
|     Tue|      198268|
|     Mon|      193474|
|     Sat|      150390|
|     Sun|      137470|
+--------+------------+

