# Car Data ETL (Extract, Transform, Load) Project Using Python

---

## Introduction

Hello, I'm João Henrique. In this project, we'll explore a practical application of Python for an ETL (Extract, Transform, Load) process, focusing on car data. We'll extract information from various file formats, transform it, and load it into a database, showcasing Python's utility in data processing and database operations.

## Project Overview

1. **Data Download and Extraction**:
   - Download a zip file containing our data from a remote server.
   - Extract the contents, which include CSV, JSON, and XML files.

2. **Data Extraction**:
   - Use Python's Pandas library and other tools to read data from each file format and create a uniform data structure.

3. **Data Transformation**:
   - Perform currency conversion from USD to EUR on the extracted data.

4. **Data Load**:
   - Connect to a MySQL database and load the transformed data.

4. **Query data**:
   - Query the data from the Database.

## Technical Aspects

- This project utilizes Python libraries like `requests`, `pandas`, `xml.etree.ElementTree`, and `mysql.connector`.
- For currency conversion, web scraping techniques are used to retrieve the latest USD to EUR conversion rate.

## Execution Guide

- **Setting Up**:
  - Ensure required Python libraries are installed.
  - Download your Kaggle credentials and replace the placeholder in the code.

- **Running the Code**:
  - The script is divided into functions, each handling a part of the ETL process.
  - Run the entire script to see the ETL process or step through each function individually.

- **Logging**:
  - Logs are written to `log_file.txt`, useful for debugging or tracking.

---

**Let's dive into Python's versatility in data processing, from extraction and transformation to database management!**


## Requirements

- Python (version 3.11.3): The Python programming language itself.
- MySQL (version 8.0.34): The database workbench

To run this notebook, you need the following Python libraries:

- pandas (version 2.0.3): A popular data manipulation and analysis library.
- requests (version 2.31.0): A library for making HTTP requests, commonly used for web scraping and API interactions.
- xml.etree.ElementTree (version 1.3.0): A module for parsing and manipulating XML documents.
- mysql.connector (version 8.2.0): A library for connecting to MySQL databases and performing database operations.
- glob: A module for searching and working with file paths using wildcard patterns.
- datetime: A module for working with date and time data.
- zipfile: A module for creating and extracting zip archives.
- json: for return request in json


You can install these libraries using pip:

    pip install requests zipfile36 pandas glob2 mysql-connector-python datetime


In [1]:
import requests
import zipfile
import pandas as pd
import xml.etree.ElementTree as ET 
import glob 
import mysql.connector
from datetime import datetime
import json

# Constants Definition

In this notebook, we define a set of constants that are crucial for the smooth execution of our script. These constants primarily involve paths and file names required for the data extraction, transformation, and loading process.

## Description of Constants

- `URL`: The URL of the data source. This is the link to the zip file containing our datasets in various formats (CSV, JSON, XML).
- `FILE_NAME`: The name of the file obtained from the URL. This helps us to dynamically assign the file name based on the URL.
- `ZIP_FILE_PATH`: The path to the zip file that will be downloaded. We name the zip file as 'datasource.zip'.
- `EXTRACTION_PATH`: The directory path where the contents of the zip file will be extracted. It is set to the current directory denoted by '.'.
- `LOG_FILE`: The name of the file where we will log the operations and errors during the script's execution. It is essential for tracking the progress and debugging.
- `TARGET_FILE`: The name of the final output file where the transformed data will be stored. In this case, it is named 'car_data.csv'.

By defining these constants at the beginning, we ensure that our script remains organized and easier to manage. It also allows for easier adjustments should there be changes in file paths or names.

In [2]:
URL = "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/datasource.zip"
FILE_NAME = URL.split('/')[-1]
ZIP_FILE_PATH = 'datasource.zip'  # The name of the zip file
EXTRACTION_PATH = '.'  # Current directory
LOG_FILE = "log_file.txt"
TARGET_FILE = "car_data.csv"

In [3]:
def log_progress(message): 
    timestamp_format = '%Y-%h-%d-%H:%M:%S' # Year-Monthname-Day-Hour-Minute-Second 
    now = datetime.now() # get current timestamp 
    timestamp = now.strftime(timestamp_format) 
    with open(LOG_FILE,"a") as f: 
        f.write(timestamp + ',' + message + '\n') 

# Downloading and Extracting Data

The `download` function is designed to perform two main tasks: downloading the dataset from a provided URL and extracting the contents of the downloaded zip file.

## Function Description

