<div style="
            display: block;
            color: Black;
            border-radius: 5px;
            background-color: #00acc1;
            font-size: 160%;
            font-family: lora;">
    <p style="padding: 10px; color: White; text-align: center;">Python ETL</p> </div>
    
Erickson Figueroa, 3150886 <br><br>
Data Acquisition and Management<br><br>
Submitted to: <br>
Muhammad Shahin<br><br>
UNIVERSITY OF WINNIPEG <br>
Professional Applied and Continuing Education (PACE)<br>

January 2024

### Project Outline
- Step 1: Download the zipped data folder from the URL
- Step 2: Unzip the folder and copy data files under a different folder
- Step 3: Extract and integrate data from these files into a common Pandas DataFrame
- Step 4: Perform a simple data transformation
- Step 5: Log operations with appropriately tagged messages through out the ETL process

In [66]:
# Importing libraries
import os
import pandas as pd
import requests as rq
from io import BytesIO
from zipfile import ZipFile
from xml.etree import ElementTree as ET
import logging
from datetime import datetime

<div style="
            display: block;
            color: Black;
            border-radius: 5px;
            background-color: #00acc1;
            font-size: 160%;
            font-family: lora;">
    <p style="padding: 10px; color: White; text-align: center;">Getting data from the source</p> </div>

In [80]:
# Set up logging file
log_folder = "./logs/"
log_file_path = os.path.join(log_folder,"etl_log.txt")

# Ensuring the destination folder exists, create if not
if not os.path.exists(log_folder):
    os.makedirs(log_folder)

# Configuring the logging to overwrite the file for each run, adding date and time
logging.basicConfig(filename=log_file_path, filemode='w', level=logging.INFO,
                    format="[%(levelname)s]: %(message)s")

# Function to get current date and time with milliseconds
def get_current_datetime():   
    return datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]

try:
    
    # The folder where the files are extracted
    extracted_folder = "./staging/"
    
    # Ensure the extracted folder exists, create if not
    if not os.path.exists(extracted_folder):
        os.makedirs(extracted_folder)
        
    # Log an initial message indicating the process has started
    logging.info(f"ETL process started at: {get_current_datetime()}")
    
    # Download the zip file from the URL
    url  = "https://elasticbeanstalk-us-east-2-340729127361.s3.us-east-2.amazonaws.com/prices.zip"
    
    # Send a GET request to the specified URL to download the zip file
    response  = rq.get(url)
    
    # 200 is a condition that checks if the HTTP request was successful
    if response.status_code == 200:
        
    # Unzip the content
        with ZipFile(BytesIO(response.content)) as z:
            z.extractall("./staging/")
            logging.info(f"Zip file downloaded and extracted successfully to the path:'{extracted_folder}' {get_current_datetime()}")
        
        # Get a list of file names in the extracted folder
        file_names = os.listdir(extracted_folder)
        
        # Sort the file names by file type
        file_names.sort(key=lambda x: x.split('.')[-1])
        
        # Iterate through each file in the extracted folder and print the file names
        print("File names list:")
        print("----------------")
        for file_name in file_names:
            print(file_name)    
    else:
        logging.error(f"Failed to download the zip file. Status code: {response.status_code}")
    
except Exception as e:
    # Log an error message if an exception occurs
    logging.error(f"An error occurred: {str(e)}, {get_current_datetime()}")



File names list:
----------------
car_prices1.csv
car_prices2.csv
car_prices3.csv
car_prices1.json
car_prices2.json
car_prices3.json
car_prices1.xml
car_prices2.xml
car_prices3.xml


<div style="
            display: block;
            color: Black;
            border-radius: 5px;
            background-color: #00acc1;
            font-size: 160%;
            font-family: lora;">
    <p style="padding: 10px; color: White; text-align: center;">Consolidating files into a dataframe</p> </div>

In [68]:
try:
    
    # Initialize an empty DataFrame to store the consolidated data
    consolidated_df = pd.DataFrame()
    
    # Iterate through each file in the folder
    for file_name in os.listdir(extracted_folder):
        file_path = os.path.join(extracted_folder, file_name)

        # Check the file format and read accordingly
        if file_name.endswith(".json"):
            df = pd.read_json(file_path, lines=True)
        elif file_name.endswith(".xml"):
            tree = ET.parse(file_path)
            root = tree.getroot()
            data = []
            for row in root.findall('.//row'):
                row_data = {elem.tag: elem.text for elem in row}
                data.append(row_data)
            df = pd.DataFrame(data)
        elif file_name.endswith(".csv"):
            df = pd.read_csv(file_path)
        else: 
            logging.warning(f"Unsupported file format: {file_name} {get_current_datetime()}")
            continue
        
        # Concatenate the current DataFrame with the consolidated DataFrame
        consolidated_df = pd.concat([consolidated_df, df], ignore_index=True)
    
    # Log a success message if no errors occurred
    logging.info(f"The consolidated dataframe was created successfully. {get_current_datetime()}")
    
