In [2]:
import findspark
findspark.init()

In [3]:
from pyspark import SparkContext, SparkConf

In [4]:
conf = SparkConf().setMaster("local").setAppName("applicaiton").set("spark.network.timeout", "10000000").set("spark.executor.heartbeatInterval", "10000000")
conf.set("spark.driver.memory", "5g") 
conf.set("spark.executor.memory", "2g")
conf.set("spark.cores.max", "2")
sc = SparkContext(conf=conf)

In [5]:
path = "C:\\Dataset\\nypd_7_major_felony_incidents.csv"
#path = "C:\\Dataset\\nypd_statistic.csv"
data = sc.textFile(path,2000)

In [8]:
header=data.first()
header

'Unique Key,Created Date,Closed Date,Agency,Agency Name,Complaint Type,Descriptor,Location Type,Incident Zip,Incident Address,Street Name,Cross Street 1,Cross Street 2,Intersection Street 1,Intersection Street 2,Address Type,City,Landmark,Facility Type,Status,Due Date,Resolution Action Updated Date,Community Board,Borough,X Coordinate (State Plane),Y Coordinate (State Plane),Park Facility Name,Park Borough,Vehicle Type,Taxi Company Borough,Taxi Pick Up Location,Bridge Highway Name,Bridge Highway Direction,Road Ramp,Bridge Highway Segment,Latitude,Longitude,Location'

In [75]:
#remove header
dataWoHeader = data.filter(lambda x: x != header)
dataWoHeader.first()

'16358884,03/31/2010 10:21:08 PM,06/01/2010 12:00:00 AM,DOB,Department of Buildings,Emergency Response Team (ERT),After Hours Work - Illegal,,11385,75-40 60 LANE,60 LANE,75 AVENUE,ST FELIX AVENUE,,,ADDRESS,RIDGEWOOD,,N/A,Closed,,06/01/2010 12:00:00 AM,Unspecified QUEENS,QUEENS,1013431,193566,Unspecified,QUEENS,,,,,,,,40.69792261475524,-73.89476337774164,"(40.69792261475524, -73.89476337774164)"'

In [46]:
fields = header.replace(" ","_").replace("/","_").replace("(","_").replace(")","_").split(",")
fields

['Unique_Key',
 'Created_Date',
 'Closed_Date',
 'Agency',
 'Agency_Name',
 'Complaint_Type',
 'Descriptor',
 'Location_Type',
 'Incident_Zip',
 'Incident_Address',
 'Street_Name',
 'Cross_Street_1',
 'Cross_Street_2',
 'Intersection_Street_1',
 'Intersection_Street_2',
 'Address_Type',
 'City',
 'Landmark',
 'Facility_Type',
 'Status',
 'Due_Date',
 'Resolution_Action_Updated_Date',
 'Community_Board',
 'Borough',
 'X_Coordinate__State_Plane_',
 'Y_Coordinate__State_Plane_',
 'Park_Facility_Name',
 'Park_Borough',
 'Vehicle_Type',
 'Taxi_Company_Borough',
 'Taxi_Pick_Up_Location',
 'Bridge_Highway_Name',
 'Bridge_Highway_Direction',
 'Road_Ramp',
 'Bridge_Highway_Segment',
 'Latitude',
 'Longitude',
 'Location']

In [11]:
import csv
from collections import namedtuple

In [12]:
Crime = namedtuple("Crime", fields)

In [13]:
def parse(row):
    list_eles = row.replace(", ",";").split(',')
    return Crime(*list_eles)

In [14]:
crimes = dataWoHeader.map(lambda x:parse(x))

In [38]:
crimesList = []
for crime in crimes.take(10000):
    crimesList.append(crime)

In [58]:
crimes = sc.parallelize(crimesList)
crimes.map(lambda x:x.Location_Type).countByValue()

defaultdict(int,
            {'': 2780,
             'Lot': 109,
             'RESIDENTIAL BUILDING': 6061,
             'Sidewalk': 792,
             'Street': 253,
             'Street/Sidewalk': 4,
             'Residential Building/House': 1})

