In [1]:
# load the data into an RDD
path = "file:///Users/delshawnkirksey/Projects/spark/apps/tutorial_2_nycrime/NYPD_7_Major_Felony_Incidents.csv"
data = sc.textFile(path)

In [2]:
data.take(10)

['OBJECTID,Identifier,Occurrence Date,Day of Week,Occurrence Month,Occurrence Day,Occurrence Year,Occurrence Hour,CompStat Month,CompStat Day,CompStat Year,Offense,Offense Classification,Sector,Precinct,Borough,Jurisdiction,XCoordinate,YCoordinate,Location 1',
 '1,f070032d,09/06/1940 07:30:00 PM,Friday,Sep,6,1940,19,9,7,2010,BURGLARY,FELONY,D,66,BROOKLYN,N.Y. POLICE DEPT,987478,166141,"(40.6227027620001, -73.9883732929999)"',
 '2,c6245d4d,12/14/1968 12:20:00 AM,Saturday,Dec,14,1968,0,12,14,2008,GRAND LARCENY,FELONY,G,28,MANHATTAN,N.Y. POLICE DEPT,996470,232106,"(40.8037530600001, -73.955861904)"',
 '3,716dbc6f,10/30/1970 03:30:00 PM,Friday,Oct,30,1970,15,10,31,2008,BURGLARY,FELONY,H,84,BROOKLYN,N.Y. POLICE DEPT,986508,190249,"(40.688874254, -73.9918594329999)"',
 '4,638cd7b7,07/18/1972 11:00:00 PM,Tuesday,Jul,18,1972,23,7,19,2012,GRAND LARCENY OF MOTOR VEHICLE,FELONY,F,73,BROOKLYN,N.Y. POLICE DEPT,1005876,182440,"(40.6674141890001, -73.9220463899999)"',
 '5,6e410287,05/21/1987 12:01:00

In [5]:
# filter the header row
header = data.first()
print(header)

OBJECTID,Identifier,Occurrence Date,Day of Week,Occurrence Month,Occurrence Day,Occurrence Year,Occurrence Hour,CompStat Month,CompStat Day,CompStat Year,Offense,Offense Classification,Sector,Precinct,Borough,Jurisdiction,XCoordinate,YCoordinate,Location 1


In [9]:
# create a new RDD without the header row
dataWoHeader = data.filter(lambda x: x != header)

In [12]:
# read the first row
dataWoHeader.first()

'1,f070032d,09/06/1940 07:30:00 PM,Friday,Sep,6,1940,19,9,7,2010,BURGLARY,FELONY,D,66,BROOKLYN,N.Y. POLICE DEPT,987478,166141,"(40.6227027620001, -73.9883732929999)"'

In [13]:
# parse the rows to extract the fields

In [26]:
dataWoHeader.map(lambda x: x.split(","))

PythonRDD[12] at RDD at PythonRDD.scala:48

In [19]:
import csv 
from io import StringIO
from collections import namedtuple

In [20]:
fields = header.replace(" ","_").replace("/","_").split(",")
print(fields)

['OBJECTID', 'Identifier', 'Occurrence_Date', 'Day_of_Week', 'Occurrence_Month', 'Occurrence_Day', 'Occurrence_Year', 'Occurrence_Hour', 'CompStat_Month', 'CompStat_Day', 'CompStat_Year', 'Offense', 'Offense_Classification', 'Sector', 'Precinct', 'Borough', 'Jurisdiction', 'XCoordinate', 'YCoordinate', 'Location_1']


In [21]:
Crime = namedtuple('Crime', fields, verbose=True)

from builtins import property as _property, tuple as _tuple
from operator import itemgetter as _itemgetter
from collections import OrderedDict

class Crime(tuple):
    'Crime(OBJECTID, Identifier, Occurrence_Date, Day_of_Week, Occurrence_Month, Occurrence_Day, Occurrence_Year, Occurrence_Hour, CompStat_Month, CompStat_Day, CompStat_Year, Offense, Offense_Classification, Sector, Precinct, Borough, Jurisdiction, XCoordinate, YCoordinate, Location_1)'

    __slots__ = ()

    _fields = ('OBJECTID', 'Identifier', 'Occurrence_Date', 'Day_of_Week', 'Occurrence_Month', 'Occurrence_Day', 'Occurrence_Year', 'Occurrence_Hour', 'CompStat_Month', 'CompStat_Day', 'CompStat_Year', 'Offense', 'Offense_Classification', 'Sector', 'Precinct', 'Borough', 'Jurisdiction', 'XCoordinate', 'YCoordinate', 'Location_1')

    def __new__(_cls, OBJECTID, Identifier, Occurrence_Date, Day_of_Week, Occurrence_Month, Occurrence_Day, Occurrence_Year, Occurrence_Hour, CompStat_Month, CompStat_Day, CompStat_Year, Offens

In [32]:
# define a function to parse each row into a Crime object
def parse(row):
    reader = csv.reader(StringIO(row))
    row=next(reader)
    return Crime(*row)

In [33]:
# use the parse function on each row of data -- storing in crimes variable
crimes = dataWoHeader.map(parse)

In [34]:
# read the first row of the new RDD
crimes.first()

Crime(OBJECTID='1', Identifier='f070032d', Occurrence_Date='09/06/1940 07:30:00 PM', Day_of_Week='Friday', Occurrence_Month='Sep', Occurrence_Day='6', Occurrence_Year='1940', Occurrence_Hour='19', CompStat_Month='9', CompStat_Day='7', CompStat_Year='2010', Offense='BURGLARY', Offense_Classification='FELONY', Sector='D', Precinct='66', Borough='BROOKLYN', Jurisdiction='N.Y. POLICE DEPT', XCoordinate='987478', YCoordinate='166141', Location_1='(40.6227027620001, -73.9883732929999)')

In [35]:
crimes.first().Offense

'BURGLARY'

In [36]:
# summarize the data by offense type by using the map operation
# countByValue returns a summary of each distinct value and the number of times it occurs
crimes.map(lambda x: x.Offense).countByValue()

defaultdict(int,
            {'BURGLARY': 191369,
             'FELONY ASSAULT': 184042,
             'GRAND LARCENY': 428993,
             'GRAND LARCENY OF MOTOR VEHICLE': 101963,
             'MURDER & NON-NEGL. MANSLAUGHTE': 4574,
             'NA': 1,
             'RAPE': 13779,
             'ROBBERY': 198744})

In [37]:
# the 'NA' above represents one record without a value

In [47]:
crimes.map(lambda x: x.Occurrence_Year).countByValue()

defaultdict(int,
            {'': 244,
             '1905': 2,
             '1908': 1,
             '1910': 3,
             '1911': 1,
             '1912': 1,
             '1913': 4,
             '1914': 2,
             '1915': 3,
             '1920': 1,
             '1940': 1,
             '1945': 2,
             '1946': 1,
             '1950': 1,
             '1954': 1,
             '1955': 1,
             '1956': 1,
             '1958': 1,
             '1959': 1,
             '1960': 1,
             '1964': 1,
             '1965': 2,
             '1966': 7,
             '1968': 1,
             '1969': 1,
             '1970': 2,
             '1971': 1,
             '1972': 2,
             '1973': 5,
             '1974': 3,
             '1975': 2,
             '1976': 2,
             '1977': 3,
             '1978': 2,
             '1979': 6,
             '1980': 5,
             '1981': 1,
             '1982': 5,
             '1983': 1,
             '1984': 4,
             '1985': 8,
 

In [43]:
# the map above shows that 244 records are missing an Occurrence_Year
# and that crimes data is likely missing in years before 2006

In [52]:
# filter out the records where offense is NA or Occurence_Year is blank and where Occurrence_Year < 2006
crimesFiltered = crimes.filter(lambda x: not (x.Offense=="NA" or x.Occurrence_Year==''))\
                    .filter(lambda x: int(x.Occurrence_Year)>=2006)

In [54]:
# count the count of crimes by year of the new RDD
crimesFiltered.map(lambda x: x.Occurrence_Year).countByValue()

defaultdict(int,
            {'2006': 127887,
             '2007': 120554,
             '2008': 117375,
             '2009': 106018,
             '2010': 105643,
             '2011': 107206,
             '2012': 111798,
             '2013': 111286,
             '2014': 106849,
             '2015': 102657})

In [55]:
# function to take the locations string and return the coordinates as latitude and longitude numbers
def extractCoords(location):
    location_lat = float(location[1:location.index(",")])
    location_lon = float(location[location.index(",")+1:-1])
    return (location_lat, location_lon)

In [60]:
# extracts the location from the RDD to lat and long and
# reduces records to the min lat and long for checking anomalies
crimesFiltered.map(lambda x: extractCoords(x.Location_1))\
    .reduce(lambda x,y: (min(x[0],y[0]),min(x[1],y[1])))

(40.112709974, -77.519206334)

In [61]:
# extracts the location from the RDD to lat and long and
# reduces records to the max lat and long for checking anomalies
crimesFiltered.map(lambda x: extractCoords(x.Location_1))\
    .reduce(lambda x,y: (max(x[0],y[0]),max(x[1],y[1])))

(59.5805088160001, -73.700716685)

In [63]:
# uses the extractCoords function to filter out crimes committed outside of the NY bounding box (from google search)
crimesFinal = crimesFiltered.filter(lambda x: extractCoords(x.Location_1)[0]>=40.477399 and \
                                   extractCoords(x.Location_1)[0]<=40.917577 and \
                                   extractCoords(x.Location_1)[1]>=-74.25909 and \
                                   extractCoords(x.Location_1)[1]<=-73.700009)

In [66]:
# we now have a clean dataset -- YAY! :)

In [68]:
# look at the trend by year
crimesFinal.map(lambda x: x.Occurrence_Year).countByValue()

defaultdict(int,
            {'2006': 127887,
             '2007': 120491,
             '2008': 117375,
             '2009': 106018,
             '2010': 105639,
             '2011': 107203,
             '2012': 111798,
             '2013': 111286,
             '2014': 106849,
             '2015': 102657})

In [71]:
# look at burglaries by year
crimesFinal.filter(lambda x: x.Offense=='BURGLARY')\
    .map(lambda x: x.Occurrence_Year)\
    .countByValue()

defaultdict(int,
            {'2006': 23069,
             '2007': 21715,
             '2008': 20732,
             '2009': 19441,
             '2010': 18700,
             '2011': 18860,
             '2012': 19309,
             '2013': 17419,
             '2014': 16832,
             '2015': 14967})

In [73]:
"""
the use case below involves creating a visual representation
of the crime statistics using Google's gmplot library
"""

import gmplot
gmap = gmplot.GoogleMapPlotter(37.428, -122.145, 16).from_geocode("New York City")


In [74]:
# a list of all latitudes for burglaries
b_lats = crimesFinal.filter(lambda x: x.Offense=='BURGLARY' and x.Occurrence_Year=='2015')\
                    .map(lambda x: extractCoords(x.Location_1)[0])\
                    .collect()
        
# a list of all longitudes for burglaries
b_lons = crimesFinal.filter(lambda x: x.Offense=='BURGLARY' and x.Occurrence_Year=='2015')\
                    .map(lambda x: extractCoords(x.Location_1)[1])\
                    .collect()

In [75]:
# uses the gmplot api library to render a map containing the given latitudes and longitudes 
# in the given color and size
gmap.scatter(b_lats, b_lons, '#4440C8', size=40, marker=False)

In [77]:
# creates an html file displaying the map given above
gmap.draw("mymap.html")