# Testing spark methods

In [1]:
import pyspark

In [2]:
sc

<pyspark.context.SparkContext at 0x1005755d0>

In [3]:
nums = sc.parallelize([1,2,3])

In [4]:
nums.map(lambda x: 2*x).take(2)

[2, 4]

In [5]:
nums2 = sc.parallelize([4,5,6])

In [6]:
#using union to combine RDDS!
nums.union(nums2).collect()

[1, 2, 3, 4, 5, 6]

In [7]:
pets = sc.parallelize(\
[(u"cat", 1), (u"dog", 1), (u"cat", 2)])
pets_sorted = pets.sortByKey()

In [8]:
pets2 = sc.parallelize([(u"simon",1)])

In [9]:
#inner join test
rdd1 =  sc.parallelize([("foo", 1), ("bar", 2), ("baz", 3)])
rdd2 =  sc.parallelize([("foo", None), ("bar", 6)])

In [10]:
rdd1.join(rdd2).collect()

[('foo', (1, None)), ('bar', (2, 6))]

# Task1

In [1]:
#import dataset
from csv import reader
parking_csv = sc.textFile("file:/Users/zhuorulin/Documents/DataScience/datasets/parking-violations.csv")
open_csv = sc.textFile('file:/Users/zhuorulin/Documents/DataScience/datasets/open-violations.csv')

In [2]:
parking_lines = parking_csv.mapPartitions(lambda x: reader(x))
open_lines = open_csv.mapPartitions(lambda x: reader(x))

In [3]:
#map each entry in parking with value 1
in_parking = parking_lines.map(lambda x: (x[0],1))
in_open = open_lines.map(lambda x: (x[0],-1))

In [72]:
combined = in_parking.union(in_open)

In [74]:
reduced = combined.reduceByKey(lambda x,y:x+y).filter(lambda x:x[1]==1)

In [75]:
def extract_parking_info(parking_line):
    return (parking_line[0],parking_line[14]+' '+parking_line[6]+' '+parking_line[2]+parking_line[1])

In [76]:
#Trying to take
parking_info = parking_lines.map(extract_parking_info)

In [173]:
results = parking_info.join(reduced).map(lambda x: (x[0],x[1][0])).sortByKey()\
.map(lambda x:'%s\t%s'%(x[0],x[1])).saveAsTextFile('task1.out')

In [85]:
results.take(10)

['1017773099\tGTR366A 23 242016-03-15',
 '1028883584\tGJD3754 14 172016-03-10',
 '1028883651\tJMM3606 103 202016-03-15',
 '1053108965\tFMD6397 108 202016-03-28',
 '1076818079\tDSB7845 108 142016-03-20',
 '1131602810\tGWV8673 103 192016-03-25',
 '1131605718\tFHV4978 109 202016-03-09',
 '1131606243\tGPW5019 104 202016-03-08',
 '1131606267\tGWX9386 103 242016-03-29',
 '1131607284\tT655859C 103 142016-03-25']

In [204]:
%%writefile ./py_files/task1.py
from __future__ import print_function
import sys
from operator import add
from pyspark import SparkContext
from csv import reader

if __name__ == "__main__":
    #suppose: argv[1]:parking.csv argv[2]open.csv
    sc = SparkContext()
    parking_csv = sc.textFile(sys.argv[1],1)
    open_csv = sc.textFile(sys.argv[2],1)
    parking_lines = parking_csv.mapPartitions(lambda x: reader(x))
    open_lines = open_csv.mapPartitions(lambda x: reader(x))
    in_parking = parking_lines.map(lambda x: (x[0],1))
    in_open = open_lines.map(lambda x: (x[0],-1))
    combined = in_parking.union(in_open)
    reduced = combined.reduceByKey(lambda x,y:x+y).filter(lambda x:x[1]==1)
    def extract_parking_info(parking_line):
        return (parking_line[0],parking_line[14]+', '+parking_line[6]+', '+parking_line[2]+', '+parking_line[1])
    parking_info = parking_lines.map(extract_parking_info)
    parking_info.join(reduced).map(lambda x: (x[0],x[1][0])).sortByKey().map(lambda x:'%s\t%s'%(x[0],x[1])).saveAsTextFile("task1.out")


Overwriting ./py_files/task1.py


# Task2

In [83]:
violations_count = parking_lines.map(lambda x: (int(x[2]),1)).reduceByKey(lambda x,y:x+y)\
.sortBy(keyfunc = lambda x:x[0]).map(lambda x: '%s\t%s'%(x[0],x[1]))
violations_count.take(10)

['1\t159',
 '2\t5',
 '3\t92',
 '4\t86',
 '5\t10941',
 '6\t23',
 '7\t37584',
 '8\t249',
 '9\t733',
 '10\t3930']