In [63]:
crimesWithClosed = crimes.filter(lambda x: x.Status=='Closed' and x.Location_Type != '')


In [65]:
crimesWithClosed.map(lambda x:x.Location_Type).countByValue()

defaultdict(int,
            {'Lot': 109,
             'RESIDENTIAL BUILDING': 6057,
             'Sidewalk': 791,
             'Street': 253,
             'Street/Sidewalk': 4,
             'Residential Building/House': 1})

In [67]:
crimesWithClosed.count()

7215

In [92]:
crimesWithClosed.first()

Crime(Unique_Key='16359163', Created_Date='04/01/2010 12:04:00 PM', Closed_Date='04/06/2010 12:00:00 PM', Agency='DSNY', Agency_Name='A - Queens', Complaint_Type='Dirty Conditions', Descriptor='E3A Dirty Area/Alleyway', Location_Type='Lot', Incident_Zip='11694', Incident_Address='122-03 BEACH CHANNEL DRIVE', Street_Name='BEACH CHANNEL DRIVE', Cross_Street_1='BEACH 122 STREET', Cross_Street_2='BEACH 123 STREET', Intersection_Street_1='', Intersection_Street_2='', Address_Type='ADDRESS', City='ROCKAWAY PARK', Landmark='', Facility_Type='N/A', Status='Closed', Due_Date='', Resolution_Action_Updated_Date='04/06/2010 12:00:00 AM', Community_Board='14 QUEENS', Borough='QUEENS', X_Coordinate__State_Plane_='1027501', Y_Coordinate__State_Plane_='151166', Park_Facility_Name='Unspecified', Park_Borough='QUEENS', Vehicle_Type='', Taxi_Company_Borough='', Taxi_Pick_Up_Location='', Bridge_Highway_Name='', Bridge_Highway_Direction='', Road_Ramp='', Bridge_Highway_Segment='', Latitude='40.581486309560

In [68]:
def extracCorordinate(location):
    cor =location.split(';')
    return cor

In [82]:
cord = crimesWithClosed.map(lambda x:extracCorordinate(x.Location.replace('"(','').replace(')"','')))

In [80]:
cord.reduce(lambda x,y:(min(x[0],y[0]),min(x[1],y[1])))

('40.5087817135548', '-73.70373052737233')

In [83]:
cord.reduce(lambda x,y:(max(x[0],y[0]),max(x[1],y[1])))

('40.91044110216951', '-74.24914585014686')

In [98]:
target = crimesWithClosed.filter(lambda x: '-74.24914585014686' in x.Location)
target.take(2)

[Crime(Unique_Key='16366790', Created_Date='04/02/2010 09:53:00 PM', Closed_Date='04/03/2010 12:00:00 PM', Agency='DSNY', Agency_Name='BCC - Staten Island', Complaint_Type='Sanitation Condition', Descriptor='12 Dead Animals', Location_Type='Sidewalk', Incident_Zip='10307', Incident_Address='467 CRAIG AVENUE', Street_Name='CRAIG AVENUE', Cross_Street_1='MAIN STREET', Cross_Street_2='BUTLER AVENUE', Intersection_Street_1='', Intersection_Street_2='', Address_Type='ADDRESS', City='STATEN ISLAND', Landmark='', Facility_Type='DSNY Garage', Status='Closed', Due_Date='', Resolution_Action_Updated_Date='04/03/2010 12:00:00 PM', Community_Board='03 STATEN ISLAND', Borough='STATEN ISLAND', X_Coordinate__State_Plane_='914970', Y_Coordinate__State_Plane_='125237', Park_Facility_Name='Unspecified', Park_Borough='STATEN ISLAND', Vehicle_Type='', Taxi_Company_Borough='', Taxi_Pick_Up_Location='', Bridge_Highway_Name='', Bridge_Highway_Direction='', Road_Ramp='', Bridge_Highway_Segment='', Latitude='4