In [1]:
from pyspark.sql import SparkSession


spark_session = SparkSession\
        .builder\
        .master("local[1]") \
        .appName("novellarausell_lecture1_simple_example")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",4)\
        .getOrCreate()
        
spark_context = spark_session.sparkContext


# Part A - Working with the RDD API

## Question A.1

### A.1.1 Read the English transcripts with Spark, and count the number of lines.

In [None]:
# First we need to access the given files, which are in HDFS. The Namenode and the host name of the master containing
# such files is 192.168.1.153
en_lines = spark_context.textFile("hdfs://192.168.1.153:9000/europarl/europarl-v7.sv-en.en")
nr_en = en_lines.count()
print("The number of lines in the English transcript: {}".format(nr_en))

### A.1.2 Do the same with the other language (so that you have a separate lineage of RDDs for each).

In [None]:
sv_lines = spark_context.textFile("hdfs://192.168.1.153:9000/europarl/europarl-v7.sv-en.sv")
nr_sv = sv_lines.count()
print("The number of lines in the Swedish transcript: {}".format(nr_sv))

### A.1.3 Verify that the line counts are the same for the two languages.

In [None]:
# Asserting that both lengths are the same!
assert (nr_en == nr_sv), "Not the same length!"

### A.1.4 Count the number of partitions.

In [None]:
# The number of partitions is the number of blocks used by HDFS to store the file 
print("Number of partitions in the English transcript: {} \n".format(en_lines.getNumPartitions()))
print("Number of partitions in the Swedish transcript: {} \n".format(sv_lines.getNumPartitions()))

## Question A.2

### A.2.1 Pre-process the text from both RDDs by doing the following:
 - Lowercase the text
 - Tokenize the text (split on space)

In [None]:
import string

def cleanstring(x):
    if isinstance(x,str):
            return x.lower().strip().translate(str.maketrans("", "", string.punctuation))

def rddtokenizer(x):
    words = x.split(' ')
    for word in words:
        return tuple((word,1))

In [None]:
sv_tokens = sv_lines.map(lambda x: cleanstring(x)).map(lambda y: cleanstring(y))
en_tokens = en_lines.map(lambda x: cleanstring(x)).map(lambda y: cleanstring(y))

### A.2.2 Inspect 10 entries from each of your RDDs to verify your pre-processing

In [None]:
print("English transcript inspection: \n {}".format(en_tokens.take(10)))
print("Swedish transcript inspection: \n {}".format(sv_tokens.take(10)))

### A.2.3 Verify that the line counts still match after the pre-processing.

In [None]:
# Asserting that both lengths are the same!
en_tokens.count()

assert (en_tokens.count() == sv_tokens.count()), "Not the same length!"

## Question A.3

### A.3.1 Use Spark to compute the 10 most frequently according words in the English language corpus. Repeat for the other language.

In [None]:
from operator import add
en_mostcommon = en_tokens.reduceByKey(add).takeOrdered(10, key = lambda x: -x[1])
sv_mostcommon = sv_tokens.reduceByKey(add).takeOrdered(10, key = lambda x: -x[1])

In [None]:
print("The 10 most common words on the English corpus are: \n" + ",".join([pair[0] for pair in en_mostcommon]))
print("The 10 most common words on the Swedish corpus are: \n" + ",".join([pair[0] for pair in sv_mostcommon]))

### A.3.2 Verify that your results are reasonable

## Question A.4

### A.4.1 Use this parallel corpus to mine some translations in the form of word pairs, for the two languages. Do this by pairing words found on short lines with the same number of words respectively. We (incorrectly) assume the words stay in the same order when translated. 

#### 1. Key the lines by their line number (hint: ZipWithIndex())

In [None]:
en_1 = en_lines.zipWithIndex()
sv_1 = sv_lines.zipWithIndex()

#### 2. Swap the key and value - so that the line number is the key

In [None]:
en_2 = en_1.map(lambda x: (x[1], x[0]), en_1)
sv_2 = sv_1.map(lambda x: (x[1], x[0]), sv_1)

#### 3. Join the two RDDs together according to the line number key, so you have pairs of matching lines

In [None]:
ensv_3 = en_2.join(sv_2)

#### 4. Filter to exclude line pairs that have an empty/missing “corresponding” sentence.
#### 5. Filter to leave only pairs of sentences with a small number of words per sentence, this should give a more reliable translation (you can experiment)
#### 6. Filter to leave only pairs of sentences with the same number of words in each sentence.