In [203]:
%%writefile ./py_files/task2.py
from __future__ import print_function
import sys
from operator import add
from pyspark import SparkContext
from csv import reader

if __name__ == "__main__":
    #suppose: argv[1]:parking.csv argv[2]open.csv
    sc = SparkContext()
    parking_csv = sc.textFile(sys.argv[1],1)
    #open_csv = sc.textFile(sys.argv[2],1)
    parking_lines = parking_csv.mapPartitions(lambda x: reader(x))
    #open_lines = open_csv.mapPartitions(lambda x: reader(x))
    violations_count = parking_lines.map(lambda x: (int(x[2]),1)).reduceByKey(lambda x,y:x+y)\
.sortBy(keyfunc = lambda x:x[0]).map(lambda x: '%s\t%s'%(x[0],x[1])).saveAsTextFile('task2.out')

Overwriting ./py_files/task2.py


# Task3

In [61]:
open_due = open_lines.map(lambda x : (x[2],(float(x[12]),1))).sortByKey()\
.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).sortByKey()\
.map(lambda x: '%s\t%.2f, %.2f'%(x[0],x[1][0],x[1][0]/x[1][1]))

In [62]:
open_due.take(10)

['999\t246785.51, 85.51',
 'AGC\t255.00, 51.00',
 'AGR\t699.07, 58.26',
 'AMB\t0.00, 0.00',
 'APP\t96574.46, 63.12',
 'ARG\t0.00, 0.00',
 'AYG\t75.00, 18.75',
 'BOB\t0.00, 0.00',
 'BOT\t115.00, 115.00',
 'CBS\t5560.00, 86.88']

In [200]:
%%writefile ./py_files/task3.py
from __future__ import print_function
import sys
from operator import add
from pyspark import SparkContext
from csv import reader

if __name__ == "__main__":
    #suppose: argv[1]:parking.csv argv[2]open.csv
    sc = SparkContext()
    #parking_csv = sc.textFile(sys.argv[1],1)
    open_csv = sc.textFile(sys.argv[1],1)
    #parking_lines = parking_csv.mapPartitions(lambda x: reader(x))
    open_lines = open_csv.mapPartitions(lambda x: reader(x))
    open_lines.map(lambda x : (x[2],(float(x[12]),1))).sortByKey()\
.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).sortByKey()\
.map(lambda x: '%s\t%.2f, %.2f'%(x[0],x[1][0],x[1][0]/x[1][1])).saveAsTextFile('task3.out')

Overwriting ./py_files/task3.py


# Task4

In [103]:
def task4map(line):
    #If new york return ('NY',1)
    #else return ('Other',1)
    if line[16] =='NY':
        return ('NY',1)
    else:
        return ('Other',1)
parking_inNY_count = parking_lines.map(task4map).reduceByKey(lambda x,y:x+y).map(lambda x:'%s\t%s'%(x[0],x[1]))

In [104]:
parking_inNY_count.collect()

['NY\t794106', 'Other\t219911']

In [205]:
%%writefile ./py_files/task4.py
from __future__ import print_function
import sys
from operator import add
from pyspark import SparkContext
from csv import reader

if __name__ == "__main__":
    #suppose: argv[1]:parking.csv argv[2]open.csv
    sc = SparkContext()
    parking_csv = sc.textFile(sys.argv[1],1)
    #open_csv = sc.textFile(sys.argv[2],1)
    parking_lines = parking_csv.mapPartitions(lambda x: reader(x))
    #open_lines = open_csv.mapPartitions(lambda x: reader(x))
    def task4map(line):
    #If new york return ('NY',1)
    #else return ('Other',1)
        if line[16] =='NY':
            return ('NY',1)
        else:
            return ('Other',1)
    parking_lines.map(task4map).reduceByKey(lambda x,y:x+y).map(lambda x:'%s\t%s'%(x[0],x[1]))\
    .saveAsTextFile('task4.out')

Overwriting ./py_files/task4.py


# Task 5

In [116]:
parking_vehicle_count = parking_lines.map(lambda x:('%s, %s'%(x[14],x[16]),1)).reduceByKey(lambda x,y:x+y).sortBy(lambda x:x[1],False)

In [167]:
evil = sc.parallelize(parking_vehicle_count.take(1)).map(lambda x:'%s\t%s'%(x[0],x[1]))

In [168]:
evil.collect()

['BLANKPLATE, 99/t1203']

In [201]:
%%writefile ./py_files/task5.py
from __future__ import print_function
import sys
from operator import add
from pyspark import SparkContext
from csv import reader

