# Laboratory 03

Using Spark SQL to analyze historical data about the usage of bike sharing in Barcelona.

## Using RDDs

First, the exercises will be solved using RDDs


In [1]:
# Clear the destination folder to prevent overwriting
!hdfs dfs -rm -r /user/s315054/lab03/output_RDD.csv

23/01/19 11:14:36 INFO fs.TrashPolicyDefault: Moved: 'hdfs://BigDataHA/user/s315054/lab03/output_RDD.csv' to trash at: hdfs://BigDataHA/user/s315054/.Trash/Current/user/s315054/lab03/output_RDD.csv


### 1. Input files

* `register.csv` contains info about the used/free bike parking slots in approx. 3000 stations. It contains the header. It is needed to filter out errors (used slots = 0, free slots = 0); format: `station\ttimestamp\tused_slots\tfree_slots`
* `stations.csv` contains the description of the stations; format: `id\tlongitude\tlatitude\tname`
    
We need to first remove the header (1st line of CSV files) and separate the elements in each line. 
Then, on the first file we need to remove "wrong" lines, i.e., these lines in which both the used slots and the free slots of the station are set to 0 (errors).

In [2]:
from datetime import datetime
import time

path_register = "/data/students/bigdata_internet/lab3/register.csv"
path_stations = "/data/students/bigdata_internet/lab3/stations.csv"
# out_path = '/user/s315054/lab03/output_RDD.csv'

In [3]:
# File 1
RDD_reg_file = sc.textFile(path_register)

In [None]:
# Remove 1st line (header)
header_reg = RDD_reg_file.first()
RDD_reg_noHead = RDD_reg_file.filter(lambda l: l != header_reg)
# Output was removed because it contained logs

In [5]:
# Separate elements at (`\t`)
RDD_reg = RDD_reg_noHead.map(lambda line: line.split('\t'))

#### 1.1 - Count the rows befor and after removing the wrong elements
* Before filtering: 25319028 rows
* After filtering: 25104121 rows

In [6]:
# Remove wrong lines, i.e., the ones containing
RDD_reg_correct = RDD_reg\
    .filter(lambda line: (int(line[2]) != 0) or (int(line[3]) != 0))

lines_before = RDD_reg.count()
print(f"The number of lines before filtering is: {lines_before}")
lines_after = RDD_reg_correct.count()
print(f"The number of lines after filtering is: {lines_after}")

                                                                                

The number of lines before filtering is: 25319028




The number of lines after filtering is: 25104121


                                                                                

#### 1.2 - Stations

In [7]:
# Opening file 2
RDD_stat_file = sc.textFile(path_stations)

# Remove 1st line (header)
header_stat = RDD_stat_file.first()
RDD_stat_noHead = RDD_stat_file.filter(lambda l: l != header_stat)

In [8]:
# Separate elements
RDD_stat = RDD_stat_noHead.map(lambda line: line.split('\t'))
n_stations = RDD_stat.count()
print(f"The number of lines in the stations file is {n_stations}")

The number of lines in the stations file is 3301


### 2. Exercise

From the RDD of the 1st file, create a pair RDD containing as key the tuple `(timestamp, station_ID)` and as value the number of free slots. Notice that, if some entries have the same day and hour, regardless of the month and minute, they will have the same key. This is crucial for the next steps.
Then, having defined the function `critCount()`, which assigns a tuple `(key, [+1, +1])` to critical RDD elements and `(key, [0, +1])` to others, it is possible to apply a `reduceByKey()` transformation to sum the corresponding elements of the lists and obtain, for each key: `[number_of_critical_measurements, number_of_measurements]`. These two values are the ones that need to be divided for obtaining the criticality of the corresponding station and time slot pair.

The next operation consists in dividing the first element of each list by the second one to get the criticality as values in a new pair RDD.


#### 2.1 - evaluate criticality

In [9]:
# Isolate pairs (timestamp, id)
RDD_pair_all = RDD_reg_correct\
.map(lambda l: ((tuple(
    datetime.strptime(l[1], "%Y-%m-%d %H:%M:%S")\
    .strftime("%A %H")\
    .split(' ')), l[0]), int(l[3])))

In [10]:
def critCount(pair):
    if pair[1] == 0:
        return (pair[0], [+1, +1])
    else:
        return (pair[0], [0, +1])

