In [2]:
from pyspark.sql import SparkSession

# (8 cores, 16gb per machine) x 5 = 40 cores

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.87:7077") \
        .appName("Daniel_Agstrand_A3_Part1")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",4)\
        .getOrCreate()
        
# Old API (RDD)
spark_context = spark_session.sparkContext



In [2]:
# Q A.1.1

rdd_en = spark_context.newAPIHadoopFile(
    'hdfs://192.168.2.87:9000/europarl/europarl-v7.sv-en.en',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text'
)\
.cache() # Keep this RDD in memory!

line_count_en = rdd_en.count()

In [3]:
# Q A.1.2

rdd_sv = spark_context.newAPIHadoopFile(
    'hdfs://192.168.2.87:9000/europarl/europarl-v7.sv-en.sv',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text'
)\
.cache() # Keep this RDD in memory!

line_count_sv = rdd_sv.count()

In [4]:
# Q A.1.3

if line_count_sv == line_count_en:
    print ("Equal amount of lines! Number of lines: {}".format(line_count_en))
else:
    print ("Unequal amount of lines! Number of lines in en-sv = {} and Number of lines in sv-en = {}"\
           .format(line_count_en, line_count_sv))

Equal amount of lines! Number of lines: 1862234


In [5]:
# Q A.1.4

rdd_en_partitions_count = rdd_en.getNumPartitions()
rdd_sv_partitions_count = rdd_sv.getNumPartitions()

total_partitions_count = rdd_en_partitions_count + rdd_sv_partitions_count

print("Number of partions in rdd_en: {}\nNumber of partions in rdd_sv: {}\nNumber of total partions in all rdd: {}"\
      .format(rdd_en_partitions_count, rdd_sv_partitions_count, total_partitions_count))

Number of partions in rdd_en: 2
Number of partions in rdd_sv: 3
Number of total partions in all rdd: 5


In [6]:
# Q A.2.1
from operator import add

def lowercase_split(line):
    line = str(line[1]).lower().split(" ")
    return line

rdd_en_mapped = rdd_en.map(lowercase_split) 
rdd_sv_mapped = rdd_sv.map(lowercase_split)

In [7]:
# Q A.2.1

print("10 entries in rdd_en_mapped: " + str(rdd_en_mapped.take(10)) + "\n10 entries in rdd_vs_mapped: " + str(rdd_sv_mapped.take(10)))