1. **Downloading the Dataset:**
   - The function makes a GET request to the URL specified in the `URL` constant.
   - If the request is successful (`status code 200`), the content of the response, which is the dataset in a zip format, is written to a file named as specified in `FILE_NAME`.
   - If the request fails, it prints an error message along with the failed status code.

2. **Extracting the Zip File:**
   - After downloading, the function unzips the file located at `ZIP_FILE_PATH`.
   - The contents of the zip file are extracted to the directory specified in `EXTRACTION_PATH`.
   - A confirmation message is printed indicating successful extraction and the location of the extracted files.

This function is an integral part of the data pipeline, ensuring that the required data is available and accessible for the subsequent extraction, transformation, and loading processes.


In [4]:
# Downloading the file
def download():
    response = requests.get(URL)
    if response.status_code == 200:
        with open(FILE_NAME, 'wb') as file:
            file.write(response.content)
        print(f"File downloaded successfully and saved as {FILE_NAME}")
    else:
        print(f"Failed to download file. Status code: {response.status_code}")
    
    # Unzipping the file
    with zipfile.ZipFile(ZIP_FILE_PATH, 'r') as zip_ref:
        zip_ref.extractall(EXTRACTION_PATH)
        print(f"Files extracted to project path")

In [5]:
# Download the data
try:
    download()
    log_progress("Data downloaded successfully")
except Exception as e:
    log_progress(f"Error in download: {e}")

File downloaded successfully and saved as datasource.zip
Files extracted to project path


# Data Extraction Functions

In this section, we define functions to extract data from different file formats: CSV, JSON, and XML. These functions are crucial for retrieving and formatting the data needed for further processing.

## `extract_from_csv` Function
- **Purpose:** This function reads data from a CSV file and converts it into a Pandas DataFrame.
- **Input Parameter:** `file_to_process` - The path to the CSV file.
- **Process:** The function utilizes `pandas.read_csv` to read the CSV file into a DataFrame.
- **Return:** A DataFrame containing data from the CSV file.




In [6]:
def extract_from_csv(file_to_process):
    dataframe = pd.read_csv(file_to_process)
    return dataframe

## `extract_from_json` Function
- **Purpose:** Designed to extract data from a JSON file.
- **Input Parameter:** `file_to_process` - The path to the JSON file.
- **Process:** It uses `pandas.read_json` with `lines=True` to handle the JSON file correctly.
- **Return:** A DataFrame with data from the JSON file.

In [7]:
def extract_from_json(file_to_process):
    dataframe = pd.read_json(file_to_process,lines=True)
    return dataframe

## `extract_from_xml` Function
- **Purpose:** Extracts data from an XML file.
- **Input Parameter:** `file_to_process` - The path to the XML file.
- **Process:**
  - Initializes an empty DataFrame with predefined columns: `["car_model","year_of_manufacture","price","fuel"]`.
  - Parses the XML file and iterates over each element (car) to extract relevant information.
  - For each car, it finds and converts `car_model`, `year_of_manufacture`, `price`, and `fuel` to their respective data types.
  - Appends this data as a new row to the DataFrame.
- **Return:** A DataFrame with the aggregated data from the XML file.

In [8]:
def extract_from_xml(file_to_process):
        dataframe = pd.DataFrame(columns=["car_model","year_of_manufacture","price","fuel"])
        tree = ET.parse(file_to_process)
        root = tree.getroot() 
        for car in root:
            car_model = car.find("car_model").text
            year_of_manufacture = int(car.find("year_of_manufacture").text)
            price = float(car.find("price").text)
            fuel = car.find("fuel").text
            dataframe = pd.concat([dataframe, pd.DataFrame([{"car_model":car_model,
                                                            "year_of_manufacture":year_of_manufacture,
                                                            "price":price,
                                                            "fuel":fuel}])], ignore_index=True)
        return dataframe

These functions are designed to handle different data formats efficiently, making the extraction process versatile and robust for varied data sources.

# Main Data Extraction Function: `extract`

This function is the centerpiece for the data extraction process in our ETL (Extract, Transform, Load) pipeline. Its role is to orchestrate the extraction of data from various file formats and consolidate it into a single DataFrame.

## Function Overview

