In [4]:
#!pip install --upgrade pip -q
#!pip install lxml sqlalchemy psycopg2-binary pandas -q
#!pip install google-cloud google-cloud-bigquery -q

In [1]:
# Inmport necessary libraries
import os
import subprocess
import logging
import zipfile
import tarfile
from urllib.parse import urljoin
from pathlib import Path
from time import time


import requests
import pandas as pd
from bs4 import BeautifulSoup

import sqlalchemy
from sqlalchemy import create_engine, text

from google.cloud import storage, bigquery
#from azure.storage.blob import BlobServiceClient

# Local imports
#from safe_run import safe_run


In [None]:
import boto3

s3 = boto3.client(
    "s3",
    endpoint_url="http://minio-service:9000",
    aws_access_key_id="admin",
    aws_secret_access_key="password",
)

s3.create_bucket(Bucket="citibike-data")


In [6]:
pd.__version__

'2.1.1'

In [7]:
# Clear existing handlers to avoid duplicates
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

# Set up logging
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s | %(levelname)s | %(filename)s:%(lineno)d | %(message)s')

In [None]:

BASE_URL = "https://s3.amazonaws.com/tripdata/"
#DOWNLOAD_DIR = "/home/bonaventure/Documents/data_engineering/data-engineering-zoomcamp-projects/citibike_project/data/citibike_data"
DOWNLOAD_DIR = "./data/citibike_data"

def scrape_citibike_files():
    xml_index_url = BASE_URL
    response = requests.get(xml_index_url)
    links = list()
    soup = BeautifulSoup(response.text, features="xml")
    xml_keys = soup.find_all('Key')

    # Extract all download links
    files = [urljoin(BASE_URL, str(link.contents[0])) for link in xml_keys if str(link.contents[0]).endswith('.zip')]
    return files


def download_files(url, download_dir=DOWNLOAD_DIR):
                
    filecount = 0
    # create directories to store unzipped and archived files
    logging.info("Starting download: %s", url)
    try:
        os.makedirs(download_dir, exist_ok=True)
        archive_dir = Path(f"{download_dir}/archive_files") 
        file_path = Path(archive_dir) / os.path.basename(url)
        files_dir = str(os.path.basename(url)).strip('JC-citibike-tripdata.zip.csv')
        unzip_dir = Path(f"{download_dir}/unzipped_files/{files_dir}")

        # Download using subprocess and wget
        #print(f"Downloading {url} to {file_path}")
        subprocess.run(["wget", "-q", "-N", "-P", archive_dir, url], check=True)

        logging.info("Download complete: %s", file_path)

    except Exception as e:
        logging.error("Download failed: %s", e)
        raise 

    # Extract depending on file type
    if file_path.suffix == ".zip":
        try:
            with zipfile.ZipFile(file_path, 'r') as zip_ref:
                zip_ref.extractall(unzip_dir)
            #print(f"Extracted ZIP to {unzip_dir}")
            logging.info("zip file extraction complete: %s", file_path)
        except zipfile.BadZipFile:
            logging.error("Invalid zip file: %s", file_path)
            raise
    elif file_path.suffix in [".tar", ".gz", ".bz2"]:
        try:
            with tarfile.open(file_path, 'r:*') as tar_ref:
                tar_ref.extractall(unzip_dir)
            #print(f"Extracted TAR to {unzip_dir}")
            logging.info("tar-like file extraction complete: %s", file_path)
        except tarfile.TarError:
            logging.error("Invalid tar file: %s", file_path)
            raise
    else:
        #print("No extraction performed, unknown file type")
        logging.warning("Unknown file type, skipping extraction: %s", file_path)
    
    filecount += 1
    print(f"Download and extraction of {os.path.basename(url)} complete. Total files processed: {filecount}")


def find_csv_file(DOWNLOAD_DIR=DOWNLOAD_DIR):
    unzip_dir_list = os.listdir(f"{DOWNLOAD_DIR}/unzipped_files")
    #print(unzip_dir_list)
    paths_list = []

    for dir in unzip_dir_list:
        #print(dir)
        folder = Path(f"{DOWNLOAD_DIR}/unzipped_files/{dir}")
        for file in os.listdir(folder):
            #print(str(file))
            filename = str(file)
            if filename.endswith(".csv"):
                path = Path.joinpath(folder, filename)
                #print(path)
            elif filename.endswith(".parquet"): #TODO: handle parquet files later
                continue
            else:
                continue
            paths_list.append(path)
            #print(paths_list)
    return paths_list

In [9]:
files_list = scrape_citibike_files()

for url in files_list[-1:]:
    download_files(url)

