Temasek Polytechnic  
Specialist Diploma in Big Data Management  
Big Data Programming (CBG1C04)  
Assignment  
  
By Lup Peng 
  
----------------------------------------
### Overview 
The main datasets used for this project are the Austin B-Cycle Trips, and Kiosk Locations. They were downloaded from data.austintexas.gov, the official city of Austin open data portal.

A synthetic data set of B-Cycle Customer Data for Memberships has been generated as well.

#### Dataset Links
- https://data.austintexas.gov/dataset/Austin-B-Cycle-Kiosk-Locations/qd73-bsdg
- https://data.austintexas.gov/dataset/Austin-B-Cycle-Trips/tyfh-5r8s

(Data is in Public Domain)

#### Data Set Headers
- B-Cycle Kiosk Locations
 - kiosk_id
 - kiosk_name
 - kiosk_status
 - latitude
 - longitude
 - location
- B-Cycle Trips
 - trip_id
 - membership_type
 - bicycle_id
 - checkout_date
 - checkout_time
 - checkout_kiosk_id
 - checkout_kiosk
 - return_kiosk_id
 - return_kiosk
 - trip_duration_minutes
 - month
 - year
- B-Cycle Customer Data for Memberships (Added as synthetic data to B-Cycle Trips)
 - age_range
 - gender 
 - payment_type

### Use Cases 
Given the dataset, the four analytical use cases are as follows:
* What is the average rental duration of each bicycle for a given period? (Used for Predictive Maintenance) 
* Which bicycle kiosk(s) reaches full capacity, based on real-time streaming of bicycle rental records? (Used for Growth Planning)
* How can B-cycle identify less frequently used kiosks to rotate their bicycles so as to even out wear and tear across their fleet? (Used for Preventive Maintenance)
* Which membership can be recommended to walk-up customers based on Customer Segmentation of existing members? (Operations Optimization and Sales)

In [1]:
# Preparation Code

# Load the two data sets 
bcycleKiosk = sc.textFile("/home/training/assignmentdata/Austin_B-Cycle_Kiosk_Locations.csv")
bcycleTrips = sc.textFile("/home/training/assignmentdata/Austin_B-Cycle_Trips.csv")

# Remove Header for bcycleKiosk 
kioskHeader = bcycleKiosk.first()
bcycleKioskNoHeader = bcycleKiosk.filter(lambda line: line != kioskHeader)

# Remove Header for bcycleTrips 
tripHeader = bcycleTrips.first()
bcycleTripsNoHeader = bcycleTrips.filter(lambda line: line != tripHeader)

# Credit from https://community.hortonworks.com/questions/113255/removing-header-from-csv-file-through-pyspark.html

### Use Case 1
#### What is the average rental duration of each bicycle for a given period?
Application: Predictive Maintenance

Description: Wear and tear is inevitable to both bicycles and kiosks. The aim of this use case is to calculate the average rental duration of each bicycle for a given period, with the assumption of a positive correlation between the distance traveled and the rental duration. This will help ground staff identify bicycles that need maintenance before it actually becomes unsafe/unusable (e.g. broken chain, worn brake pads).

In [2]:
# Assumptions: 
# 1) To avoid skewing the results,trip duration of < 1 minute will not be considered
# 2) Duration of interest: 2016
# 3) Expected Output Format: (bicycle_id, number_of_trips, total_distance_traveled, avg_rental_duration)
# 4) Scenario A: Find the top 5 highest average rental duration
# 5) Scenario B: Find the bicycle with the most number of trips 

period = '2016'

# Filter only 2016 data, and remove records with empty/null bicycle Id 
# Extract only required fields: bicycle_id, checkout_date, and trip_duration_minutes fields
bcycleTripsNoHeaderIn2016 = bcycleTripsNoHeader.map(lambda line: line.split(",")).filter(lambda fields: fields[11] == period).filter(lambda fields: len(fields[2])>0)
bcycleTripPerBikeDate = bcycleTripsNoHeaderIn2016.filter(lambda fields: int(fields[9]) > 0).map(lambda fields:((fields[2],fields[3]),fields[9]))