- **Purpose:** To aggregate data from multiple files of different formats (CSV, JSON, XML) into a unified Pandas DataFrame.
- **Process:**
  1. **Initialization:**
     - Creates an empty DataFrame `extracted_data` with predefined columns: `['car_model', 'year_of_manufacture', 'price', 'fuel']`.
  2. **Processing CSV Files:**
     - Iterates over all CSV files in the current directory (identified by the `*.csv` pattern).
     - For each CSV file, it calls `extract_from_csv` and concatenates the returned DataFrame to `extracted_data`.
  3. **Processing JSON Files:**
     - Iterates over all JSON files (identified by the `*.json` pattern).
     - Invokes `extract_from_json` for each JSON file and appends the result to `extracted_data`.
  4. **Processing XML Files:**
     - Goes through all XML files (marked by `*.xml`).
     - Each XML file's data, extracted through `extract_from_xml`, is merged into `extracted_data`.
- **Return:** The function returns the `extracted_data` DataFrame, which contains combined data from all processed files.

## Usage

Call this function to execute the initial step of the ETL pipeline, ensuring all necessary data from various sources is gathered in a structured and consistent format. This aggregated data is then ready for the subsequent 'Transform' and 'Load' stages.



In [9]:
def extract():
    #Create an empty dataframe with the indexes
    extracted_data = pd.DataFrame(columns=['car_model','year_of_manufacture','price','fuel'])
    
    # process all csv files 
    for csv_file in glob.glob('*.csv'):
        extracted_data = pd.concat([extracted_data,pd.DataFrame(extract_from_csv(csv_file))], ignore_index=True)
        
    # process all json files 
    for json_file in glob.glob('*.json'):
        extracted_data = pd.concat([extracted_data,pd.DataFrame(extract_from_json(json_file))], ignore_index=True)
        
    # process all xml files 
    for xml_file in glob.glob('*.xml'):
        extracted_data = pd.concat([extracted_data,pd.DataFrame(extract_from_xml(xml_file))], ignore_index=True)

    return extracted_data

In [10]:
# Extract data
extracted_data = extract()

In [11]:
print(extracted_data)

        car_model year_of_manufacture         price    fuel
0            ritz                2014   5000.000000  Petrol
1             sx4                2013   7089.552239  Diesel
2            ciaz                2017  10820.895522  Petrol
3         wagon r                2011   4253.731343  Petrol
4           swift                2014   6865.671642  Diesel
..            ...                 ...           ...     ...
85          camry                2006   3731.343284  Petrol
86   land cruiser                2010  52238.805970  Diesel
87  corolla altis                2012   8805.970149  Petrol
88     etios liva                2013   5149.253731  Petrol
89        etios g                2014   7089.552239  Petrol

[90 rows x 4 columns]


# Currency Conversion Function: `get_EUR`

This function is responsible for fetching the current exchange rate between the US Dollar (USD) and the Euro (EUR). It plays a crucial role in the 'Transform' stage of our ETL pipeline, ensuring that the pricing data is standardized in a single currency for consistency and ease of analysis.

## Function Overview

- **Purpose:** To obtain the latest USD to EUR exchange rate from the CoinGecko API.
- **Process:**
  1. **API Request:**
     - Sends a GET request to the CoinGecko API's `/simple/price` endpoint.
     - Requests the price of 1 USD in EUR (`'ids': 'usd'` and `'vs_currencies': 'eur'`).
  2. **Response Handling:**
     - Checks if the API response status is 200 (OK).
     - Parses the JSON response to extract the exchange rate.
  3. **Data Extraction:**
     - Retrieves the USD to EUR exchange rate from the response data.
     - Returns the exchange rate if found.
     - Prints an error message if the USD data is not found or if the API request fails.

## Usage

Use this function to dynamically fetch the current exchange rate between USD and EUR. This rate can then be applied to convert any USD values to EUR in the dataset, ensuring currency consistency in the transformed data.



In [12]:
def get_EUR():

    url = 'https://api.coingecko.com/api/v3/simple/price'
    params = {  
        'ids': 'usd',  # Request the price of 1 USD in EUR
        'vs_currencies': 'eur'
    }

    response = requests.get(url, params=params)
    
    data = response.json()
    
    USD_to_EUR_rate = data['usd']['eur']  # Access the USD/EUR exchange rate
    return USD_to_EUR_rate
 

In [13]:
get_EUR()

0.92178

# Data Transformation Function: `transform`

The `transform` function is a key part of the data processing pipeline. It takes the extracted data and applies necessary transformations to make it suitable for analysis and storage. A primary task in this function is the conversion of price data from USD to EUR.

## Function Overview