except Exception as e:
    # Log an error message if an exception occurs
    logging.error(f"An error occurred consolidating files into the dataframe: {str(e)} {get_current_datetime()}")

# Display the consolidated DataFrame
consolidated_df.head(15)

Unnamed: 0,car_model,year_of_manufacture,price,fuel
0,ritz,2014,5000.0,Petrol
1,sx4,2013,7089.552239,Diesel
2,ciaz,2017,10820.895522,Petrol
3,wagon r,2011,4253.731343,Petrol
4,swift,2014,6865.671642,Diesel
5,vitara brezza,2018,13805.970149,Diesel
6,ciaz,2015,10074.626866,Petrol
7,s cross,2015,9701.492537,Diesel
8,ciaz,2016,13059.701493,Diesel
9,ciaz,2015,11119.402985,Diesel


<div style="
            display: block;
            color: Black;
            border-radius: 5px;
            background-color: #00acc1;
            font-size: 160%;
            font-family: lora;">
    <p style="padding: 10px; color: White; text-align: center;">Data transformation</p> </div>

In [83]:
try:
    # Perform the data transformation on the 'price' column
    # Adding a new column: modified_price, convert 'price' column to numeric, round to 2 decimals, 
    # and double the values
    consolidated_df['modified_price'] = pd.to_numeric(consolidated_df['price'], errors='coerce').round(2) * 2

    # Reorder columns to place 'modified_price' just after 'price'
    columns_order = ['car_model', 'year_of_manufacture', 'price', 'modified_price', 'fuel']
    consolidated_df = consolidated_df[columns_order]
    
    # Writing in the log file the path with the saved CSV file
    logging.info(f"The data transformation process was successfully executed. {get_current_datetime()}")

except Exception as e:
    # Log an error message if an exception occurs
    logging.error(f"An error occurred: {str(e)} {get_current_datetime()}")

# Display the transformed DataFrame
consolidated_df.head(15)

Unnamed: 0,car_model,year_of_manufacture,price,modified_price,fuel
0,ritz,2014,5000.0,10000.0,Petrol
1,sx4,2013,7089.552239,14179.1,Diesel
2,ciaz,2017,10820.895522,21641.8,Petrol
3,wagon r,2011,4253.731343,8507.46,Petrol
4,swift,2014,6865.671642,13731.34,Diesel
5,vitara brezza,2018,13805.970149,27611.94,Diesel
6,ciaz,2015,10074.626866,20149.26,Petrol
7,s cross,2015,9701.492537,19402.98,Diesel
8,ciaz,2016,13059.701493,26119.4,Diesel
9,ciaz,2015,11119.402985,22238.8,Diesel


<div style="
            display: block;
            color: Black;
            border-radius: 5px;
            background-color: #00acc1;
            font-size: 160%;
            font-family: lora;">
    <p style="padding: 10px; color: White; text-align: center;">Data loading</p> </div>

In [82]:
try:
    # Specify the folder for the destination repository
    destination_repository = "./destination/" 

    # Ensure the destination folder exists, create if not
    if not os.path.exists(destination_repository):
        os.makedirs(destination_repository)

    # Save the transformed DataFrame to a new CSV file in the destination repository
    transformed_csv_path = os.path.join(destination_repository, "transformed_prices.csv")
    consolidated_df.to_csv(transformed_csv_path, index=False)

    # Writing in the log file the path with the saved CSV file
    logging.info(f"Transformed data saved to: '{transformed_csv_path}' {get_current_datetime()}")
    
    # Get the number of rows in the consolidated DataFrame
    num_rows_loaded = len(consolidated_df)
    
    # Log final message indicating the completion of the ETL process
    logging.info(f"ETL process completed successfully. {num_rows_loaded} rows were loaded. {get_current_datetime()}")

    # printing the final result
    print(f"The data upload was successful, check the CSV file in the path: {transformed_csv_path}")
    
except Exception as e:
    # Log an error message if an exception occurs
    logging.error(f"An error occurred: {str(e)} {get_current_datetime()}")

The data upload was successful, check the CSV file in the path: ./destination/transformed_prices.csv
