# Loading required packages

In [60]:

import pandas as pd

#required for reading .xml files
import xml.etree.ElementTree as ET

#required for navigating machine's directory
import glob
import os.path

#required for communicating with SQL database
from sqlalchemy import create_engine



# Setup - Global Variable Definitions

In [61]:
# the postgresql address for SQL base coonection
conn_string = 'postgresql://admin:de300SPRING2024@dd300spring2024.549787090008.us-east-2.redshift-serverless.amazonaws.com:5439/dev'

# new connection config for mysql locally in a docker container (due to redshift not working from TA)
# Database connection parameters
user = 'de300_lab04'  # Your database username
password = 'de300u1springaccess'  # Your database password
host = 'lab4_sql_container'  # Database host (Docker container name if on the same Docker network)
database = 'cars_lab4'  # Database name

table_name = 'car_data'  # Name of the table to create/insert data into
pq_table_name = "car_data_pq"  # Name of the table to create/insert data into

# Create the SQLAlchemy engine
engine = create_engine(f'mysql+mysqlconnector://{user}:{password}@{host}/{database}')

# Define global variables
columns = ['car_model','year_of_manufacture','price', 'fuel']
folder = "data"


### Utility function for writing data into the SQL database

In [62]:
# def insert_to_table(data: pd.DataFrame, table_name:str):
#     db = create_engine(conn_string)
#     conn = db.connect()
#     data.to_sql(table_name, conn, if_exists="replace", index=False)
#     conn.close()


# Insert data to table using MySQL Docker
def insert_to_table(data: pd.DataFrame, table_name:str):
    data.to_sql(name=table_name, con=engine, if_exists='replace', index=False)


# Step one : Extract data from ./data/ folder

In [63]:
# See all files in the data folder
all_files = glob.glob('./data/*')

# Output the list of files
for file in all_files:
    print(file)
    

./data/used_car_prices1.csv
./data/used_car_prices2.csv
./data/used_car_prices3.csv
./data/used_car_prices1.json
./data/used_car_prices3.xml
./data/used_car_prices2.xml
./data/used_car_prices3.json
./data/used_car_prices1.xml
./data/used_car_prices2.json


### Function to extract data from one .csv file

In [64]:
def extract_from_csv(file_to_process: str) -> pd.DataFrame:
    
    # add you line here to read the .csv file and return dataframe
    # Given the file name, parse in the csv, return the dataframe
    
    return pd.read_csv(file_to_process)

### Function to extract data from one .json file

In [65]:
def extract_from_json(file_to_process: str) -> pd.DataFrame:
    
    # add you line here to read the .json file and return dataframe
    # Given the json file name, parse in the file line by line, as each line is a json object, and return the dataframe
    
    return pd.read_json(file_to_process, lines=True)

### Function to extract data from one  .xml file

In [66]:
def extract_from_xml(file_to_process: str) -> pd.DataFrame:
    dataframe = pd.DataFrame(columns = columns)
    tree = ET.parse(file_to_process)
    root = tree.getroot()
    for person in root:
        car_model = person.find("car_model").text
        year_of_manufacture = int(person.find("year_of_manufacture").text)
        price = float(person.find("price").text)
        fuel = person.find("fuel").text
        sample = pd.DataFrame({"car_model":car_model, "year_of_manufacture":year_of_manufacture, "price":price, "fuel":fuel}, index = [0])
        dataframe = pd.concat([dataframe, sample], ignore_index=True)
    return dataframe


### Function to extract data from the ./data/ folder

In [67]:
# Helper function to extract data from all files in the data folder
def extract() -> pd.DataFrame:
    extracted_data = pd.DataFrame(columns = columns)
    #for csv files
    for csv_file in glob.glob(os.path.join(folder, "*.csv")):
        extracted_data = pd.concat([extracted_data, extract_from_csv(csv_file)], ignore_index=True)
        print(f"Extracted data from {csv_file}")

    #add lines for json files
    for json_file in glob.glob(os.path.join(folder, "*.json")):
        extracted_data = pd.concat([extracted_data, extract_from_json(json_file)], ignore_index=True)
        print(f"Extracted data from {json_file}")
    
    #add lines for xml files
    for xml_file in glob.glob(os.path.join(folder, "*.xml")):
        extracted_data = pd.concat([extracted_data, extract_from_xml(xml_file)], ignore_index=True)
        print(f"Extracted data from {xml_file}")
    
    return extracted_data

