In [4]:
import os
filePath = os.path.join(os.pardir,'data','crimesInNY.csv')
data = sc.textFile(filePath)

# QUICK EXPLORATION

In [5]:

data.take(5)

['OBJECTID,Identifier,Occurrence Date,Day of Week,Occurrence Month,Occurrence Day,Occurrence Year,Occurrence Hour,CompStat Month,CompStat Day,CompStat Year,Offense,Offense Classification,Sector,Precinct,Borough,Jurisdiction,XCoordinate,YCoordinate,Location 1',
 '1,f070032d,09/06/1940 07:30:00 PM,Friday,Sep,6,1940,19,9,7,2010,BURGLARY,FELONY,D,66,BROOKLYN,N.Y. POLICE DEPT,987478,166141,"(40.6227027620001, -73.9883732929999)"',
 '2,c6245d4d,12/14/1968 12:20:00 AM,Saturday,Dec,14,1968,0,12,14,2008,GRAND LARCENY,FELONY,G,28,MANHATTAN,N.Y. POLICE DEPT,996470,232106,"(40.8037530600001, -73.955861904)"',
 '3,716dbc6f,10/30/1970 03:30:00 PM,Friday,Oct,30,1970,15,10,31,2008,BURGLARY,FELONY,H,84,BROOKLYN,N.Y. POLICE DEPT,986508,190249,"(40.688874254, -73.9918594329999)"',
 '4,638cd7b7,07/18/1972 11:00:00 PM,Tuesday,Jul,18,1972,23,7,19,2012,GRAND LARCENY OF MOTOR VEHICLE,FELONY,F,73,BROOKLYN,N.Y. POLICE DEPT,1005876,182440,"(40.6674141890001, -73.9220463899999)"']

# PRELIMINARY TRANSFORMATION

In [6]:
# get the header 
header = data.first()
print(header)

OBJECTID,Identifier,Occurrence Date,Day of Week,Occurrence Month,Occurrence Day,Occurrence Year,Occurrence Hour,CompStat Month,CompStat Day,CompStat Year,Offense,Offense Classification,Sector,Precinct,Borough,Jurisdiction,XCoordinate,YCoordinate,Location 1


In [6]:
# example of a filter operation 
dataWoHeader = data.filter(lambda x: x!=header)

In [7]:
dataWoHeader.first()

'1,f070032d,09/06/1940 07:30:00 PM,Friday,Sep,6,1940,19,9,7,2010,BURGLARY,FELONY,D,66,BROOKLYN,N.Y. POLICE DEPT,987478,166141,"(40.6227027620001, -73.9883732929999)"'

In [8]:
# example of a map operation
dataWoHeader.map(lambda x: x.split(",")).take(10)