- **Purpose:** To transform the extracted data for consistency and readiness for the loading stage.
- **Process:**
  1. **Currency Conversion:**
     - Fetches the current USD to EUR exchange rate using the `get_EUR` function.
     - Rounds off the price in USD to two decimal places for uniformity.
     - Converts the price from USD to EUR using the fetched exchange rate and rounds the result to two decimal places.
     - Adds this converted price as a new column `price_EUR` in the DataFrame.
  2. **Returning Transformed Data:**
     - The function returns the transformed DataFrame with the newly added `price_EUR` column.

## Usage

This function is used in the 'Transform' phase of the ETL process. After extracting data from various sources, use `transform` to standardize the data format, particularly the pricing information, ensuring that all financial values are represented in a single currency (EUR). This makes the dataset uniform and easier to analyze and store in the subsequent stages.



In [14]:
def transform(data):

    USD_to_EUR = get_EUR()
    #round to 2 decimals
    data['price'] = round(data['price'],2)
    
    #transform USD to EUR and round
    data['price_EUR'] = round(data['price']* float(USD_to_EUR),2)
    
    return data

In [15]:
transformed_data = transform(extracted_data)
transformed_data

Unnamed: 0,car_model,year_of_manufacture,price,fuel,price_EUR
0,ritz,2014,5000.00,Petrol,4608.90
1,sx4,2013,7089.55,Diesel,6535.01
2,ciaz,2017,10820.90,Petrol,9974.49
3,wagon r,2011,4253.73,Petrol,3921.00
4,swift,2014,6865.67,Diesel,6328.64
...,...,...,...,...,...
85,camry,2006,3731.34,Petrol,3439.47
86,land cruiser,2010,52238.81,Diesel,48152.69
87,corolla altis,2012,8805.97,Petrol,8117.17
88,etios liva,2013,5149.25,Petrol,4746.48


# Database Connection Function: `connect_database`

The `connect_database` function is a crucial part of the data handling process, particularly in establishing a connection to a MySQL database. This function is essential for both extracting data from and loading data into the database.

## Function Overview

- **Purpose:** To establish a connection with a MySQL database using provided credentials and port information.
- **Inputs:**
  1. `host`: The hostname or IP address of the database server.
  2. `user`: The username used for authentication with the MySQL server.
  3. `password`: The password used for authentication with the MySQL server.
  4. `port`: The port number through which to connect to the MySQL server.
- **Process:**
  - Utilizes the `mysql.connector` library to establish a connection to the MySQL database.
  - The function returns a connection object (`mydb`), which can be used for executing queries, inserting data, and other database operations.

## Usage

This function is vital for any operation that requires interaction with a MySQL database. It is commonly used at the beginning of data extraction and loading processes, allowing for seamless and secure access to the database. The flexibility to specify different host addresses and port numbers makes it versatile for various environments, from local testing to deployment in production systems.



In [16]:
# Connect to database
def connect_database(host,user,password,port):
    
    mydb = mysql.connector.connect(
        
    host=host,
    user=user,
    password=password,
    port=port
    
    )   
    return mydb

In [17]:
mydb = connect_database('127.0.0.1', 'root', 'password', '6666')
mydb

<mysql.connector.connection_cext.CMySQLConnection at 0x1fab8e2d650>

# Data Loading Function: `load`

The `load` function plays a crucial role in the final stage of the ETL (Extract, Transform, Load) process. Its primary responsibility is to load the transformed data into a CSV file and a MySQL database.

## Function Overview

- **Purpose:** To save the transformed data into a persistable format, ensuring that it is stored for future analysis and retrieval.
- **Process:**
  1. **Save to CSV:**
     - The function first saves the transformed data into a CSV file specified by `target_file`.
  2. **Database Interaction:**
     - Establishes a cursor for database interaction.
     - Converts the pandas DataFrame to a structured array for ease of data insertion.
     - Creates a new schema named `car` in the MySQL database if it doesn't already exist.
     - Selects the `car` schema for subsequent operations.
     - Creates a new table named `car` with appropriate columns (`id`, `car_model`, `year_of_manufacture`, `price`, `price_EUR`, `fuel`) if it doesn't exist.
     - Prepares the SQL insert statement (`DML`).
  3. **Data Insertion:**
     - Iterates over each record in the structured array and executes the insert statement to load data into the `car` table.
  4. **Commit and Close:**
     - Commits the transaction to save changes to the database.
     - Closes the database connection.

## Usage

This function is essential in the 'Load' phase of the ETL process. After transforming the data, `load` facilitates storing the data both in a CSV file for easy access and in a MySQL database for more structured and complex queries. This dual approach to data storage maximizes the utility and accessibility of the processed data.



