# AWS Glue Studio Notebook
##### Glue job with PySpark to clean and transform citibike data and load into S3 data lake


In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, to_timestamp, year, month, dayofmonth, hour, date_format, lit, when, count, avg, udf
)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
import math
import requests
import json

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Citibike Data Processing") \
    .getOrCreate()




In [3]:
# Define Schema for the Dataset
schema = StructType([
    StructField("ride_id", StringType(), True),
    StructField("rideable_type", StringType(), True),
    StructField("started_at", TimestampType(), True),
    StructField("ended_at", TimestampType(), True),
    StructField("start_station_name", StringType(), True),
    StructField("start_station_id", StringType(), True),
    StructField("end_station_name", StringType(), True),
    StructField("end_station_id", StringType(), True),
    StructField("start_lat", DoubleType(), True),
    StructField("start_lng", DoubleType(), True),
    StructField("end_lat", DoubleType(), True),
    StructField("end_lng", DoubleType(), True),
    StructField("member_casual", StringType(), True)
])




In [4]:
# Load Raw Data from S3
raw_data_path = "s3://citibike-df/inbound-data/*.csv"
df = spark.read.schema(schema).option("header", "true").csv(raw_data_path)





In [24]:
# Define Haversine Formula to Calculate Distance
def haversine(lat1, lon1, lat2, lon2):
    R = 6371  # Earth radius in kilometers
    lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = math.sin(dlat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    return R * c

# Register Haversine function as a UDF
haversine_udf = udf(haversine, DoubleType())




In [25]:
# Data Cleaning and Transformation
processed_df = df.withColumnRenamed("member_casual", "user_type") \
    .withColumn("trip_duration_seconds", 
                (col("ended_at").cast("long") - col("started_at").cast("long"))) \
    .withColumn("start_year", year(col("started_at"))) \
    .withColumn("start_month", month(col("started_at"))) \
    .withColumn("start_day", dayofmonth(col("started_at"))) \
    .withColumn("start_hour", hour(col("started_at"))) \
    .withColumn("end_year", year(col("ended_at"))) \
    .withColumn("end_month", month(col("ended_at"))) \
    .withColumn("end_day", dayofmonth(col("ended_at"))) \
    .withColumn("end_hour", hour(col("ended_at"))) \
    .withColumn("user_type", 
                when(col("user_type") == "member", "Subscriber")
                .otherwise("Customer")) \
    .withColumn("start_station_id", col("start_station_id").cast(StringType())) \
    .withColumn("end_station_id", col("end_station_id").cast(StringType())) \
    .withColumn("start_lat", col("start_lat").cast(DoubleType())) \
    .withColumn("start_lng", col("start_lng").cast(DoubleType())) \
    .withColumn("end_lat", col("end_lat").cast(DoubleType())) \
    .withColumn("end_lng", col("end_lng").cast(DoubleType())) \
    .dropDuplicates(["ride_id"])  # Remove duplicate rows based on ride_id





In [26]:
# Handle Missing Values
processed_df = processed_df.na.fill({
    "start_station_name": "Unknown",
    "start_station_id": "Unknown",
    "end_station_name": "Unknown",
    "end_station_id": "Unknown",
    "start_lat": 0.0,
    "start_lng": 0.0,
    "end_lat": 0.0,
    "end_lng": 0.0
})




In [27]:
# Calculate Trip Distance using Haversine Formula
processed_df = processed_df.withColumn(
    "trip_distance_km",
    haversine_udf(col("start_lat"), col("start_lng"), col("end_lat"), col("end_lng"))
)




In [28]:
# Create Dimension Tables
# 1. Stations Dimension
stations = processed_df.select(
    col("start_station_id").alias("station_id"),
    col("start_station_name").alias("station_name"),
    col("start_lat").alias("latitude"),
    col("start_lng").alias("longitude")
).distinct()




In [29]:
stations.show()

+----------+--------------------+------------+-------------+
|station_id|        station_name|    latitude|    longitude|
+----------+--------------------+------------+-------------+
|     JC103|      Journal Square|    40.73367|     -74.0625|
|     HB102|Hoboken Terminal ...|40.736116886|-74.029208183|
|     JC063|      Jackson Square|    40.71113|     -74.0789|
|     JC009|       Hamilton Park|40.727452755| -74.04429698|
|     HB506|    Grand St & 14 St|40.753990173|-74.031709194|
|     JC052|  Liberty Light Rail|40.711340785|-74.055756807|
|     JC065|              Dey St|40.737742424|-74.067027569|
|     HB201|12 St & Sinatra Dr N|40.750844717|-74.024120808|
|     HB402|   Madison St & 1 St|40.738869905|-74.039157629|
|     HB603|8 St & Washington St|40.745940685|-74.028149962|
|     HB303|   Clinton St & 7 St|40.745452762|-74.033394217|
|     JC103|      Journal Square|  40.7337327|-74.062520027|
|     JC072|        Morris Canal|40.712231278|-74.038221717|
|     JC103|      Journa

In [30]:
# 2. Users Dimension
users = processed_df.select(
    col("ride_id").alias("user_id"),  # Assuming ride_id can be used as user_id since we don't have the user data
    lit(1990).alias("birth_year"),    # Placeholder for birth year
    lit("Unknown").alias("gender")    # Placeholder for gender
).distinct()





In [44]:
# 3. Time Dimension
# Set legacy time parser policy
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
time_dim = processed_df.select(
    col("started_at").alias("time_id"),
    year(col("started_at")).alias("year"),
    month(col("started_at")).alias("month"),
    dayofmonth(col("started_at")).alias("day"),
    hour(col("started_at")).alias("hour"),
    date_format(col("started_at"), "u").alias("day_of_week"),  # 'u' for day of the week (1 = Monday, 7 = Sunday)
    when(date_format(col("started_at"), "u").isin([6, 7]), True).otherwise(False).alias("is_weekend")
).distinct()




In [32]:
# 4. Bikes Dimension (Placeholder)
bikes = processed_df.select(
    col("ride_id").alias("bike_id"),  # Assuming ride_id can be used as bike_id
    lit("classic").alias("bike_type"),  # Placeholder for bike type
    lit("Unknown").alias("manufacturer"),  # Placeholder for manufacturer
    lit("2020-01-01").alias("purchase_date")  # Placeholder for purchase date
).distinct()




In [33]:
# 5. Weather Dimension (Placeholder)
# Fetch weather data using an OpenWeatherMap API 
def fetch_weather(lat, lng, timestamp):
    api_key = "855152b9f44ec9bbf0c3c1f9d2f0b534"
    url = f"https://api.openweathermap.org/data/2.5/weather?lat={lat}&lon={lng}&dt={timestamp}&appid={api_key}"
    response = requests.get(url)
    if response.status_code == 200:
        weather_data = response.json()
        return {
            "temperature_c": weather_data["main"]["temp"] - 273.15,  # Convert Kelvin to Celsius
            "precipitation_mm": weather_data.get("rain", {}).get("1h", 0.0),
            "weather_condition": weather_data["weather"][0]["main"]
        }
    else:
        return {
            "temperature_c": 0.0,
            "precipitation_mm": 0.0,
            "weather_condition": "Unknown"
        }





In [34]:
# Create Weather Dimension
weather_df = processed_df.select(
    col("started_at").alias("time_id"),
    col("start_lat").alias("latitude"),
    col("start_lng").alias("longitude")
).distinct()




In [35]:
weather_df = weather_df.limit(5)
weather_df.show()

+-------------------+-----------------+------------------+
|            time_id|         latitude|         longitude|
+-------------------+-----------------+------------------+
|2023-11-05 09:59:04|       40.7112423|       -74.0557013|
|2023-11-05 01:25:24|     40.719288945|     -74.034209609|
|2023-11-20 06:01:05|     40.713478446|     -74.062786818|
|2023-11-27 20:59:22|40.73721535335381|-74.02886540972803|
|2023-11-03 14:20:25|     40.714554071|     -74.042777896|
+-------------------+-----------------+------------------+


In [36]:
# Fetch weather data for each row (this can be optimized further)
weather_data = []
for row in weather_df.collect():
    weather = fetch_weather(row["latitude"], row["longitude"], int(row["time_id"].timestamp()))
    weather_data.append({
        "time_id": row["time_id"],
        "temperature_c": weather["temperature_c"],
        "precipitation_mm": weather["precipitation_mm"],
        "weather_condition": weather["weather_condition"]
    })
weather_dim = spark.createDataFrame(weather_data)




In [37]:
weather_dim.show()

+----------------+-------------------+-------------------+-----------------+
|precipitation_mm|      temperature_c|            time_id|weather_condition|
+----------------+-------------------+-------------------+-----------------+
|             0.0| -1.589999999999975|2023-11-28 19:10:15|             Snow|
|             0.0| -1.919999999999959|2023-11-16 08:44:32|             Snow|
|             0.0|-1.6899999999999977|2023-11-07 18:25:03|             Snow|
|             0.0|-1.9099999999999682|2023-11-27 14:36:28|             Snow|
|             0.0| -1.849999999999966|2023-11-07 11:52:45|             Snow|
+----------------+-------------------+-------------------+-----------------+


In [38]:
# Prepare Trip Fact Table
trips = processed_df.select(
    col("ride_id").alias("trip_id"),
    col("start_station_id"),
    col("end_station_id"),
    col("started_at").alias("start_time_id"),
    col("ended_at").alias("end_time_id"),
    col("ride_id").alias("user_id"),  # Assuming ride_id can be used as user_id
    col("ride_id").alias("bike_id"),  # Assuming ride_id can be used as bike_id
    col("trip_duration_seconds"),
    col("trip_distance_km"),
    col("user_type")
)




In [None]:
trips.show()

In [46]:
# Save Dimension Tables to S3
dimensions_path = "s3://citibike-df/outbound-data/dimensions/"
stations.write.mode("overwrite").parquet(f"{dimensions_path}stations_dim")
users.write.mode("overwrite").parquet(f"{dimensions_path}users_dim")
time_dim.write.mode("overwrite").parquet(f"{dimensions_path}time_dim")
bikes.write.mode("overwrite").parquet(f"{dimensions_path}bikes_dim")
weather_dim.write.mode("overwrite").parquet(f"{dimensions_path}weather_dim")




In [48]:
# Save Fact Table to S3
fact_path = "s3://citibike-df/outbound-data/fact/"
trips.write.mode("overwrite").parquet(f"{fact_path}trips_fact")




In [None]:
# Stop Spark Session
spark.stop()