In [30]:
# Import SparkConf class into program
from pyspark import SparkConf

# local[*]: run Spark in local mode with as many working processors as logical cores on your machine
# If we want Spark to run locally with 'k' worker threads, we can specify as "local[k]".
master = "local[*]"
# The `appName` field is a name to be shown on the Spark cluster UI page
app_name = "Data processing"
# Setup configuration parameters for Spark
spark_conf = SparkConf().setMaster(master).setAppName(app_name)

# Import SparkContext and SparkSession classes
from pyspark import SparkContext # Spark
from pyspark.sql import SparkSession # Spark SQL

# Method 1: Using SparkSession
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')

In [31]:
# read files
df_node = spark.read.csv("NODE.csv",header=True).repartition(4)
df_loc = spark.read.csv("ACCIDENT_LOCATION.csv",header=True).repartition(4)
df_events = spark.read.csv("ACCIDENT_EVENT.csv",header=True).repartition(4)


In [32]:
# Join node and location
df_loc = df_loc.withColumnRenamed("ACCIDENT_NO","id")
df_temp = df_node.join(df_loc,df_node.ACCIDENT_NO==df_loc.id)
df_temp = df_temp.drop("id")

In [33]:
# Join temp and events
df_events = df_events.withColumnRenamed("ACCIDENT_NO","id")
df_t = df_temp.join(df_events,df_temp.ACCIDENT_NO==df_events.id)
df_t.show(10)

[Stage 74:>                                                         (0 + 3) / 3]

+------------+-----------+------------+-----------+-----------+----------+---------+------------+--------------------+
| ACCIDENT_NO|   LGA_NAME|         Lat|       Long|POSTCODE_NO| ROAD_NAME|ROAD_TYPE|          id|     Event Type Desc|
+------------+-----------+------------+-----------+-----------+----------+---------+------------+--------------------+
|T20090026133|STONNINGTON|-37.85465868|145.0522587|       3146|   MALVERN|     ROAD|T20090026133|           Collision|
|T20160001873|    BANYULE|-37.68677281|145.1429409|       3089|     RYANS|     ROAD|T20160001873|           Collision|
|T20090026994|    WYNDHAM|-37.88485681|144.7002177|       3030|   PRINCES|  HIGHWAY|T20090026994|           Collision|
|T20170024322|  GLEN EIRA|-37.90751665|145.0690223|       3163|     NORTH|     ROAD|T20170024322|           Collision|
|T20150026265| SHEPPARTON|-36.37758557|145.4318101|       3631|    DOYLES|     ROAD|T20150026265|           Collision|
|T20160028004|    BAW BAW|-38.16140618| 145.9785

                                                                                

In [28]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

df_data = df_t.select('ACCIDENT_NO',F.substring(df_t.id, 2, 4).alias('Year'),df_t.LGA_NAME.alias('Area'),'Lat','Long','Event Type Desc','ROAD_NAME','ROAD_TYPE','POSTCODE_NO')
df_data.withColumn('Year',F.col('Year').cast(IntegerType()))
df_data = df_data.select('ACCIDENT_NO','Year','Area','Lat','Long','Event Type Desc','ROAD_NAME','ROAD_TYPE','POSTCODE_NO').filter(df_data.Year>2014)

df_data.show(10)


                                                                                

+------------+----+------------+------------+-----------+-------------------+--------------------+---------+-----------+
| ACCIDENT_NO|Year|        Area|         Lat|       Long|    Event Type Desc|           ROAD_NAME|ROAD_TYPE|POSTCODE_NO|
+------------+----+------------+------------+-----------+-------------------+--------------------+---------+-----------+
|T20160012885|2016|  BOROONDARA|-37.86425636|145.0865011|          Collision|                HIGH|   STREET|       3147|
|T20150023790|2015|     GEELONG|-38.20857249|144.3398647|          Collision|          SURF COAST|  HIGHWAY|       3216|
|T20190007447|2019|  WHITTLESEA|-37.67874172|144.9992489|          Collision|              EDGARS|     ROAD|       3074|
|T20160024678|2016|      MONASH|-37.87249794| 145.150876|          Collision|         HIGH STREET|     ROAD|       3150|
|T20170021878|2017|        HUME|-37.69010132|144.8979313|          Collision|             CARRICK|    DRIVE|       3043|
|T20180016385|2018| MARIBYRNONG|

In [35]:
#write data to json file
df_data.write \
 .save("traffic_data.json",format="json")

                                                                                