In [29]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession

import csv
import os
import pandas as pd

import datetime as dt
import re

In [2]:
sc = SparkContext()

In [3]:
path = 'data\\nyc_cscl.csv'

"""
New Index
0: PHYSICALID
1: L_LOW_HN
2: L_HIGH_HN
3: R_LOW_HN
4: R_HIGH_HN
5: ST_LABEL
6: BOROCODE
7: FULL_STREE
"""

data = sc.textFile(path)
header = data.first()

out = sc.textFile(path) \
        .filter(lambda x: x!= header) \
        .mapPartitions(lambda x: csv.reader(x)) \
        .filter(lambda x: len(x) >= 30) \
        .map(lambda x: (x[0], x[2], x[3], x[4], x[5], x[10], x[13], x[28])) \
        .collect()

In [4]:
print(len(out), len(out[0]))

547313 8


In [5]:
bc = sc.broadcast(out)

In [6]:
for o, b in zip(out[:3], bc.value[:3]):
    print(o)
    print(b)
    print()

('164809', '', '', '', '', 'MITSUBISHI WILD WETLAND TRL', '2', 'MITSUBISHI WILD WETLAND TRL')
('164809', '', '', '', '', 'MITSUBISHI WILD WETLAND TRL', '2', 'MITSUBISHI WILD WETLAND TRL')

('6110', '215-001', '215-027', '215-000', '215-026', '28 AV', '4', '28 AVE')
('6110', '215-001', '215-027', '215-000', '215-026', '28 AV', '4', '28 AVE')

('145494', '317', '399', '316', '360', 'SCHERMERHORN ST', '3', 'SCHERMERHORN ST')
('145494', '317', '399', '316', '360', 'SCHERMERHORN ST', '3', 'SCHERMERHORN ST')



In [47]:
out[:3]

[('164809',
  '',
  '',
  '',
  '',
  'MITSUBISHI WILD WETLAND TRL',
  'MITSUBISHI WILD WETLAND TRL'),
 ('6110', '215-001', '215-027', '215-000', '215-026', '28 AV', '28 AVE'),
 ('145494', '317', '399', '316', '360', 'SCHERMERHORN ST', 'SCHERMERHORN ST')]

In [15]:
NYC_CSCL_PATH = 'data\\nyc_cscl.csv'
root = 'test'
violation_records = [os.path.join(root, 'violation_small1.csv'),
                     os.path.join(root, 'violation_small2.csv')]
VIOLATION_PATH = ','.join(violation_records)
# indices for lookup table
PHYSICALID = 0
L_LOW_HN = 1
L_HIGH_HN = 2
R_LOW_HN = 3
R_HIGH_HN = 4
ST_LABEL = 5
BOROCODE_IDX = 6
FULL_STREE = 7

In [26]:
def match_house_number(hn_record, segment):
    # exclude single character house numbers
    if len(hn_record) == 1 and (not hn_record.isnumeric()):
        return False
    # exlude cases like 789A
    if (hn_record.find('-') == -1) and (not hn_record.isnumeric()):
        return False
    # if a record is empty, assigns 0
    if len(hn_record) == 0:
        hn_record = 0
    # otherwise concatenate two values together
    # example: '187-09' = 18709 <int>
    # example: '187' = 187 <int>
    else:
        hn_record = int(hn_record.replace('-', ''))
    # format house numbers in lookup segment in the same way
    # if hn_record is even, we should use 'R'; otherwise, 'L'
    if hn_record%2 == 0:
        if len(segment[R_LOW_HN]) == 0:
            lower = 0
        else:
            lower = int(re.sub('-0|-', '', segment[R_LOW_HN]))
        if len(segment[R_HIGH_HN]) == 0:
            high = 0
        else:
            high = int(re.sub('-0|-', '', segment[R_HIGH_HN]))
    else:
        if len(segment[L_LOW_HN]) == 0:
            lower = 0
        else:
            lower = int(re.sub('-0|-', '', segment[L_LOW_HN]))
        if len(segment[L_HIGH_HN]) == 0:
            high = 0
        else:
            high = int(re.sub('-0|-', '', segment[L_HIGH_HN]))
    return (lower <= hn_record) and (hn_record <= high)


def countyname2borocode(county_name):
    if (county_name == 'NEW Y') or (county_name == 'NEWY') or (county_name == 'NY') or (county_name == 'MH') or (county_name == 'MAN'):
        return 1
    elif (county_name == 'BRONX') or (county_name == 'BX'):
        return 2
    elif (county_name == 'KINGS') or (county_name == 'KING') or (county_name == 'K'):
        return 3
    elif (county_name == 'QUEEN') or (county_name == 'QU') or (county_name == 'Q'):
        return 4
    elif (county_name == 'R'):
        return 5
    else:
        return -1