In [11]:
# Count critical for each key ((date, day), ID) and total elements
# Assign +1 to elements whose value is 0 and 0 to the ones whose 
# value is not; after this, sum the values by using reduceByKey
RDD_n_crit = RDD_pair_all.map(lambda pair: critCount(pair))\
.reduceByKey(lambda v1, v2: [v1[0]+v2[0], v1[1]+v2[1]])

In [12]:
print(f"The number of distinct (station, timeslot) pairs is: {RDD_n_crit.count()}")



The number of distinct (station, timeslot) pairs is: 47550


                                                                                

In [13]:
# Evaluate criticality:
RDD_criticality = RDD_n_crit\
.map(lambda pair: (pair[0], float(pair[1][0])/float(pair[1][1])))

#### 2.2 - Find elements above a certain threshold

Let the user decide the threshold value.

In [14]:
# Define the threshold
val_is_good = False
while (not val_is_good):
    threshold = float(input("Enter the threshold: "))
    if (threshold >= 0.0) and (threshold <= 1.0):  # Check for validity
        val_is_good = True        
    else:
        print("Invalid threshold: the value needs to lay between 0 and 1")

Enter the threshold:  0.6


In [15]:
# Isolate elements whose criticality is above the threshold
RDD_thresh = RDD_criticality.filter(lambda pair: pair[1]>=threshold)

#### 2.3 - Order by increasing criticality

If criticality is the same, use ID; if also ID is the same, use the day of the week (Monday < Tuesday < ... < Sunday) and then the hour of the day.

In [16]:
RDD_sorted = RDD_thresh\
.sortBy(lambda pair: \
        (pair[1], int(pair[0][1]), int(time.strptime(pair[0][0][0], "%A")\
                                       .tm_wday), int(pair[0][0][1])))

#### 2.4 - Storing on output file

Format: .csv with elements separated by tabs (\t).

First, create a pair RDD containing as key the station ID from the RDD of stations.
Then, rearrange the sorted RDD so that the keys are the station IDs and the values are lists: `[(day_of_the_week, hour), criticality]`
Having the IDs as keys, it is possible to join them (`join()` transformation) so that each entry of the criticality RDD will be associated with latitude and longitude of the corresponding station


In [17]:
# Working on the RDD of stations
# RDD_stat_ID will contain: (ID, [latitude, longitude])
RDD_stat_ID = RDD_stat.map(lambda row: (row[0], row[1:3]))

In [18]:
# Rearrange the sorted RDD:
# RDD_sorted_ID will contain: (ID, [(Day, Hour), criticality])
RDD_sorted_ID = RDD_sorted.map(lambda pair: (pair[0][1], [pair[0][0], pair[1]]))

In [19]:
# Join the two RDDs
RDD_comb = RDD_sorted_ID.join(RDD_stat_ID)

In [20]:
# Put together RDD to be stored as CSV (actually, TSV):
RDD_out = RDD_comb.map(lambda elem: f"{elem[0]}\t{elem[1][1][0]}\t{elem[1][1][1]}\t{elem[1][0][0][0]}\t{elem[1][0][0][1]}\t{elem[1][0][1]}")
# Need to add the header:
RDD_header = sc.parallelize(["station\tlongitude\tlatitude\tday\thour\tcriticality"])
# Include the header as 1st element of the RDD
RDD_out_header = RDD_header.union(RDD_out)

In [21]:
## The output path needs to be decided by the user:
out_path = input("Enter output file path: ")
## '/user/s315054/lab03/output_RDD.csv'

Enter output file path:  /user/s315054/lab03/output_RDD.csv


In [22]:
RDD_out_header.saveAsTextFile(out_path)

                                                                                

#### 2.5 - Results for threshold = 0.6

The output file for $threshold = 0.6$ contains 6 lines, which are displayed 4 cells above.

---

## Using SparkSQL

The same exercise has also been solved using Spark SQL.


### 1. Import files

In [23]:
!hdfs dfs -rm -r /user/s315054/lab03/output_DF.csv

23/01/19 11:17:12 INFO fs.TrashPolicyDefault: Moved: 'hdfs://BigDataHA/user/s315054/lab03/output_DF.csv' to trash at: hdfs://BigDataHA/user/s315054/.Trash/Current/user/s315054/lab03/output_DF.csv


In [24]:
from datetime import datetime
import time

path_register = "/data/students/bigdata_internet/lab3/register.csv"
path_stations = "/data/students/bigdata_internet/lab3/stations.csv"