2025-09-20 19:08:52,256 | INFO | 1543903068.py:20 | Starting download: https://s3.amazonaws.com/tripdata/JC-202508-citibike-tripdata.csv.zip
2025-09-20 19:08:53,013 | INFO | 1543903068.py:32 | Download complete: /home/bonaventure/Documents/data_engineering/data-engineering-zoomcamp-projects/2-docker_terraform/data/citibike_data/archive_files/JC-202508-citibike-tripdata.csv.zip
2025-09-20 19:08:53,862 | INFO | 1543903068.py:44 | zip file extraction complete: /home/bonaventure/Documents/data_engineering/data-engineering-zoomcamp-projects/2-docker_terraform/data/citibike_data/archive_files/JC-202508-citibike-tripdata.csv.zip


Download and extraction of JC-202508-citibike-tripdata.csv.zip complete. Total files processed: 1


In [10]:
# Load data into cloud storage
def upload_to_gcs(bucket_name, local_path, gcs_path):
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(gcs_path)
    blob.upload_from_filename(local_path)
    print(f"Uploaded {local_path} to gs://{bucket_name}/{gcs_path}")

def upload_to_aws(bucket_name, local_path, gcs_path): # TODO: Implement AWS upload
    pass

def upload_to_azure(bucket_name, local_path, gcs_path): # TODO: Implement Azure upload
    pass




In [11]:
# Explore a sample file
path = find_csv_file()[0]
citibike_data_202508 = pd.read_csv(filepath_or_buffer=path, 
                                   #nrows=100,
                                   parse_dates=["started_at", "ended_at"])
citibike_data_202508.head()
#citibike_data_202508.count()




Unnamed: 0,ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual
0,AABD1C039D2D622D,electric_bike,2025-08-18 09:02:16.100,2025-08-18 09:07:45.510,City Hall - Washington St & 1 St,HB105,14 St Ferry - 14 St & Shipyard Ln,HB202,40.73736,-74.03097,40.752961,-74.024353,member
1,3420D743AD40EDC8,classic_bike,2025-08-15 14:04:18.806,2025-08-15 14:29:45.333,Newark St & Washington St,HB612,JC Medical Center,JC110,40.73681,-74.0309,40.715391,-74.049692,casual
2,6957013BE2AF6B52,classic_bike,2025-08-23 13:56:08.239,2025-08-23 13:59:58.649,Exchange Pl,JC116,York St & Marin Blvd,JC097,40.716366,-74.034344,40.716615,-74.042412,member
3,B005E728E67ED43F,electric_bike,2025-08-19 17:41:53.820,2025-08-19 17:50:22.265,Riverview Park,JC057,14 St Ferry - 14 St & Shipyard Ln,HB202,40.744319,-74.043991,40.752961,-74.024353,member
4,2BF0B8E6F05BC2AF,classic_bike,2025-08-27 09:26:19.273,2025-08-27 09:31:24.381,Manila & 1st,JC082,JC Medical Center,JC110,40.721651,-74.042884,40.715391,-74.049692,member


In [12]:
# Connect to postgres database inside containter

engine = create_engine("postgresql://postgres:postgres@localhost:5432/citibike")
db_connection = engine.connect()
db_connection


<sqlalchemy.engine.base.Connection at 0x7bdc1f80f400>

In [13]:
schema = pd.io.sql.get_schema(citibike_data_202508, name="citibike_data", con=db_connection)
print(schema)


CREATE TABLE citibike_data (
	ride_id TEXT, 
	rideable_type TEXT, 
	started_at TIMESTAMP WITHOUT TIME ZONE, 
	ended_at TIMESTAMP WITHOUT TIME ZONE, 
	start_station_name TEXT, 
	start_station_id TEXT, 
	end_station_name TEXT, 
	end_station_id TEXT, 
	start_lat FLOAT(53), 
	start_lng FLOAT(53), 
	end_lat FLOAT(53), 
	end_lng FLOAT(53), 
	member_casual TEXT
)




In [14]:

def load_data_to_postgres(paths_list=find_csv_file(), engine=engine, data_path=path):
    """Create schema in psql database and load data"""
    for path in paths_list[-1:]:
        df_name = "_".join(["citibike", str(path).split("/")[-2].strip()])
        # Create an iterator from the large dataset
        df_header = pd.read_csv(filepath_or_buffer=path,
                              parse_dates=["started_at", "ended_at"]).head(n=0)
            
        try:
            # Load the header of the df as schemas
            df_header.to_sql(name=df_name, con=engine, if_exists="replace")
            # Create an iterator from the large dataset
            df_iter = pd.read_csv(filepath_or_buffer=path,
                                chunksize=500000, 
                                parse_dates=["started_at", "ended_at"])
            while True:
                try:
                    start_time = time()
                    chunk_num = 0
                    df = next(df_iter)
                    df.to_sql(name=df_name, con=engine, if_exists="append")
                    chunk_num += 1
                    end_time = time()
                except StopIteration:
                    logging.info(f"Finished ingesting chunck {chunk_num} into postgres; just check if last")
                    break
            print(f'Insertion of {df_name} complete, I/O osp time {(end_time-start_time):.2f}')
            logging.info(f"Insertion into postgres db complete: %s", df_name)
        except Exception as e:
            logging.error("Data insertion failed: %s", e)
            raise 