def street_segmentid_lookup(HN, STREET_NAME, BOROCODE, physicalID_list):
    for segment in physicalID_list:
        street = STREET_NAME.lower()
        # print(type(int(segment['BOROCODE'])), type(v_record['Violation County']))
        # first check county code and street name
        if (BOROCODE == int(segment[BOROCODE_IDX])) and \
           ((street == segment[FULL_STREE].lower()) or (street == segment[ST_LABEL].lower())):
           # then, check house number: odd number is stored in left
           if match_house_number(HN, segment):
                return segment[PHYSICALID]
    # returns -1 if there is no match
    return -1


def export_csv(output, lookup_table):
    """ Export output in csv format """
    # build lookup table with counts
    physicalIDs = {}
    for row in lookup_table:
        if row[PHYSICALID].isnumeric():
            id = int(row[PHYSICALID])
            physicalIDs.update({id:[0, 0, 0, 0, 0, 0]})
    # assign the count in output
    for out in output:
        try:
            lookup = int(out[0])
            for idx in range(6):
                physicalIDs[lookup][idx] = out[1][idx][1]
        except KeyError:
            pass
    # export the resutl as csv
    with open('temp.csv', 'w', newline='\n') as f:
        writer = csv.writer(f)
        for key in sorted(physicalIDs.keys()):
            writer.writerow([key] + physicalIDs[key])


def ols(data):
    """ data = [(x1, y1), ..., (xi, yi), ..., (xN, yN)] """
    x_bar = sum([d[0] for d in data])/len(data)
    y_bar = sum([d[1] for d in data])/len(data)
    numerator = sum([(d[0] - x_bar)*(d[1] - y_bar) for d in data])
    denomenator = sum([(d[0] - x_bar)**2 for d in data])
    if denomenator == 0:
        return 0
    else:
        return numerator/denomenator


def fill_zer0(row):
    expected = {2015: 0, 2016:0, 2017:0, 2018:0, 2019:0}
    for x in row:
        expected[x[0]] += x[1]
    expected = [(k, v) for k, v in expected.items()]
    return expected


# test_id_assignment

In [7]:
data = sc.textFile(NYC_CSCL_PATH)
header = data.first()
# start testing
lookup = sc.textFile(NYC_CSCL_PATH) \
           .filter(lambda x: x != header) \
           .mapPartitions(lambda x: csv.reader(x)) \
           .filter(lambda x: len(x) >= 30) \
           .map(lambda x: (x[0], x[2], x[3], x[4], x[5], x[10], x[13], x[28])) \
           .collect()
LOOKUP_BCAST = sc.broadcast(lookup)
# skip headers
file = 'test\\violation_small.csv'
data = sc.textFile(file)
header = data.first()
# load data
res = sc.textFile(file) \
        .filter(lambda x: x != header) \
        .mapPartitions(lambda x: csv.reader(x)) \
        .map(lambda x: (int(dt.datetime.strptime(x[4], '%m/%d/%Y').year), x[21], x[23], x[24])) \
        .filter(lambda x: (2015 <= x[0] and x[0] <= 2019)) \
        .map(lambda x: (x[0], countyname2borocode(x[1]), x[2], x[3])) \
        .filter(lambda x: x[1] > 0) \
        .map(lambda x: (x[0], street_segmentid_lookup(x[2], x[3], x[1], LOOKUP_BCAST.value))) \
        .filter(lambda x: int(x[1]) > 0) \
        .collect()

In [8]:
len(res)

49

# test_whole_process

In [10]:
data = sc.textFile(NYC_CSCL_PATH)
header = data.first()
# start testing
lookup = sc.textFile(NYC_CSCL_PATH) \
           .filter(lambda x: x != header) \
           .mapPartitions(lambda x: csv.reader(x)) \
           .filter(lambda x: len(x) >= 30) \
           .map(lambda x: (x[0], x[2], x[3], x[4], x[5], x[10], x[13], x[28])) \
           .collect()
LOOKUP_BCAST = sc.broadcast(lookup)

file = 'test\\violation_small.csv'
# to skip header
data = sc.textFile(file)
header = data.first()
# start computation
res = sc.textFile(file) \
        .filter(lambda x: x != header) \
        .mapPartitions(lambda x: csv.reader(x)) \
        .filter(lambda x: len(x) >= 25) \
        .map(lambda x: (int(dt.datetime.strptime(x[4], '%m/%d/%Y').year), x[21], x[23], x[24])) \
        .filter(lambda x: (2015 <= x[0] and x[0] <= 2019)) \
        .map(lambda x: (x[0], countyname2borocode(x[1]), x[2], x[3])) \
        .filter(lambda x: x[1] > 0) \
        .map(lambda x: (x[0], street_segmentid_lookup(x[2], x[3], x[1], LOOKUP_BCAST.value))) \
        .filter(lambda x: int(x[1]) > 0) \
        .map(lambda x: ((x[1], x[0]), 1)) \
        .reduceByKey(lambda x, y: x + y) \
        .sortByKey(True, 1) \
        .map(lambda x: (x[0][0], [(x[0][1], x[1])])) \
        .reduceByKey(lambda x, y: x + y) \
        .mapValues(lambda x: fill_zer0(x) + [('OLS_COEF', ols(x))]) \
        .collect()
