## High-Level ETL (Extract - Transform - Load) Flow
**Goal**: By the end of this tutorial, you will be able to
- Extract: Download a file from AWS S3 using Python’s boto3.
- Transform: Clean, filter, or manipulate data in Python (often using libraries like pandas).
- Load: Insert the transformed data into a relational database via SQL statements.

## Lab Assignment

1. Implement the following functions
   - `extract_from_csv(file_to_process: str) -> pd.DataFrame`: read the .csv file and return dataframe
   - `extract_from_json(file_to_process: str) -> pd.DataFrame`: read the .json file and return dataframe
   - `extract() -> pd.DataFrame`: extract data of heterogeneous format and combine them into a single dataframe.
   - `transform(df) -> pd.DataFrame`: function for data cleaning and manipulation.
2. Clean the data
   - Round float-type columns to two decimal places.
   - remove duplicate samples
   - Save the cleaned data into parquet file
3. Insert the data into SQL
   - Create postgresql database
   - Insert the data into the database
  
Submission requirement:
    1. Jupyter Notebook
    2. Parquet File
    3. SQL file (optional)

In [1]:
# Required Package:
# psycopg2 2.9.10 (A PostgreSQL database adapter)
# pandas 2.0.3 (For data manipulation and analysis)
# sqlalchemy 2.0.37 (A SQL toolkit and Object Relational Mapper)
# pyarrow 14.0.1 (Provides support for efficient in-memory columnar data structures, part from Apache Arrow Objective)
import pandas as pd

#required for reading .xml files
import xml.etree.ElementTree as ET

#required for navigating machine's directory
import glob
import os.path

#required for communicating with SQL database
from sqlalchemy import create_engine

# E: Extracting data from multiple sources

In [3]:
!pip install boto3

Collecting boto3
  Downloading boto3-1.38.22-py3-none-any.whl.metadata (6.6 kB)
Collecting botocore<1.39.0,>=1.38.22 (from boto3)
  Downloading botocore-1.38.22-py3-none-any.whl.metadata (5.7 kB)
Collecting jmespath<2.0.0,>=0.7.1 (from boto3)
  Downloading jmespath-1.0.1-py3-none-any.whl.metadata (7.6 kB)
Collecting s3transfer<0.14.0,>=0.13.0 (from boto3)
  Downloading s3transfer-0.13.0-py3-none-any.whl.metadata (1.7 kB)