### Extract the data

In [68]:
# run
def main():
    data = extract()
    insert_to_table(data, table_name) # table name is defined as a global variable
    
    return data

data = main()

# Test connecting to the database and querying the first 3 rows
result = pd.read_sql(f"SELECT * FROM {table_name} LIMIT 3", engine)
for row in result:
    print(row)

Extracted data from data/used_car_prices1.csv
Extracted data from data/used_car_prices2.csv
Extracted data from data/used_car_prices3.csv
Extracted data from data/used_car_prices1.json
Extracted data from data/used_car_prices3.json
Extracted data from data/used_car_prices2.json
Extracted data from data/used_car_prices3.xml
Extracted data from data/used_car_prices2.xml
Extracted data from data/used_car_prices1.xml
car_model
year_of_manufacture
price
fuel


  extracted_data = pd.concat([extracted_data, extract_from_csv(csv_file)], ignore_index=True)
  dataframe = pd.concat([dataframe, sample], ignore_index=True)
  dataframe = pd.concat([dataframe, sample], ignore_index=True)
  dataframe = pd.concat([dataframe, sample], ignore_index=True)


# Step Two: Transformation of the data

In [69]:
staging_file = "cars.parquet"
staging_data_dir = "staging_data"

def transform(df):
    """_summary_
    
    Notes:
        Engine is a globally defined variable

    Args:
        df (dataframe): dataframe of the data
        
    Returns:
        None
    """

    df = pd.read_sql_query(f'SELECT * FROM {table_name}', engine)
    print(f"Shape of data initially: {df.shape}")

    # truncate price with 2 decimal place (add your code below)
    df['price'] = df['price'].apply(lambda x: round(x, 2))
    print(f"Example of price after truncating: {df['price'].head()}")

    # remove samples with same car_model (add your code below)
    df = df.drop_duplicates(subset='car_model')
    print(f"Shape of data after removing same car model: {df.shape}")
    
    # Check if the directory exists
    if not os.path.exists(staging_data_dir):
        os.makedirs(staging_data_dir)  # Create the directory if it does not exist

    # write to parquet
    df.to_parquet(os.path.join(staging_data_dir, staging_file))

In [70]:
# Test Parquet Transformation
transform(data)

# Read the parquet file
pq_data = pd.read_parquet(os.path.join(staging_data_dir, staging_file))
print(pq_data.head())

Shape of data initially: (90, 4)
Example of price after truncating: 0     5000.00
1     7089.55
2    10820.90
3     4253.73
4     6865.67
Name: price, dtype: float64
Shape of data after removing same car model: (25, 4)
  car_model  year_of_manufacture     price    fuel
0      ritz                 2014   5000.00  Petrol
1       sx4                 2013   7089.55  Diesel
2      ciaz                 2017  10820.90  Petrol
3   wagon r                 2011   4253.73  Petrol
4     swift                 2014   6865.67  Diesel


In [71]:
# Print the head of your data for verification
print(data.head())

  car_model year_of_manufacture         price    fuel
0      ritz                2014   5000.000000  Petrol
1       sx4                2013   7089.552239  Diesel
2      ciaz                2017  10820.895522  Petrol
3   wagon r                2011   4253.731343  Petrol
4     swift                2014   6865.671642  Diesel




# Step Three : Loading data for further modeling


In [72]:
# read from the .parquet file

def load() -> pd.DataFrame:
    data = pd.DataFrame()
    for parquet_file in glob.glob(os.path.join(staging_data_dir, "*.parquet")):
        data = pd.concat([pd.read_parquet(parquet_file),data])

    insert_to_table(data, pq_table_name)

    return data

data = load()
print(data.shape)

(25, 4)