# count the number of total violations
count = 0
for segment in res:
    for year in segment[1]:
        if year[0] != 'OLS_COEF':
            count += year[1]
assert count == 49

In [11]:
count

49

# test_computation_and_export_csv

In [27]:
data = sc.textFile(NYC_CSCL_PATH)
header = data.first()
# start testing
lookup = sc.textFile(NYC_CSCL_PATH) \
           .filter(lambda x: x != header) \
           .mapPartitions(lambda x: csv.reader(x)) \
           .filter(lambda x: len(x) >= 30) \
           .map(lambda x: (x[0], x[2], x[3], x[4], x[5], x[10], x[13], x[28])) \
           .collect()
LOOKUP_BCAST = sc.broadcast(lookup)
# skip headers
data = sc.textFile(VIOLATION_PATH)
header = data.first()
# load data
res = sc.textFile(VIOLATION_PATH) \
        .filter(lambda x: x != header) \
        .mapPartitions(lambda x: csv.reader(x)) \
        .filter(lambda x: len(x) >= 25) \
        .map(lambda x: (int(dt.datetime.strptime(x[4], '%m/%d/%Y').year), x[21], x[23], x[24])) \
        .filter(lambda x: (2015 <= x[0] and x[0] <= 2019)) \
        .map(lambda x: (x[0], countyname2borocode(x[1]), x[2], x[3])) \
        .filter(lambda x: x[1] > 0) \
        .map(lambda x: (x[0], street_segmentid_lookup(x[2], x[3], x[1], LOOKUP_BCAST.value))) \
        .filter(lambda x: int(x[1]) > 0) \
        .map(lambda x: ((x[1], x[0]), 1)) \
        .reduceByKey(lambda x, y: x + y) \
        .sortByKey(True, 1) \
        .map(lambda x: (x[0][0], [(x[0][1], x[1])])) \
        .reduceByKey(lambda x, y: x + y) \
        .mapValues(lambda x: fill_zer0(x) + [('OLS_COEF', ols(x))]) \
        .collect()
export_csv(res, LOOKUP_BCAST.value)

119801
101337
1024
1124
1144
114456
11771
119674
123542
12419
1398
16628
16742
168363
169054
174976
181289
181296
181509
182005
183597
184762
1884
19364
19562
19664
19726
2066
21812
2258
22844
22945
23309
2367
24767
24931
26804
2683
29155
30366
35349
35889
35908
36371
3641
3642
3710
38190
38706
39249
39746
41354
4208
4286
43129
43130
43676
44303
44449
4599
46717
48350
50603
51135
5119
51251
53445
53967
55649
5568
56666
58790
59154
5946
61096
6497
6577
66252
66890
67443
68931
69033
69087
70468
71256
72708
72709
72762
73434
759
770
78670
79853
79943
80600
82118
82152
86561
9126
91648
91797
92949
95539
9655


PermissionError: [Errno 13] Permission denied: 'temp.csv'

In [30]:
a = [[1, 2, 3],
     [4, 5, 6],
     [7, 8, 9]]
df = pd.DataFrame(a)
df

Unnamed: 0,0,1,2
0,1,2,3
1,4,5,6
2,7,8,9


In [31]:
df.write.csv('t.csv')

AttributeError: 'DataFrame' object has no attribute 'write'

In [34]:
not '123A'.isnumeric()

True

In [37]:
'123A'.isdigit()

False

In [36]:
'123'.isdigit()

True

In [38]:
a = '123A'

In [40]:
a[-1].isdigit()

False

In [48]:
re.sub('\s', '-', '70-04')

'70-04'

In [71]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StructField, DecimalType

In [50]:
spark = SparkSession.builder \
                    .appName('BDA Final Project') \
                    .master('local') \
                    .getOrCreate()

In [88]:
schema = StructType([
    StructField('Physical ID', IntegerType(), False),
    StructField('2015_count', IntegerType(), False),
    StructField('2016_count', IntegerType(), False),
    StructField('2017_count', IntegerType(), False),
    StructField('2018_count', IntegerType(), False),
    StructField('2019_count', IntegerType(), False),
    StructField('OLS_coef', IntegerType(), False)
])