In [None]:
ensv_456 = ensv_3.map(lambda x: (x[0],tuple(cleanstring(sentence) for sentence in x[1])))\
.filter(lambda x: x if x[1][0] or x[1][1] else None)\
.filter(lambda x: x if len(x[1][0].split(' ')) < 5 and len(x[1][1].split(' ')) < 5 else None)\
.filter(lambda x: x if len(x[1][0].split(' ')) == len(x[1][1].split(' ')) else None)

#### 7. For each sentence pair, map so that you pair each (in order) word in the two sentences. We no longer need the line numbers. (hint: use python’s built in zip() function)

In [None]:
ensv_7 = ensv_456.flatMap(lambda x: list(zip(x[1][0].split(' '), x[1][1].split(' '))))\

#### 8. Use reduce to count the number of occurrences of the word-translation-pairs.

In [None]:
def translationtokens(x):
    (en_word, sv_word) = x
    return tuple(("en: " + en_word + ", " + "sv: " + sv_word, 1))

In [None]:
ensv_8 = ensv_7.map(translationtokens)\
.reduceByKey(add)\
.takeOrdered(10, key = lambda x: -x[1])

#### 9. Print some of the most frequently occurring pairs of words.

In [None]:
print('\n'.join([str(tuple((tpl[0], 'frequency: {}'.format(tpl[1])))) for tpl in ensv_8]))

# Section B - Working with DataFrames and SQL

### B.1 Load the CSV file from HDFS, and call show() to verify the data is loaded correctly.

In [2]:
data_frame = spark_session.read\
    .option("header", "true")\
    .csv('hdfs://192.168.1.153:9000/parking-citations.csv')\
    .cache()

In [None]:
data_frame.show()

### B.2 Print the schema for the DataFrame.

In [3]:
data_frame.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



### B.3 Count the number of rows in the CSV file.

In [None]:
data_frame.count()

### B.4 Count the number of partitions in the underlying RDD.

In [None]:
data_frame.rdd.getNumPartitions()

### B.5 Drop the columns VIN, Latitude and Longitude.

In [4]:
data_frame_5 = data_frame.drop('VIN', 'Latitude', 'Longitude')

### B.6 Find the maximum fine amount. How many fines have this amount?

In [None]:
from pyspark.sql.types import FloatType
import pyspark.sql.functions as func

udf_tofloat = func.udf(lambda x: float(x) if x else None, FloatType())

In [None]:
data_frame_6 = data_frame_5.withColumn("Fine amount", udf_tofloat("Fine amount"))
maxFine = data_frame_6.agg(func.max('Fine amount')).first()[0]
maxFineCount = data_frame_6.filter(data_frame_6['Fine amount'] == maxFine).count()
print("The maximum fine amount is: {}\n".format(maxFine))
print("There are {} fines that have this amount".format(maxFineCount))

### B.7 Show the top 20 most frequent vehicle makes, and their frequencies

In [5]:
vehicle_makes = data_frame_5.groupby("Make")\
.count()\
.orderBy('count', ascending = False)

In [6]:
vehicle_makes.show()

+----+-------+
|Make|  count|
+----+-------+
|TOYT|1531949|
|HOND|1043276|
|FORD| 807498|
|NISS| 662097|
|CHEV| 631413|
| BMW| 422916|
|MERZ| 376830|
|VOLK| 316002|
|HYUN| 285286|
|DODG| 271590|
|LEXS| 263269|
| KIA| 217795|
|JEEP| 214965|
|AUDI| 179718|
|MAZD| 169811|
|OTHR| 154376|
| GMC| 132788|
|INFI| 120340|
|CHRY| 120317|
|ACUR| 111265|
+----+-------+
only showing top 20 rows



###  B.8 Create a User Defined Function to create a new column, ‘color long’, mapping the original colors to their corresponding values in the dictionary below. If there is no key matching the original color, use the original color

In [9]:
COLORS = {
'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black',
'BL':'Blue', 'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze',
'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold',
'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory',
'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon',
'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver',
'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White',
'WH':'White', 'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'
}

from pyspark.sql.types import StringType
import pyspark.sql.functions as func


udf_expandcolor = func.udf(lambda x: COLORS.get(x, x), StringType())

In [10]:
data_frame_8 = data_frame_5.withColumn('Color long', udf_expandcolor(data_frame_5.Color))

#### B.9 Using this new column, what’s the most frequent colour value for Toyotas (TOYT)?

In [20]:
import pyspark.sql.functions as func

In [29]:
toyotas_colors_df = data_frame_8.filter(data_frame_8["Make"] == 'TOYT')\
.groupby('Color long')\
.count()\
.orderBy(func.desc('count'))\
.take(1)
print("The most frequent color value for Toyotas is: {}".format(toyotas_colors_df[0][0]))

The most frequent color value for Toyotas is: Gray