In [25]:
reg_DF = spark.read.option("header","true")\
                .option("sep", "\t")\
                .option("multiLine", "true")\
                .option("ignoreTrailingWhiteSpace", "true")\
                .csv(path_register)

reg_DF.cache()
reg_DF.createOrReplaceTempView("register")

                                                                                

In [26]:
# Remove wrong lines
correct_reg_DF = spark.sql("SELECT * \
                            FROM register \
                            WHERE used_slots != 0 OR free_slots != 0 \
                            ")

#### 1.1.1 Count the rows before and after

In [27]:
# Count the rows before and after the filtering operation
n_before = reg_DF.count()
n_after = correct_reg_DF.count()

print(f"Number of rows before: {n_before}\nNumber of rows after: {n_after}")

[Stage 18:>                                                         (0 + 1) / 1]

Number of rows before: 25319028
Number of rows after: 25104121


                                                                                

Read second file:

In [28]:
stations_DF = spark.read.option("header","true")\
                    .option("sep", "\t")\
                    .option("multiLine", "true")\
                    .option("ignoreTrailingWhiteSpace", "true")\
                    .csv(path_stations)

### 2. Criticality evaluation

The idea was to first isolate the useful columns (station ID, timeslot and number of free slots), and then work on `free_slots` to understand if the table entry was critical and evaluate the terms needed for the computation of criticality.

* The number of critical records for a given station and timeslot was obtained by assigning each record 1 if it was critical and 0 else (`criticality` column); then we could group records according to station ID and timeslot and perform an aggregate sum on `criticality`.
* The number of records for each station and timeslot pair was simply obtained by performing a `count()` operations of the grouped rows.

In [29]:
# Define a function to convert the date into a timeslot
spark.udf.register("timeStamp2timeSlot",\
                   lambda datestring: \
                   datetime.strptime(datestring, "%Y-%m-%d %H:%M:%S")\
                   .strftime("%A %H"))

<function __main__.<lambda>(datestring)>

In [30]:
# Create DF containing station ID, timeslot, 
# 1 if critical/0 if not and free slots
test_DF = correct_reg_DF.selectExpr('station', \
                    'timeStamp2timeSlot(timestamp) AS timeslot', \
                    'CAST(free_slots == 0 AS double) AS critical', \
                    'free_slots')
# Speed up calculations:
test_DF.cache()

DataFrame[station: string, timeslot: string, critical: double, free_slots: string]

In [31]:
# For each timeslot and station, get the total critical 
# records and total n. of records
critical_fact_DF = test_DF.groupBy('station', 'timeslot')\
        .agg({'critical':'sum', 'free_slots':'count'})\
        .withColumnRenamed('sum(critical)', 'crit_sum')\
        .withColumnRenamed('count(free_slots)', 'free_count')

#### 2.1 - Evaluate criticality

In [32]:
#To get criticality, simply divide the count of critical
# slots per timestamp by the total n. of records
critical_DF = critical_fact_DF.selectExpr('CAST(station AS int)',\
                'timeslot', 'crit_sum/free_count AS criticality')

#### 2.2 Select criticality values > threshold

In [33]:
# Define the threshold
val_is_good = False
while (not val_is_good):
    threshold = float(input("Enter the threshold: "))
    if (threshold >= 0.0) and (threshold <= 1.0):       # Check for validity
        val_is_good = True    
    else:
        print("Invalid threshold: the value needs to lay between 0 and 1")

Enter the threshold:  0.6


In [34]:
# Get records for which the criticality is above threshold
thresh_DF = critical_DF.filter(f'criticality > {threshold}')

In [35]:
thresh_DF.createOrReplaceTempView('ordered')

#### 2.3 Order results in increasing order

In order to carry out this task it was needed to create some UDFs for handling the timeslot.

* `dayValue()` returns an integer associated with the day of the week of the input timeslot (Monday: 0, Sunday: 6).
* `hourValue()` returns the hour of the input tumeslot (as int).
* `dayOfWeek()` returns the string containing the day of the week of the input timeslot.

In [36]:
def dayValue(ts):
    mapping = { "Monday":0, "Tuesday":1, 
               "Wednesday":2, "Thursday":3, 
               "Friday":4, "Saturday":5, "Sunday":6}
    sepLine = ts.split(' ')
    return mapping[sepLine[0]]