In [97]:
data = [
    (0, 0, 0, 0, 0, 0.8),
    (0, 0, 0, 0, 1, 0.2),
    (1, 0, 0, 0, 0, 0.1),
]

In [98]:
rdd = spark.sparkContext.parallelize(data)

In [99]:
spark_df = spark.createDataFrame(rdd)

In [100]:
spark_df.show()

+---+---+---+---+---+---+
| _1| _2| _3| _4| _5| _6|
+---+---+---+---+---+---+
|  0|  0|  0|  0|  0|0.8|
|  0|  0|  0|  0|  1|0.2|
|  1|  0|  0|  0|  0|0.1|
+---+---+---+---+---+---+



In [103]:
spark_df.write.csv('t.csv')

AnalysisException: 'path file:/C:/Users/under/GitHub/Big-Data-Analytics-Final/t.csv already exists.;'

# Reduce lookup table size

In [133]:
data = sc.textFile('data\\nyc_cscl.csv')
header = data.first()
# start testing
lookup = sc.textFile(NYC_CSCL_PATH) \
           .filter(lambda x: x != header) \
           .mapPartitions(lambda x: csv.reader(x)) \
           .filter(lambda x: len(x) >= 30) \
           .map(lambda x: (x[0], (x[2], x[3], x[4], x[5], x[10], x[13], x[28]))) \
           .reduceByKey(lambda x, y: x) \
           .map(lambda x: (x[0], x[1][0], x[1][1], x[1][2], x[1][3], x[1][4], x[1][5], x[1][6])) \
           .collect()
LOOKUP_BCAST = sc.broadcast(lookup)

In [134]:
print(len(lookup), len(LOOKUP_BCAST.value))
print(len(lookup[0]), len(LOOKUP_BCAST.value[0]))

119801 119801
8 8


In [138]:
lookup[10:20]

[('155363', '', '', '', '', 'BQE', '4', 'B Q E'),
 ('56796', '1', '35', '2', '34', 'THORNYCROFT AV', '5', 'THORNY CROFT AVE'),
 ('82534',
  '115-001',
  '115-099',
  '115-000',
  '115-098',
  '148 ST',
  '4',
  '148 ST'),
 ('7962', '11-001', '11-009', '0', '0', '50 AV', '4', '50 AVE'),
 ('50539', '1301', '1359', '1300', '1378', 'TAYLOR AV', '2', 'TAYLOR AVE'),
 ('72929', '97', '159', '98', '160', 'WOODBINE ST', '3', 'WOODBINE ST'),
 ('97577', '14-001', '14-071', '14-000', '14-070', '156 ST', '4', '156 ST'),
 ('7136', '', '', '', '', 'GRAND CENTRAL PKWY', '4', 'GCP'),
 ('176643',
  '',
  '',
  '',
  '',
  'FRANKLIN D ROOSEVELT DRIVE',
  '1',
  'F ROOSEVELT DR'),
 ('92124', '87', '111', '84', '110', 'HECKER ST', '5', 'HECKER ST')]

In [104]:
df = pd.read_csv('data\\nyc_cscl.csv')

In [105]:
df.shape

(547313, 32)

In [107]:
len(df['PHYSICALID'].unique())

119801

In [135]:
file = 'test\\violation_small.csv'
# to skip header
data = sc.textFile(file)
header = data.first()
# start computation
res = sc.textFile(file) \
        .filter(lambda x: x != header) \
        .mapPartitions(lambda x: csv.reader(x)) \
        .filter(lambda x: len(x) >= 25) \
        .map(lambda x: (int(dt.datetime.strptime(x[4], '%m/%d/%Y').year), x[21], x[23], x[24])) \
        .filter(lambda x: (2015 <= x[0] and x[0] <= 2019)) \
        .map(lambda x: (x[0], countyname2borocode(x[1]), x[2], x[3])) \
        .filter(lambda x: x[1] > 0) \
        .map(lambda x: (x[0], street_segmentid_lookup(x[2], x[3], x[1], LOOKUP_BCAST.value))) \
        .filter(lambda x: int(x[1]) > 0) \
        .map(lambda x: ((x[1], x[0]), 1)) \
        .reduceByKey(lambda x, y: x + y) \
        .sortByKey(True, 1) \
        .map(lambda x: (x[0][0], [(x[0][1], x[1])])) \
        .reduceByKey(lambda x, y: x + y) \
        .mapValues(lambda x: fill_zer0(x) + [('OLS_COEF', ols(x))]) \
        .collect()

In [136]:
count = 0
for segment in res:
    for year in segment[1]:
        if year[0] != 'OLS_COEF':
            count += year[1]

In [137]:
count

21