# Find the total trip duration per bike, per day
bcycleTripPerBikeDateTotalTrip = bcycleTripPerBikeDate.reduceByKey(lambda v1, v2: int(v1) + int(v2))

# FlatMapValue the RDD to create two records per trip: (bike_id, checkout_date) and (bike_id,trip_duration_minutes)
bcycleTripPerBikeDateTotalTripKeyByBikeDate = bcycleTripPerBikeDateTotalTrip.flatMapValues(lambda x : str(x)).map(lambda line: (line[0][0],[line[0][1],line[1]]))
bcycleTripPerBikeFlatMap = bcycleTripPerBikeDateTotalTripKeyByBikeDate.flatMapValues(lambda x: x)
tupleBikeDate = bcycleTripPerBikeFlatMap.filter(lambda line: "/" in line[1])
tupleBikeDuration = bcycleTripPerBikeFlatMap.filter(lambda line: "/" not in line[1])

# Sum the total trip duration
tupleBikeSumDurationKeyBikeId = tupleBikeDuration.reduceByKey(lambda v1, v2: str(int(v1) + int(v2)))

# Count the total number of trips 
tupleBikeSumTripsKeyBikeId = tupleBikeDate.map(lambda fields: (fields[0],1)).reduceByKey(lambda v1, v2: str(int(v1) + int(v2)))

# Produce the Expected Output Format: (bike_id, number_of_trips, total_trip_duration, avg_rental_duration)
expectedOutputFormat = tupleBikeSumTripsKeyBikeId.join(tupleBikeSumDurationKeyBikeId).map(lambda x: (int(x[0]),[x[1][0],x[1][1],int(x[1][1])/float(x[1][0])]))

# Scenario A: Find the top 5 highest average rental duration
expectedOutputFormat.sortBy((lambda fields: fields[1][2]),False).take(5)

# Scenario B: Find the bicycle with the most number of trips 
# expectedOutputFormat.sortBy((lambda fields: fields[1][0]),False).take(5)

[(113, ['248', '1040', 4.193548387096774]),
 (1000, ['307', '1278', 4.162866449511401]),
 (114, ['330', '1364', 4.133333333333334]),
 (680, ['299', '1232', 4.120401337792642]),
 (356, ['361', '1481', 4.1024930747922435])]

### Use Case 2
#### Which bicycle kiosk(s) reaches full capacity, based on real-time streaming of bicycle rental records?
Application: Growth Planning

Description: Bicycle kiosks with no empty space means customers have to cycle further to find another one. This results in lower customer satisfaction and return rate. The aim of this use case is to identify bicycle kiosks that experience full occupancy rates so as to help the operations team focus their upgrading/expansion efforts.

In [4]:
# Assumptions: 
# 1) Each kiosk can hold up to 13 bicycles
# 2) Each kiosk starts with 8 existing bikes, and 5 vacancies
# 3) Duration of interest: Records will stream in for the period of 1 January 2017 to 31 March 2017
# 4) Every checkout kiosk will +1 vacancy, and checkin kiosk will -1 vacancy, bounded at 13 and 0 respectively 
# 5) Kiosks that reach full capacity (i.e. zero vacancies) will have their kiosk ID displayed via streaming

# Prepare the data - extract B-cycle data from the period of 1 January 2017 to 31 March 2017, and sort it based on timestamp
JanMar2017Trip = bcycleTripsNoHeader.map(lambda line: line.split(',')).filter(lambda fields: fields[11] == '2017').filter(lambda fields: fields[10] in ('1','2','3'))

# Prepare the data - pad the timestamp to be all double digits 
def padTime (fields):
    if len(fields[4]) == 7:
        fields[4] = '0' + fields[4]
    return fields

JanMar2017TripPaddedTime = JanMar2017Trip.map(padTime)

# Prepare the data - combine date and time fields to form a single datetime stamp key, and sort by it
JanMar2017TripSorted = JanMar2017TripPaddedTime.map(lambda fields: ((fields[3],fields[4]),fields)).sortByKey()

# Prepare the data - Combine the string tokens into a single string, set encoding to ignore ascii (to avoid printing u'text'), and save it.
JanMar2017TripData = JanMar2017TripSorted.map(lambda fields: ','.join(fields[1])).coalesce(1)
JanMar2017TripData.map(lambda line: line.encode('ascii', 'ignore')).saveAsTextFile('/home/training/assignmentdata/streamingFolder/trip')

# Prepare the data - Create kiosk capacity list - extract name and id of active stations/kiosks, and set capacity to 5
bcycleKioskKioskCapacity = bcycleKioskNoHeader.map(lambda line: line.split(',')).filter(lambda fields: fields[2] == 'active').map(lambda fields: (fields[0],fields[1],str(5))).map(lambda fields: ','.join(fields)).coalesce(1)
bcycleKioskKioskCapacity.saveAsTextFile('/home/training/assignmentdata/streamingFolder/kiosk')

In [7]:
# FOR STREAMING KIOSK DATA
# Pre-requisite: Start the process processTrips.py in terminal before starting this cell

# Prepare the Spark Streaming Context
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 3)

# Create a SocketTextStream DStream to listen to the port
from pyspark import StorageLevel
lines = ssc.socketTextStream("127.0.0.1", 8586, StorageLevel.MEMORY_ONLY)

# Print the kiosks that are full, based on the messages sent over the socket 
lines.pprint()

ssc.start()

-------------------------------------------
Time: 2017-09-01 11:14:51
-------------------------------------------

-------------------------------------------
Time: 2017-09-01 11:14:54
-------------------------------------------

-------------------------------------------
Time: 2017-09-01 11:14:57
-------------------------------------------

-------------------------------------------
Time: 2017-09-01 11:15:00
-------------------------------------------

-------------------------------------------
Time: 2017-09-01 11:15:03
-------------------------------------------
B-Cycle kiosk 2499 has reached maximum capacity

-------------------------------------------
Time: 2017-09-01 11:15:06
-------------------------------------------

-------------------------------------------
Time: 2017-09-01 11:15:09
-------------------------------------------
B-Cycle kiosk 2711 has reached maximum capacity
B-Cycle kiosk 2503 has reached maximum capacity

-------------------------------------------
Time: 2

In [9]:
# Stop the Spark Streaming, but keep-alive the Spark Context
ssc.stop(False)

### Use Case 3
#### How can B-cycle identify most/less frequently used kiosks to rotate their bicycles so as to even out wear and tear across their fleet?
Application: Preventive Maintenance

Description: If most rental usage is focused on a small subset of bicycles, their overall lifespan will be greatly reduced. This translates to poor customer experience, and higher maintenance costs. The aim of this use case is to identify kiosks with low levels of usage, and rotate their bicycles with those at kiosks with high levels of usage. This will help to even out the rental usage across the entire fleet of bicycles.

In [14]:
# Assumptions
# 1) In order to achieve preventive maintenance, we will first identify the top and bottom 3 kiosks, based on rental activity.
#    Bicycles from these two groups should then be rotated 
# 2) Duration of interest: March 2016
# 3) The analysis of each month is independent of the preceding months 
# 4) Expected Output Format: (most_active_kiosk_id, most_active_kiosk_name, least_active_kiosk_id, least_active_kiosk_name)
# 5) Scenario A: It is 31 March 2016. The company wants to know which kiosks' bicycles they should rotate on 1 April 2016

# Firstly, create the Schema for the Trips Data Frame 
from pyspark.sql.types import *
trip_schema = StructType( [
    StructField('trip_id', StringType(), True),
    StructField('membership_type', StringType(), True),
    StructField('bicycle_id', StringType(), True),
    StructField('checkout_date', StringType(), True),
    StructField('checkout_time', StringType(), True),
    StructField('checkout_kiosk_id', StringType(), True),
    StructField('checkout_kiosk', StringType(), True),
    StructField('return_kiosk_id', StringType(), True),
    StructField('return_kiosk', StringType(), True),
    StructField('trip_duration_minutes', StringType(), True),
    StructField('month', StringType(), True),
    StructField('year', StringType(), True) ])

kiosk_schema = StructType( [
    StructField('kiosk_id', StringType(), True),
    StructField('kiosk_name', StringType(), True),
    StructField('kiosk_status', StringType(), True),
    StructField('latitude', StringType(), True),
    StructField('longitude', StringType(), True),
    StructField('location', StringType(), True) ])

# Create the Data Frame for Trips and Kiosk
trips = spark.read.csv("/home/training/assignmentdata/Austin_B-Cycle_Trips.csv", header=False, nullValue="na", schema=trip_schema)
kiosks = spark.read.csv("/home/training/assignmentdata/Austin_B-Cycle_Kiosk_Locations.csv", header=False, nullValue="na", schema=kiosk_schema)

# Filter only records from March 2016 
trips2016 = trips.filter('year == 2016').filter('month == 3')

# Count the number of checkin and checkout activities that each kiosk experienced
trips2016Checkout = trips2016.groupBy('checkout_kiosk_id').agg({'checkout_kiosk_id': 'count'})
trips2016Checkin = trips2016.groupBy('return_kiosk_id').agg({'return_kiosk_id': 'count'})

# Get top 3 kiosks for checkout/checkin activity
top3Checkout = trips2016Checkout.orderBy('count(checkout_kiosk_id)', ascending = False).limit(3)
top3Checkin = trips2016Checkin.orderBy('count(return_kiosk_id)', ascending = False).limit(3)

# Get bottom 3 kiosks for checkout/checkin activity
bottom3Checkout = trips2016Checkout.orderBy('count(checkout_kiosk_id)', ascending = True).limit(3)
bottom3Checkin = trips2016Checkin.orderBy('count(return_kiosk_id)', ascending = True).limit(3)

# Show the distinct kiosks that have the LEAST checkout/checkin activity 
bottom3Union = bottom3Checkin.unionAll(bottom3Checkout).select('return_kiosk_id').selectExpr('return_kiosk_id as kiosk_id_least_activity').distinct()
bottom3Union.join(kiosks, bottom3Union.kiosk_id_least_activity == kiosks.kiosk_id).select('kiosk_id_least_activity', "kiosk_name").show()

# 2561 - State Capitol Visitors Garage @ San Jacinto & 12th
# 2823 - Capital Metro HQ - East 5th at Broadway
# 3381 - East 7th & Pleasant Valley --> closed in 2017
# 3293 - East 2nd & Pedernales

+-----------------------+--------------------+
|kiosk_id_least_activity|          kiosk_name|
+-----------------------+--------------------+
|                   2561|State Capitol Vis...|
|                   2823|Capital Metro HQ ...|
|                   3381|East 7th & Pleasa...|
|                   3293|East 2nd & Pedern...|
+-----------------------+--------------------+



In [16]:
# Show the distinct kiosks that have the MOST checkout/checkin activity 
top3Union = top3Checkin.unionAll(top3Checkout).select('return_kiosk_id').selectExpr('return_kiosk_id as kiosk_id_most_activity').distinct()
top3Union.join(kiosks, top3Union.kiosk_id_most_activity == kiosks.kiosk_id).select('kiosk_id_most_activity', "kiosk_name").show()

# 2498 - Convention Center / 4th St. @ MetroRail
# 2563 - Davis at Rainey Street
# 2539 - Convention Center / 3rd & Trinity

+----------------------+--------------------+
|kiosk_id_most_activity|          kiosk_name|
+----------------------+--------------------+
|                  2498|Convention Center...|
|                  2563|Davis at Rainey S...|
|                  2539|Convention Center...|
+----------------------+--------------------+



### Use Case 4
#### Which membership can be recommended to walk-up customers based on Customer Segmentation of existing members?
Application: Operations Optimization, Sales Growth

Description: There are two main categories of customers - those with pre-paid memberships, and walk-up customers. Based on customer segmentation of existing customers from membership purchases, recommend memberships to walk-up customers based on their age, gender, payment type.

In [17]:
# Assumptions
# 1) Synthetic customer data is reflective of membership trends, and are critical fields for customer segmentation
# 2) Duration of interest: 2016
# 3) The analysis outcome will be used to influence walk-up customers in 2016
# 4) Scenario A: Based on customer segmentation of 2016, the company wants to send a personalized message to
#    recommend a walk-up customer to purchase a membership that best suits him/her, based on the age, gender, 
#    and trip duration

# Reference: 
# - https://spark.apache.org/docs/latest/mllib-clustering.html#k-means
# - https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.clustering.KMeansModel
# - http://vargas-solar.com/big-data-analytics/hands-on/k-means-with-spark-hadoop/

# Load the two data sets 
bcycleCust = sc.textFile("/home/training/assignmentdata/Austin_B-Cycle_Trips_Cust.csv")

# Remove Header for bcycleKiosk 
custHeader = bcycleCust.first()
bcycleCustNoHeader = bcycleCust.filter(lambda line: line != custHeader)

# Filter only 2016 records 
bcycleCust2016 = bcycleCustNoHeader.map(lambda line: line.split(',')).filter(lambda fields: fields[11] == '2016')

# Extract fields of interest: membership_type, age_range, gender, payment_type
bcycleCust2016Fields = bcycleCust2016.map(lambda fields: [fields[1],fields[12],fields[13],fields[14]])

# Exclude the walk-up members - cause we want to sell membership to them! 
bcycleCust2016Members = bcycleCust2016Fields.filter(lambda fields: fields[0] != 'Walk Up')

# Python function to convert from predefined string to int 
def preprocessClustingData(fields):
    membershipInt = 0;
    ageInt = 0;
    genderInt = 0;
    paymentInt = 0;
    
    # Convert membership string
    if (fields[0] == 'Walk Up'):
        membershipInt = 0.0
    elif (fields[0] == 'Weekender'): 
        membershipInt = 1.0
    elif (fields[0] == 'Local30'): 
        membershipInt = 2.0
    elif (fields[0] == 'Local365'): 
        membershipInt = 3.0
    else: 
        membershipInt = 4.0
        
    # Convert age string
    if (fields[1] == 'below 21'):
        ageInt = 0.0
    elif (fields[1] == '21 to 35'): 
        ageInt = 1.0
    elif (fields[1] == '36 to 50'): 
        ageInt = 2.0
    elif (fields[1] == '51 to 65'): 
        ageInt = 3.0
    elif (fields[1] == 'above 65'): 
        ageInt = 4.0
    
    # Convert gender string
    if (fields[2] == 'male'):
        genderInt = 0.0
    else: 
        genderInt = 1.0
    
    # Convert payment string
    if (fields[3] == 'visa'):
        paymentInt = 0.0
    elif (fields[3] == 'mastercard'): 
        paymentInt = 1.0
    elif (fields[3] == 'amex'): 
        paymentInt = 2.0
    elif (fields[3] == 'unionpay'): 
        paymentInt = 3.0
    elif (fields[3] == 'others'): 
        paymentInt = 4.0

    return [membershipInt,ageInt,genderInt,paymentInt]
        
# Preprocess the data - convert from string to int 
bcycleCust2016FieldsPreprocessed = bcycleCust2016Members.map(preprocessClustingData)

# Build the model (cluster the data)
from pyspark.mllib.clustering import KMeans
model = KMeans.train(bcycleCust2016FieldsPreprocessed, 10, maxIterations=20, initializationMode='random')

# Print the center of the clusters 
clusterMembership = []
for cluster in model.clusterCenters:
    recommended = round(cluster[0])
    if (recommended == 1.0):
        clusterMembership.append('Weekender')
    elif (recommended == 2.0):
        clusterMembership.append('Local30')
    else: 
        clusterMembership.append('Local365')

print clusterMembership

['Local365', 'Local365', 'Local30', 'Local365', 'Local365', 'Local365', 'Local365', 'Local365', 'Local365', 'Weekender']


In [18]:
# Select walk up customers of bcycle in 2016
bcycleCust2016WalkUp = bcycleCust2016Fields.filter(lambda fields: fields[0] == 'Walk Up')

# Preprocess the data from string to int, and select 10 records for illustration
bcycleCust2016WalkUpPreprocessed = bcycleCust2016WalkUp.map(preprocessClustingData)
bcycleCust2016WalkUpPreprocessedSubset = bcycleCust2016WalkUpPreprocessed.take(100)

for record in bcycleCust2016WalkUpPreprocessedSubset:
    cluster = model.predict(record)
    print record, '-->', clusterMembership[model.predict(record)]

[0.0, 1.0, 1.0, 4.0] --> Local30
[0.0, 2.0, 1.0, 0.0] --> Weekender
[0.0, 3.0, 0.0, 2.0] --> Weekender
[0.0, 0.0, 1.0, 3.0] --> Local30
[0.0, 3.0, 1.0, 1.0] --> Weekender
[0.0, 0.0, 1.0, 2.0] --> Weekender
[0.0, 1.0, 0.0, 2.0] --> Weekender
[0.0, 3.0, 0.0, 2.0] --> Weekender
[0.0, 4.0, 1.0, 2.0] --> Local365
[0.0, 2.0, 0.0, 0.0] --> Weekender
[0.0, 4.0, 0.0, 1.0] --> Local365
[0.0, 1.0, 0.0, 1.0] --> Weekender
[0.0, 0.0, 0.0, 4.0] --> Local30
[0.0, 4.0, 0.0, 3.0] --> Local365
[0.0, 0.0, 0.0, 1.0] --> Weekender
[0.0, 1.0, 0.0, 1.0] --> Weekender
[0.0, 0.0, 0.0, 4.0] --> Local30
[0.0, 0.0, 1.0, 3.0] --> Local30
[0.0, 0.0, 0.0, 0.0] --> Weekender
[0.0, 3.0, 0.0, 1.0] --> Weekender
[0.0, 1.0, 0.0, 1.0] --> Weekender
[0.0, 2.0, 1.0, 4.0] --> Local30
[0.0, 4.0, 1.0, 0.0] --> Local365
[0.0, 4.0, 1.0, 2.0] --> Local365
[0.0, 3.0, 1.0, 0.0] --> Weekender
[0.0, 0.0, 1.0, 1.0] --> Weekender
[0.0, 0.0, 0.0, 3.0] --> Local30
[0.0, 4.0, 1.0, 3.0] --> Local365
[0.0, 4.0, 0.0, 1.0] --> Local365
[0.0, 

In [20]:
# For additional information only 
model.clusterCenters

[array([ 2.74007577,  3.51680119,  0.49769395,  3.51886015]),
 array([ 3.09715041,  2.39665745,  0.50054106,  2.32716124]),
 array([ 1.6015192 ,  1.00882776,  0.49866557,  3.49291727]),
 array([ 3.19000372,  0.99169044,  0.42143123,  3.8505519 ]),
 array([ 2.68821109,  3.75784717,  0.49600581,  1.02331962]),
 array([ 3.23739639,  0.49368121,  0.63011279,  0.37328441]),
 array([ 2.79381889,  0.49275989,  0.05057273,  1.55716447]),
 array([ 3.19670871,  0.3943402 ,  0.75253768,  2.53168256]),
 array([ 2.94327091,  2.38501742,  0.4992378 ,  0.35834059]),
 array([ 1.37594636,  0.81289206,  0.5299589 ,  0.78628596])]