10 entries in rdd_en_mapped: [['resumption', 'of', 'the', 'session'], ['i', 'declare', 'resumed', 'the', 'session', 'of', 'the', 'european', 'parliament', 'adjourned', 'on', 'friday', '17', 'december', '1999,', 'and', 'i', 'would', 'like', 'once', 'again', 'to', 'wish', 'you', 'a', 'happy', 'new', 'year', 'in', 'the', 'hope', 'that', 'you', 'enjoyed', 'a', 'pleasant', 'festive', 'period.'], ['although,', 'as', 'you', 'will', 'have', 'seen,', 'the', 'dreaded', "'millennium", "bug'", 'failed', 'to', 'materialise,', 'still', 'the', 'people', 'in', 'a', 'number', 'of', 'countries', 'suffered', 'a', 'series', 'of', 'natural', 'disasters', 'that', 'truly', 'were', 'dreadful.'], ['you', 'have', 'requested', 'a', 'debate', 'on', 'this', 'subject', 'in', 'the', 'course', 'of', 'the', 'next', 'few', 'days,', 'during', 'this', 'part-session.'], ['in', 'the', 'meantime,', 'i', 'should', 'like', 'to', 'observe', 'a', "minute'", 's', 'silence,', 'as', 'a', 'number', 'of', 'members', 'have', 'request

In [8]:
# Q A.2.2

line_count_en_mapped = rdd_en_mapped.count()
line_count_sv_mapped = rdd_sv_mapped.count()

if ((line_count_en_mapped == line_count_en) & (line_count_sv_mapped == line_count_sv)):
    print("Line count still match!")
else:
    print("Line count doesnt match any more!")

Line count still match!


In [9]:
# Q A.3.1
from operator import add

wordCounts_en = rdd_en_mapped.flatMap(lambda x: x).map(lambda word: (word, 1)).reduceByKey(add)
wordCounts_sv = rdd_sv_mapped.flatMap(lambda x: x).map(lambda word: (word, 1)).reduceByKey(add)

In [10]:
# Q A.3.2

print("10 most common words in the english text: {}".format(wordCounts_en.takeOrdered(10, key=lambda x: -x[1])))
print("10 most common words in the swedish text: {}".format(wordCounts_sv.takeOrdered(10, key=lambda x: -x[1])))

10 most common words in the english text: [('the', 3498375), ('of', 1659758), ('to', 1539760), ('and', 1288401), ('in', 1085993), ('that', 797516), ('a', 773522), ('is', 758050), ('for', 534242), ('we', 522849)]
10 most common words in the swedish text: [('att', 1706293), ('och', 1344830), ('i', 1050774), ('det', 924866), ('som', 913276), ('för', 908680), ('av', 738068), ('är', 694381), ('en', 620310), ('vi', 539797)]


In [11]:
# Q A.4.1

rdd_en_1 = rdd_en_mapped.zipWithIndex()
rdd_sv_1 = rdd_sv_mapped.zipWithIndex()

rdd_en_1.take(5)

[(['resumption', 'of', 'the', 'session'], 0),
 (['i',
   'declare',
   'resumed',
   'the',
   'session',
   'of',
   'the',
   'european',
   'parliament',
   'adjourned',
   'on',
   'friday',
   '17',
   'december',
   '1999,',
   'and',
   'i',
   'would',
   'like',
   'once',
   'again',
   'to',
   'wish',
   'you',
   'a',
   'happy',
   'new',
   'year',
   'in',
   'the',
   'hope',
   'that',
   'you',
   'enjoyed',
   'a',
   'pleasant',
   'festive',
   'period.'],
  1),
 (['although,',
   'as',
   'you',
   'will',
   'have',
   'seen,',
   'the',
   'dreaded',
   "'millennium",
   "bug'",
   'failed',
   'to',
   'materialise,',
   'still',
   'the',
   'people',
   'in',
   'a',
   'number',
   'of',
   'countries',
   'suffered',
   'a',
   'series',
   'of',
   'natural',
   'disasters',
   'that',
   'truly',
   'were',
   'dreadful.'],
  2),
 (['you',
   'have',
   'requested',
   'a',
   'debate',
   'on',
   'this',
   'subject',
   'in',
   'the',
   'course',
  

In [12]:
# Q A.4.2

rdd_en_2 = rdd_en_1.map(lambda x: (x[1], x[0]))
rdd_sv_2 = rdd_sv_1.map(lambda x: (x[1], x[0]))

rdd_en_2.take(5)

[(0, ['resumption', 'of', 'the', 'session']),
 (1,
  ['i',
   'declare',
   'resumed',
   'the',
   'session',
   'of',
   'the',
   'european',
   'parliament',
   'adjourned',
   'on',
   'friday',
   '17',
   'december',
   '1999,',
   'and',
   'i',
   'would',
   'like',
   'once',
   'again',
   'to',
   'wish',
   'you',
   'a',
   'happy',
   'new',
   'year',
   'in',
   'the',
   'hope',
   'that',
   'you',
   'enjoyed',
   'a',
   'pleasant',
   'festive',
   'period.']),
 (2,
  ['although,',
   'as',
   'you',
   'will',
   'have',
   'seen,',
   'the',
   'dreaded',
   "'millennium",
   "bug'",
   'failed',
   'to',
   'materialise,',
   'still',
   'the',
   'people',
   'in',
   'a',
   'number',
   'of',
   'countries',
   'suffered',
   'a',
   'series',
   'of',
   'natural',
   'disasters',
   'that',
   'truly',
   'were',
   'dreadful.']),
 (3,
  ['you',
   'have',
   'requested',
   'a',
   'debate',
   'on',
   'this',
   'subject',
   'in',
   'the',
   'course

In [13]:
# Q A.4.3

rdd_en_sv_1 = rdd_en_2.join(rdd_sv_2).map(lambda x: (x[1]))

rdd_en_sv_1.take(5)

[(['the',
   'union',
   'cannot',
   'achieve',
   'its',
   'environmental',
   'goals',
   'unless',
   'the',
   'use',
   'of',
   'renewables',
   'is',
   'actively',
   'stepped',
   'up.'],
  ['unionen',
   'kan',
   'inte',
   'uppnå',
   'sina',
   'miljömål',
   'om',
   'man',
   'inte',
   'aktivt',
   'ökar',
   'användningen',
   'av',
   'förnybara',
   'energikällor.']),
 (['in',
   'relation',
   'to',
   'china,',
   'we',
   'should',
   'not',
   'be',
   'dissuaded',
   'from',
   'raising',
   'the',
   'issue',
   'simply',
   'because',
   'discussions',
   'on',
   'it',
   'may',
   'be',
   'blocked.'],
  ['vad',
   'kina',
   'beträffar',
   'bör',
   'vi',
   'inte',
   'låta',
   'bli',
   'att',
   'ta',
   'upp',
   'frågan',
   'bara',
   'för',
   'att',
   'diskussionen',
   'om',
   'den',
   'kan',
   'bli',
   'blockerad.']),
 (['and',
   'once',
   'again',
   'we',
   'draw',
   'the',
   'same',
   'lesson:',
   'the',
   'citizens',
   'must'

In [14]:
# Q A.4.4

rdd_en_sv_2 = rdd_en_sv_1.filter(lambda x: len(x[0])>0)

rdd_en_sv_2.take(5)

[(['what',
   'is',
   'the',
   'commission',
   'actually',
   'doing',
   'to',
   'make',
   'sure',
   'that',
   'they',
   'are',
   'in',
   'fact',
   'completed?'],
  ['vad',
   'gör',
   'egentligen',
   'kommissionen',
   'för',
   'att',
   'det',
   'också',
   'skall',
   'kunna',
   'bli',
   'så?']),
 (['mr',
   'president,',
   'i',
   'am',
   'pleased',
   'to',
   'have',
   'this',
   'opportunity',
   'to',
   'address',
   'parliament',
   'on',
   'this',
   'very',
   'important',
   'issue.'],
  ['herr',
   'talman!',
   'det',
   'gläder',
   'mig',
   'att',
   'få',
   'detta',
   'tillfälle',
   'att',
   'uttala',
   'mig',
   'i',
   'parlamentet',
   'om',
   'denna',
   'mycket',
   'viktiga',
   'fråga.']),
 (['in',
   'addition,',
   'apart',
   'from',
   'humanitarian',
   'aid',
   'proper,',
   'we',
   'need',
   'to',
   'find',
   'ways',
   'of',
   'improving',
   'conflict',
   'prevention',
   'because',
   '25%',
   'of',
   'the',
   'a

In [15]:
# Q A.4.5

rdd_en_sv_3 = rdd_en_sv_2.filter(lambda x: len(x[0])<10)

rdd_en_sv_3.take(5)

[(['we', 'would', 'be', 'better', 'off', 'without', 'it.'],
  ['vi', 'skulle', 'klara', 'oss', 'bättre', 'utan', 'den.']),
 (['thank', 'you,', 'mrs', 'hautala.'], ['tack,', 'fru', 'hautala.']),
 (['\xa0\xa0', '.'], ['\xa0\xa0', '.', '–', 'herr', 'talman!']),
 (['will', 'the', 'regulations', 'be', 'american?'],
  ['kommer', 'lagstiftningen', 'att', 'vara', 'amerikansk?']),
 (['president.'], ['talmannen.'])]

In [16]:
# Q A.4.6

rdd_en_sv_4 = rdd_en_sv_3.filter(lambda x: len(x[0])==len(x[1]))

rdd_en_sv_4.take(5)

[(['on',
   'other',
   'continents,',
   'decoupling',
   'as',
   'an',
   'instrument',
   'has',
   'failed.'],
  ['på',
   'andra',
   'kontinenter',
   'har',
   'frikopplingen',
   'inte',
   'fungerat',
   'som',
   'instrument.']),
 (['that', 'is', 'the', 'first', 'consideration.'],
  ['det', 'är', 'den', 'första', 'synpunkten.']),
 (['.'], ['.']),
 (['we',
   'therefore',
   'support',
   'the',
   'amendments',
   'tabled',
   'by',
   'mr',
   'sterckx.'],
  ['vi',
   'stöder',
   'därför',
   'de',
   'ändringsförslag',
   'som',
   'sterckx',
   'lagt',
   'fram.']),
 (['\xa0\xa0',
   'yes',
   'to',
   'liberalisation,',
   'but',
   'on',
   'the',
   'right',
   'terms.'],
  ['\xa0\xa0',
   '.–',
   'ja',
   'till',
   'avreglering,',
   'men',
   'på',
   'rätt',
   'villkor.'])]

In [17]:
# Q A.4.7

rdd_en_sv_5  = rdd_en_sv_4.map(lambda x: list(zip(x[0], x[1])))

rdd_en_sv_5.take(5)

[[('next', 'efter'),
  ('will', 'irak'),
  ('be', 'kommer'),
  ('syria,', 'syrien,'),
  ('iran,', 'iran,'),
  ('korea,', 'korea,'),
  ('who', 'vem'),
  ('knows?', 'vet?')],
 [('we', 'vi'),
  ('must', 'måste'),
  ('continue', 'fortsätta'),
  ('to', 'att'),
  ('give', 'ge'),
  ('them', 'dem'),
  ('our', 'vårt'),
  ('support.', 'stöd.')],
 [('\xa0\xa0', '\xa0\xa0'),
  ('.', '.'),
  ('i', '–'),
  ('am', 'jag'),
  ('not', 'känner'),
  ('aware', 'inte'),
  ('of', 'till'),
  ('it.', 'det.')],
 [('2002/2007(cos));', '2002/2007(cos)),')],
 [('that', 'det'),
  ('is', 'är'),
  ('very', 'mycket'),
  ('important', 'viktigt'),
  ('to', 'att'),
  ('emphasise.', 'betona.')]]

In [18]:
# Q A.4.7

rdd_en_sv_6 = rdd_en_sv_5.flatMap(lambda x: x).map(lambda word: (word, 1)).reduceByKey(add)
rdd_en_sv_6.takeOrdered(10, key=lambda x: -x[1])

[(('is', 'är'), 10040),
 (('we', 'vi'), 5530),
 (('i', 'jag'), 5020),
 (('this', 'detta'), 3252),
 (('closed.', 'avslutad.'), 2964),
 (('and', 'och'), 2917),
 (('a', 'en'), 2888),
 (('it', 'det'), 2866),
 (('that', 'det'), 2806),
 (('not', 'inte'), 2650)]

In [3]:
# Q B.1

data_frame = spark_session.read\
    .option("header", "true")\
    .csv('hdfs://192.168.2.87:9000/parking-citations.csv')\
    .cache()
    
data_frame.show()

+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|Ticket number|          Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|Agency Description|Color Description|Body Style Description|
+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|   1103341116|2015-12-21T00:00:...|      1251|    null|       null|            CA|           200304|null|HOND|        PA|  

In [31]:
#Q B.2

data_frame.rdd.getNumPartitions()

11

In [32]:
# Q B.3

data_frame.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



In [33]:
# Q B.4

data_frame.count()

9881842

In [4]:
# Q B.5

drop_columns = ['Agency Description', 'Agency', 'Route']

data_frame = data_frame.drop(*drop_columns)

data_frame.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



In [48]:
# Q B.6
#from pyspark.sql.functions import mean, col
import statistics as s 

def float_fine(fine):
    float_fine = map(float, filter(None, fine))
    return float_fine

#fine = data_frame.select(mean(col('Fine amount'))).rdd.flatMap(lambda x: x).collect()[0]

fine = s.mean(data_frame.select('Fine amount').rdd.map(float_fine).flatMap(lambda x: x).collect())

print(fine)

70.1855354220642


In [89]:
# Q B.7
total = data_frame.select('Make').count()

make = data_frame.select('Make').rdd.\
flatMap(lambda x: x).\
map(lambda word: (word, 1)).\
reduceByKey(lambda a, b: a + b).\
takeOrdered(10, key=lambda x: -x[1])

most_popular = []
for i in range(len(most_populare)):
    freq = most_populare[i][1]/total
    temp = (most_populare[i][0], most_populare[i][1], freq)
    most_popular.append(temp)
print(most_popular)

[('TOYT', 1633266, 0.1652795096298848), ('HOND', 1113834, 0.11271522050241241), ('FORD', 860828, 0.08711209914103059), ('NISS', 709250, 0.0717730560759826), ('CHEV', 674422, 0.0682486119490678), ('BMW', 450909, 0.04563005561108951), ('MERZ', 402126, 0.040693425375552456), ('VOLK', 335618, 0.03396310121129239), ('HYUN', 304934, 0.03085801209936366), ('DODG', 290979, 0.029445825990741404)]


In [117]:
color = data_frame.select('Color').rdd

In [126]:
# Q B.7
from pyspark.sql.functions import udf
from pyspark.sql.types import *

def long_color(color):
    COLORS = {
            'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black',
            'BL':'Blue', 'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze',
            'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold',
            'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory',
            'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon',
            'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver',
            'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White',
            'WH':'White', 'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'
            }
    for key in COLORS.keys():
        if (key == color):
            return COLORS[key]
    return color

data = data_frame.select('Color').rdd.flatMap(lambda x: x).map(long_color)
data_frame.withColumn("Color Long", udf(data, StringType()))
color = data_frame.select('Color Long').rdd.flatMap(lambda x: x).collect()
print(color[1:10])

TypeError: Invalid function: not a function or callable (__call__ is not defined): <class 'list'>

In [121]:
#spark_session.stop()