In [1]:
import findspark
findspark.init()

In [2]:
import pandas as pd
pd.set_option('display.max_colwidth', None)

In [3]:
import warnings
warnings.filterwarnings('ignore')

In [4]:
from pyspark.sql.session import SparkSession

spark = (SparkSession.builder
    .appName("MDA2-Group Assignment")
    .getOrCreate())


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Import Data

### Dataset Info

This step is only needed in our course environment; other Spark environments you might see out there might not need this statement.

Each quarter, we publish downloadable files of Capital Bikeshare trip data. The data includes:

- **Duration** – Duration of trip
- **Start Date** – Includes start date and time
- **End Date** – Includes end date and time
- **Start Station** – Includes starting station name and number
- **End Station** – Includes ending station name and number
- **Bike Number** – Includes ID number of bike used for the trip
- **Member Type** – Indicates whether user was a "registered" member (Annual Member, 30-Day Member, or Day Key Member) or a "casual" rider (Single Trip, 24-Hour Pass, 3-Day Pass, or 5-Day Pass)

This data has been processed to remove trips that are taken by staff as they service and inspect the system, trips that are taken to/from any of our “test” stations at our warehouses, and any trips lasting less than 60 seconds (potentially false starts or users trying to re-dock a bike to ensure it's secure).

**NOTE**: The 3-Day Membership replaced the 5-Day Membership in Fall 2011.

https://capitalbikeshare.com/system-data


**IMPORTANT** **this documentation only applies for the CSV files up to March, 2020 (2020 Q1). From April, 2020, onwards, the schema for the data changes to this form (adding new columns):**

- ride_id                object
- rideable_type          object
- started_at             object
- ended_at               object
- start_station_name     object
- start_station_id      float64
- end_station_name       object
- end_station_id        float64
- start_lat             float64
- start_lng             float64
- end_lat               float64
- end_lng               float64
- member_casual          object --> = Member Type

dtype: object



WE DECIDED TO ONLY WORK WITH THE DATA STARTING FROM APRIL 2020 -ONLY WITH THE CSV FILES THAT HAD THE NEWER SHCEMA.

In [4]:
'''
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

# Define the schema explicitly
schema = StructType([
    StructField("Duration", StringType(), True),
    StructField("Start date", StringType(), True),
    StructField("End date", StringType(), True),
    StructField("Start station number", StringType(), True),
    StructField("Start station", StringType(), True),
    StructField("End station number", StringType(), True),
    StructField("End station", StringType(), True),
    StructField("Bike number", StringType(), True),
    StructField("Member type", StringType(), True)
])
'''

'\nfrom pyspark.sql.types import StructType, StructField, StringType, TimestampType\n\n# Define the schema explicitly\nschema = StructType([\n    StructField("Duration", StringType(), True),\n    StructField("Start date", StringType(), True),\n    StructField("End date", StringType(), True),\n    StructField("Start station number", StringType(), True),\n    StructField("Start station", StringType(), True),\n    StructField("End station number", StringType(), True),\n    StructField("End station", StringType(), True),\n    StructField("Bike number", StringType(), True),\n    StructField("Member type", StringType(), True)\n])\n'

In [5]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType, IntegerType

# Define the schema explicitly
schema = StructType([
    StructField("", IntegerType(), True),
    StructField("ride_id", StringType(), True),
    StructField("rideable_type", StringType(), True),
    StructField("started_at", StringType(), True),
    StructField("ended_at", StringType(), True),
    StructField("start_station_name", StringType(), True),
    StructField("start_station_id", FloatType(), True),
    StructField("end_station_name", StringType(), True),
    StructField("end_station_id", FloatType(), True),
    StructField("start_lat", FloatType(), True),
    StructField("start_lng", FloatType(), True),
    StructField("end_lat", FloatType(), True),
    StructField("end_lng", FloatType(), True),
    StructField("member_casual", StringType(), True)
])


In [10]:
# Read a single CSV file to check schema
single_file_df = spark.read.schema(schema).option("header", "true").csv("hdfs://localhost:9000/datalake/raw/bike_sharing/bike_sharing/202202-capitalbikeshare-tripdata.csv")

# Show the DataFrame to verify the schema
single_file_df.printSchema()
single_file_df.show()


root
 |-- ride_id: string (nullable = true)
 |-- rideable_type: string (nullable = true)
 |-- started_at: string (nullable = true)
 |-- ended_at: string (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: float (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: float (nullable = true)
 |-- start_lat: float (nullable = true)
 |-- start_lng: float (nullable = true)
 |-- end_lat: float (nullable = true)
 |-- end_lng: float (nullable = true)
 |-- member_casual: string (nullable = true)

+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+---------+---------+---------+----------+-------------+
|         ride_id|rideable_type|         started_at|           ended_at|  start_station_name|start_station_id|    end_station_name|end_station_id|start_lat|start_lng|  end_lat|   end_lng|member_casual|
+----------------+-------------+-

In [11]:
capitalbikeshare_raw.unpersist()

DataFrame[: float, ride_id: string, rideable_type: string, started_at: string, ended_at: string, start_station_name: string, start_station_id: float, end_station_name: string, end_station_id: float, start_lat: float, start_lng: float, end_lat: float, end_lng: float, member_casual: string]

In [6]:
# Read all CSV files in the directory using the defined schema
capitalbikeshare_raw = spark.read.schema(schema).option("header", "true").csv("hdfs://localhost:9000/datalake/raw/bike_sharing/final_data.csv")
capitalbikeshare_raw.show(5)

[Stage 0:>                                                          (0 + 1) / 1]

+---+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+---------+---------+--------+---------+-------------+
|   |         ride_id|rideable_type|         started_at|           ended_at|  start_station_name|start_station_id|    end_station_name|end_station_id|start_lat|start_lng| end_lat|  end_lng|member_casual|
+---+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+---------+---------+--------+---------+-------------+
|  0|77A0F1B26D1597B1|  docked_bike|2020-04-25 17:28:39|2020-04-25 17:35:04|Rhode Island & Co...|         31239.0|      12th & L St NW|       31251.0|38.905994| -77.0398|38.90382| -77.0284|       casual|
|  1|8698F10128EA4F18|  docked_bike|2020-04-06 07:54:59|2020-04-06 07:57:24|      21st & I St NW|         31205.0|      18th & L St NW|       31224.0| 38.90071|-77.04645|38.90374|-77.0

                                                                                

In [14]:
# Cache the DataFrame if needed
capitalbikeshare_raw.cache()

DataFrame[: int, ride_id: string, rideable_type: string, started_at: string, ended_at: string, start_station_name: string, start_station_id: float, end_station_name: string, end_station_id: float, start_lat: float, start_lng: float, end_lat: float, end_lng: float, member_casual: string]

In [9]:
# Print the schema to verify
capitalbikeshare_raw.printSchema()

# Perform a count to ensure there are no read errors
print(f"Number of records: {capitalbikeshare_raw.count()}")


root
 |-- ride_id: string (nullable = true)
 |-- rideable_type: string (nullable = true)
 |-- started_at: string (nullable = true)
 |-- ended_at: string (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: float (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: float (nullable = true)
 |-- start_lat: float (nullable = true)
 |-- start_lng: float (nullable = true)
 |-- end_lat: float (nullable = true)
 |-- end_lng: float (nullable = true)
 |-- member_casual: string (nullable = true)





Number of records: 22511604


                                                                                

In [19]:
# Check for nulls or other anomalies in the 'started_at' and 'ended_at' columns as they have caused a lot of trouble
capitalbikeshare_raw.select("started_at", "ended_at").summary("count", "min", "max").show()




+-------+-------------+-------------------+
|summary|   started_at|           ended_at|
+-------+-------------+-------------------+
|  count|     11246784|           11246784|
|    min| classic_bike|2020-04-01 00:25:48|
|    max|electric_bike|2023-09-30 23:59:58|
+-------+-------------+-------------------+



                                                                                

In [13]:
from pyspark.sql import functions as F

value_counts_df = capitalbikeshare_raw.groupBy("rideable_type").count().orderBy(F.desc("count"))
value_counts_df.show()




+-------------+-------+
|rideable_type|  count|
+-------------+-------+
| classic_bike|7129646|
|electric_bike|2185882|
|  docked_bike|1931256|
+-------------+-------+



                                                                                

In [161]:
# Only if I need to restart the process from the beginning...
'''
capitalbikeshare_raw = capitalbikeshare_raw_copy
'''

In [18]:
# Copy the original df into a 'safety' variable
'''
capitalbikeshare_raw_copy = capitalbikeshare_raw

capitalbikeshare_raw_copy.limit(5).toPandas()
'''

Unnamed: 0,ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual
0,ride_id,rideable_type,started_at,ended_at,start_station_name,,end_station_name,,,,,,member_casual
1,D88F352076694FFC,classic_bike,2023-08-03 17:33:41,2023-08-03 18:03:07,5th St & Massachusetts Ave NW,31265.0,20th & Columbia Rd NW,31133.0,38.900928,-77.018677,38.918037,-77.045486,member
2,6B733406B1E19446,classic_bike,2023-08-16 08:19:26,2023-08-16 08:34:07,5th & K St NW,31600.0,23rd & E St NW,31260.0,38.903042,-77.019028,38.896103,-77.049881,member
3,56145CE76A7B5A66,classic_bike,2023-08-25 08:19:20,2023-08-25 08:37:11,5th & K St NW,31600.0,23rd & E St NW,31260.0,38.903042,-77.019028,38.896103,-77.049881,member
4,11B7E5D90AE9DBE5,classic_bike,2023-08-15 17:56:22,2023-08-15 18:13:58,23rd & E St NW,31260.0,5th & K St NW,31600.0,38.896103,-77.049881,38.903042,-77.019028,member


In [15]:
from pyspark.sql.functions import col, to_timestamp

# Check for entries that are non-date values
capitalbikeshare_raw.filter(~col("started_at").rlike("^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$")).select("started_at").show(truncate=False)




+----------+
|started_at|
+----------+
+----------+



                                                                                

In [12]:
from pyspark.sql.functions import expr


# Get the first row of the DataFrame
header = capitalbikeshare_raw.first()

# Get the name of the first column
first_column_name = capitalbikeshare_raw.columns[0]

# Filter out the rows that have the same value as the header in the first row
capitalbikeshare_filtered = capitalbikeshare_raw.filter(expr("NOT {} = '{}'".format(first_column_name, header[0])))

# Count the number of rows that were dropped or filtered
num_dropped = capitalbikeshare_raw.count() - capitalbikeshare_filtered.count()

# Print the number of rows that were dropped or filtered and the new total number of rows in the DataFrame
print(f"Dropped {num_dropped} rows. New total number of rows: {capitalbikeshare_filtered.count()}")





Dropped 44 rows. New total number of rows: 22511560


                                                                                

In [13]:
capitalbikeshare_raw.unpersist()
capitalbikeshare_filtered.cache()

DataFrame[ride_id: string, rideable_type: string, started_at: string, ended_at: string, start_station_name: string, start_station_id: float, end_station_name: string, end_station_id: float, start_lat: float, start_lng: float, end_lat: float, end_lng: float, member_casual: string]

In [14]:
capitalbikeshare_filtered.limit(5).toPandas()

                                                                                

Unnamed: 0,ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual
0,C40CF15D0DBE584E,classic_bike,2022-09-23 18:42:27,2022-09-24 08:27:11,Georgia Ave & Morton St NW,31419.0,14th St Heights / 14th & Crittenden St NW,31402.0,38.932129,-77.023499,38.947773,-77.032822,member
1,3788C1944643B399,classic_bike,2022-09-22 01:35:01,2022-09-22 01:49:59,7th & T St NW,31109.0,7th St & Massachusetts Ave NE,31647.0,38.915691,-77.021706,38.89222,-76.996017,member
2,A9E14A3286BB0922,classic_bike,2022-09-16 17:04:11,2022-09-16 17:08:40,7th & F St NW / National Portrait Gallery,31232.0,North Capitol St & F St NW,31624.0,38.897282,-77.022194,38.897446,-77.009888,member
3,77518ADEB4313901,classic_bike,2022-09-29 17:24:21,2022-09-29 18:32:05,14th & D St NW / Ronald Reagan Building,31231.0,4th & East Capitol St NE,31618.0,38.894512,-77.031616,38.889954,-77.000351,casual
4,7B99FCADC829EAC0,classic_bike,2022-09-13 23:51:12,2022-09-14 00:35:14,New Hampshire Ave & 24th St NW,31275.0,New Hampshire Ave & 24th St NW,31275.0,38.901756,-77.051086,38.901756,-77.051086,casual


In [8]:
capitalbikeshare_filtered = capitalbikeshare_raw

In [18]:
# Check for entries that are non-date values
capitalbikeshare_filtered.filter(~col("started_at").rlike("^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$")).select("started_at").show(truncate=False)




+----------+
|started_at|
+----------+
+----------+



                                                                                

In [16]:
capitalbikeshare_filtered.printSchema()

root
 |-- ride_id: string (nullable = true)
 |-- rideable_type: string (nullable = true)
 |-- started_at: string (nullable = true)
 |-- ended_at: string (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: float (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: float (nullable = true)
 |-- start_lat: float (nullable = true)
 |-- start_lng: float (nullable = true)
 |-- end_lat: float (nullable = true)
 |-- end_lng: float (nullable = true)
 |-- member_casual: string (nullable = true)



We need to modify the date columns to be operable timestamp columns

In [9]:
from pyspark.sql.functions import col, to_timestamp

# Rename the original date columns to indicate they are string representations
capitalbikeshare_dateOperable = capitalbikeshare_filtered.withColumnRenamed("started_at", "Start_date_str") \
                                            .withColumnRenamed("ended_at", "End_date_str")

# Convert the string date columns to timestamp data type with the original column names
capitalbikeshare_dateOperable = capitalbikeshare_dateOperable.withColumn(
    "started_at", to_timestamp(col("Start_date_str"), "yyyy-MM-dd HH:mm:ss")
).withColumn(
    "ended_at", to_timestamp(col("End_date_str"), "yyyy-MM-dd HH:mm:ss")
)

# Drop the original string date columns
capitalbikeshare_dateOperable = capitalbikeshare_dateOperable.drop("Start_date_str", "End_date_str")

# The DataFrame now has timestamp columns with the original names for date-based operations

In [10]:
capitalbikeshare_filtered.unpersist()
capitalbikeshare_dateOperable.cache()

DataFrame[: int, ride_id: string, rideable_type: string, start_station_name: string, start_station_id: float, end_station_name: string, end_station_id: float, start_lat: float, start_lng: float, end_lat: float, end_lng: float, member_casual: string, started_at: timestamp, ended_at: timestamp]

In [21]:
capitalbikeshare_dateOperable.limit(5).toPandas()

                                                                                

Unnamed: 0,Unnamed: 1,ride_id,rideable_type,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual,started_at,ended_at
0,0,77A0F1B26D1597B1,docked_bike,Rhode Island & Connecticut Ave NW,31239.0,12th & L St NW,31251.0,38.905994,-77.039803,38.90382,-77.028397,casual,2020-04-25 17:28:39,2020-04-25 17:35:04
1,1,8698F10128EA4F18,docked_bike,21st & I St NW,31205.0,18th & L St NW,31224.0,38.900711,-77.046448,38.90374,-77.04245,member,2020-04-06 07:54:59,2020-04-06 07:57:24
2,2,AA07819DC0F58872,docked_bike,Connecticut Ave & Tilden St NW,31313.0,Connecticut Ave & Tilden St NW,31313.0,38.941139,-77.061974,38.941139,-77.061974,casual,2020-04-22 17:06:18,2020-04-22 18:08:32
3,3,DA909BCA92EF85AB,docked_bike,7th & E St SW,31294.0,7th & E St SW,31294.0,38.88345,-77.021744,38.88345,-77.021744,casual,2020-04-16 15:22:40,2020-04-16 15:58:37
4,4,B36F1E14D8C6757E,docked_bike,Potomac & Pennsylvania Ave SE,31606.0,8th & Eye St SE / Barracks Row,31608.0,38.880299,-76.986198,38.8792,-76.9953,member,2020-04-10 13:19:41,2020-04-10 13:23:05


In [22]:
# For each column, select the column, perform a count of nulls, and then show the results
for column in capitalbikeshare_dateOperable.columns:
    missing_count = capitalbikeshare_dateOperable.filter(col(column).isNull()).count()
    print(f"Column {column} has {missing_count} missing values")

                                                                                

Column  has 0 missing values


                                                                                

Column ride_id has 0 missing values


                                                                                

Column rideable_type has 0 missing values


                                                                                

Column start_station_name has 680271 missing values


                                                                                

Column start_station_id has 680273 missing values


                                                                                

Column end_station_name has 753012 missing values


                                                                                

Column end_station_id has 753014 missing values
Column start_lat has 10 missing values


                                                                                

Column start_lng has 10 missing values


                                                                                

Column end_lat has 22184 missing values


                                                                                

Column end_lng has 22184 missing values


                                                                                

Column member_casual has 0 missing values


                                                                                

Column started_at has 0 missing values




Column ended_at has 0 missing values


                                                                                

In [23]:
# Find the minimum and maximum of the Start date (obviously it takes really long, 
# so I won't be using it after I've checked that it's correct).

from pyspark.sql.functions import min, max

# Find the minimum and maximum of the Start date
start_date_range = capitalbikeshare_dateOperable.agg(
    min("started_at").alias("Start date min"),
    max("started_at").alias("Start date max")
)

# Find the minimum and maximum of the End date
end_date_range = capitalbikeshare_dateOperable.agg(
    min("ended_at").alias("End date min"),
    max("ended_at").alias("End date max")
)

# Show the results
start_date_range.show()
end_date_range.show()

                                                                                

+-------------------+-------------------+
|     Start date min|     Start date max|
+-------------------+-------------------+
|2020-04-01 00:25:48|2023-09-30 23:59:58|
+-------------------+-------------------+





+-------------------+-------------------+
|       End date min|       End date max|
+-------------------+-------------------+
|2020-04-01 00:27:59|2023-10-05 02:00:19|
+-------------------+-------------------+



                                                                                

In [24]:
# Observe schema
capitalbikeshare_dateOperable.printSchema()

root
 |-- : integer (nullable = true)
 |-- ride_id: string (nullable = true)
 |-- rideable_type: string (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: float (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: float (nullable = true)
 |-- start_lat: float (nullable = true)
 |-- start_lng: float (nullable = true)
 |-- end_lat: float (nullable = true)
 |-- end_lng: float (nullable = true)
 |-- member_casual: string (nullable = true)
 |-- started_at: timestamp (nullable = true)
 |-- ended_at: timestamp (nullable = true)



### Weather Dataset Info

https://www.visualcrossing.com/resources/documentation/weather-data/weather-data-documentation/

In [11]:
weatherdata_raw = (spark.read
                    .option("inferSchema", "true")
                    .option('header', 'true')
                    .csv("hdfs://localhost:9000/datalake/raw/weather/weather")
                    .cache())

                                                                                

In [26]:
weatherdata_raw.limit(5).toPandas()

Unnamed: 0,name,datetime,tempmax,tempmin,temp,feelslikemax,feelslikemin,feelslike,dew,humidity,...,solarenergy,uvindex,severerisk,sunrise,sunset,moonphase,conditions,description,icon,stations
0,washinton dc,2020-07-01,30.0,22.6,25.9,31.6,22.6,26.5,20.0,70.6,...,6.0,4,,2020-07-01 05:46:51,2020-07-01 20:37:16,0.37,"Rain, Partially cloudy",Partly cloudy throughout the day with rain.,rain,"KDCA,72405013743,72403793728,F0198,KGAI,KADW,KDAA,AS365,72033493764,74594013705"
1,washinton dc,2020-07-02,33.3,22.0,27.8,33.7,22.0,28.1,18.0,57.9,...,6.2,4,,2020-07-02 05:47:21,2020-07-02 20:37:08,0.41,Partially cloudy,Becoming cloudy in the afternoon.,partly-cloudy-day,"KDCA,72405013743,72403793728,F0198,KGAI,KDAA,AS365,KADW,72033493764,74594013705"
2,washinton dc,2020-07-03,35.5,24.1,30.3,37.0,24.1,31.0,18.0,49.2,...,7.1,6,,2020-07-03 05:47:52,2020-07-03 20:36:58,0.45,Partially cloudy,Partly cloudy throughout the day.,partly-cloudy-day,"KDCA,72405013743,72403793728,F0198,KADW,KDAA,74594013705"
3,washinton dc,2020-07-04,33.6,25.3,29.2,35.6,25.3,30.7,20.0,58.3,...,6.2,4,,2020-07-04 05:48:24,2020-07-04 20:36:45,0.48,"Rain, Partially cloudy",Becoming cloudy in the afternoon with afternoon rain.,rain,"KDCA,72405013743,72403793728,F0198,KADW,KDAA,AS365,74594013705"
4,washinton dc,2020-07-05,33.6,24.2,28.4,38.7,24.2,30.7,22.4,71.3,...,6.1,4,,2020-07-05 05:48:58,2020-07-05 20:36:31,0.5,Partially cloudy,Partly cloudy throughout the day.,partly-cloudy-day,"KDCA,72405013743,72403793728,F0198,KADW,KDAA,74594013705"


In [27]:
# Observe schema
weatherdata_raw.printSchema()

root
 |-- name: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- tempmax: double (nullable = true)
 |-- tempmin: double (nullable = true)
 |-- temp: double (nullable = true)
 |-- feelslikemax: double (nullable = true)
 |-- feelslikemin: double (nullable = true)
 |-- feelslike: double (nullable = true)
 |-- dew: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- precip: double (nullable = true)
 |-- precipprob: integer (nullable = true)
 |-- precipcover: double (nullable = true)
 |-- preciptype: string (nullable = true)
 |-- snow: double (nullable = true)
 |-- snowdepth: double (nullable = true)
 |-- windgust: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- winddir: double (nullable = true)
 |-- sealevelpressure: double (nullable = true)
 |-- cloudcover: double (nullable = true)
 |-- visibility: double (nullable = true)
 |-- solarradiation: double (nullable = true)
 |-- solarenergy: double (nullable = true)
 |-- uvindex: int

Analyzing columns PRELIMINARILY:

root
 |-- name: string (nullable = true) <--------------------- IRRELEVANT
 
 |-- datetime: string (nullable = true) <--------------------- NOT TIMESTAMP
 
 |-- tempmax: double (nullable = true) <--------------------- POSSIBLE REDUNDANCIES GROUP 1
 
 |-- tempmin: double (nullable = true) <--------------------- POSSIBLE REDUNDANCIES GROUP 1
 
 |-- temp: double (nullable = true) <--------------------- POSSIBLE REDUNDANCIES GROUP 1
 
 |-- feelslikemax: double (nullable = true) <--------------------- POSSIBLE REDUNDANCIES GROUP 2
 
 |-- feelslikemin: double (nullable = true) <--------------------- POSSIBLE REDUNDANCIES GROUP 2
 
 |-- feelslike: double (nullable = true) <--------------------- POSSIBLE REDUNDANCIES GROUP 2
 
 |-- dew: double (nullable = true)
 
 |-- humidity: double (nullable = true) <--------------------- POSSIBLE REDUNDANCIES GROUP 3
 
 |-- precip: double (nullable = true) <--------------------- POSSIBLE REDUNDANCIES GROUP 3
 
 |-- precipprob: integer (nullable = true) <--------------------- POSSIBLE REDUNDANCIES GROUP 3
 
 |-- precipcover: double (nullable = true) <--------------------- POSSIBLE REDUNDANCIES GROUP 3
 
 |-- preciptype: string (nullable = true)
 
 |-- snow: double (nullable = true)
 
 |-- snowdepth: double (nullable = true)
 
 |-- windgust: double (nullable = true) <--------------------- POSSIBLE REDUNDANCIES GROUP 4
 
 |-- windspeed: double (nullable = true) <--------------------- POSSIBLE REDUNDANCIES GROUP 4
 
 |-- winddir: double (nullable = true)
 
 |-- sealevelpressure: double (nullable = true)
 
 |-- cloudcover: double (nullable = true)
 
 |-- visibility: double (nullable = true)
 
 |-- solarradiation: double (nullable = true) <--------------------- POSSIBLE REDUNDANCIES GROUP 5
 
 |-- solarenergy: double (nullable = true) <--------------------- POSSIBLE REDUNDANCIES GROUP 5
 
 |-- uvindex: integer (nullable = true)
 
 |-- severerisk: integer (nullable = true)
 
 |-- sunrise: timestamp (nullable = true) <--------------------- EXTRACT TO 'sunlight_hours'
 
 |-- sunset: timestamp (nullable = true) <--------------------- EXTRACT TO 'sunlight_hours'
 
 |-- moonphase: double (nullable = true)
 
 |-- conditions: string (nullable = true) <--------------------- REDUNDANT & IMPRACTICAL
 
 |-- description: string (nullable = true) <--------------------- REDUNDANT & IMPRACTICAL
 
 |-- icon: string (nullable = true) <--------------------- REDUNDANT & IMPRACTICAL
 
 |-- stations: string (nullable = true) <--------------------- IRRELEVANT

In [28]:
# For the capitalbikeshare_raw DataFrame
capitalbikeshare_row_count = capitalbikeshare_dateOperable.count()
capitalbikeshare_column_count = len(capitalbikeshare_dateOperable.columns)
print(f"The shape of capitalbikeshare_dateOperable is: ({capitalbikeshare_row_count}, {capitalbikeshare_column_count})")

# For the weatherdata_raw DataFrame
weatherdata_row_count = weatherdata_raw.count()
weatherdata_column_count = len(weatherdata_raw.columns)
print(f"The shape of weatherdata_raw is: ({weatherdata_row_count}, {weatherdata_column_count})")


                                                                                

The shape of capitalbikeshare_dateOperable is: (11246784, 14)
The shape of weatherdata_raw is: (2133, 33)


#### Comment on Dataset Size

- **capitalbikeshare_raw** has 22,108,947 rows and 9 columns, which is quite sizable. Operations on this DataFrame, especially those that require a full pass over the data like .count(), .show(), or any kind of groupBy/aggregation, will take a noticeable amount of time to complete.
- **weatherdata_raw** has 2,133 rows and 33 columns, which is relatively small. Operations on this DataFrame should be much faster and are generally reasonable to perform without needing to sample or otherwise optimize for performance.

## Cleaning

In [12]:
from pyspark.sql.functions import to_timestamp

# Convert the 'datetime' column to timestamp data type
weatherdata_raw = weatherdata_raw.withColumn(
    "datetime", to_timestamp("datetime", "yyyy-MM-dd")
)

# Now the 'datetime' column is in timestamp format and ready for joining with the other dataset


weatherdata_raw.printSchema()

root
 |-- name: string (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- tempmax: double (nullable = true)
 |-- tempmin: double (nullable = true)
 |-- temp: double (nullable = true)
 |-- feelslikemax: double (nullable = true)
 |-- feelslikemin: double (nullable = true)
 |-- feelslike: double (nullable = true)
 |-- dew: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- precip: double (nullable = true)
 |-- precipprob: integer (nullable = true)
 |-- precipcover: double (nullable = true)
 |-- preciptype: string (nullable = true)
 |-- snow: double (nullable = true)
 |-- snowdepth: double (nullable = true)
 |-- windgust: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- winddir: double (nullable = true)
 |-- sealevelpressure: double (nullable = true)
 |-- cloudcover: double (nullable = true)
 |-- visibility: double (nullable = true)
 |-- solarradiation: double (nullable = true)
 |-- solarenergy: double (nullable = true)
 |-- uvindex: 

In [13]:
from pyspark.sql.functions import unix_timestamp, round

# Calculate 'sunlight_hours' by finding the difference between 'sunset' and 'sunrise', and convert to hours
weatherdata_raw = weatherdata_raw.withColumn(
    "sunlight_hours",
    round((unix_timestamp("sunset") - unix_timestamp("sunrise")) / 3600.0, 2)
)

# The 'sunlight_hours' column will now contain the duration of sunlight in hours, rounded to two decimal places


In [29]:
weatherdata_raw.limit(5).toPandas()

Unnamed: 0,name,datetime,tempmax,tempmin,temp,feelslikemax,feelslikemin,feelslike,dew,humidity,...,uvindex,severerisk,sunrise,sunset,moonphase,conditions,description,icon,stations,sunlight_hours
0,washinton dc,2020-07-01,30.0,22.6,25.9,31.6,22.6,26.5,20.0,70.6,...,4,,2020-07-01 05:46:51,2020-07-01 20:37:16,0.37,"Rain, Partially cloudy",Partly cloudy throughout the day with rain.,rain,"KDCA,72405013743,72403793728,F0198,KGAI,KADW,KDAA,AS365,72033493764,74594013705",14.84
1,washinton dc,2020-07-02,33.3,22.0,27.8,33.7,22.0,28.1,18.0,57.9,...,4,,2020-07-02 05:47:21,2020-07-02 20:37:08,0.41,Partially cloudy,Becoming cloudy in the afternoon.,partly-cloudy-day,"KDCA,72405013743,72403793728,F0198,KGAI,KDAA,AS365,KADW,72033493764,74594013705",14.83
2,washinton dc,2020-07-03,35.5,24.1,30.3,37.0,24.1,31.0,18.0,49.2,...,6,,2020-07-03 05:47:52,2020-07-03 20:36:58,0.45,Partially cloudy,Partly cloudy throughout the day.,partly-cloudy-day,"KDCA,72405013743,72403793728,F0198,KADW,KDAA,74594013705",14.82
3,washinton dc,2020-07-04,33.6,25.3,29.2,35.6,25.3,30.7,20.0,58.3,...,4,,2020-07-04 05:48:24,2020-07-04 20:36:45,0.48,"Rain, Partially cloudy",Becoming cloudy in the afternoon with afternoon rain.,rain,"KDCA,72405013743,72403793728,F0198,KADW,KDAA,AS365,74594013705",14.81
4,washinton dc,2020-07-05,33.6,24.2,28.4,38.7,24.2,30.7,22.4,71.3,...,4,,2020-07-05 05:48:58,2020-07-05 20:36:31,0.5,Partially cloudy,Partly cloudy throughout the day.,partly-cloudy-day,"KDCA,72405013743,72403793728,F0198,KADW,KDAA,74594013705",14.79


In [31]:
# Obtaining all unique values in 'name' column (knowing they can't be many...), to ensure we should drop it.
unique_names = weatherdata_raw.select('name').distinct().collect()

# If there are only 1 or 2 unique values, this will be quick
unique_values = [row['name'] for row in unique_names]
unique_values


['washinton dc']

Explore missing values for weatherdata_raw:

In [14]:
from pyspark.sql.functions import count, when, isnan, lit
from pyspark.sql.types import DoubleType, FloatType

# Function to calculate missing values
def calculate_missing_values(df):
    total_rows = df.count()
    agg_exprs = []
    for c in df.columns:
        # For numerical columns, check for both null and NaN
        if isinstance(df.schema[c].dataType, (DoubleType, FloatType)):
            agg_exprs.append(count(when(col(c).isNull() | isnan(col(c)), c)).alias(c + '_count'))
            agg_exprs.append((count(when(col(c).isNull() | isnan(col(c)), c)) / total_rows * 100).alias(c + '_percent'))
        # For non-numerical columns, only check for null
        else:
            agg_exprs.append(count(when(col(c).isNull(), c)).alias(c + '_count'))
            agg_exprs.append((count(when(col(c).isNull(), c)) / total_rows * 100).alias(c + '_percent'))
    return df.agg(*agg_exprs)

# Use the function to calculate missing values on the DataFrame
missing_values_df = calculate_missing_values(weatherdata_raw)

# Convert to Pandas DataFrame for better formatting
missing_values_pandas = missing_values_df.toPandas()

# Transpose the DataFrame to have column names as the row index
missing_values_pandas = missing_values_pandas.transpose().reset_index()

# Split the 'index' column to separate the column names and the '_count' or '_percent' suffix
missing_values_pandas[['Column', 'Metric']] = missing_values_pandas['index'].str.rsplit('_', n=1, expand=True)

# Pivot the DataFrame to get the 'Missing Count' and 'Missing Percent' as separate columns
missing_values_pandas = missing_values_pandas.pivot(index='Column', columns='Metric', values=0)

# Reset the index to turn 'Column' back into a regular column
missing_values_pandas.reset_index(inplace=True)

# Rename the columns for clarity
missing_values_pandas.columns = ['Column', 'Missing Count', 'Missing Percent']

# Display the DataFrame
missing_values_pandas


                                                                                

Unnamed: 0,Column,Missing Count,Missing Percent
0,cloudcover,0.0,0.0
1,conditions,0.0,0.0
2,datetime,0.0,0.0
3,description,0.0,0.0
4,dew,0.0,0.0
5,feelslike,0.0,0.0
6,feelslikemax,0.0,0.0
7,feelslikemin,0.0,0.0
8,humidity,0.0,0.0
9,icon,0.0,0.0


Drop the irrelevant columns and those with too many missing values (taking in to account the amount of other related weather variables):


In [15]:

weatherdata_clean = weatherdata_raw.drop('name', 'conditions', 'description', 'icon','preciptype','severerisk','windgust','stations')


In [16]:
weatherdata_raw.unpersist()

DataFrame[name: string, datetime: timestamp, tempmax: double, tempmin: double, temp: double, feelslikemax: double, feelslikemin: double, feelslike: double, dew: double, humidity: double, precip: double, precipprob: int, precipcover: double, preciptype: string, snow: double, snowdepth: double, windgust: double, windspeed: double, winddir: double, sealevelpressure: double, cloudcover: double, visibility: double, solarradiation: double, solarenergy: double, uvindex: int, severerisk: int, sunrise: timestamp, sunset: timestamp, moonphase: double, conditions: string, description: string, icon: string, stations: string, sunlight_hours: double]

In [33]:
weatherdata_clean.limit(5).toPandas()

Unnamed: 0,datetime,tempmax,tempmin,temp,feelslikemax,feelslikemin,feelslike,dew,humidity,precip,...,sealevelpressure,cloudcover,visibility,solarradiation,solarenergy,uvindex,sunrise,sunset,moonphase,sunlight_hours
0,2020-07-01,30.0,22.6,25.9,31.6,22.6,26.5,20.0,70.6,3.133,...,1011.5,54.0,16.0,71.3,6.0,4,2020-07-01 05:46:51,2020-07-01 20:37:16,0.37,14.84
1,2020-07-02,33.3,22.0,27.8,33.7,22.0,28.1,18.0,57.9,0.0,...,1012.0,36.9,16.0,73.5,6.2,4,2020-07-02 05:47:21,2020-07-02 20:37:08,0.41,14.83
2,2020-07-03,35.5,24.1,30.3,37.0,24.1,31.0,18.0,49.2,0.0,...,1011.7,20.4,16.0,84.0,7.1,6,2020-07-03 05:47:52,2020-07-03 20:36:58,0.45,14.82
3,2020-07-04,33.6,25.3,29.2,35.6,25.3,30.7,20.0,58.3,0.537,...,1012.7,40.9,15.7,73.0,6.2,4,2020-07-04 05:48:24,2020-07-04 20:36:45,0.48,14.81
4,2020-07-05,33.6,24.2,28.4,38.7,24.2,30.7,22.4,71.3,0.0,...,1014.8,44.3,13.5,69.9,6.1,4,2020-07-05 05:48:58,2020-07-05 20:36:31,0.5,14.79


Verify no missing values for cleaned dataframe

In [17]:
# Use the function to verify missing values on the cleaned DataFrame ('wd_c' stands for 'weatherdata_clean')
missing_values_wd_c = calculate_missing_values(weatherdata_clean)

# Convert to Pandas DataFrame for better formatting
missing_values_wd_c_pandas = missing_values_wd_c.toPandas()

# Transpose the DataFrame to have column names as the row index
missing_values_wd_c_pandas = missing_values_wd_c_pandas.transpose().reset_index()

# Split the 'index' column to separate the column names and the '_count' or '_percent' suffix
missing_values_wd_c_pandas[['Column', 'Metric']] = missing_values_wd_c_pandas['index'].str.rsplit('_', n=1, expand=True)

# Pivot the DataFrame to get the 'Missing Count' and 'Missing Percent' as separate columns
missing_values_wd_c_pandas = missing_values_wd_c_pandas.pivot(index='Column', columns='Metric', values=0)

# Reset the index to turn 'Column' back into a regular column
missing_values_wd_c_pandas.reset_index(inplace=True)

# Rename the columns for clarity
missing_values_wd_c_pandas.columns = ['Column', 'Missing Count', 'Missing Percent']

# Display the DataFrame
missing_values_wd_c_pandas

Unnamed: 0,Column,Missing Count,Missing Percent
0,cloudcover,0.0,0.0
1,datetime,0.0,0.0
2,dew,0.0,0.0
3,feelslike,0.0,0.0
4,feelslikemax,0.0,0.0
5,feelslikemin,0.0,0.0
6,humidity,0.0,0.0
7,moonphase,0.0,0.0
8,precip,0.0,0.0
9,precipcover,0.0,0.0


Explore missing values for capitalbikeshare_dateOperable:

In [18]:
from pyspark.sql.types import DoubleType, FloatType
from pyspark.sql.functions import isnan, when

# Initialize Spark session
spark = SparkSession.builder.appName("missing_values").getOrCreate()

# Function to calculate missing values for each column
def calculate_missing_values(df):
    total_rows = df.count()
    agg_exprs_counts = [
        count(when(col(c).isNull(), c)).alias(c + '_count')
        for c in df.columns
    ]
    agg_exprs_percent = [
        (count(when(col(c).isNull(), c)) / total_rows * 100).alias(c + '_percent')
        for c in df.columns
    ]
    return df.agg(*agg_exprs_counts + agg_exprs_percent)

# Use the function to calculate missing values on the DataFrame
missing_values_cbs_r = calculate_missing_values(capitalbikeshare_dateOperable)

# Convert to Pandas DataFrame for better formatting
missing_values_cbs_r_pandas = missing_values_cbs_r.toPandas()

# Prepare the data for the desired output format
missing_counts_cbs_r_final = pd.DataFrame({
    'Column': [c.replace('_count', '') for c in missing_values_cbs_r_pandas.columns[:len(capitalbikeshare_dateOperable.columns)]],
    'Missing Count': missing_values_cbs_r_pandas.iloc[0, :len(capitalbikeshare_dateOperable.columns)].astype(int),
    'Missing Percent': missing_values_cbs_r_pandas.iloc[0, len(capitalbikeshare_dateOperable.columns):].values
})

# Format the 'Missing Percent' column to show as a percentage with two decimal places
missing_counts_cbs_r_final['Missing Percent'] = missing_counts_cbs_r_final['Missing Percent'].map(lambda x: '{:.2f}%'.format(x))

missing_counts_cbs_r_final



                                                                                

Unnamed: 0,Column,Missing Count,Missing Percent
_count,,0,0.00%
ride_id_count,ride_id,0,0.00%
rideable_type_count,rideable_type,0,0.00%
start_station_name_count,start_station_name,680271,6.05%
start_station_id_count,start_station_id,680273,6.05%
end_station_name_count,end_station_name,753012,6.70%
end_station_id_count,end_station_id,753014,6.70%
start_lat_count,start_lat,10,0.00%
start_lng_count,start_lng,10,0.00%
end_lat_count,end_lat,22184,0.20%


Drop irrelevant columns:

In [19]:
# Columns with missing values
columns_with_missing_values = ['start_station_name', 'end_station_name', 'start_lat', 'start_lng', 'end_lat', 'end_lng']

# Drop rows with missing values in these columns
capitalbikeshare_dropped_cols = capitalbikeshare_dateOperable.dropna(subset=columns_with_missing_values)

In [20]:
capitalbikeshare_dateOperable.unpersist()

DataFrame[: int, ride_id: string, rideable_type: string, start_station_name: string, start_station_id: float, end_station_name: string, end_station_id: float, start_lat: float, start_lng: float, end_lat: float, end_lng: float, member_casual: string, started_at: timestamp, ended_at: timestamp]

In [38]:
capitalbikeshare_dropped_cols.printSchema()

root
 |-- : integer (nullable = true)
 |-- ride_id: string (nullable = true)
 |-- rideable_type: string (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: float (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: float (nullable = true)
 |-- start_lat: float (nullable = true)
 |-- start_lng: float (nullable = true)
 |-- end_lat: float (nullable = true)
 |-- end_lng: float (nullable = true)
 |-- member_casual: string (nullable = true)
 |-- started_at: timestamp (nullable = true)
 |-- ended_at: timestamp (nullable = true)



Drop all rows that contain missing values, as they are relatively few

In [22]:
# Drop all rows that contain missing values from the 'capitalbikeshare_dropped_cols' DataFrame
capitalbikeshare_clean = capitalbikeshare_dropped_cols.dropna()

# Count the number of rows in both DataFrames
rows_cbs_dateOperable_count = capitalbikeshare_dropped_cols.count()
rows_cbs_clean_count = capitalbikeshare_clean.count()

# Calculate the number and percentage of rows that were dropped
num_rows_dropped = rows_cbs_dateOperable_count - rows_cbs_clean_count
percent_dropped = (float(num_rows_dropped) / rows_cbs_dateOperable_count) * 100

# Print the number and percentage of rows that were dropped
print(num_rows_dropped)
print(percent_dropped)






0
0.0


                                                                                

Verify no more missing values:

In [40]:
# Use the function to calculate missing values on the DataFrame
missing_values_cbs_c = calculate_missing_values(capitalbikeshare_clean)

# Convert to Pandas DataFrame for better formatting
missing_values_cbs_c_pandas = missing_values_cbs_c.toPandas()

# Prepare the data for the desired output format
missing_counts_cbs_c_final = pd.DataFrame({
    'Column': [c.replace('_count', '') for c in missing_values_cbs_c_pandas.columns[:len(capitalbikeshare_clean.columns)]],
    'Missing Count': missing_values_cbs_c_pandas.iloc[0, :len(capitalbikeshare_clean.columns)].astype(int),
    'Missing Percent': missing_values_cbs_c_pandas.iloc[0, len(capitalbikeshare_clean.columns):].values
})

# Format the 'Missing Percent' column to show as a percentage with two decimal places
missing_counts_cbs_c_final['Missing Percent'] = missing_counts_cbs_c_final['Missing Percent'].map(lambda x: '{:.2f}%'.format(x))

missing_counts_cbs_c_final

                                                                                

Unnamed: 0,Column,Missing Count,Missing Percent
_count,,0,0.00%
ride_id_count,ride_id,0,0.00%
rideable_type_count,rideable_type,0,0.00%
start_station_name_count,start_station_name,0,0.00%
start_station_id_count,start_station_id,0,0.00%
end_station_name_count,end_station_name,0,0.00%
end_station_id_count,end_station_id,0,0.00%
start_lat_count,start_lat,0,0.00%
start_lng_count,start_lng,0,0.00%
end_lat_count,end_lat,0,0.00%


## Joining Both Tables

In [23]:
from pyspark.sql.functions import to_date

# Convert the 'started_at' column to a date data type and store the result in a new column called 'start_date'
capitalbikeshare_clean = capitalbikeshare_clean.withColumn(
    "start_date", to_date("started_at")
)

# Import the unix_timestamp function from the pyspark.sql.functions module
from pyspark.sql.functions import unix_timestamp

# Calculate the duration of each bike ride by finding the difference between the 'ended_at' and 'started_at' columns,
# and store the result in a new column called 'duration'
capitalbikeshare_clean = capitalbikeshare_clean.withColumn(
    "duration", unix_timestamp("ended_at") - unix_timestamp("started_at")
)

In [24]:
# Move the 'start_date' column to the beginning of the 'capitalbikeshare_clean' DataFrame
capitalbikeshare_clean = capitalbikeshare_clean.select('start_date', *capitalbikeshare_clean.drop('start_date').columns)


In [43]:
capitalbikeshare_clean.cache()
capitalbikeshare_clean.limit(5).toPandas()

                                                                                

Unnamed: 0,start_date,Unnamed: 2,ride_id,rideable_type,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual,started_at,ended_at,duration
0,2020-04-25,0,77A0F1B26D1597B1,docked_bike,Rhode Island & Connecticut Ave NW,31239.0,12th & L St NW,31251.0,38.905994,-77.039803,38.90382,-77.028397,casual,2020-04-25 17:28:39,2020-04-25 17:35:04,385
1,2020-04-06,1,8698F10128EA4F18,docked_bike,21st & I St NW,31205.0,18th & L St NW,31224.0,38.900711,-77.046448,38.90374,-77.04245,member,2020-04-06 07:54:59,2020-04-06 07:57:24,145
2,2020-04-22,2,AA07819DC0F58872,docked_bike,Connecticut Ave & Tilden St NW,31313.0,Connecticut Ave & Tilden St NW,31313.0,38.941139,-77.061974,38.941139,-77.061974,casual,2020-04-22 17:06:18,2020-04-22 18:08:32,3734
3,2020-04-16,3,DA909BCA92EF85AB,docked_bike,7th & E St SW,31294.0,7th & E St SW,31294.0,38.88345,-77.021744,38.88345,-77.021744,casual,2020-04-16 15:22:40,2020-04-16 15:58:37,2157
4,2020-04-10,4,B36F1E14D8C6757E,docked_bike,Potomac & Pennsylvania Ave SE,31606.0,8th & Eye St SE / Barracks Row,31608.0,38.880299,-76.986198,38.8792,-76.9953,member,2020-04-10 13:19:41,2020-04-10 13:23:05,204


In [44]:
weatherdata_clean.cache()
weatherdata_clean.limit(5).toPandas()   

Unnamed: 0,datetime,tempmax,tempmin,temp,feelslikemax,feelslikemin,feelslike,dew,humidity,precip,...,sealevelpressure,cloudcover,visibility,solarradiation,solarenergy,uvindex,sunrise,sunset,moonphase,sunlight_hours
0,2020-07-01,30.0,22.6,25.9,31.6,22.6,26.5,20.0,70.6,3.133,...,1011.5,54.0,16.0,71.3,6.0,4,2020-07-01 05:46:51,2020-07-01 20:37:16,0.37,14.84
1,2020-07-02,33.3,22.0,27.8,33.7,22.0,28.1,18.0,57.9,0.0,...,1012.0,36.9,16.0,73.5,6.2,4,2020-07-02 05:47:21,2020-07-02 20:37:08,0.41,14.83
2,2020-07-03,35.5,24.1,30.3,37.0,24.1,31.0,18.0,49.2,0.0,...,1011.7,20.4,16.0,84.0,7.1,6,2020-07-03 05:47:52,2020-07-03 20:36:58,0.45,14.82
3,2020-07-04,33.6,25.3,29.2,35.6,25.3,30.7,20.0,58.3,0.537,...,1012.7,40.9,15.7,73.0,6.2,4,2020-07-04 05:48:24,2020-07-04 20:36:45,0.48,14.81
4,2020-07-05,33.6,24.2,28.4,38.7,24.2,30.7,22.4,71.3,0.0,...,1014.8,44.3,13.5,69.9,6.1,4,2020-07-05 05:48:58,2020-07-05 20:36:31,0.5,14.79


In [45]:
capitalbikeshare_clean.printSchema()
weatherdata_clean.printSchema()
# Print the final row count for each DataFrame
print(f"capitalbikeshare_clean row count: {capitalbikeshare_clean.count()}")
print(f"weatherdata_clean row count: {weatherdata_clean.count()}")


root
 |-- start_date: date (nullable = true)
 |-- : integer (nullable = true)
 |-- ride_id: string (nullable = true)
 |-- rideable_type: string (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: float (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: float (nullable = true)
 |-- start_lat: float (nullable = true)
 |-- start_lng: float (nullable = true)
 |-- end_lat: float (nullable = true)
 |-- end_lng: float (nullable = true)
 |-- member_casual: string (nullable = true)
 |-- started_at: timestamp (nullable = true)
 |-- ended_at: timestamp (nullable = true)
 |-- duration: long (nullable = true)

root
 |-- datetime: timestamp (nullable = true)
 |-- tempmax: double (nullable = true)
 |-- tempmin: double (nullable = true)
 |-- temp: double (nullable = true)
 |-- feelslikemax: double (nullable = true)
 |-- feelslikemin: double (nullable = true)
 |-- feelslike: double (nullable = true)
 |-- dew: double (nullable = 

                                                                                

capitalbikeshare_clean row count: 10214972
weatherdata_clean row count: 2133


In [25]:
from pyspark.sql.functions import col

# Join the DataFrames on the 'start_date' and 'datetime' columns
biketrips_weather_total = capitalbikeshare_clean.join(
    weatherdata_clean,
    capitalbikeshare_clean.start_date == weatherdata_clean.datetime,
    'left'
)

# Drop the 'datetime' column, since it is redundant with 'start_date'
biketrips_weather_total = biketrips_weather_total.drop('datetime')


In [47]:
biketrips_weather_total.limit(5).toPandas()

Unnamed: 0,start_date,Unnamed: 2,ride_id,rideable_type,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,...,sealevelpressure,cloudcover,visibility,solarradiation,solarenergy,uvindex,sunrise,sunset,moonphase,sunlight_hours
0,2020-04-25,0,77A0F1B26D1597B1,docked_bike,Rhode Island & Connecticut Ave NW,31239.0,12th & L St NW,31251.0,38.905994,-77.039803,...,1016.0,78.2,15.2,90.0,7.9,4,2020-04-25 06:17:09,2020-04-25 19:55:29,0.09,13.64
1,2020-04-06,1,8698F10128EA4F18,docked_bike,21st & I St NW,31205.0,18th & L St NW,31224.0,38.900711,-77.046448,...,1017.0,46.4,16.0,182.6,15.7,9,2020-04-06 06:44:13,2020-04-06 19:37:08,0.45,12.88
2,2020-04-22,2,AA07819DC0F58872,docked_bike,Connecticut Ave & Tilden St NW,31313.0,Connecticut Ave & Tilden St NW,31313.0,38.941139,-77.061974,...,1015.2,23.5,16.0,174.2,15.0,9,2020-04-22 06:21:06,2020-04-22 19:52:34,0.0,13.52
3,2020-04-16,3,DA909BCA92EF85AB,docked_bike,7th & E St SW,31294.0,7th & E St SW,31294.0,38.88345,-77.021744,...,1021.6,30.2,16.0,178.1,15.2,8,2020-04-16 06:29:26,2020-04-16 19:46:46,0.79,13.29
4,2020-04-10,4,B36F1E14D8C6757E,docked_bike,Potomac & Pennsylvania Ave SE,31606.0,8th & Eye St SE / Barracks Row,31608.0,38.880299,-76.986198,...,1005.7,42.4,16.0,114.3,9.9,6,2020-04-10 06:38:11,2020-04-10 19:40:59,0.59,13.05


In [26]:
capitalbikeshare_clean.unpersist()
weatherdata_clean.unpersist()

DataFrame[datetime: timestamp, tempmax: double, tempmin: double, temp: double, feelslikemax: double, feelslikemin: double, feelslike: double, dew: double, humidity: double, precip: double, precipprob: int, precipcover: double, snow: double, snowdepth: double, windspeed: double, winddir: double, sealevelpressure: double, cloudcover: double, visibility: double, solarradiation: double, solarenergy: double, uvindex: int, sunrise: timestamp, sunset: timestamp, moonphase: double, sunlight_hours: double]

In [49]:
# Cache the joined DataFrame for faster access
biketrips_weather_total.cache()

# Display the first few rows of the joined DataFrame
biketrips_weather_total.limit(5).toPandas()

                                                                                

Unnamed: 0,start_date,Unnamed: 2,ride_id,rideable_type,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,...,sealevelpressure,cloudcover,visibility,solarradiation,solarenergy,uvindex,sunrise,sunset,moonphase,sunlight_hours
0,2020-04-25,0,77A0F1B26D1597B1,docked_bike,Rhode Island & Connecticut Ave NW,31239.0,12th & L St NW,31251.0,38.905994,-77.039803,...,1016.0,78.2,15.2,90.0,7.9,4,2020-04-25 06:17:09,2020-04-25 19:55:29,0.09,13.64
1,2020-04-06,1,8698F10128EA4F18,docked_bike,21st & I St NW,31205.0,18th & L St NW,31224.0,38.900711,-77.046448,...,1017.0,46.4,16.0,182.6,15.7,9,2020-04-06 06:44:13,2020-04-06 19:37:08,0.45,12.88
2,2020-04-22,2,AA07819DC0F58872,docked_bike,Connecticut Ave & Tilden St NW,31313.0,Connecticut Ave & Tilden St NW,31313.0,38.941139,-77.061974,...,1015.2,23.5,16.0,174.2,15.0,9,2020-04-22 06:21:06,2020-04-22 19:52:34,0.0,13.52
3,2020-04-16,3,DA909BCA92EF85AB,docked_bike,7th & E St SW,31294.0,7th & E St SW,31294.0,38.88345,-77.021744,...,1021.6,30.2,16.0,178.1,15.2,8,2020-04-16 06:29:26,2020-04-16 19:46:46,0.79,13.29
4,2020-04-10,4,B36F1E14D8C6757E,docked_bike,Potomac & Pennsylvania Ave SE,31606.0,8th & Eye St SE / Barracks Row,31608.0,38.880299,-76.986198,...,1005.7,42.4,16.0,114.3,9.9,6,2020-04-10 06:38:11,2020-04-10 19:40:59,0.59,13.05


In [50]:
from pyspark.sql.functions import col, sum

# Count the number of missing values in each column of the joined DataFrame
missing_values = biketrips_weather_total.select([sum(col(c).isNull().cast("int")).alias(c) for c in biketrips_weather_total.columns])

# Display the results
missing_values.show()


23/11/10 00:33:35 ERROR Executor: Exception in task 4.0 in stage 118.0 (TID 560)
java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter$$Lambda$3766/0x000000010166a840.get$Lambda(Unknown Source)
	at java.base/java.lang.invoke.DirectMethodHandle$Holder.invokeStatic(DirectMethodHandle$Holder)
	at java.base/java.lang.invoke.Invokers$Holder.linkToTargetMethod(Invokers$Holder)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.parse(TimestampFormatter.scala:117)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.next(InMemoryRelation.scala:87)
	at org.apache.spark.sql.execution.c

Py4JJavaError: An error occurred while calling o1891.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 118.0 failed 1 times, most recent failure: Lost task 4.0 in stage 118.0 (TID 560) (10.0.2.15 executor driver): java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter$$Lambda$3766/0x000000010166a840.get$Lambda(Unknown Source)
	at java.base/java.lang.invoke.DirectMethodHandle$Holder.invokeStatic(DirectMethodHandle$Holder)
	at java.base/java.lang.invoke.Invokers$Holder.linkToTargetMethod(Invokers$Holder)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.parse(TimestampFormatter.scala:117)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.next(InMemoryRelation.scala:87)
	at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.next(InMemoryRelation.scala:79)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:783)
	at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:140)
	at org.apache.spark.serializer.SerializerManager.dataSerializeStream(SerializerManager.scala:177)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$3(BlockManager.scala:1500)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$3$adapted(BlockManager.scala:1498)
	at org.apache.spark.storage.BlockManager$$Lambda$3521/0x00000001015b1840.apply(Unknown Source)
	at org.apache.spark.storage.DiskStore.put(DiskStore.scala:70)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
	at org.apache.spark.storage.BlockManager$$Lambda$2054/0x0000000101081840.apply(Unknown Source)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1418)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1482)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1305)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter$$Lambda$3766/0x000000010166a840.get$Lambda(Unknown Source)
	at java.base/java.lang.invoke.DirectMethodHandle$Holder.invokeStatic(DirectMethodHandle$Holder)
	at java.base/java.lang.invoke.Invokers$Holder.linkToTargetMethod(Invokers$Holder)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.parse(TimestampFormatter.scala:117)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.next(InMemoryRelation.scala:87)
	at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.next(InMemoryRelation.scala:79)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:783)
	at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:140)
	at org.apache.spark.serializer.SerializerManager.dataSerializeStream(SerializerManager.scala:177)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$3(BlockManager.scala:1500)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$3$adapted(BlockManager.scala:1498)
	at org.apache.spark.storage.BlockManager$$Lambda$3521/0x00000001015b1840.apply(Unknown Source)
	at org.apache.spark.storage.DiskStore.put(DiskStore.scala:70)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
	at org.apache.spark.storage.BlockManager$$Lambda$2054/0x0000000101081840.apply(Unknown Source)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1418)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1482)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1305)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)


## Create Parquet File

In [27]:
# Write the DataFrame back to HDFS in a processed format, like Parquet
biketrips_weather_total.write.parquet("hdfs://localhost:9000/datalake/std/bikes_sharing")

                                                                                

## End Spark Session

### Unpersist to free up resources

In [28]:
biketrips_weather_total.unpersist()

DataFrame[start_date: date, : int, ride_id: string, rideable_type: string, start_station_name: string, start_station_id: float, end_station_name: string, end_station_id: float, start_lat: float, start_lng: float, end_lat: float, end_lng: float, member_casual: string, started_at: timestamp, ended_at: timestamp, duration: bigint, tempmax: double, tempmin: double, temp: double, feelslikemax: double, feelslikemin: double, feelslike: double, dew: double, humidity: double, precip: double, precipprob: int, precipcover: double, snow: double, snowdepth: double, windspeed: double, winddir: double, sealevelpressure: double, cloudcover: double, visibility: double, solarradiation: double, solarenergy: double, uvindex: int, sunrise: timestamp, sunset: timestamp, moonphase: double, sunlight_hours: double]

### Tear Down

Once we complete the the lab we can stop all the services

### Stop Hadoop

Stops Hadoop
Open a terminal and execute
```sh
hadoop-stop.sh
```