# Data Warehouse Citibike -ETL Process

## Project Info

Group: **Real estate**

Team Members: **Altin Kelmendi, Julian Hoffmann, Daniel Dodmasej, Clemens Hohensinner**




## Setup &  Imports

In [192]:
from sqlalchemy import create_engine, Column, Integer, String, TIMESTAMP, Interval, ForeignKey, MetaData
from sqlalchemy.orm import declarative_base, sessionmaker

# DB from Docker (Postgres) / run docker-compose up before
DATABASE_URL = 'postgresql://root:root@localhost:5452/citibike_db'

# SQLAlchemy setup
engine = create_engine(DATABASE_URL)
Session = sessionmaker(bind=engine)
session = Session()
Base = declarative_base()
metadata = MetaData()



## Table Definitions ( SQLAlchemy ORM)
Tables are defined using SQLAlchemy ORM:


In [193]:
class UserType(Base):
    __tablename__ = 'usertype'
    __table_args__ = {'extend_existing': True}

    usertypeid = Column(Integer, primary_key=True, autoincrement=True)
    usertype = Column(String(50), nullable=False)
    description = Column(String(250))


class Station(Base):
    __tablename__ = 'station'
    __table_args__ = {'extend_existing': True}

    id = Column(String(50), primary_key=True)
    name = Column(String(50), nullable=False)
    oldname = Column(String(50), nullable=True)


class Date(Base):
    __tablename__ = 'date'
    __table_args__ = {'extend_existing': True}

    id = Column(Integer, primary_key=True, autoincrement=True)
    hour = Column(Integer, nullable=False)
    day = Column(Integer, nullable=False)
    month = Column(Integer, nullable=False)
    year = Column(Integer, nullable=False)
    minute = Column(Integer, nullable=False)
    datetime = Column(TIMESTAMP, nullable=False)


class FactTrip(Base):
    __tablename__ = 'fact_trip'
    __table_args__ = {'extend_existing': True}

    tripid = Column(String(50), primary_key=True)
    startstation = Column(String(50), ForeignKey('station.id'), nullable=False)
    stopstation = Column(String(50), ForeignKey('station.id'), nullable=False)
    starttimeid = Column(Integer, ForeignKey('date.id'), nullable=False)
    stoptimeid = Column(Integer, ForeignKey('date.id'), nullable=False)
    usertypeid = Column(Integer, ForeignKey('usertype.usertypeid'), nullable=False)
    duration = Column(Interval)


Base.metadata.create_all(engine)


##  Load CSV Files (Extraction Phase)
The 3 CSV files are loaded into DataFrames and combined into one DataFrame: ( since its the same structure, just more data)


In [194]:
import pandas as pd

main_path = '../data/content/'

df_december = pd.read_csv(main_path + '202412-tripdata.csv')
df_january = pd.read_csv(main_path + '202501-tripdata.csv')
df_february = pd.read_csv(main_path + '202502-tripdata.csv')

# Combines DataFrames
df_all = pd.concat([df_december, df_january, df_february], ignore_index=True)

print(f"Combined data shape: {df_all.shape}")
df_all.head()


Combined data shape: (150699, 13)


Unnamed: 0,ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual
0,28A17ACD224CD80B,electric_bike,2024-12-06 17:50:49.428,2024-12-06 17:54:20.070,Oakland Ave,JC022,Hilltop,JC019,40.737604,-74.052478,40.731169,-74.057574,member
1,3508393A86FBD357,classic_bike,2024-12-14 11:01:00.309,2024-12-14 11:12:01.382,Oakland Ave,JC022,Hoboken Terminal - Hudson St & Hudson Pl,HB101,40.737604,-74.052478,40.735938,-74.030305,member
2,75FA4C03A1447401,electric_bike,2024-12-24 08:07:17.475,2024-12-24 08:14:14.612,Oakland Ave,JC022,Leonard Gordon Park,JC080,40.737604,-74.052478,40.74591,-74.057271,member
3,C7741EF495C597DD,classic_bike,2024-12-19 12:48:05.452,2024-12-19 12:54:15.253,Oakland Ave,JC022,Leonard Gordon Park,JC080,40.737604,-74.052478,40.74591,-74.057271,member
4,07952BB20B46C5B1,electric_bike,2024-12-17 11:19:37.631,2024-12-17 11:28:25.150,Oakland Ave,JC022,Grove St PATH,JC115,40.737604,-74.052478,40.71941,-74.04309,casual


## Data Cleansing & Transformation

