# Introduction to Spark
Master M2 – Université Grenoble Alpes & Grenoble INP - Alvaro Gonzalez Jimenez

In [None]:
import sys
from pyspark import SparkContext
import time

# Finds out the index of "name" in the array firstLine
# returns -1 if it cannot find it
def findCol(firstLine, name):
    if name in firstLine:
        return firstLine.index(name)
    else:
        return -1


#### Driver program

# start spark with 1 worker thread
sc = SparkContext("local[1]")
sc.setLogLevel("ERROR")


# read the input file into an RDD[String]
wholeFile = sc.textFile("./data/CLIWOC15.csv")

# The first line of the file defines the name of each column in the cvs file
# We store it as an array in the driver program
firstLine = (
    wholeFile.filter(lambda x: "RecID" in x).collect()[0].replace('"', "").split(",")
)

# filter out the first line from the initial RDD
entries = wholeFile.filter(lambda x: not ("RecID" in x))

# split each line into an array of items
entries = entries.map(lambda x: x.split(","))

# keep the RDD in memory
entries.cache()

##### Create an RDD that contains all nationalities observed in the
##### different entries

# Information about the nationality is provided in the column named
# "Nationality"

# First find the index of the column corresponding to the "Nationality"
column_index = findCol(firstLine, "Nationality")
print("{} corresponds to column {}".format("Nationality", column_index))

# Use 'map' to create a RDD with all nationalities and 'distinct' to remove duplicates
nationalities = entries.map(lambda x: x[column_index])
nationalities = nationalities.map(lambda x: x.replace(" ", "")).distinct()
# Display the 5 first nationalities
print("A few examples of nationalities:")
for elem in nationalities.sortBy(lambda x: x).take(5):
    print(elem)

## Count the total number of observations included in the dataset (each line corresponds to one observation)

In [None]:
from timeit import default_timer as timer

start = timer()
count = entries.count()
end = timer()
print("Total number of observations is", count)

print("Time: ", end - start, "seg")

## Count the number of years over which observations have been made (Column "Year" should be used)

In [None]:
start = timer()
column_index_year = findCol(firstLine, "Year")
year_observations = entries.map(lambda x: x[column_index_year])
year_observations = year_observations.filter(lambda x: x != "NA").distinct()
count = year_observations.count()
end = timer()
print("Total number: ", count)
print("Time: ", end - start, "seg")

 ## Display the oldest and the newest year of observation

In [None]:
start = timer()
print("The newest year is:", year_observations.max())
print("The oldest year is:", year_observations.min())
end = timer()
print("Time: ", end - start, "seg")

## Display the years with the minimum and the maximum number of observations (and the corresponding number of observations)

In [None]:
import operator
from operator import add

start = timer()
year_observations = entries.map(
    lambda x: x[column_index_year]
)  # I need all the data not just distinct
year_observations = year_observations.map(lambda x: (x, 1))
year_observations_tuples = year_observations.foldByKey(0, add).collect()
dictionary = dict(year_observations_tuples)

year_max_observations = max(dictionary.items(), key=operator.itemgetter(1))
print("The year with the maximum number of observations is", year_max_observations)

year_min_observations = min(dictionary.items(), key=operator.itemgetter(1))
print("The year with the minimum number of observations is", year_min_observations)
end = timer()

print("Time: ", end - start, "seg")

## Count the distinct departure places (column "VoyageFrom") using two methods (i.e., using the function distinct() or reduceByKey()) and compare the execution time.

### With the function distinct()

In [None]:
start = timer()
column_index_voyageFrom = findCol(firstLine, "VoyageFrom")
voyageFrom_observations = entries.map(lambda x: x[column_index_voyageFrom])
voyageFrom_observations = voyageFrom_observations.filter(lambda x: x != "NA").distinct()
count = voyageFrom_observations.count()
end = timer()
print("The number of distinct departure places is", count)
print("Time: ", end - start, "seg")

### With the function reduceByKey()

In [None]:
start = timer()
column_index_voyageFrom = findCol(firstLine, "VoyageFrom")
voyageFrom_observations = entries.map(lambda x: x[column_index_voyageFrom])
voyageFrom_observations = voyageFrom_observations.filter(lambda x: x != "NA")
voyageFrom_observations = voyageFrom_observations.map(lambda x: (x, 1))
voyageFrom_observations = voyageFrom_observations.reduceByKey(
    add
)  # Don't care the add operation.
count = voyageFrom_observations.count()
end = timer()

print("The number of distinct departure places is", count)
print("Time: ", end - start, "seg")

## Display the 10 most popular departure places

In [None]:
start = timer()
column_index_voyageFrom = findCol(firstLine, "VoyageFrom")
voyageFrom_observations = entries.map(lambda x: x[column_index_voyageFrom])
voyageFrom_observations = voyageFrom_observations.filter(lambda x: x != "NA")
voyageFrom_observations = voyageFrom_observations.map(lambda x: (x, 1))
voyageFrom_observations = voyageFrom_observations.reduceByKey(add)

# Display the 10 most famous departures
print("The Top-10 departures are:")
for elem in voyageFrom_observations.sortBy(lambda x: -x[1]).take(10):
    print(elem)

end = timer()
print("Time: ", end - start, "seg")

## Display the 10 roads (defined by a pair "VoyageFrom" and "VoyageTo") the most often taken.

In [None]:
start = timer()
column_index_voyageFrom = findCol(firstLine, "VoyageFrom")
column_index_voyageTo = findCol(firstLine, "VoyageTo")
voyageFrom_voyageTo = entries.map(
    lambda x: ((x[column_index_voyageFrom], x[column_index_voyageTo]), 1)
)
voyageFrom_voyageTo = voyageFrom_voyageTo.filter(lambda x: "NA" not in x[0])
voyageFrom_voyageTo = voyageFrom_voyageTo.reduceByKey(lambda x, y: x + y)
for elem in voyageFrom_voyageTo.sortBy(lambda x: -x[1]).take(10):
    print(elem)

end = timer()
print("Time: ", end - start, "seg")