### Case Study - Aggregation and Summarization with MapReduce and PySpark

In [1]:
from platform import python_version

In [2]:
print(f'Jupyter Notebook Python Version: {python_version()}')

Jupyter Notebook Python Version: 3.8.3


**Installing (or updating) the watermark package.**<br />
This package is used to record versions of other packages used on this jupyter notebook.<br />
After installing or updating the package, restart the jupyter notebook.

In [3]:
# !pip install -q -U watermark

In [4]:
import argparse
import csv
import numpy as np
import pandas as pd
import pyspark
import time

from datetime import datetime
from pathlib import Path
from pyspark import SparkContext
from pyspark.sql.session import SparkSession

**Showing Jupyter Notebook Packages Version**

In [5]:
%reload_ext watermark
%watermark -a 'Case Study' --iversions

csv      1.0
argparse 1.1
py4j     0.10.9
pyspark  3.0.0
pandas   1.0.5
numpy    1.18.5
platform 1.0.8
Case Study


In [6]:
spark = SparkSession(sc)

In [7]:
file = 'aux/datasets/bikes.csv'

In [8]:
dfBikes = pd.read_csv(file)

In [9]:
dfBikes.head()

Unnamed: 0,Genero_Usuario,Idade_Usuario,Bike,Estacao_Aluguel,Data_Aluguel,Hora_Aluguel,Estacao_Chegada,Data_Chegada,Hora_Chegada
0,M,44,4357,442,01/02/2020,0:00:38,116,01/02/2020,0:35:17
1,M,22,12083,66,01/02/2020,0:00:53,37,01/02/2020,0:06:23
2,M,29,11562,331,01/02/2020,0:00:55,341,01/02/2020,0:26:47
3,M,27,10206,164,01/02/2020,0:01:18,35,01/02/2020,0:16:51
4,M,27,10101,120,01/02/2020,0:01:18,47,01/02/2020,0:12:39


In [10]:
dfBikes.shape

(686327, 9)

In [11]:
dfBikes.dtypes

Genero_Usuario     object
Idade_Usuario       int64
Bike                int64
Estacao_Aluguel     int64
Data_Aluguel       object
Hora_Aluguel       object
Estacao_Chegada     int64
Data_Chegada       object
Hora_Chegada       object
dtype: object

### Functions

In [12]:
def formatDate(date, time):
    dateFormatted = datetime.strptime(date + ' ' + time, '%d/%m/%Y %H:%M:%S')
    
    return dateFormatted

In [13]:
def getAgeGroup(age):
    if age < 18:
        return '00-17'
    elif age >= 18 and age < 35:
        return '18-34'
    elif age >= 35 and age < 45:
        return '35-44'
    elif age >= 45 and age < 55:
        return '45-54'
    elif age >= 55 and age < 65:
        return '55-64'
    else:
        return '65+'

In [14]:
def dataCleaning(partitionId, records):
    if partitionId == 0:
        next(records)
    
    reader = csv.reader(records)
    
    for row in reader:
        gender        = row[0]
        age           = int(row[1])
        bikeId        = int(row[2])
        stationStart  = row[3]
        stationEnd    = row[6]
        dateTimeStart = formatDate(row[4], row[5])
        dateTimeEnd   = formatDate(row[7], row[8])
    
        yield (
            bikeId, 
            gender, 
            getAgeGroup(age),
            stationStart,
            stationEnd,
            dateTimeStart,
            dateTimeEnd
        )

In [15]:
def setRdd(sc, file):
    print('Creating the RDD...')
    print(f'Reading file: {file}')
    rddRides = sc.textFile(file, use_unicode = True).mapPartitionsWithIndex(dataCleaning).cache()
    print(f'Partitions number: {rddRides.getNumPartitions()}')
    print('Done.')
    
    return rddRides

In [16]:
def getTopStartStations(num, rddRides):
    result = \
        rddRides \
            .map(lambda line: (line[3], line[6] - line[5])) \
            .filter(lambda line: line[1].total_seconds() <= 60 * 60 * 2) \
            .mapValues(lambda line: 1) \
            .reduceByKey(lambda key, value: key + value) \
            .map(lambda line: (line[1], line[0])) \
            .top(num, key = lambda n: n)
    
    return result

In [17]:
def getTopRoutes(num, rddRides):
    results = \
        rddRides \
            .map(lambda line: ((line[3], line[4]), line[6] - line[5])) \
            .filter(lambda line: line[1].total_seconds() <= 60 * 60 * 2) \
            .mapValues(lambda line: (line.total_seconds(), 1)) \
            .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
            .map(lambda line: (line[1][1], (line[0], line[1][0] / line[1][1]))) \
            .top(num, key = lambda x: x)
    
    return results

