### ETL of raw data into clean data

In [37]:
import json
import re

In [10]:
with open('./data/LAPD_Crime_and_Collision_Raw_Data.json') as crime_json:    
    crime_json = json.load(crime_json)
    crime_data = crime_json['data']
    print len(crime_data)

228017


In [44]:
for crime_row in crime_data:
    crime_dict = {}
    crime_dict['date_occ'] = crime_row[10].strip() if crime_row[10] is not None else None
    crime_dict['time_occ'] = crime_row[11].strip() if crime_row[11] is not None else None
    crime_dict['area'] = crime_row[12].strip() if crime_row[12] is not None else None
    crime_dict['area_name'] = crime_row[13].strip() if crime_row[13] is not None else None
    crime_dict['road'] = crime_row[14].strip() if crime_row[14] is not None else None
    crime_dict['crime_code'] = crime_row[15].strip() if crime_row[15] is not None else None
    crime_dict['crime_desc'] = crime_row[16].strip() if crime_row[16] is not None else None
    crime_dict['location'] = re.sub(' +', '', crime_row[19].strip()) if crime_row[19] is not None else None
    crime_dict['cross_street'] = crime_row[20].strip() if crime_row[20] is not None else None
    crime_dict['cross_street'] = re.sub(' +', '', crime_dict['cross_street']) if crime_dict['cross_street'] is not None else None
    crime_dict['latitude'] = crime_row[21][1].strip()
    crime_dict['longitude'] = crime_row[21][2].strip()
    with open('./data/lapd_clean_Data.json', 'a') as clean_data:
        json.dump(crime_dict, clean_data)
        clean_data.write("\n"); 

### Load data into spark

In [1]:
from pyspark import SparkContext

In [2]:
from pyspark.sql import SQLContext

In [3]:
sqlContext = SQLContext(sc)

In [4]:
crime = sqlContext.read.json('./data/lapd_clean_Data.json')

In [5]:
crime.printSchema()

root
 |-- area: string (nullable = true)
 |-- area_name: string (nullable = true)
 |-- crime_code: string (nullable = true)
 |-- crime_desc: string (nullable = true)
 |-- cross_street: string (nullable = true)
 |-- date_occ: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- location: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- road: string (nullable = true)
 |-- time_occ: string (nullable = true)



In [6]:
crime.registerTempTable('crime')

### Top areas with the most serious crime

In [8]:
sql_script = 'SELECT area_name, COUNT(crime_code) AS crime_count FROM crime GROUP BY area_name ORDER BY crime_count DESC LIMIT 5'
sqlContext.sql(sql_script).show()

+-----------+-----------+
|  area_name|crime_count|
+-----------+-----------+
|77th Street|      15308|
|  Southwest|      14733|
|    Pacific|      12376|
|N Hollywood|      12229|
|  Southeast|      11399|
+-----------+-----------+



### Top time with the most serious crime in one place

In [11]:
sql_script = 'SELECT location, time_occ, COUNT(crime_code) AS crime_count FROM crime GROUP BY location, time_occ ORDER BY crime_count DESC LIMIT 5'
sqlContext.sql(sql_script).show()

+-------------------+--------+-----------+
|           location|time_occ|crime_count|
+-------------------+--------+-----------+
|9300TOPANGACANYONBL|    1200|         49|
|      15100RAYMERST|    2330|         30|
|              6THST|    1900|         23|
|     11800SHELDONST|    1600|         23|
|              7THST|    2000|         19|
+-------------------+--------+-----------+



### Top crime occurence within one time

In [12]:
sql_script = 'SELECT time_occ, COUNT(crime_code) AS crime_count FROM crime GROUP BY time_occ ORDER BY crime_count DESC LIMIT 5'
sqlContext.sql(sql_script).show()

+--------+-----------+
|time_occ|crime_count|
+--------+-----------+
|    1200|      10102|
|    1800|       6242|
|    2000|       5751|
|    1900|       5531|
|    2100|       5453|
+--------+-----------+



### Top crime types in one place

In [13]:
sql_script = 'SELECT location, crime_code, COUNT(*) AS crime_count FROM crime GROUP BY location, crime_code ORDER BY crime_count DESC LIMIT 5'
sqlContext.sql(sql_script).show()

+---------+----------+-----------+
| location|crime_code|crime_count|
+---------+----------+-----------+
|WESTERNAV|       997|        514|
|VENTURABL|       997|        510|
|VICTORYBL|       997|        438|
|SHERMANWY|       997|        436|
|VERMONTAV|       997|        428|
+---------+----------+-----------+



### Spark Streaming

In [None]:
from pyspark.streaming import StreamingContext
# Create a local StreamingContext with two working thread and batch interval of 1 second
ssc = StreamingContext(sc, 2)

In [None]:
# Discretized Streams
lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))

In [None]:
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

In [None]:
ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate