In [1]:
# install required packages
# pip install pyspark
# pip install requests

In [2]:
# import relevant libraries
import os
import json
import pyspark
import requests
import zipfile
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number

In [3]:
# set data links as constants; can be algorithmically scraped
zipLinks = [
    "https://s3.amazonaws.com/capitalbikeshare-data/202210-capitalbikeshare-tripdata.zip",
    "https://s3.amazonaws.com/capitalbikeshare-data/202211-capitalbikeshare-tripdata.zip"
]

# initialize spark
spark = SparkSession.builder.master("local[1]").appName("BluePrintAssessment").getOrCreate()

22/12/09 09:18:54 WARN Utils: Your hostname, algorithmspath-HP-EliteBook-840-G2 resolves to a loopback address: 127.0.1.1; using 192.168.1.229 instead (on interface enp0s25)
22/12/09 09:18:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/09 09:18:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
def selectFilePart(zipUrl):
    i1 = zipUrl.rfind('/')
    if i1 == -1:
        return zipUrl
    return zipUrl[i1+1:]

# initialize data reader for ETL: (zip) => (csv) @ local machine
class DataReader():
    def __init__(self, baseDir):
        self.baseDir = baseDir
        self.archiveDir = f'{self.baseDir}/archive'
        self.dataDir = f'{self.baseDir}/data'

        for x in [ self.archiveDir, self.dataDir ]:
            if not os.path.isdir(x):
                os.mkdir(x)

    def downloadZip(self, zipUrl):
        savePath = f'{self.archiveDir}/{selectFilePart(zipUrl)}'
        resp = requests.get(zipUrl)
        fp = open(savePath, 'wb')
        fp.write(resp.content)
        fp.close()

    def extractZip(self, zipUrl):
        localZipPath = f'{self.archiveDir}/{selectFilePart(zipUrl)}'
        zipObj = zipfile.ZipFile(localZipPath)
        zipObj.extractall(self.dataDir)

    def downloadExtract(self, zipUrl):
        self.downloadZip(zipUrl)
        self.extractZip(zipUrl)

In [5]:
### PART 1: use spark to read data ###

dataReader = DataReader( os.getcwd() )

# download: last 2 links
for i in range(0, 2):
    print( f'loading: {zipLinks[len(zipLinks)-i-1]}' )
    dataReader.downloadExtract(zipLinks[len(zipLinks)-i-1])

# considers merged csv's
csvPath = f'{dataReader.dataDir}/*.csv'

# inferredSchema = {
#     "ride_id": "string",
#     "rideable_type": "string",
#     "started_at": "timestamp",
#     "ended_at": "timestamp",
#     "start_station_name": "string",
#     "start_station_id": "integer",
#     "end_station_name": "string",
#     "end_station_id": "integer",
#     "start_lat": "double",
#     "start_lng": "double",
#     "end_lat": "double",
#     "end_lng": "double",
#     "member_casual": "string"
# }
df = spark.read.csv(
    csvPath,
    header=True,
    mode="DROPMALFORMED",
    inferSchema=True
)
df.show(3)
print( df.count() )

loading: https://s3.amazonaws.com/capitalbikeshare-data/202211-capitalbikeshare-tripdata.zip
loading: https://s3.amazonaws.com/capitalbikeshare-data/202210-capitalbikeshare-tripdata.zip


                                                                                

+----------------+-------------+-------------------+-------------------+--------------------+----------------+----------------+--------------+-----------------+------------------+-------+-------+-------------+
|         ride_id|rideable_type|         started_at|           ended_at|  start_station_name|start_station_id|end_station_name|end_station_id|        start_lat|         start_lng|end_lat|end_lng|member_casual|
+----------------+-------------+-------------------+-------------------+--------------------+----------------+----------------+--------------+-----------------+------------------+-------+-------+-------------+
|636812F7EDA843A3|electric_bike|2022-10-21 07:19:55|2022-10-21 07:32:47|Fairfax Dr & N Ta...|           31049|            null|          null|       38.8823895|         -77.11283|  38.88| -77.11|       member|
|2963CAC314D0C593|electric_bike|2022-10-21 16:52:10|2022-10-21 17:07:43| Eads St & 12th St S|           31071|            null|          null|       38.8630035|

[Stage 3:>                                                          (0 + 1) / 1]

593026


                                                                                

In [6]:
### PART 2: transform data to run efficient analytics ###

# logic: drop rows with one or more null values
df = df.dropna()
print( df.count() )

[Stage 6:>                                                          (0 + 1) / 1]

557675


                                                                                

In [7]:
### PART 3: delete latitude related columns ###

# logic: call df.drop on target columns
df = df.drop("start_lat", "start_lng", "end_lat", "end_lng")
df.show(3)

+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+-------------+
|         ride_id|rideable_type|         started_at|           ended_at|  start_station_name|start_station_id|    end_station_name|end_station_id|member_casual|
+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+-------------+
|88E008078B2E7FFC|electric_bike|2022-10-19 20:16:01|2022-10-19 20:22:35|14th & Rhode Isla...|           31203|New Hampshire Ave...|         31275|       member|
|53D58D1427BF475B| classic_bike|2022-10-25 21:10:30|2022-10-25 21:18:51|   Potomac & M St NW|           31295|New Hampshire Ave...|         31212|       member|
|69247684F27CAB16| classic_bike|2022-10-20 18:07:02|2022-10-20 18:11:37|14th & Rhode Isla...|           31203|Rhode Island & Co...|         31239|       member|
+----------------+-------------+--

In [8]:
### PART 4: add duration of each ride ###

# logic: add duration column to dataframe as [ended_at] - [started_at]
# duration is in seconds
df = df.withColumn( 'duration', col("ended_at").cast("long") - col('started_at').cast("long") )
df.show(3)

+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+-------------+--------+
|         ride_id|rideable_type|         started_at|           ended_at|  start_station_name|start_station_id|    end_station_name|end_station_id|member_casual|duration|
+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+-------------+--------+
|88E008078B2E7FFC|electric_bike|2022-10-19 20:16:01|2022-10-19 20:22:35|14th & Rhode Isla...|           31203|New Hampshire Ave...|         31275|       member|     394|
|53D58D1427BF475B| classic_bike|2022-10-25 21:10:30|2022-10-25 21:18:51|   Potomac & M St NW|           31295|New Hampshire Ave...|         31212|       member|     501|
|69247684F27CAB16| classic_bike|2022-10-20 18:07:02|2022-10-20 18:11:37|14th & Rhode Isla...|           31203|Rhode Island & Co...|         31239|    

In [9]:
### PART 5: calculate average ride duration for each rideable type ###

# logic: group by [rideable_type] then perform average on [duration]
avgDurationPerType = df.groupBy('rideable_type').avg('duration')
avgDurationPerType.show()

[Stage 11:>                                                         (0 + 1) / 1]

+-------------+------------------+
|rideable_type|     avg(duration)|
+-------------+------------------+
|  docked_bike| 3906.955096196211|
|electric_bike| 851.2544860576362|
| classic_bike|1045.9276883572882|
+-------------+------------------+



                                                                                

In [10]:
### PART 6: calculate top 10 ride durations (longer than 24 hours) for each start station ###

# logic: partition [start_station_name] sort [duration] filter [ duration > 24hr and partition_num <= 10 ]
startStationPartition = Window.partitionBy("start_station_name").orderBy(col("duration").desc())
resultDf = df \
    .withColumn("partition_row_id", row_number().over(startStationPartition)) \
    .filter( (col("duration") > 24 * 3600) & (col("partition_row_id") <= 10) ) \
    .select('duration', 'start_station_name')

resultDf.show()

[Stage 16:>                                                         (0 + 1) / 1]

+--------+--------------------+
|duration|  start_station_name|
+--------+--------------------+
|  110031|      10th & K St NW|
|   88236|      13th & D St NE|
| 1946330|14th & Belmont St NW|
|  149613|14th & D St NW / ...|
|   89658|14th & D St NW / ...|
|   94628|      14th & D St SE|
|  162111|      14th & G St NW|
|   87631|   14th & Otis Pl NW|
|  691477|15th St & Pennsyl...|
|  124772|17th & K St NW / ...|
|  356007|      18th & L St NW|
|   88095|18th St & Wyoming...|
|   92547|19th St & Constit...|
|   97124|       1st & M St NE|
|  335469|       1st & M St SE|
|  785165|1st & Washington ...|
|   92606|      20th & E St NW|
|   88259|20th St & Virgini...|
|  108315|      21st & I St NW|
|   87711|      21st & I St NW|
+--------+--------------------+
only showing top 20 rows



                                                                                