In [28]:
def load(target_file,data,mydb):
    #load to a csv file
    data.to_csv(target_file)
    mycursor = mydb.cursor()
    
    # Convert the DataFrame to a structured array
    data_records = data.to_records(index=False)
    
    # Create schema
    mycursor.execute("CREATE SCHEMA IF NOT EXISTS `car`")
    
    mycursor.execute("USE car")
    
    #Create a table car if not exists
    mycursor.execute("""
                     CREATE TABLE IF NOT EXISTS car 
                        (
                        id INT AUTO_INCREMENT PRIMARY KEY,
                        car_model VARCHAR(255),
                        year_of_manufacture INT, 
                        price FLOAT,
                        price_EUR FLOAT, 
                        fuel VARCHAR(45)
                        )
                    """)
    #make the DML function
    DML = """INSERT INTO car 
        (car_model, year_of_manufacture, price, price_EUR, fuel)
        VALUES (%s, %s, %s, %s, %s)
        """
    for record in data_records:
        VAL = (record.car_model, record.year_of_manufacture, record.price, record.price_EUR, record.fuel)
        mycursor.execute(DML, VAL)
            
    mydb.commit()
    mydb.close()
    return json.dumps({'rows_inserted': len(data_records)})

In [29]:
load(TARGET_FILE, transformed_data, mydb)

'{"rows_inserted": 90}'

# Database Query Function: `query`

The `quey` function is designed to retrieve data from the 'car' table within a MySQL database. This function is instrumental in data retrieval operations, especially for analyzing and processing stored data.

## Function Overview

- **Purpose:** To instatiate a query and return in a dataframe form.
- **Inputs:**
  1. `mydb`: A MySQL database connection object, established through the `connect_database` function.
  2. `query`: The query in string format, a modular way to make a customized query.
- **Process:**
  1. **Create Cursor:** Initialize a cursor object using the database connection for executing queries.
  2. **Execute Query:** Perform a `SELECT` SQL custom query.
  3. **Fetch Data:** Extract all fetched rows using the `fetchall()` method.
  4. **Close Resources:** Close both the cursor and the database connection to ensure resource management and prevent memory leaks.
- **Output:** Returns the fetched data as a list of tuples, each tuple representing a row from the 'car' table.

## Usage

This function is primarily used for data analysis and reporting purposes, where data from the 'car' table needs to be extracted for further processing or visualization. It ensures a streamlined approach to data retrieval from the database, abstracting the complexities of SQL queries and cursor management.



In [35]:
def query(mydb,query):

    try:
        mycursor = mydb.cursor()
        mycursor.execute("USE car")
        # Execute the query and convert directly to a DataFrame
        dataframe = pd.read_sql_query(query, mydb)
    except Exception as e:
        print(f"Error retrieving data: {e}")

    return dataframe


# Make the query yourself!

You can know more about querys in my <a href="https://balenciagaa.notion.site/SQL-1b15be8f10d74695a8de7bc860292f1b?pvs=73">SQL tutorial</a> (It's in portuguese right now but i will fix soon): 


## Query examples:

**Retrieve Records with Specific Conditions**

`SELECT * FROM car WHERE year_of_manufacture > 2010`

**Calculate Average Price**

`SELECT AVG(price) AS average_price FROM car`

**Group by Fuel Type and AVG price for each type**

`SELECT fuel, AVG(price) AS average_price FROM car GROUP BY fuel`

- Connect to database:

In [39]:
mydb = connect_database('127.0.0.1', 'root', '112104', '6666')

- Make the query here:

In [40]:
query(mydb,"SELECT * FROM car")

  dataframe = pd.read_sql_query(query, mydb)


Unnamed: 0,id,car_model,year_of_manufacture,price,price_EUR,fuel
0,1,ritz,2014,5000.00,4608.90,Petrol
1,2,sx4,2013,7089.55,6535.01,Diesel
2,3,ciaz,2017,10820.90,9974.49,Petrol
3,4,wagon r,2011,4253.73,3921.00,Petrol
4,5,swift,2014,6865.67,6328.64,Diesel
...,...,...,...,...,...,...
265,266,camry,2006,3731.34,3439.47,Petrol
266,267,land cruiser,2010,52238.80,48152.70,Diesel
267,268,corolla altis,2012,8805.97,8117.17,Petrol
268,269,etios liva,2013,5149.25,4746.48,Petrol


- Close de database

In [None]:
mydb.close()