# Data Talks Data Engineer Course Home Work
### Source of Data to be inserted in Postgress for homework
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz  
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv  
  
    TIP:- As we can see the source file is zip in gz format so we have to extract it before loading it into PostgreSQL

# Section A
## Writing the ingest.py file to be used in Data ingestor container
    STEP 1: Import necessary python library for the script

In [100]:
%%writefile ingest1.py


import os
import argparse
import gzip
import shutil
import pandas as pd
from sqlalchemy import create_engine
from time import time

Overwriting ingest1.py


    Step 2: Defining the parameter given by user

In [101]:
%%writefile -a ingest1.py

def main(param):
    user = param.user
    password = param.password
    db = param.db
    host = param.host
    port = param.port
    table_name = param.table_name
    file_url = param.file_url


Appending to ingest1.py


    Step 3: Download data from git hub as instructed and extract CSV file

In [102]:
%%writefile -a ingest1.py


 # ‚úÖ Auto-generate output filename from URL
    output_file = file_url.split('/')[-1]

    # Download the file
    print(f"üì• Downloading file from {file_url}")
    os.system(f"wget {file_url} -O {output_file}")
    print(f"‚¨áÔ∏è  Downloading: {file_url}")


    # If file is compressed (.gz), extract it
    if output_file.endswith(".gz"):
        extracted_file = output_file.replace(".gz", "")
        print(f"üóúÔ∏è Extracting compressed file: {output_file} ‚Üí {extracted_file}")
        with gzip.open(output_file, 'rb') as f_in:
            with open(extracted_file, 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)
        print("‚úÖ Extraction complete.")
    else:
        print("‚úÖ File is not compressed, proceeding normally.")
        csv_file = output_file
        

Appending to ingest1.py


    Step 4: Connect with pg-database docker container and create our table inside our ny-taxi DB   

In [103]:
%%writefile -a ingest1.py 

    print(f"üöÄ Connecting to Postgres: {host}:{port}/{db}")
    engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')
    print("Connected to ",engine.connect(), " postgress server")

    print(f"üìÑ Reading data from {csv_file}")
    df_iter = pd.read_csv(csv_file, iterator=True, chunksize=100000)

    # Create table schema
    df = next(df_iter)
    df.head(0).to_sql(name=table_name, con=engine, if_exists='replace')

Appending to ingest1.py


    Step 5: Ingesting data in chunks of managmanageable size of postgress

In [104]:
%%writefile -a ingest1.py


    while True:
        try:
            t_start = time()
            df = next(df_iter)

            # ‚úÖ Handle both green & yellow taxi schemas
            datetime_cols = [col for col in df.columns if 'pickup_datetime' in col or 'dropoff_datetime' in col]
            for col in datetime_cols:
                df[col] = pd.to_datetime(df[col])

            df.to_sql(name=table_name, con=engine, if_exists='append', index=False)

            t_end = time()
            print(f"‚úÖ Inserted chunk in {t_end - t_start:.3f} seconds")

        except StopIteration:
            print("üèÅ Ingestion complete ‚Äî no more chunks to process.")
            break
        except Exception as e:
            print(f"‚ùå Error processing chunk: {e}")
            break


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Ingest CSV or GZ data to Postgres')

    parser.add_argument('--user', required=True, help='username for postgres')
    parser.add_argument('--password', required=True, help='password for postgres')
    parser.add_argument('--host', required=True, help='host for postgres')
    parser.add_argument('--port', required=True, help='port for postgres')
    parser.add_argument('--db', required=True, help='database name for postgres')
    parser.add_argument('--table_name', required=True, help='table name for postgres')
    parser.add_argument('--file_url', required=True, help='URL of the CSV or GZ file')

    args = parser.parse_args()
    main(args)

Appending to ingest1.py


# Section B 

    Step1: Creating Docker-Compose.yml File 

In [105]:
%%writefile docker-compose.yml

services:
  postgres:
    image: postgres:13
    container_name: pg-database
    environment:
      - POSTGRES_USER=root
      - POSTGRES_PASSWORD=root
      - POSTGRES_DB=ny_taxi
    ports:
      - "5432:5432"
    volumes:
      - ./ny_taxi_postgres_data:/var/lib/postgresql/data
    networks:
      - pg-network

  pgadmin:
    image: dpage/pgadmin4
    container_name: pgadmin
    environment:
      - PGADMIN_DEFAULT_EMAIL=admin@admin.com
      - PGADMIN_DEFAULT_PASSWORD=admin
    ports:
      - "8080:80"
    depends_on:
      - postgres
    networks:
      - pg-network

