## High-Level ETL (Extract - Transform - Load) Flow
**Goal**: By the end of this tutorial, you will be able to
- Extract: Download a file from AWS S3 using Python’s boto3.
- Transform: Clean, filter, or manipulate data in Python (often using libraries like pandas).
- Load: Insert the transformed data into a relational database via SQL statements.

## Lab Assignment

1. Implement the following functions
   - `extract_from_csv(file_to_process: str) -> pd.DataFrame`: read the .csv file and return dataframe
   - `extract_from_json(file_to_process: str) -> pd.DataFrame`: read the .json file and return dataframe
   - `extract() -> pd.DataFrame`: extract data of heterogeneous format and combine them into a single dataframe.
   - `transform(df) -> pd.DataFrame`: function for data cleaning and manipulation.
2. Clean the data
   - Round float-type columns to two decimal places.
   - remove duplicate samples
   - Save the cleaned data into parquet file
3. Insert the data into SQL
   - Create postgresql database
   - Insert the data into the database
  
Submission requirement:
    1. Jupyter Notebook
    2. Parquet File
    3. SQL file (optional)

In [8]:
# Required Package:
# psycopg2 2.9.10 (A PostgreSQL database adapter)
# pandas 2.0.3 (For data manipulation and analysis)
# sqlalchemy 2.0.37 (A SQL toolkit and Object Relational Mapper)
# pyarrow 14.0.1 (Provides support for efficient in-memory columnar data structures, part from Apache Arrow Objective)
import pandas as pd

#required for reading .xml files
import xml.etree.ElementTree as ET

#required for navigating machine's directory
import glob
import os.path

#required for communicating with SQL database
from sqlalchemy import create_engine

In [10]:
pip install boto3

Collecting boto3
  Downloading boto3-1.36.16-py3-none-any.whl.metadata (6.7 kB)
Collecting botocore<1.37.0,>=1.36.16 (from boto3)
  Downloading botocore-1.36.16-py3-none-any.whl.metadata (5.7 kB)
Collecting s3transfer<0.12.0,>=0.11.0 (from boto3)
  Downloading s3transfer-0.11.2-py3-none-any.whl.metadata (1.7 kB)
Downloading boto3-1.36.16-py3-none-any.whl (139 kB)
   ---------------------------------------- 0.0/139.2 kB ? eta -:--:--
   -- ------------------------------------- 10.2/139.2 kB ? eta -:--:--
   -- ------------------------------------- 10.2/139.2 kB ? eta -:--:--
   ----------- --------------------------- 41.0/139.2 kB 330.3 kB/s eta 0:00:01
   ----------- --------------------------- 41.0/139.2 kB 330.3 kB/s eta 0:00:01
   ----------- --------------------------- 41.0/139.2 kB 330.3 kB/s eta 0:00:01
   --------------------------- ---------- 102.4/139.2 kB 454.0 kB/s eta 0:00:01
   -------------------------------------- 139.2/139.2 kB 458.9 kB/s eta 0:00:00
Downloading botocor

ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
aiobotocore 2.12.3 requires botocore<1.34.70,>=1.34.41, but you have botocore 1.36.16 which is incompatible.


# E: Extracting data from multiple sources

In [22]:
import boto3
import os

my_aws_access_key_id='ASIAYAAO5HRMLXI6IMLK'
my_aws_secret_access_key='oEO9uc7z0Iggmheg9GiFLZ2v6yFIH/Xvjy8qlKgG'
my_aws_session_token='IQoJb3JpZ2luX2VjEHsaCXVzLWVhc3QtMiJGMEQCIA9FSi1eY2bChnH2oSc9c/HlPjysTNCjczuw+Y9D1qAMAiAOs5afOMtlX+VHVhhOYl61bSV0fXh/Fl1GY6M/+lLERir0AgiV//////////8BEAAaDDU0OTc4NzA5MDAwOCIM8iWjHs/tE68hG0bdKsgC+POvBxqBSY2PcQovXNWJqZy+ps0i+JToPn4N9aoDAb0oqkqtM5uBFYho3RJb9420irSZq3RRcx71N5rjDK7/ig/a6QeFqMkeXPOtWuDSsJMsRuXQ9zGESZz9F1exNscJzgbXxIcO9VaOoOfRqCiPhoeQJMkhHnKh6eKq3srEKCDZ4J0XUt8dG4PRohsEFMsp/ew0eWXFFKJ7QGaFT516gPiupVslcJYtiQZCet7V0bSHv+eN1rU3NPMFVzfpKV/nL4SJlA1JicrEUTLct7TVE10FOx6wRbpi10PW6pLACkJv6qrAMU/Vnzn0PNkdan3t11IMlttMJKT9Dmxsz6tf7KPehL8kL86rgd3wBhukcR9Pe8wN9ydObQgpfH4m5mZwFETpanty0xCOqo9hoJIYhiCul540rALvgTNG4QbqSoeEwpvulOsnTzCN4J69BjqoAXw+VzazjVPdW8/bg21myKsufABK9gRh7Kz5VFhadDgOl7VgrT9k3tZew0i2+HCO/RrAvZVI9G0ToXndRZ7Ow+F3XY7Q3hfTDbmMEb1JQ4M2Aru2zv70N2iKnXZHOG6SoAHeFkCPqEesth0ueJ+GGb4vE6iNTRQl/BbcLKfQm1+V4lDhSti6qdbMoRWCMQJ87SkuIEdQS1lS6ObTOoCby/8dLcuvqskvCw=='