Data is being transformed and cleaned:
- duplicates are removed
- null values are removed
- data types are adjusted
- duration is calculated (ended_at - started_at) in a readable format
- date is split into hour, day, month, year, minute and as well as a datetime column and cleaned


In [195]:
# 1:  duplicates
df_all.drop_duplicates(inplace=True)

# 2 : null values
df_all.dropna(subset=['ride_id', 'started_at', 'ended_at', 'start_station_id', 'end_station_id', 'member_casual'],
              inplace=True)

# 3:  data types
df_all['started_at'] = pd.to_datetime(df_all['started_at'])
df_all['ended_at'] = pd.to_datetime(df_all['ended_at'])

# 4:  duration
df_all['duration'] = df_all['ended_at'] - df_all['started_at']

# 5:  date dimension
# Combine started_at and ended_at into one date dataframe
combined_dates = pd.concat([
    df_all[['started_at']].rename(columns={'started_at': 'dateTime'}),
    df_all[['ended_at']].rename(columns={'ended_at': 'dateTime'})
]).drop_duplicates()

df_all['hour'] = df_all['started_at'].dt.hour
df_all['day'] = df_all['started_at'].dt.day
df_all['month'] = df_all['started_at'].dt.month
df_all['year'] = df_all['started_at'].dt.year
df_all['minute'] = df_all['started_at'].dt.minute

# 6:  station dimension
date_df = df_all[['ride_id', 'hour', 'day', 'month', 'year', 'minute', 'started_at']].copy()
date_df.rename(columns={'started_at': 'dateTime', 'ride_id': 'tripid'}, inplace=True)
date_df.drop_duplicates(subset=['dateTime'], inplace=True)

# 7: cleaned data
print(f"Cleaned data shape: {df_all.shape}")
df_all.head()


Cleaned data shape: (150274, 19)


Unnamed: 0,ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual,duration,hour,day,month,year,minute
0,28A17ACD224CD80B,electric_bike,2024-12-06 17:50:49.428,2024-12-06 17:54:20.070,Oakland Ave,JC022,Hilltop,JC019,40.737604,-74.052478,40.731169,-74.057574,member,0 days 00:03:30.642000,17,6,12,2024,50
1,3508393A86FBD357,classic_bike,2024-12-14 11:01:00.309,2024-12-14 11:12:01.382,Oakland Ave,JC022,Hoboken Terminal - Hudson St & Hudson Pl,HB101,40.737604,-74.052478,40.735938,-74.030305,member,0 days 00:11:01.073000,11,14,12,2024,1
2,75FA4C03A1447401,electric_bike,2024-12-24 08:07:17.475,2024-12-24 08:14:14.612,Oakland Ave,JC022,Leonard Gordon Park,JC080,40.737604,-74.052478,40.74591,-74.057271,member,0 days 00:06:57.137000,8,24,12,2024,7
3,C7741EF495C597DD,classic_bike,2024-12-19 12:48:05.452,2024-12-19 12:54:15.253,Oakland Ave,JC022,Leonard Gordon Park,JC080,40.737604,-74.052478,40.74591,-74.057271,member,0 days 00:06:09.801000,12,19,12,2024,48
4,07952BB20B46C5B1,electric_bike,2024-12-17 11:19:37.631,2024-12-17 11:28:25.150,Oakland Ave,JC022,Grove St PATH,JC115,40.737604,-74.052478,40.71941,-74.04309,casual,0 days 00:08:47.519000,11,17,12,2024,19


## Load UserType Dimension

Data is loaded into the UserType dimension table:

In [196]:
session.rollback()

usertype_map = {
    'member': ('Member', 'Registered user'),
    'casual': ('Casual', 'Unregistered user')
}

user_types_df = pd.DataFrame(usertype_map).T.reset_index()
user_types_df.columns = ['member_casual', 'usertype', 'description']

for _, row in user_types_df.iterrows():
    user_type_entry = UserType(usertype=row['usertype'], description=row['description'])
    session.merge(user_type_entry)

session.commit()

user_type_lookup = session.query(UserType).all()
user_type_dict = {ut.usertype.lower(): ut.usertypeid for ut in user_type_lookup}

df_all['userTypeId'] = df_all['member_casual'].map(lambda x: user_type_dict.get(usertype_map[x][0].lower()))


## Load Station Dimension
Data is loaded into the Station dimension table:


In [197]:
#  unique stations
start_stations = df_all[['start_station_id', 'start_station_name']].drop_duplicates()
start_stations.columns = ['id', 'name']
stop_stations = df_all[['end_station_id', 'end_station_name']].drop_duplicates()
stop_stations.columns = ['id', 'name']

