Extract Data

In [None]:
import requests
import polars as pl
import pandas as pd
from zipfile import ZipFile

In [None]:
DATA_URL = "https://s3.amazonaws.com/capitalbikeshare-data/202212-capitalbikeshare-tripdata.zip"

# pl.read_csv(
#     ZipFile(DATA_URL ).open("my_file.csv", method='r').read()
# )# 

In [None]:
# pd.read_csv(DATA_URL).head()


In [None]:
from datetime import datetime

time_stamp = datetime.now().strftime("%d-%m-%Y")

response = requests.get(DATA_URL)

with open(f"{time_stamp}_capitalbikeshare-tripdata.zip", 'wb') as f:
    f.write(response.content)

In [None]:
from zipfile import ZipFile
import os

folder_name = f"{time_stamp}_capitalbikeshare-tripdata"

folder = os.makedirs(folder_name, exist_ok=True)


with ZipFile(f"{time_stamp}_capitalbikeshare-tripdata.zip", "r") as zip_ref:
    zip_ref.extractall(folder_name)

In [None]:
def extract_file():
    for file in os.listdir(folder_name):
        if file.endswith(".csv"):
            return os.path.join(folder_name,file)

Transform

In [None]:
DATA_PATH = extract_file()
print(DATA_PATH)

In [None]:
data = pl.read_csv(DATA_PATH)
data.head()

In [None]:
data.columns

In [None]:
len(data)

In [None]:
data.schema

In [None]:
schema = {
    'ride_id': pl.String,
    'rideable_type': pl.String,
    'started_at': pl.String,
    'ended_at': pl.String,  
    'start_station_name': pl.String,
    'start_station_id': pl.Int64,
    'end_station_name': pl.String,
    'end_station_id': pl.Int64,
    'start_lat': pl.Float64,
    'start_lng': pl.Float64,
    'end_lat': pl.Float64,
    'end_lng': pl.Float64,
    'member_casual': pl.String
}

data = pl.DataFrame(data, schema=schema)

# # convert the string data time to datetime
# data = data.with_columns(
#     pl.col('started_at').str.to_datetime(),
#     pl.col('ended_at').str.to_datetime()
# )


# data = data.with_columns(
#     (pl.col('ended_at') - pl.col('started_at')).alias('duration')
# )

# data = data.with_columns(
#     pl.col('duration').dt.total_seconds().alias('duration_seconds')
# )


# data = data.with_columns(
#     pl.col('ended_at').dt.week().alias('week')
# )


df = data.with_columns([
    pl.col('started_at').str.to_datetime(),
    pl.col('ended_at').str.to_datetime(),
    (pl.col('ended_at').str.to_datetime() - pl.col('started_at').str.to_datetime()).alias('duration'),
    (pl.col('ended_at').str.to_datetime() - pl.col('started_at').str.to_datetime())
        .dt.total_seconds().alias('duration_seconds'),
    pl.col('ended_at').str.to_datetime().dt.week().alias('week')
])

df


In [None]:
df = data.unique(subset='ride_id')
df

In [None]:
data.select(pl.col('rideable_type').value_counts()).unnest('rideable_type')

In [None]:
data.select(pl.col('start_station_name').value_counts()).unnest('start_station_name')

In [None]:
data.select(pl.col('member_casual').value_counts()).unnest('member_casual')

In [None]:
#3 flag data with 45mins ride and above (45min = 2700secs)
# - A casual rider starts a trip at midnight
# - Any ride lasts longer than 45 minutes
import logging

# Setup logging to a file
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARNING)
file_handler = logging.FileHandler('ride_flags.log')
formatter = logging.Formatter('%(asctime)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

def log_flagged_data(func):
    """log flagged data"""
    def wrapper(*args, **kwargs):
        flagged_data = func(*args, **kwargs)
        logger.warning(f"Logged flagged data: {flagged_data} rows")
        yield flagged_data
    return wrapper

@log_flagged_data
def flag_longer_ride(df, col_name):
    """Flag longer rides (over 27,000 seconds, ~45 minutes)"""
    flagged_df = df.filter(pl.col(col_name) > 27000)
    print(flagged_df)
    yield logger.warning(f"Flagged rides with duration greater than 45 minutes: {flagged_df} rows")

@log_flagged_data
def flag_later_hour_ride(df, col_name):
    """Flag rides that occurred later than 23:30:00"""
    flagged_df = df.filter(pl.col(col_name).dt.hour() >=23 & pl.col(col_name).dt.minute() >=59)
    flagged_df.collect()
    yield logger.warning(f"Flagged rides later than 23:30:00:{flagged_df} rows")


for df in flag_later_hour_ride(data,"ended_at"):
    df

for df in flag_longer_ride(df, "duration_seconds"):
    df

Partitioned Data using polars

In [None]:
data.write_parquet(f'capitalbikeshare-tripdata_{time_stamp}.parquet',
    partition_by=['member_casual','week']
)

In [None]:
def flag_longer_ride(df, col_name):
    """Flag longer rides (over 27,000 seconds, ~45 minutes)"""
    flagged_df = df.filter(pl.col(col_name) > 27000)
    yield flagged_df.show()
    # yield logger.warning(f"Flagged rides with duration greater than 45 minutes: {flagged_df} rows")

flag_later_hour_ride(data,"ended_at") 

In [None]:
data.head()

In [None]:
data.filter(pl.col("ended_at") > 27000)

SIMULATE STREAMS

In [None]:
import pandas as pd
import logging

# Set up logger
logging.basicConfig(
    level=logging.WARNING, 
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('bike_rides.log'),
    ]
)
logger = logging.getLogger(__name__)

data = r"C:\Users\Admin\Desktop\Data-engineering-internship\Task\Task6\docker_mapped_output\airflow\data\01-05-2025_capitalbikeshare-tripdata.parquet"

def stream_data(file_path):
    """
    Generator that yields rows from a Parquet file
    """
    try:
        df = pd.read_parquet(file_path)

        for _, row in df.iterrows():
            if row['started_at'].hour > 21:
                yield row

    except Exception as e:
        logger.error(f"Failed to stream data: {e}")

# Stream the data and log the late-night rides
for i in stream_data(data):
    logger.warning(f"Late night ride: {i}")

rideable_type                          classic_bike
started_at                      2022-12-29 22:51:00
ended_at                        2022-12-29 22:57:30
start_station_name             Van Ness Metro / UDC
start_station_id                              31300
end_station_name      Connecticut & Nebraska Ave NW
end_station_id                                31310
start_lat                                 38.944551
start_lng                                -77.063896
end_lat                                   38.955016
end_lng                                  -77.069956
member_casual                                member
duration                            0 days 00:06:30
duration_seconds                                390
week                                             52
Name: 4, dtype: object
rideable_type                classic_bike
started_at            2022-12-04 22:14:40
ended_at              2022-12-04 22:19:14
start_station_name          3rd & H St NE
start_station_id             