BUCKET_NAME = 'de300winter2025'   # Replace with your bucket name
S3_FOLDER = 'dinglin_xia/lab4_data/'             # The folder path in S3
LOCAL_DIR = 'C:/Users/nebah/Downloads/DE300_Lab4'      # Local directory to save files

In [24]:
def download_s3_folder(bucket_name, s3_folder, local_dir):
    """Download a folder from S3."""
    if not os.path.exists(local_dir):
        os.makedirs(local_dir)

    # List objects within the specified folder
    s3_resource = boto3.resource('s3',
                                aws_access_key_id=my_aws_access_key_id,
                                aws_secret_access_key=my_aws_secret_access_key,
                                aws_session_token=my_aws_session_token)
    bucket = s3_resource.Bucket(bucket_name)
    
    for obj in bucket.objects.filter(Prefix=s3_folder):
        # Define local file path
        local_file_path = os.path.join(local_dir, obj.key[len(s3_folder):])  
        
        if obj.key.endswith('/'):  # Skip folders
            continue
        
        # Create local directory if needed
        local_file_dir = os.path.dirname(local_file_path)
        if not os.path.exists(local_file_dir):
            os.makedirs(local_file_dir)
        
        # Download the file
        bucket.download_file(obj.key, local_file_path)
        print(f"Downloaded {obj.key} to {local_file_path}")

In [26]:
download_s3_folder(BUCKET_NAME, S3_FOLDER, LOCAL_DIR)

Downloaded dinglin_xia/lab4_data/used_car_prices1.csv to C:/Users/nebah/Downloads/DE300_Lab4\used_car_prices1.csv
Downloaded dinglin_xia/lab4_data/used_car_prices1.json to C:/Users/nebah/Downloads/DE300_Lab4\used_car_prices1.json
Downloaded dinglin_xia/lab4_data/used_car_prices1.xml to C:/Users/nebah/Downloads/DE300_Lab4\used_car_prices1.xml
Downloaded dinglin_xia/lab4_data/used_car_prices2.csv to C:/Users/nebah/Downloads/DE300_Lab4\used_car_prices2.csv
Downloaded dinglin_xia/lab4_data/used_car_prices2.json to C:/Users/nebah/Downloads/DE300_Lab4\used_car_prices2.json
Downloaded dinglin_xia/lab4_data/used_car_prices2.xml to C:/Users/nebah/Downloads/DE300_Lab4\used_car_prices2.xml
Downloaded dinglin_xia/lab4_data/used_car_prices3.csv to C:/Users/nebah/Downloads/DE300_Lab4\used_car_prices3.csv
Downloaded dinglin_xia/lab4_data/used_car_prices3.json to C:/Users/nebah/Downloads/DE300_Lab4\used_car_prices3.json
Downloaded dinglin_xia/lab4_data/used_car_prices3.xml to C:/Users/nebah/Downloads/

## Extract data from ./data/ folder

In [76]:
all_files = glob.glob('./data/*')

# Output the list of files
for file in all_files:
    print(file)

./data\used_car_prices1.csv
./data\used_car_prices1.json
./data\used_car_prices1.xml
./data\used_car_prices2.csv
./data\used_car_prices2.json
./data\used_car_prices2.xml
./data\used_car_prices3.csv
./data\used_car_prices3.json
./data\used_car_prices3.xml


### Function to extract data from one .csv file

In [80]:
def extract_from_csv(file_to_process: str) -> pd.DataFrame:
    
    # add you line here to read the .csv file and return dataframe
    return pd.read_csv(file_to_process)

### Function to extract data from one .json file

