# Loading required packages #

In [67]:
import pandas as pd

#required for reading .xml files
import xml.etree.ElementTree as ET



#required for navigating machine's directory
import glob
import os.path

#required for communicating with SQL database
from sqlalchemy import create_engine

In [68]:
# the postgresql address for SQL base coonection
conn_string = 'postgresql://admin:de300SPRING2024@dd300spring2024.549787090008.us-east-2.redshift-serverless.amazonaws.com:5439/dev'

## Utility function for writing data into the SQL database


In [69]:
def insert_to_table(data: pd.DataFrame, table_name:str):
    db = create_engine(conn_string)
    conn = db.connect()
    data.to_sql(table_name, conn, if_exists="replace", index=False)
    conn.close()

## Step one : Extract data from ./data/ folder

In [70]:
all_files = glob.glob('./data/*')

# Output the list of files
for file in all_files:
    print(file)
    

./data/used_car_prices2.xml
./data/used_car_prices3.json
./data/used_car_prices2.csv
./data/used_car_prices3.xml
./data/used_car_prices1.xml
./data/used_car_prices2.json
./data/used_car_prices3.csv
./data/used_car_prices1.json
./data/used_car_prices1.csv


## Function to extract data from one .csv file

In [71]:
def extract_from_csv(file_to_process: str) -> pd.DataFrame:
    # add you line here to read the .csv file and return dataframe
    df = pd.read_csv(file_to_process)
    return df

## Function to extract data from one .json file

In [72]:
def extract_from_json(file_to_process: str) -> pd.DataFrame:
    
    # add you line here to read the .json file and return dataframe
    df = pd.read_json(file_to_process,lines=True)
    return df

## Function to extract data from one .xml file

In [73]:
def extract_from_xml(file_to_process: str) -> pd.DataFrame:
    dataframe = pd.DataFrame(columns = columns)
    tree = ET.parse(file_to_process)
    root = tree.getroot()
    for person in root:
        car_model = person.find("car_model").text
        year_of_manufacture = int(person.find("year_of_manufacture").text)
        price = float(person.find("price").text)
        fuel = person.find("fuel").text
        sample = pd.DataFrame({"car_model":car_model, "year_of_manufacture":year_of_manufacture, "price":price, "fuel":fuel}, index = [0])
        dataframe = pd.concat([dataframe, sample], ignore_index=True)
    return dataframe

## Function to extract data from the ./data/ folder

In [74]:
def extract() -> pd.DataFrame:
    extracted_data = pd.DataFrame(columns = columns)
    #for csv files
    for csv_file in glob.glob(os.path.join(folder, "*.csv")):
        extracted_data = pd.concat([extracted_data, extract_from_csv(csv_file)], ignore_index=True)
    
   # For JSON files
    for json_file in glob.glob(os.path.join(folder, "*.json")):
       extracted_data = pd.concat([extracted_data, extract_from_json(json_file)], ignore_index=True)
         
    # For XML files
    for xml_file in glob.glob(os.path.join(folder, "*.xml")):
        xml_data = extract_from_xml(xml_file)
        extracted_data = pd.concat([extracted_data, xml_data], ignore_index=True)
    
    return extracted_data

## Extract the data


In [75]:
columns = ['car_model','year_of_manufacture','price', 'fuel']
folder = "data"
#table_name = "car_data"

# run
def main():
    data = extract()
    #insert_to_table(data, "car_data")
    
    return data

data = main()

  extracted_data = pd.concat([extracted_data, extract_from_csv(csv_file)], ignore_index=True)
  dataframe = pd.concat([dataframe, sample], ignore_index=True)
  dataframe = pd.concat([dataframe, sample], ignore_index=True)
  dataframe = pd.concat([dataframe, sample], ignore_index=True)


## Step Two: Transformation of the data


In [76]:
staging_file = "cars.parquet"
staging_data_dir = "staging_data"

def transform(df):
    #db = create_engine(conn_string)

    #df = pd.read_sql_query(f'SELECT * FROM {table_name}',con=db)

    print(f"Shape of data {df.shape}")

    # truncate price with 2 decimal place (add your code below)
    df['price'] = df['price'].round(2)

    # remove samples with same car_model (add your code below)
    df = df.drop_duplicates(subset='car_model', keep='first')
    
    print(f"Shape of data {df.shape}")

    if not os.path.exists(staging_data_dir):
        os.makedirs(staging_data_dir)

    # write to parquet
    df.to_parquet(os.path.join(staging_data_dir, staging_file))

In [77]:
# print the head of your data
print(data.head())

  car_model year_of_manufacture         price    fuel
0  alto 800                2017   4253.731343  Petrol
1      ciaz                2015  10223.880597  Diesel
2      ciaz                2015  11194.029851  Petrol
3    ertiga                2015   9104.477612  Petrol
4     dzire                2009   3358.208955  Petrol


In [78]:
def transform2(df):
# truncate price with 2 decimal place (add your code below)
    df['price'] = df['price'].round(2)

    # remove samples with same car_model (add your code below)
    df = df.drop_duplicates(subset='car_model', keep='first')
    return df
    
# Print the head of the data
print(transform2(data).head())

  car_model year_of_manufacture     price    fuel
0  alto 800                2017   4253.73  Petrol
1      ciaz                2015  10223.88  Diesel
3    ertiga                2015   9104.48  Petrol
4     dzire                2009   3358.21  Petrol
8   wagon r                2015   4850.75     CNG


## Step Three : Loading data for further modeling

In [79]:
# read from the .parquet file

def load() -> pd.DataFrame:
    data = pd.DataFrame()
    for parquet_file in glob.glob(os.path.join(staging_data_dir, "*.parquet")):
        data = pd.concat([pd.read_parquet(parquet_file),data])

    #insert_to_table(data, table_name)

    return data

data = load()
print(data.shape)

(25, 4)