if __name__ == "__main__":
    #suppose: argv[1]:parking.csv argv[2]open.csv
    sc = SparkContext()
    parking_csv = sc.textFile(sys.argv[1],1)
    #open_csv = sc.textFile(sys.argv[2],1)
    parking_lines = parking_csv.mapPartitions(lambda x: reader(x))
    #open_lines = open_csv.mapPartitions(lambda x: reader(x))
    parking_vehicle_count = parking_lines.map(lambda x:('%s, %s'%(x[14],x[16]),1))\
    .reduceByKey(lambda x,y:x+y).sortBy(lambda x:x[1],False)
    sc.parallelize(parking_vehicle_count.take(1)).map(lambda x:'%s\t%s'%(x[0],x[1]))\
    .saveAsTextFile('task5.out')

Overwriting ./py_files/task5.py


# Task 6

In [166]:
parking_vehicle_count = parking_lines.map(lambda x:('%s, %s'%(x[14],x[16]),1)).reduceByKey(lambda x,y:x+y).sortBy(lambda x:x[1],False)
evil = sc.parallelize(parking_vehicle_count.take(20)).map(lambda x:'%s\t%s'%(x[0],x[1]))
evil.collect()

['BLANKPLATE, 99/t1203',
 'N/A, NY/t155',
 'AP501F, NJ/t138',
 '56207MG, NY/t125',
 '20302TC, NY/t116',
 '96087MA, NY/t116',
 'AR290A, NJ/t114',
 '12359MG, NY/t110',
 '17741MD, NY/t107',
 '12817KA, NY/t107',
 '96091MA, NY/t103',
 '96089MA, NY/t102',
 'AP300F, NJ/t101',
 '62546JM, NY/t100',
 '81091MB, NY/t100',
 '14483JY, NY/t99',
 'AL353U, NJ/t99',
 '16206TC, NY/t97',
 '55109MB, NY/t97',
 '30954JX, NY/t96']

In [202]:
%%writefile ./py_files/task6.py
from __future__ import print_function
import sys
from operator import add
from pyspark import SparkContext
from csv import reader

if __name__ == "__main__":
    #suppose: argv[1]:parking.csv argv[2]open.csv
    sc = SparkContext()
    parking_csv = sc.textFile(sys.argv[1],1)
    #open_csv = sc.textFile(sys.argv[2],1)
    parking_lines = parking_csv.mapPartitions(lambda x: reader(x))
    #open_lines = open_csv.mapPartitions(lambda x: reader(x))
    parking_vehicle_count = parking_lines.map(lambda x:('%s, %s'%(x[14],x[16]),1))\
    .reduceByKey(lambda x,y:x+y).sortBy(lambda x:x[1],False)
    sc.parallelize(parking_vehicle_count.take(20)).map(lambda x:'%s\t%s'%(x[0],x[1]))\
    .saveAsTextFile('task6.out')

Overwriting ./py_files/task6.py


# Task 7

In [164]:
def task7map(line):
    issue_date = line[1]
    issue_day = int(issue_date.split('-')[2])
    violation_code = line[2]
    if issue_day in [5,6,12,13,19,20,26,27]:
        return ((int(violation_code),(1,0)))
    else:
        return ((int(violation_code),(0,1)))
code_isWeekend_count = parking_lines.map(task7map).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).sortByKey()\
.map(lambda x: '%s\t%.2f,%.2f'%(x[0],x[1][0]/8.,x[1][1]/23.))

In [165]:
code_isWeekend_count.take(10)

['1\t3.25,5.78',
 '2\t0.12,0.17',
 '3\t1.12,3.61',
 '4\t6.25,1.57',
 '5\t0.00,475.70',
 '6\t0.88,0.70',
 '7\t1359.75,1161.13',
 '8\t4.38,9.30',
 '9\t2.00,31.17',
 '10\t43.62,155.70']

In [197]:
%%writefile ./py_files/task7.py
from __future__ import print_function
import sys
from operator import add
from pyspark import SparkContext
from csv import reader

def task7map(line):
    issue_date = line[1]
    issue_day = int(issue_date.split('-')[2])
    violation_code = line[2]
    if issue_day in [5,6,12,13,19,20,26,27]:
        return ((int(violation_code),(1,0)))
    else:
        return ((int(violation_code),(0,1)))

if __name__ == "__main__":
    #suppose: argv[1]:parking.csv argv[2]open.csv
    sc = SparkContext()
    parking_csv = sc.textFile(sys.argv[1],1)
    #open_csv = sc.textFile(sys.argv[2],1)
    parking_lines = parking_csv.mapPartitions(lambda x: reader(x))
    #open_lines = open_csv.mapPartitions(lambda x: reader(x))
    parking_lines.map(task7map).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).sortByKey()\
.map(lambda x: '%s\t%.2f,%.2f'%(x[0],x[1][0]/8.,x[1][1]/23.)).saveAsTextFile('task7.out')

Writing task7.py