spark.udf.register("dayValue", dayValue)
spark.udf.register("hourValue", lambda ts: int(ts.split()[1]))
spark.udf.register("dayOfWeek", lambda ts: ts.split()[0])

23/01/19 11:20:16 WARN analysis.SimpleFunctionRegistry: The function dayofweek replaced a previously registered function.


<function __main__.<lambda>(ts)>

In [37]:
# Ordering
ordered_DF = spark.sql("""SELECT * 
    FROM ordered 
    ORDER BY criticality, station, 
            dayValue(timeslot), hourValue(timeslot)""")

#### 2.4 Store pairs in a csv file inside the HDFS

It is again needed to perform the join operation between the DF containing the criticality values and the one containing the station coordinates.

In [38]:
joined_DF = ordered_DF.join(stations_DF\
    .selectExpr('id AS station', 'longitude', 'latitude'), 'station')\
    .selectExpr('station', 'longitude', 'latitude',\
                'dayOfWeek(timeslot) AS day', \
                'hourValue(timeslot) AS hour', 'criticality')

In [39]:
## The output path needs to be decided by the user:
out_path = input("Enter output file path: ")
#/user/s315054/lab03/output_DF.csv

Enter output file path:  /user/s315054/lab03/output_DF.csv


In [40]:
joined_DF.write.csv(out_path, header=True)

                                                                                

#### 2.5 - Results for threshold = 0.6

For $threshold = 0.6$, the records obtained are 5. They correspond with the ones obtained in the first part, using RDDs.

Above, the final result (content of the file) is shown.

---

## Part 3 - Bonus Task



In [41]:
# Evaluating the distance from the city center
center_lat = 41.386904
center_lon = 2.169989

import numpy as np

def centerDist(coords1):
    lat1 = float(coords1[0]) * (np.pi/180)
    lon1 = float(coords1[1]) * (np.pi/180)
    lat2 = 41.386904 * (np.pi/180)
    lon2 = 2.169989 * (np.pi/180)
    dlon = lon2 - lon1
    dlat = lat2 - lat1

    a = np.sin(dlat/2.0)**2+np.cos(lat1)*np.cos(lat2)*np.sin(dlon/2.0)**2
    c = 2 * np.arcsin(np.sqrt(a))
    km = 6367 * c
    return float(km)

spark.udf.register("centerDist", centerDist)

<function __main__.centerDist(coords1)>

In [42]:
# Get center distance for each station
dist_DF = stations_DF\
    .selectExpr("id", "centerDist((latitude, longitude)) AS center_dist")

In [43]:
correct_reg_DF.createOrReplaceTempView('readings_correct') # from exercise 2
# Evaluate average utilization
avg_used_DF = spark.sql("""
    SELECT station, sum(used_slots)/count(used_slots) AS avg_used 
    FROM readings_correct 
    GROUP BY station 
    ORDER BY CAST(station AS int)""")

In [44]:
# Join with the DF of distances from center
avg_and_dist_DF = avg_used_DF.join(dist_DF\
                        .withColumnRenamed("id", "station"), "station")

In [45]:
avg_and_dist_DF.createOrReplaceTempView("avg_and_dist")

In [46]:
U1_DF = avg_and_dist_DF.filter("center_dist < 1.5")\
        .selectExpr("sum(avg_used)/count(avg_used) AS U1")
U1 = U1_DF.collect()[0][0]

                                                                                

In [47]:
U2_DF = avg_and_dist_DF.filter("center_dist >= 1.5")\
        .selectExpr("sum(avg_used)/count(avg_used) AS U2")
U2 = U2_DF.collect()[0][0]

                                                                                

In [48]:
print(f"Average number of used slots in stations at less than 1.5 km from the center: {U1}\nAverage number of used slots in stations at more than 1.5 km from center: {U2}")

Average number of used slots in stations at less than 1.5 km from the center: 8.1756828331052
Average number of used slots in stations at more than 1.5 km from center: 7.8692836432461055


### 3.1 - Where are the most used stations located?

The average number of used slots is higher in stations located closer to the city center.
Indeed:
* $U1 = avg(U(S_i))\ where\ S_i :\ d(S_i, center) < 1.5\ km = 8.176\ slots$
* $U2 = avg(U(S_i))\ where\ S_i :\ d(S_i, center) \geq 1.5\ km = 7.869\ slots$