In [102]:
def extract_from_json(file_to_process: str) -> pd.DataFrame:
    
    # add you line here to read the .json file and return dataframe
    try:
        # Attempt to read as JSON Lines first.
        return pd.read_json(file_to_process, lines=True)
    except ValueError as e:
        # If that fails, try the standard read_json.
        print(f"Could not read {file_to_process} with lines=True: {e}")
        return pd.read_json(file_to_process)

### Function to extract data from one  .xml file

In [86]:
def extract_from_xml(file_to_process: str) -> pd.DataFrame:
    dataframe = pd.DataFrame(columns = columns)
    tree = ET.parse(file_to_process)
    root = tree.getroot()
    for person in root:
        car_model = person.find("car_model").text
        year_of_manufacture = int(person.find("year_of_manufacture").text)
        price = float(person.find("price").text)
        fuel = person.find("fuel").text
        sample = pd.DataFrame({"car_model":car_model, "year_of_manufacture":year_of_manufacture, "price":price, "fuel":fuel}, index = [0])
        dataframe = pd.concat([dataframe, sample], ignore_index=True)
    return dataframe

### Function to extract data from the ./data/ folder

In [104]:
def extract() -> pd.DataFrame:
    # Create a list to store DataFrames from different files.
    data_frames = []
    
    # Process CSV files
    for csv_file in glob.glob(os.path.join(folder, "*.csv")):
        try:
            df_csv = extract_from_csv(csv_file)
            data_frames.append(df_csv)
        except Exception as e:
            print(f"Error reading CSV file {csv_file}: {e}")
    
    # Process JSON files
    for json_file in glob.glob(os.path.join(folder, "*.json")):
        try:
            df_json = extract_from_json(json_file)
            data_frames.append(df_json)
        except Exception as e:
            print(f"Error reading JSON file {json_file}: {e}")
    
    # Process XML files (requires pandas 1.3.0+)
    for xml_file in glob.glob(os.path.join(folder, "*.xml")):
        try:
            df_xml = pd.read_xml(xml_file)
            data_frames.append(df_xml)
        except Exception as e:
            print(f"Error reading XML file '{xml_file}': {e}")
    
    # If we have extracted any data, concatenate the DataFrames.
    if data_frames:
        extracted_data = pd.concat(data_frames, ignore_index=True)
        # Ensure the DataFrame contains only the desired columns (if possible)
        # This will raise a KeyError if any expected column is missing.
        try:
            extracted_data = extracted_data[columns]
        except KeyError as e:
            print(f"Warning: Not all expected columns are present. {e}")
    else:
        extracted_data = pd.DataFrame(columns=columns)
    
    return extracted_data

### Extract the data

In [107]:
columns = ['car_model','year_of_manufacture','price', 'fuel']
folder = "./data"
#table_name = "car_data"

# run
def main():
    data = extract()
    #insert_to_table(data, "car_data")
    
    return data

data = main()

In [109]:
data.head()

Unnamed: 0,car_model,year_of_manufacture,price,fuel
0,ritz,2014,5000.0,Petrol
1,sx4,2013,7089.552239,Diesel
2,ciaz,2017,10820.895522,Petrol
3,wagon r,2011,4253.731343,Petrol
4,swift,2014,6865.671642,Diesel


# T: Transformation data and save organized data to .parquet file 

In [111]:
staging_file = "cars.parquet"
staging_data_dir = "staging_data"

In [113]:
def transform(df):
    print(f"Shape of data {df.shape}")

    # truncate price with 2 decimal place (add your code below)
    if 'price' in df.columns:
        df['price'] = df['price'].round(2)

    # remove samples with same car_model (add your code below)
    if 'car_model' in df.columns:
        df = df.drop_duplicates(subset=['car_model'], keep='first')
    
    print(f"Shape of data {df.shape}")

    # Ensure the staging directory exists before writing the Parquet file
    if not os.path.exists(staging_data_dir):
        os.makedirs(staging_data_dir)
        print(f"Directory '{staging_data_dir}' created.")

    # write to parquet
    df.to_parquet(os.path.join(staging_data_dir, staging_file))
    return df

In [115]:
# print the head of your data
df = transform(data)
df.head()

Shape of data (90, 4)
Shape of data (25, 4)
Directory 'staging_data' created.


Unnamed: 0,car_model,year_of_manufacture,price,fuel
0,ritz,2014,5000.0,Petrol
1,sx4,2013,7089.55,Diesel
2,ciaz,2017,10820.9,Petrol
3,wagon r,2011,4253.73,Petrol
4,swift,2014,6865.67,Diesel