[['1',
  'f070032d',
  '09/06/1940 07:30:00 PM',
  'Friday',
  'Sep',
  '6',
  '1940',
  '19',
  '9',
  '7',
  '2010',
  'BURGLARY',
  'FELONY',
  'D',
  '66',
  'BROOKLYN',
  'N.Y. POLICE DEPT',
  '987478',
  '166141',
  '"(40.6227027620001',
  ' -73.9883732929999)"'],
 ['2',
  'c6245d4d',
  '12/14/1968 12:20:00 AM',
  'Saturday',
  'Dec',
  '14',
  '1968',
  '0',
  '12',
  '14',
  '2008',
  'GRAND LARCENY',
  'FELONY',
  'G',
  '28',
  'MANHATTAN',
  'N.Y. POLICE DEPT',
  '996470',
  '232106',
  '"(40.8037530600001',
  ' -73.955861904)"'],
 ['3',
  '716dbc6f',
  '10/30/1970 03:30:00 PM',
  'Friday',
  'Oct',
  '30',
  '1970',
  '15',
  '10',
  '31',
  '2008',
  'BURGLARY',
  'FELONY',
  'H',
  '84',
  'BROOKLYN',
  'N.Y. POLICE DEPT',
  '986508',
  '190249',
  '"(40.688874254',
  ' -73.9918594329999)"'],
 ['4',
  '638cd7b7',
  '07/18/1972 11:00:00 PM',
  'Tuesday',
  'Jul',
  '18',
  '1972',
  '23',
  '7',
  '19',
  '2012',
  'GRAND LARCENY OF MOTOR VEHICLE',
  'FELONY',
  'F',
  '73

In [9]:
import csv
from io import StringIO
from collections import namedtuple

In [10]:
fields = header.replace(" ","_").replace("/","_").split(",")
print(fields)

['OBJECTID', 'Identifier', 'Occurrence_Date', 'Day_of_Week', 'Occurrence_Month', 'Occurrence_Day', 'Occurrence_Year', 'Occurrence_Hour', 'CompStat_Month', 'CompStat_Day', 'CompStat_Year', 'Offense', 'Offense_Classification', 'Sector', 'Precinct', 'Borough', 'Jurisdiction', 'XCoordinate', 'YCoordinate', 'Location_1']


In [11]:
Crime = namedtuple('Crime',fields,verbose=False)

In [12]:
def parse(row):
    reader = csv.reader(StringIO(row))
    row = next(reader)
    return Crime(*row)

In [13]:
# Using Map operation explicitly 
crimes=dataWoHeader.map(parse)

In [14]:
crimes.first()


Crime(OBJECTID='1', Identifier='f070032d', Occurrence_Date='09/06/1940 07:30:00 PM', Day_of_Week='Friday', Occurrence_Month='Sep', Occurrence_Day='6', Occurrence_Year='1940', Occurrence_Hour='19', CompStat_Month='9', CompStat_Day='7', CompStat_Year='2010', Offense='BURGLARY', Offense_Classification='FELONY', Sector='D', Precinct='66', Borough='BROOKLYN', Jurisdiction='N.Y. POLICE DEPT', XCoordinate='987478', YCoordinate='166141', Location_1='(40.6227027620001, -73.9883732929999)')

In [15]:
crimes.first().Identifier

'f070032d'

# SUMMARY STATISTICS

In [16]:
crimes.map(lambda x:x.Offense).countByValue()

defaultdict(int,
            {'BURGLARY': 191369,
             'GRAND LARCENY': 428993,
             'GRAND LARCENY OF MOTOR VEHICLE': 101963,
             'RAPE': 13779,
             'ROBBERY': 198744,
             'FELONY ASSAULT': 184042,
             'MURDER & NON-NEGL. MANSLAUGHTE': 4574,
             'NA': 1})

In [17]:
crimes.map(lambda x:x.Occurrence_Year).countByValue()

defaultdict(int,
            {'1940': 1,
             '1968': 1,
             '1970': 2,
             '1972': 2,
             '1987': 6,
             '1990': 17,
             '1992': 12,
             '1994': 19,
             '1995': 27,
             '1996': 34,
             '1998': 74,
             '1999': 124,
             '2000': 282,
             '2001': 343,
             '2002': 368,
             '2003': 490,
             '2004': 692,
             '2005': 3272,
             '2006': 127887,
             '1910': 3,
             '1913': 4,
             '1945': 2,
             '1981': 1,
             '1985': 8,
             '1988': 6,
             '1991': 12,
             '1905': 2,
             '1971': 1,
             '1997': 40,
             '1914': 2,
             '1956': 1,
             '1989': 12,
             '1993': 23,
             '2015': 102657,
             '1954': 1,
             '1982': 5,
             '1950': 1,
             '1959': 1,
             '1966': 7,
            

# FILTERING OUT ABNORMALIES

In [18]:
# FILTER OUT UN-WANTED VALUES
crimesFiltered=crimes.filter(lambda x: not(x.Offense=="NA" or x.Occurrence_Year==''))
crimesFiltered= crimesFiltered.filter(lambda x: int(x.Occurrence_Year)>= 2006)

In [19]:
# VERIFY UNWANTED VALUES HAVE BEEN FILTERED OUT
crimesFiltered.map(lambda x: x.Offense).countByValue()


defaultdict(int,
            {'GRAND LARCENY': 424635,
             'BURGLARY': 191045,
             'GRAND LARCENY OF MOTOR VEHICLE': 101728,
             'FELONY ASSAULT': 183879,
             'ROBBERY': 198569,
             'RAPE': 12974,
             'MURDER & NON-NEGL. MANSLAUGHTE': 4443})

In [20]:
crimesFiltered.map(lambda x:x.Occurrence_Year).countByValue()

defaultdict(int,
            {'2006': 127887,
             '2015': 102657,
             '2007': 120554,
             '2008': 117375,
             '2009': 106018,
             '2010': 105643,
             '2011': 107206,
             '2012': 111798,
             '2013': 111286,
             '2014': 106849})

In [21]:
def extractCoords(location):
    location_lat = float(location[1:location.index(",")])
    location_lon = float(location[location.index(",")+1:-1])
    return (location_lat,location_lon)

In [22]:
#  Example of reduce function being used here
crimesFiltered.map(lambda x:extractCoords(x.Location_1))\
            .reduce(lambda x,y: (min(x[0],y[0]),min(x[1],y[1])))

(40.112709974, -77.519206334)

In [23]:
crimesFiltered.map(lambda x:extractCoords(x.Location_1))\
            .reduce(lambda x,y: (max(x[0],y[0]),max(x[1],y[1])))

(59.5805088160001, -73.700716685)

In [24]:
crimesFiltered.filter(lambda x:x.Offense=='BURGLARY')\
                .map(lambda x:x.Occurrence_Year)\
                .countByValue()

defaultdict(int,
            {'2006': 23069,
             '2007': 21716,
             '2008': 20732,
             '2009': 19441,
             '2010': 18700,
             '2011': 18860,
             '2012': 19309,
             '2013': 17419,
             '2014': 16832,
             '2015': 14967})

In [25]:
import gmplot
gmap = gmplot.GoogleMapPlotter(37.428,-122.145,16).from_geocode("New York City")

In [27]:
# b_lats = crimesFiltered.filter(lambda x:x.Offense=='BURGLARY' and x.Occurrence_Year=='2015')\
#                         .map(lambda x:extractCoords(x.Location_1[0])).collect()  #there is an error here 