networks:
  pg-network:
    driver: bridge


Overwriting docker-compose.yml


    Step 2: Creating a docker file for our Ingestor container(using Ingest1.py).

In [None]:
%%writefile dockerfile

FROM python:3.12.8
WORKDIR /app

# Install wget
RUN apt-get update && apt-get install -y wget && apt-get clean

# Install Python dependencies
RUN pip install --no-cache-dir pandas sqlalchemy psycopg2-binary

# Create archive directory inside container
RUN mkdir -p /app/archive

# Copy local files into container
COPY ingest.py /app/ingest.py
COPY archive/ /app/archive/

# Run the Python script
ENTRYPOINT ["python", "ingest.py"]


Overwriting dockerfile


## Section C 
    Launching the docker-compose continer using terminal
step 1:$docker-compose up -d  
step 2:$docker build -t [container-name]  
step 3: Checking  network name used by compose container   
          $docker network ls [look for your docker-compose network]  
step 4: Run the continer create in step2  
          $docker run -it --network=[your compose network] [continer-name] \
                    --user=root \
                    --password=root \
                    --db=ny_taxi \
                    --host=pg-database \
                    --port=5432 \
                    --table_name=hw-green \
                    --file_url=https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz  


In [110]:
!docker compose up -d

18603.34s - pydevd: Sending message related to process being replaced timed-out after 5 seconds