In [18]:
def getGenderStats(rddRides):
    results = \
        rddRides \
            .map(lambda line: (line[1], line[6] - line[5])) \
            .filter(lambda line: line[1].total_seconds() <= 60 * 60 * 2) \
            .mapValues(lambda line: (line.total_seconds(), 1)) \
            .reduceByKey(lambda x, y: ((x[0] + y[0]), x[1] + y[1])) \
            .mapValues(lambda line: (line[0] / line[1], line[1])) \
            .collect()
    
    return results

In [19]:
def getAgeStats(rddRides):
    results = \
        rddRides \
            .map(lambda line: (line[2], line[6] - line[5])) \
            .filter(lambda line: line[1].total_seconds() <= 60 * 60 * 2) \
            .mapValues(lambda line: (line.total_seconds(), 1)) \
            .reduceByKey(lambda x, y: ((x[0] + y[0]), x[1] + y[1])) \
            .mapValues(lambda line: (line[0] / line[1], line[1])) \
            .collect()
    
    return results

In [20]:
def getTopBusyStations(num, rddRides, by_count = False):
    results = \
        rddRides \
            .map(lambda line: (line[0], line[6] - line[5])) \
            .filter(lambda line: line[1].total_seconds() <= 60 * 60 * 2) \
            .mapValues(lambda line: (1, line.total_seconds())) \
            .reduceByKey(lambda x, y: ((x[0] + y[0]), x[1] + y[1])) \
            .map(lambda line: (line[1], line[0])) \
            
    if by_count:
        return results.top(num, key = lambda x: x[0][0])
    else:
        return results.top(num, key = lambda x: x[0][1])

### Creating the RDD

In [21]:
rddRides = setRdd(sc, file)

Creating the RDD...
Reading file: aux/datasets/bikes.csv
Partitions number: 2
Done.


### 1. What are the 5 stations with the highest number of bicycle rentals?

In [22]:
topStartStations = getTopStartStations(5, rddRides)

In [23]:
for station in topStartStations:
    print('Station: {:03d} - Rentals: {:03d}'.format(int(station[1]), station[0]))

Station: 001 - Rentals: 6298
Station: 027 - Rentals: 6201
Station: 271 - Rentals: 5262
Station: 064 - Rentals: 4825
Station: 041 - Rentals: 4621


### 2. What are the 5 most used routes (start and end station) and the average duration of each rental?

In [24]:
topRoutes = getTopRoutes(5, rddRides)

In [25]:
for route in topRoutes:
    print('From Station {:03d} to Station {:03d} - Rentals: {:03d} - Avg Duration(min): {:05.2f}' \
          .format(int(route[1][0][0]), int(route[1][0][1]), route[0], route[1][1] / 60)
    )

From Station 033 to Station 033 - Rentals: 375 - Avg Duration(min): 30.23
From Station 018 to Station 001 - Rentals: 319 - Avg Duration(min): 05.58
From Station 211 to Station 217 - Rentals: 303 - Avg Duration(min): 03.54
From Station 449 to Station 449 - Rentals: 301 - Avg Duration(min): 15.34
From Station 208 to Station 206 - Rentals: 297 - Avg Duration(min): 08.53


### 3. Which sex rents more bicyles? What is the average rental time by sex?

In [26]:
genderStats = getGenderStats(rddRides)

In [27]:
for item in genderStats:
    print('Gender: {} - Rentals: {:,} - Avg Duration(min): {:.2f}' \
          .format(item[0], item[1][1], item[1][0] / 60)
    )

Gender: M - Rentals: 509,782 - Avg Duration(min): 13.62
Gender: F - Rentals: 174,808 - Avg Duration(min): 14.30


### 4. Which age group rents more bicyles? What is the average rental time by age group?

In [28]:
ageStats = getAgeStats(rddRides)

In [29]:
for item in ageStats:
    print('Age: {} - Rentals: {:07,} - Avg Duration(min): {:.2f}' \
          .format(item[0], item[1][1], item[1][0] / 60)
    )

Age: 35-44 - Rentals: 167,715 - Avg Duration(min): 13.58
Age: 18-34 - Rentals: 392,040 - Avg Duration(min): 14.03
Age: 45-54 - Rentals: 077,990 - Avg Duration(min): 13.30
Age: 00-17 - Rentals: 001,332 - Avg Duration(min): 12.78
Age: 55-64 - Rentals: 035,831 - Avg Duration(min): 13.30
Age: 65+ - Rentals: 009,682 - Avg Duration(min): 13.69


### 5. Which stations have the highest number of rented/returned bicyles? What is the rentals total time (in minutes) of each one?

In [30]:
topBusyStations = getTopBusyStations(3, rddRides, True)

In [31]:
for item in topBusyStations:
    print('Station: {:05d} - Rentals: {} - Minutes: {:,.2f}' \
          .format(item[1], item[0][0], item[0][1] / 60)
    )

Station: 10771 - Rentals: 217 - Minutes: 1,523.15
Station: 10810 - Rentals: 208 - Minutes: 2,785.77
Station: 07854 - Rentals: 192 - Minutes: 2,633.07
