In [0]:
import time
import requests
import json
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import boto3
import io
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
# Initialize Spark and Supabase Storage stuff

ENDPOINT = None # Replace
ACCESS_KEY =  None # Replace
SECRET_KEY =  None # Replace

spark = SparkSession.builder \
    .appName("BlueBikes Batch Reader") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
    .config("spark.hadoop.fs.s3a.endpoint", ENDPOINT) \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "true") \
    .config("spark.hadoop.fs.s3a.access.key", ACCESS_KEY) \
    .config("spark.hadoop.fs.s3a.secret.key", SECRET_KEY) \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

# Define schema
station_schema = StructType([
    StructField("station_id", StringType(), True),
    StructField("num_bikes_available", IntegerType(), True),
    StructField("num_ebikes_available", IntegerType(), True),
    StructField("num_docks_available", IntegerType(), True),
    StructField("last_reported", LongType(), True),
    StructField("num_bikes_disabled", IntegerType(), True),
    StructField("num_docks_disabled", IntegerType(), True),
])



In [0]:
def fetch_and_process_gbfs():
    """Fetch GBFS data and process with Spark"""
    try:
        # Fetch data
        response = requests.get(
            "https://gbfs.lyft.com/gbfs/1.1/bos/en/station_status.json",
            timeout=10
        )
    
        data = response.json() 
        
        # Add ingestion timestamp
        ingestion_time = datetime.utcnow()
        
        # Extract stations data
        stations_data = data['data']['stations']

        # Add ingestion timestamp to each station record
        for station in stations_data:
            station['ingestion_timestamp'] = ingestion_time.isoformat()
            station['gbfs_last_updated'] = data['last_updated']
        
        # Create Spark DataFrame
        df = spark.createDataFrame(stations_data, station_schema)

        # Fetch station information data
        station_info_response = requests.get(
            "https://gbfs.lyft.com/gbfs/1.1/bos/en/station_information.json",
            timeout=10
        )
        station_info = station_info_response.json()
        # Extract station info
        station_data = station_info['data']['stations']

        # Select just id and name columns (need to create a new list of dictionaries for this)       
        updated_station_data = []
        for station in station_data:
            updated_station_data.append({
                "station_id": station["station_id"],
                "name": station["name"]
            })
        info_df = spark.createDataFrame(updated_station_data).select(["station_id", "name"])


      
        
        # Add computed columns
        processed_df = df \
            .withColumn("total_capacity", 
                       col("num_bikes_available") + col("num_docks_available")) \
            .withColumn(
                    "utilization_rate",
                    when(col("total_capacity") > 0,
                        col("num_bikes_available") / col("total_capacity"))
                    .otherwise(0) 
                ) \
            .withColumn("data_timestamp", 
                       from_unixtime(col("last_reported"))) \
            .withColumn("ingestion_timestamp", current_timestamp())
        
        # Show results
        print(f"\n=== Data fetched at {ingestion_time} ===")
        # processed_df.show(10, truncate=False)
        
        # Calculate some quick stats using the agg function
        stats = processed_df.agg(
            count("*").alias("total_stations"),
            sum("num_bikes_available").alias("total_bikes"),
            sum("num_ebikes_available").alias("total_ebikes"),
            (
                (sum("num_bikes_available") + sum("num_ebikes_available")) 
                / sum("total_capacity")
                ).alias("utilization_rate")
            ).withColumn("ingestionTime", lit(ingestion_time.isoformat()))

        # Convert to Pandas DataFrame for Parquet conversion
        pd_df = processed_df.toPandas()
        # Convert to PyArrow Table
        table = pa.Table.from_pandas(pd_df)

        # Write Parquet to an in-memory buffer
        buffer = io.BytesIO()
        pq.write_table(table, buffer)
        buffer.seek(0)


        s3 = boto3.resource(
            's3',
            endpoint_url=(f"https://{ENDPOINT}/storage/v1/s3"),
            aws_access_key_id=ACCESS_KEY,  # or anon key
            aws_secret_access_key=SECRET_KEY
        )

        bucket_name = 'bluebikes'
        bucket = s3.Bucket(bucket_name)
        s3_key = 'raw_data/data.parquet'
        s3.Bucket(bucket_name).put_object(Key=s3_key, Body=buffer.getvalue())

        print("Parquet uploaded successfully.")

        # Write stats to in-memory delta table
        stats.write \
            .format("delta") \
            .mode("append") \
            .option("overwriteSchema", "true") \
            .saveAsTable("bluebikes_stats")
        # Bonus: Join processed data with station info to get names

        worst = processed_df.sort(asc("utilization_rate")).select(["station_id", "utilization_rate"]) \
            .limit(50) \
            .join(info_df, processed_df.station_id == info_df.station_id) \
            .select(['name', 'utilization_rate'])
        worst.show(truncate=False)

    except Exception as e:
        print(str(e))

fetch_and_process_gbfs()



=== Data fetched at 2025-08-14 17:43:48.256669 ===
Parquet uploaded successfully.
+----------------------------------+--------------------+
|name                              |utilization_rate    |
+----------------------------------+--------------------+
|O'Brien Highway at First Street   |0.0                 |
|Whittier St Health Center         |0.0                 |
|Tremont St at Northampton St      |0.05263157894736842 |
|The Dimock Center                 |0.0                 |
|Aquarium T Stop - 200 Atlantic Ave|0.043478260869565216|
|Parker St at Huntington Ave       |0.0                 |
|Broadway at Central St            |0.05555555555555555 |
|Shawmut Ave at Lenox St           |0.0                 |
|Bartlett St at John Elliot Sq     |0.06666666666666667 |
|Massachusetts Ave at Columbus Ave |0.0                 |
|Cross St at Stillman St           |0.0                 |
|W Broadway at Dorchester St       |0.0                 |
|Lake St. at Minuteman Bikeway     |0.0        