#load_data_to_postgres()

In [None]:
params = {
            'user': 'postgres',
            'password': 'postgres',
            'host': 'localhost',
            'port': '5432'
     }

def ingest_from_bigquery_to_postgres(params=params):
    """Ingest data from Big Query to Postgres in chunks"""

    # Replace these with your PostgreSQL credentials
    #DB_USER = params.user
    #DB_PASS = params.password
    #DB_HOST = params.host
    #DB_PORT = params.port
    #DB_NAME = 'citibikebq'  # The database you want to create

    # Use these for a dictionary input for nb testing
    DB_USER = params['user']
    DB_PASS = params['password']
    DB_HOST = params['host']
    DB_PORT = params['port']
    DB_NAME = 'citibikebq'  # The database you want to create

    # Connect to default database and create new database if not exists
    default_engine = create_engine(f'postgresql+psycopg2://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/postgres', 
                        isolation_level='AUTOCOMMIT')
    try:
        # Execute CREATE DATABASE
        with default_engine.connect() as default_conn:
            # Check if the database exists
            result = default_conn.execute(text("SELECT 1 FROM pg_database WHERE datname = :dbname"), {"dbname": DB_NAME})
            exists = result.scalar()  # Returns None if no rows found
            
            if not exists:
                default_conn.execute(text(f"CREATE DATABASE {DB_NAME}"))
                #print(f"Database '{DB_NAME}' created successfully!")
                logging.info(f"Database '{DB_NAME}' created successfully!")
            else:
                #print(f"Database '{DB_NAME}' already exists.")
                logging.info(f"Database '{DB_NAME}' already exists.")

    except Exception as e:
                logging.error("Data insertion from Big Query failed: %s", e)
                raise 
    
    # Connect to the newly created database and ingest data into postgres container
    citibikebq_engine = create_engine(f'postgresql+psycopg2://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}', 
                       isolation_level='AUTOCOMMIT')

    try:
        with citibikebq_engine.connect() as citibikebq_conn:
            # Initialize BigQuery client
            os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/home/bonaventure/gcp-keys.json"

            client = bigquery.Client()

            # Download data from BigQuery and load to Postgres
            for year in [2013]:#range(2013, 2015):  # TODO: Extend range until year 2019 in production or cloud environment
                # Query BigQuery in 1 million row chunks
                chunk_size = 500_000  # 1/2 million rows per chunk
                offset = 0

                # Check if table exists
                check_num = 0
                if check_num >= 20:  # Safety check to avoid infinite loops during testing
                    logging.warning("Reached maximum number of checks, stopping to avoid infinite loop.")
                    break

                check_table_query = f"SELECT to_regclass('public.citibike_trips_{year}')"
                check_table_result = citibikebq_conn.execute(text(check_table_query))
                table_exists = check_table_result.scalar()

                while True:
                    try:
                        # Skip table if exists
                        if table_exists:
                            print(f"Table 'citibike_trips_{year}' already exists in {DB_NAME}")
                            check_num += 1
                            break
                        # Query BigQuery in chunks
                        query = f"""
                        SELECT *
                        FROM `bigquery-public-data.new_york_citibike.citibike_trips`
                        WHERE EXTRACT(YEAR FROM starttime) = {year}
                        LIMIT {chunk_size} OFFSET {offset}
                        """
                        
                        df_chunk = client.query(query).to_dataframe()
                        if df_chunk.empty:
                            logging.info(f"Insertion into postgres db '{DB_NAME}' complete: %s", "citibike_trips_{year}")
                            break  # stop when there is no more data
                        
                        df_chunk.to_sql(f'citibike_trips_{year}', 
                                        citibikebq_engine, 
                                        if_exists='replace', index=False)
                        logging.info(f"Loaded rows {offset} to {offset + len(df_chunk)} into Postgres")
                        #print(f"Loaded rows {offset} to {offset + len(df_chunk)} into Postgres")

                        offset += chunk_size
                    except StopIteration:
                        logging.info(f"Insertion into postgres db {DB_NAME} complete for table citibike_trips_{year}; just check if last")
                        break
    except Exception as e:
                logging.error("Data insertion from Big Query failed: %s", e)
                raise 



In [35]:
ingest_from_bigquery_to_postgres()

2025-09-20 20:57:38,209 | INFO | 3505170162.py:41 | Database 'citibikebq' already exists.


Table 'citibike_trips_2013' already exists in citibikebq