# L: Loading data for further modeling

### Set Up PostgreSQL Locally
#### Step 1: Install PostgreSQL
- Windows: Download from MySQL Official Site {https://www.postgresql.org/download/}
- Mac:
  ```{bash}
  brew install postgresql
  brew services start postgresql
  ```
Then access PostgreSQL CLI
```{bash}
psql -U postgres
```
Note: if you don't have default "postgres" user, then create it manually by
```{bash}
default "postgres" user
```
or
```{bash}
sudo -u $(whoami) createuser postgres -s
```

Then create a database
```{sql}
CREATE DATABASE my_local_db;
\l  -- List all databases
```

#### Step 2: Create a User and Grant Privileges
In PostgreSQL CLI:
```{sql}
CREATE USER myuser WITH ENCRYPTED PASSWORD 'mypassword';
GRANT ALL PRIVILEGES ON DATABASE my_local_db TO myuser;
```

#### Step 3: Install Required Python Libraries
```{bash}
pip install pandas sqlalchemy pymysql psycopg2 mysql-connector-python
```

In [119]:
pip install pandas sqlalchemy pymysql psycopg2 mysql-connector-python

Collecting pymysql
  Downloading PyMySQL-1.1.1-py3-none-any.whl.metadata (4.4 kB)
Collecting psycopg2
  Downloading psycopg2-2.9.10-cp312-cp312-win_amd64.whl.metadata (5.0 kB)
Collecting mysql-connector-python
  Downloading mysql_connector_python-9.2.0-cp312-cp312-win_amd64.whl.metadata (6.2 kB)
Downloading PyMySQL-1.1.1-py3-none-any.whl (44 kB)
   ---------------------------------------- 0.0/45.0 kB ? eta -:--:--
   ---------------------------------------- 0.0/45.0 kB ? eta -:--:--
   ------------------------------------ --- 41.0/45.0 kB 653.6 kB/s eta 0:00:01
   ---------------------------------------- 45.0/45.0 kB 560.2 kB/s eta 0:00:00
Downloading psycopg2-2.9.10-cp312-cp312-win_amd64.whl (1.2 MB)
   ---------------------------------------- 0.0/1.2 MB ? eta -:--:--
   - -------------------------------------- 0.0/1.2 MB ? eta -:--:--
   --------- ------------------------------ 0.3/1.2 MB 3.5 MB/s eta 0:00:01
   ----------------------------- ---------- 0.9/1.2 MB 6.8 MB/s eta 0:00:01

### Utility function for writing data into the SQL database

In [140]:
# Database credentials
db_host = "localhost:5432"
db_user = "postgres"
db_password = "dezo015577"
db_name = "my_local_db"

conn_string = f"postgresql+psycopg2://{db_user}:{db_password}@{db_host}/{db_name}"

engine = create_engine(conn_string)

In [142]:
# Test connection
df = pd.read_sql("SELECT * FROM pg_catalog.pg_tables;", con=engine)
print(df)

            schemaname                tablename tableowner tablespace  \
0           pg_catalog             pg_statistic   postgres       None   
1           pg_catalog                  pg_type   postgres       None   
2           pg_catalog         pg_foreign_table   postgres       None   
3           pg_catalog                pg_authid   postgres  pg_global   
4           pg_catalog    pg_statistic_ext_data   postgres       None   
..                 ...                      ...        ...        ...   
63          pg_catalog           pg_largeobject   postgres       None   
64  information_schema                sql_parts   postgres       None   
65  information_schema             sql_features   postgres       None   
66  information_schema  sql_implementation_info   postgres       None   
67  information_schema               sql_sizing   postgres       None   

    hasindexes  hasrules  hastriggers  rowsecurity  
0         True     False        False        False  
1         True   

In [144]:
def insert_to_table(data: pd.DataFrame, conn_string:str, table_name:str):
    db = create_engine(conn_string) # creates a connection to the database using SQLAlchemy
    conn = db.connect() # Establishes a database connection
    data.to_sql(table_name, conn, if_exists="replace", index=False)
    conn.close()

In [146]:
# read from the .parquet file

def load() -> pd.DataFrame:
    data = pd.DataFrame()
    for parquet_file in glob.glob(os.path.join(staging_data_dir, "*.parquet")):
        data = pd.concat([pd.read_parquet(parquet_file),data])

    #insert_to_table(data, table_name)
    insert_to_table(data = data, conn_string = conn_string, table_name = 'ml_car_data')

    return data

data = load()
print(data.shape)

(25, 4)