stations = pd.concat([start_stations, stop_stations]).drop_duplicates(subset='id')

for _, row in stations.iterrows():
    station_entry = Station(id=str(row['id']), name=row['name'])
    session.merge(station_entry)
session.commit()


## Load Date Dimension
Data is loaded into the Date dimension table:

In [198]:
combined_dates = pd.concat([
    df_all[['started_at']].rename(columns={'started_at': 'dateTime'}),
    df_all[['ended_at']].rename(columns={'ended_at': 'dateTime'})
]).drop_duplicates()

combined_dates['hour'] = combined_dates['dateTime'].dt.hour
combined_dates['day'] = combined_dates['dateTime'].dt.day
combined_dates['month'] = combined_dates['dateTime'].dt.month
combined_dates['year'] = combined_dates['dateTime'].dt.year
combined_dates['minute'] = combined_dates['dateTime'].dt.minute

date_id_map = {}

for _, row in combined_dates.iterrows():
    dt = row['dateTime']
    if dt not in date_id_map:
        date_entry = Date(
            hour=row['hour'],
            day=row['day'],
            month=row['month'],
            year=row['year'],
            minute=row['minute'],
            datetime=dt
        )
        session.add(date_entry)
        session.flush()
        date_id_map[dt] = date_entry.id

session.commit()


## Load Fact Table (FactTrip)
Data is loaded into the FactTrip fact table:

In [199]:
df_all['startdateid'] = df_all['started_at'].map(date_id_map)
df_all['stopdateid'] = df_all['ended_at'].map(date_id_map)

try:
    for _, row in df_all.iterrows():
        if pd.isnull(row['startdateid']) or pd.isnull(row['stopdateid']):
            # if you see this, you already made a mistake in the previous steps ( DEBUGGING )
            print(f"Skipping row with missing dateid: {row['ride_id']} - {row['startdateid']} - {row['stopdateid']}")
            continue

        if pd.isnull(row['userTypeId']):
            # same here, you should have caught this earlier ( DEBUGGING )
            print(f"Skipping row with missing userTypeId: {row['ride_id']}")
            continue

        trip_entry = FactTrip(
            tripid=row['ride_id'],
            startstation=str(row['start_station_id']),
            stopstation=str(row['end_station_id']),
            usertypeid=int(row['userTypeId']),
            duration=row['duration'],
            starttimeid=int(row['startdateid']),
            stoptimeid=int(row['stopdateid'])
        )
        session.merge(trip_entry)

    session.commit()

except Exception as e:
    session.rollback()
    print(f"Error occurred: {e}")


## Development & Debugging:

### hard reset of database

In [200]:
# Be aware: if yu click run all and this is not commented out, rip to the created database :)
#session.rollback()
#from sqlalchemy import text

#with engine.connect() as conn:
#    conn.execute(text("DROP SCHEMA public CASCADE;"))
#    conn.execute(text("CREATE SCHEMA public;"))
#    conn.commit()
#
#Base.metadata.create_all(engine)


### util methods for debugging

In [201]:
##print(df_all.columns.tolist())
##print(df_all.head())


## Querying the Database


In [215]:
from sqlalchemy import select
from tabulate import tabulate

query0 = select(FactTrip)
result = session.execute(query0).fetchall()

data = []
for row in result[:10]:
    fact_trip = row[0]
    data.append([
        fact_trip.tripid,
        fact_trip.startstation,
        fact_trip.stopstation,
        fact_trip.starttimeid,
        fact_trip.stoptimeid,
        fact_trip.usertypeid,
        fact_trip.duration
    ])

headers = ["Trip ID", "Start Station", "Stop Station", "Start Time ID", "Stop Time ID", "User Type ID", "Duration"]

print(tabulate(data, headers=headers, tablefmt="grid"))


+------------------+-----------------+----------------+-----------------+----------------+----------------+----------------+
| Trip ID          | Start Station   | Stop Station   |   Start Time ID |   Stop Time ID |   User Type ID | Duration       |
| 28A17ACD224CD80B | JC022           | JC019          |               1 |         150272 |              1 | 0:03:30.642000 |
+------------------+-----------------+----------------+-----------------+----------------+----------------+----------------+
| 3508393A86FBD357 | JC022           | HB101          |               2 |         150273 |              1 | 0:11:01.073000 |
+------------------+-----------------+----------------+-----------------+----------------+----------------+----------------+
| 75FA4C03A1447401 | JC022           | JC080          |               3 |         150274 |              1 | 0:06:57.137000 |
+------------------+-----------------+----------------+-----------------+----------------+----------------+----------------+