[1A[1B[0G[?25l[+] Running 0/2
 [33m‚†ô[0m postgres Pulling                                                        [34m0.1s [0m
 [33m‚†ô[0m pgadmin Pulling                                                         [34m0.1s [0m
[?25h[1A[1A[1A[0G[?25l[+] Running 0/2
 [33m‚†π[0m postgres Pulling                                                        [34m0.2s [0m
 [33m‚†π[0m pgadmin Pulling                                                         [34m0.2s [0m
[?25h[1A[1A[1A[0G[?25l[+] Running 0/2
 [33m‚†∏[0m postgres Pulling                                                        [34m0.3s [0m
 [33m‚†∏[0m pgadmin Pulling                                                         [34m0.3s [0m
[?25h[1A[1A[1A[0G[?25l[+] Running 0/2
 [33m‚†º[0m postgres Pulling                                                        [34m0.4s [0m
 [33m‚†º[0m pgadmin Pulling                                                         [34m0.4s [0m
[?25h[1A[1A[1A[0G[?2

In [120]:
!docker build -t zomm-hw:v1 .

19101.23s - pydevd: Sending message related to process being replaced timed-out after 5 seconds


[1A[1B[0G[?25l
[?25h[1A[0G[?25l[+] Building 0.0s (0/1)                                          docker:default
[?25h[1A[0G[?25l[+] Building 0.2s (1/2)                                          docker:default
[34m => [internal] load build definition from dockerfile                       0.0s
[0m[34m => => transferring dockerfile: 474B                                       0.0s
[0m => [internal] load metadata for docker.io/library/python:3.12.8           0.2s
[?25h[1A[1A[1A[1A[0G[?25l[+] Building 0.3s (1/2)                                          docker:default
[34m => [internal] load build definition from dockerfile                       0.0s
[0m[34m => => transferring dockerfile: 474B                                       0.0s
[0m => [internal] load metadata for docker.io/library/python:3.12.8           0.3s
[?25h[1A[1A[1A[1A[0G[?25l[+] Building 0.5s (1/2)                                          docker:default
[34m => [internal] load build definition

In [121]:
!docker run -it --network=datatalkas_pg-network zomm-hw:v1  \
                    --user=root \
                    --password=root \
                    --db=ny_taxi \
                    --host=pg-database \
                    --port=5432 \
                    --table_name=hw-green \
                    --file_url=https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz  

19123.14s - pydevd: Sending message related to process being replaced timed-out after 5 seconds


üì• Downloading file from https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz
--2025-11-11 12:30:48--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 20.207.73.82
Connecting to github.com (github.com)|20.207.73.82|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://release-assets.githubusercontent.com/github-production-release-asset/513814948/ea580e9e-555c-4bd0-ae73-43051d8e7c0b?sp=r&sv=2018-11-09&sr=b&spr=https&se=2025-11-11T13%3A14%3A35Z&rscd=attachment%3B+filename%3Dgreen_tripdata_2019-10.csv.gz&rsct=application%2Foctet-stream&skoid=96c2d410-5711-43a1-aedd-ab1947aa7ab0&sktid=398a6654-997b-47e9-b12b-9515b896b4de&skt=2025-11-11T12%3A13%3A39Z&ske=2025-11-11T13%3A14%3A35Z&sks=b&skv=2018-11-09&sig=akyHGvhTdVFjkRJESIJGEThq3DEi%2BPZ%2FLU69f6y3iX8%3D&jwt=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiY

In [129]:
!docker run -it --network=datatalkas_pg-network zomm-hw:v1  \
                    --user=root \
                    --password=root \
                    --db=ny_taxi \
                    --host=pg-database \
                    --port=5432 \
                    --table_name=taxi_zone_lookup \
                    --file_url=https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv  

19900.59s - pydevd: Sending message related to process being replaced timed-out after 5 seconds


üì• Downloading file from https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
--2025-11-11 12:43:46--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 20.207.73.82
Connecting to github.com (github.com)|20.207.73.82|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://release-assets.githubusercontent.com/github-production-release-asset/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?sp=r&sv=2018-11-09&sr=b&spr=https&se=2025-11-11T13%3A39%3A49Z&rscd=attachment%3B+filename%3Dtaxi_zone_lookup.csv&rsct=application%2Foctet-stream&skoid=96c2d410-5711-43a1-aedd-ab1947aa7ab0&sktid=398a6654-997b-47e9-b12b-9515b896b4de&skt=2025-11-11T12%3A39%3A05Z&ske=2025-11-11T13%3A39%3A49Z&sks=b&skv=2018-11-09&sig=UloaOhiRChN%2B4oYIzfdB85ML8OqmvQycmBlXrrfxGiU%3D&jwt=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmVsZWFzZS1hc3NldHMuZ2l0

In [135]:
import pandas as pd 

query = """
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public';
"""
table_names = pd.read_sql(query, con=engine)
table_names


Unnamed: 0,table_name
0,hw_green
1,taxi_zone_lookup
2,hw-green


In [None]:
query = "SELECT count(1) FROM hw_green;"
tables = pd.read_sql(query, con=engine)
tables

Unnamed: 0,count
0,376386


In [None]:
query = "SELECT * FROM hw_green LIMIT 100;"
tables = pd.read_sql(query, con=engine)
tables

Unnamed: 0,index,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,...,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,,2,2019-10-08 15:30:23,2019-10-08 15:37:34,N,1,65,66,1,0.95,...,0.0,0.5,2.19,0.0,,0.3,9.49,1,1.0,0.00
1,,2,2019-10-08 15:17:48,2019-10-08 15:26:33,N,1,226,260,1,1.69,...,0.0,0.5,1.20,0.0,,0.3,10.00,1,1.0,0.00
2,,2,2019-10-08 16:01:41,2019-10-08 16:29:08,N,1,226,161,1,3.41,...,0.0,0.5,4.31,0.0,,0.3,25.86,1,1.0,2.75
3,,1,2019-10-08 15:54:49,2019-10-08 15:59:37,N,1,181,181,1,0.60,...,1.0,0.5,0.00,0.0,,0.3,6.80,2,1.0,0.00
4,,2,2019-10-08 15:14:39,2019-10-08 15:30:24,N,1,179,223,1,1.20,...,0.0,0.5,0.00,0.0,,0.3,11.30,2,1.0,0.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,,2,2019-10-08 15:52:59,2019-10-08 16:02:00,N,1,75,41,1,1.06,...,0.0,0.5,1.00,0.0,,0.3,9.30,1,1.0,0.00
96,,2,2019-10-08 15:09:29,2019-10-08 15:18:55,N,1,75,238,1,0.98,...,0.0,0.5,1.50,0.0,,0.3,10.30,1,1.0,0.00
97,,2,2019-10-08 15:30:30,2019-10-08 15:35:33,N,1,75,43,1,0.99,...,0.0,0.5,0.00,0.0,,0.3,6.30,2,1.0,0.00
98,,1,2019-10-08 15:55:01,2019-10-08 16:11:09,N,1,189,17,1,2.50,...,1.0,0.5,0.00,0.0,,0.3,14.30,1,1.0,0.00


In [128]:
query = """
SELECT 
    column_name, 
    data_type, 
    is_nullable
FROM information_schema.columns
WHERE table_name = 'hw_green';
"""
columns_info = pd.read_sql(query, con=engine)
columns_info


Unnamed: 0,column_name,data_type,is_nullable
0,index,bigint,YES
1,VendorID,bigint,YES
2,lpep_pickup_datetime,text,YES
3,lpep_dropoff_datetime,text,YES
4,store_and_fwd_flag,text,YES
5,RatecodeID,bigint,YES
6,PULocationID,bigint,YES
7,DOLocationID,bigint,YES
8,passenger_count,bigint,YES
9,trip_distance,double precision,YES