Downloading boto3-1.38.22-py3-none-any.whl (139 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.9/139.9 kB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading botocore-1.38.22-py3-none-any.whl (13.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.6/13.6 MB[0m [31m79.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading jmespath-1.0.1-py3-none-any.whl (20 kB)
Downloading s3transfer-0.13.0-py3-none-any.whl (85 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m85.2/85.2 kB[0m [31m6.2 MB/s[0m eta [36m0:0

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
import boto3
import os

my_aws_access_key_id="ASIAYAAO5HRMG6QPQ5KG"
my_aws_secret_access_key="gBsOag9e2B+37+r4FpEhCDEiyOPot4UtlFUL41ky"
my_aws_session_token='IQoJb3JpZ2luX2VjECwaCXVzLWVhc3QtMiJGMEQCIALWOCoKogFx4i2tTnqPTU9J3ULtY6WP2rpKSd/9jJ84AiBFJAXY3IXomZaQmrtBSylEjaSAX93Z1WA5FXb569WcMSr0Agjl//////////8BEAAaDDU0OTc4NzA5MDAwOCIM12qjpdC5WuNhJkTHKsgCIDmeIZCSeEET4xk0apKAN9GfUk4h4FV57OHFiahURgWUWEvJDF0vu2zEBU0yYtHgAFjDs/4ncLOMNQofBDWGWzMyc9fha7+Vispqqu7R1mc2+x3KkcUsp1MwYH+mhLUOZZG7XPgn/8o3Xqkzgy+k4Jl9g3CgufM2UDsw40V00xYMPrF109sMjU+fm1fb8MyZ2S/I+SCIKXi4IFGSR66+T5o+tcow2cbYv94ELaNsyDneRi2SywdY/uwYBBVxs0Aw0ViWVdJsprvxJe/bB6KYmfxNpMsvF8PxY9FZSRpizxuZv22CrfZQ8PY8UVn9cOyXiKwJ8VXdvmbob5k0QZf0m6FSdPi9nyU4cvZ2+TNgmEp92BaUfhJo/4IcfVGYAwxYdvVuCLAmEbVN4J4duwI2TLvfZ6bSb0zuZzzHh4hqOTrAtJcZawQXJjC/2L/BBjqoAeJNgnGqaPGMRdSPzKP0tJ0NNcByXRjCze1uan3iOV4K1O100D+/2FoU9GUgN8Dbso3HtkUEyaDyQRsoRxtOGTkekrF/dS/mTkXWbD6K+Xgt3OR9w2fYWYgYOuLq8DDh4PA5x2vN4gjxnH/5QBhp8wp6c0pUWYXnEME0sFNvZdJ9P8+ZzRyZVMZobaR6foL40YG16z3Eg73wvKsY0ZCHrxNT7fRbW89s7w=='

BUCKET_NAME = 'de300spring2025'   # Replace with your bucket name
S3_FOLDER = 'dinglin_xia/lab4_data/'             # The folder path in S3
LOCAL_DIR = '/content/drive/MyDrive/Northwestern Data/Sophomore/Spring/DATA300/Marcus_VanMieghem_DE300/Marcus_VanMieghem_DE300/lab5'      # Local directory to save files

In [6]:
def download_s3_folder(bucket_name, s3_folder, local_dir):
    """Download a folder from S3."""
    if not os.path.exists(local_dir):
        os.makedirs(local_dir)

    # List objects within the specified folder
    s3_resource = boto3.resource('s3',
                                aws_access_key_id=my_aws_access_key_id,
                                aws_secret_access_key=my_aws_secret_access_key,
                                aws_session_token=my_aws_session_token)
    bucket = s3_resource.Bucket(bucket_name)

    for obj in bucket.objects.filter(Prefix=s3_folder):
        # Define local file path
        local_file_path = os.path.join(local_dir, obj.key[len(s3_folder):])

        if obj.key.endswith('/'):  # Skip folders
            continue

        # Create local directory if needed
        local_file_dir = os.path.dirname(local_file_path)
        if not os.path.exists(local_file_dir):
            os.makedirs(local_file_dir)

        # Download the file
        bucket.download_file(obj.key, local_file_path)
        print(f"Downloaded {obj.key} to {local_file_path}")

In [7]:
download_s3_folder(BUCKET_NAME, S3_FOLDER, LOCAL_DIR)

Downloaded dinglin_xia/lab4_data/used_car_prices1.csv to /content/drive/MyDrive/Northwestern Data/Sophomore/Spring/DATA300/Marcus_VanMieghem_DE300/Marcus_VanMieghem_DE300/lab5/used_car_prices1.csv
Downloaded dinglin_xia/lab4_data/used_car_prices1.json to /content/drive/MyDrive/Northwestern Data/Sophomore/Spring/DATA300/Marcus_VanMieghem_DE300/Marcus_VanMieghem_DE300/lab5/used_car_prices1.json
Downloaded dinglin_xia/lab4_data/used_car_prices1.xml to /content/drive/MyDrive/Northwestern Data/Sophomore/Spring/DATA300/Marcus_VanMieghem_DE300/Marcus_VanMieghem_DE300/lab5/used_car_prices1.xml
Downloaded dinglin_xia/lab4_data/used_car_prices2.csv to /content/drive/MyDrive/Northwestern Data/Sophomore/Spring/DATA300/Marcus_VanMieghem_DE300/Marcus_VanMieghem_DE300/lab5/used_car_prices2.csv
Downloaded dinglin_xia/lab4_data/used_car_prices2.json to /content/drive/MyDrive/Northwestern Data/Sophomore/Spring/DATA300/Marcus_VanMieghem_DE300/Marcus_VanMieghem_DE300/lab5/used_car_prices2.json
Downloaded 

## Extract data from ./data/ folder

In [9]:
all_files = glob.glob('/content/drive/MyDrive/Northwestern Data/Sophomore/Spring/DATA300/Marcus_VanMieghem_DE300/Marcus_VanMieghem_DE300/lab5')

# Output the list of files
for file in all_files:
    print(file)

/content/drive/MyDrive/Northwestern Data/Sophomore/Spring/DATA300/Marcus_VanMieghem_DE300/Marcus_VanMieghem_DE300/lab5


### Function to extract data from one .csv file

In [10]:
import pandas as pd

def extract_from_csv(file_to_process: str) -> pd.DataFrame:
    return pd.read_csv(file_to_process)

### Function to extract data from one .json file

In [11]:
def extract_from_json(file_to_process: str) -> pd.DataFrame:
    return pd.read_json(file_to_process, lines=True)

### Function to extract data from one  .xml file

In [12]:
def extract_from_xml(file_to_process: str) -> pd.DataFrame:
    dataframe = pd.DataFrame(columns = columns)
    tree = ET.parse(file_to_process)
    root = tree.getroot()
    for person in root:
        car_model = person.find("car_model").text
        year_of_manufacture = int(person.find("year_of_manufacture").text)
        price = float(person.find("price").text)
        fuel = person.find("fuel").text
        sample = pd.DataFrame({"car_model":car_model, "year_of_manufacture":year_of_manufacture, "price":price, "fuel":fuel}, index = [0])
        dataframe = pd.concat([dataframe, sample], ignore_index=True)
    return dataframe

### Function to extract data from the ./data/ folder

In [13]:
def extract() -> pd.DataFrame:
    extracted_data = pd.DataFrame(columns=columns)

    # for csv files
    for csv_file in glob.glob(os.path.join(folder, "*.csv")):
        extracted_data = pd.concat([extracted_data, extract_from_csv(csv_file)], ignore_index=True)

    # for json files
    for json_file in glob.glob(os.path.join(folder, "*.json")):
        extracted_data = pd.concat([extracted_data, extract_from_json(json_file)], ignore_index=True)

    # for xml files
    for xml_file in glob.glob(os.path.join(folder, "*.xml")):
        extracted_data = pd.concat([extracted_data, extract_from_xml(xml_file)], ignore_index=True)

    return extracted_data

### Extract the data

In [16]:
columns = ['car_model','year_of_manufacture','price', 'fuel']
folder = "/content/drive/MyDrive/Northwestern Data/Sophomore/Spring/DATA300/Marcus_VanMieghem_DE300/Marcus_VanMieghem_DE300/lab5"
#table_name = "car_data"

# run
def main():
    data = extract()
    #insert_to_table(data, "car_data")

    return data

data = main()

  extracted_data = pd.concat([extracted_data, extract_from_csv(csv_file)], ignore_index=True)
  dataframe = pd.concat([dataframe, sample], ignore_index=True)
  dataframe = pd.concat([dataframe, sample], ignore_index=True)
  dataframe = pd.concat([dataframe, sample], ignore_index=True)


In [17]:
data.head()

Unnamed: 0,car_model,year_of_manufacture,price,fuel
0,ritz,2014,5000.0,Petrol
1,sx4,2013,7089.552239,Diesel
2,ciaz,2017,10820.895522,Petrol
3,wagon r,2011,4253.731343,Petrol
4,swift,2014,6865.671642,Diesel


# T: Transformation data and save organized data to .parquet file

In [18]:
staging_file = "cars.parquet"
staging_data_dir = "staging_data"

In [19]:
def transform(df):
    print(f"Shape of data before transform: {df.shape}")

    df['price'] = df['price'].round(2)

    df = df.drop_duplicates(subset='car_model')

    print(f"Shape of data after transform: {df.shape}")

    if not os.path.exists(staging_data_dir):
        os.makedirs(staging_data_dir)
        print(f"Directory '{staging_data_dir}' created.")

    df.to_parquet(os.path.join(staging_data_dir, staging_file), index=False)
    return df

In [20]:
# print the head of your data
df = transform(data)
df.head()

Shape of data before transform: (90, 4)
Shape of data after transform: (25, 4)
Directory 'staging_data' created.


Unnamed: 0,car_model,year_of_manufacture,price,fuel
0,ritz,2014,5000.0,Petrol
1,sx4,2013,7089.55,Diesel
2,ciaz,2017,10820.9,Petrol
3,wagon r,2011,4253.73,Petrol
4,swift,2014,6865.67,Diesel


# L: Loading data for further modeling

### Set Up PostgreSQL Locally
#### Step 1: Install PostgreSQL
- Windows: Download from MySQL Official Site {https://www.postgresql.org/download/}
- Mac:
  ```{bash}
  brew install postgresql
  brew services start postgresql
  ```
Then access PostgreSQL CLI
```{bash}
psql -U postgres
```
Note: if you don't have default "postgres" user, then create it manually by
```{bash}
default "postgres" user
```
or
```{bash}
sudo -u $(whoami) createuser postgres -s
```

Then create a database
```{sql}
CREATE DATABASE my_local_db;
\l  -- List all databases
```

#### Step 2: Create a User and Grant Privileges
In PostgreSQL CLI:
```{sql}
CREATE USER myuser WITH ENCRYPTED PASSWORD 'mypassword';
GRANT ALL PRIVILEGES ON DATABASE my_local_db TO myuser;
```

#### Step 3: Install Required Python Libraries
```{bash}
pip install pandas sqlalchemy pymysql psycopg2 mysql-connector-python
```

### Utility function for writing data into the SQL database

In [None]:
# Database credentials
db_host = "localhost"
db_user = "your_user_name"
db_password = "your_password"
db_name = "your_database_name"

conn_string = f"postgresql+psycopg2://{db_user}:{db_password}@{db_host}/{db_name}"

engine = create_engine(conn_string)

In [None]:
# Test connection
df = pd.read_sql("SELECT * FROM pg_catalog.pg_tables;", con=engine)
print(df)

In [None]:
def insert_to_table(data: pd.DataFrame, conn_string:str, table_name:str):
    db = create_engine(conn_string) # creates a connection to the database using SQLAlchemy
    conn = db.connect() # Establishes a database connection
    data.to_sql(table_name, conn, if_exists="replace", index=False)
    conn.close()

In [None]:
# read from the .parquet file

def load() -> pd.DataFrame:
    data = pd.DataFrame()
    for parquet_file in glob.glob(os.path.join(staging_data_dir, "*.parquet")):
        data = pd.concat([pd.read_parquet(parquet_file),data])

    #insert_to_table(data, table_name)
    insert_to_table(data = data, conn_string = conn_string, table_name = 'ml_car_data')

    return data

data = load()
print(data.shape)