# Python Project for Data Engineering
## A Simple ETL Project:

This notebook have the detail implementation of the project that is compulosry for the **IBM Data Engineering Professional Certification**.


## Objectives

After completing this lab you will be able to:

*   Read CSV and JSON file types.
*   Extract data from the above file types.
*   Transform data.
*   Save the **transformed data in a ready-to-load format** which data engineers can use to load into an RDBMS.

<img src="Pics/Simple ETL Project Flow.drawio.png">

**Import the Important Libraries**

In [None]:
import glob
import pandas as pd
import xml.etree.ElementTree as ET
from datetime import datetime

# **EXAMPLE:** 1

**Downloading the Source Files which are stored in S3 bucket.**

- We will use **Wget** which is a **networking command-line tool** that lets you download files and interact with REST APIs. 
  
- It supports the HTTP , HTTPS , FTP , and FTPS internet protocols. Wget can deal with unstable and slow network connections. In the event of a download failure, Wget keeps trying until the entire file has been retrieved.

In [None]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/source.zip

--2022-09-07 14:50:21--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/source.zip
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.45.118.108
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.45.118.108|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2707 (2.6K) [application/zip]
Saving to: ‘source.zip’


2022-09-07 14:50:22 (822 MB/s) - ‘source.zip’ saved [2707/2707]



In DataLore Menu bar, you will see Attach Data Option. Click on it and you will see the downloaded Zip file.

**Unzip Files**

In [None]:
# Now unzip the source.zip file
!unzip source.zip

Archive:  source.zip
  inflating: source3.json            
  inflating: source1.csv             
  inflating: source2.csv             
  inflating: source3.csv             
  inflating: source1.json            
  inflating: source2.json            
  inflating: source1.xml             
  inflating: source2.xml             
  inflating: source3.xml             


<img src="Pics/Screenshot 2022-09-07 at 9.17.59 AM.png">

You can see different files with different formats.

**Set Paths:**

In [None]:
tmpfile = "temp.tmp"
logfile = "logfile.txt"
targetfile = "transformed_data.csv"

# **Extract:**

Lets move to our first step which is Extract. In this Step, we will extract data from each file into one file.

In [None]:
# CSV Extract Function
def extracting_from_csv(file):
    dataframe = pd.read_csv(file)
    return dataframe

# JSON Extract Function
def extracting_from_json(file):
    dataframe = pd.read_json(file, lines=True) # Read the file as a json object per line.
    return dataframe

# XML Extract Function
def extracting_from_xml(file):
    dataframe = pd.DataFrame(columns=["name", "height", "weight"])
    tree = ET.parse(file)
    root = tree.getroot()
    for i in root:
        name = i.find("name").text
        height = i.find("height").text
        weight = i.find("weight").text
        dataframe = dataframe.append({"name": name, "height": height, "weight": weight}, ignore_index=True)
        return dataframe

**Extract Function:**

In [None]:
def extract():
    # First Create an Empty Df
    extracted_data = pd.DataFrame(columns=['name','height','weight'])

    # Process all csv files: Use Glob
    for csvfile in glob.glob("*.csv"):
        extracted_data = extracted_data.append(extracting_from_csv(csvfile), ignore_index=True)
        # When ignore_index=True, then the order of each row would be the same as the order 
        # the row was appended to the data frame.

    # Process all json files
    for jsonfile in glob.glob("*.json"):
        extracted_data = extracted_data.append(extracting_from_json(jsonfile), ignore_index=True)
    
    # Process all xml files
    for xmlfile in glob.glob("*.xml"):
        extracted_data = extracted_data.append(extracting_from_xml(xmlfile), ignore_index=True)
        
    return extracted_data

# **Transform:**

In this case, th transform function does the following tasks.

1.  Convert height which is in inches to millimeter
2.  Convert weight which is in pounds to kilograms

In [None]:
def transformation(data):
    # Convert height from inches to millimeter
    data.height = data.height.astype(float)
    data['height'] = round(data.height * 0.0254, 2)

    # Converting weight from pounds to kilogram
    data.weight = data.weight.astype(float)
    data['weight'] = round(data.weight * 0.45359237, 2)

    return data

# **Loading Data:**

As Extraction and Transformation are done successfully, now we have to load the data into csv file.

In [None]:
def load(targetfile, data_to_load):
    data_to_load.to_csv(targetfile)

# **Logging:**

- I have used logging by importing the logging package
in python. 

- We can access logging package functionalities by using a logger. 
- Logger allows us to set the format in which the logs will generate.

In [None]:
def log(message):
    timestamp_format = '%Y-%h-%d-%H:%M:%S' # Year-Monthname-Day-Hour-Minute-Second
    now = datetime.now() # Get the current time
    timestamp  = now.strftime(timestamp_format)
    with open("logfile.txt", 'a') as file:
        file.write(timestamp + ',' + message + '\n')

# **Running ETL Process:**

In [None]:
log("ETL Job Started")

In [None]:
# Extracting Data

log("Extract Phase Started:")
extracted_data = extract()
log("Extract Phase Ended:")
extracted_data

Unnamed: 0,name,height,weight
0,alex,65.78,112.99
1,ajay,71.52,136.49
2,alice,69.4,153.03
3,ravi,68.22,142.34
4,joe,67.79,144.3
5,alex,65.78,112.99
6,ajay,71.52,136.49
7,alice,69.4,153.03
8,ravi,68.22,142.34
9,joe,67.79,144.3


In [None]:
# Transformation Data

log("Transformation Phase Started:")
transformed_data = transformation(extracted_data)
log("Transformation Phase Ended:")
transformed_data

Unnamed: 0,name,height,weight
0,alex,1.67,51.25
1,ajay,1.82,61.91
2,alice,1.76,69.41
3,ravi,1.73,64.56
4,joe,1.72,65.45
5,alex,1.67,51.25
6,ajay,1.82,61.91
7,alice,1.76,69.41
8,ravi,1.73,64.56
9,joe,1.72,65.45


In [None]:
# Loading Data

log("Loading Phase Started:")
load(targetfile, transformed_data)
log("Loading Phase Ended:")

In [None]:
log("ETL Job Ended")

**So this was the simple ETL implementation thats shows how data is extracted from a web source, transform into the usable format, and then loading the data (in this case, a csv file).**

# **EXAMPLE:** 2

# **LET'S PERFORM ETL ON CAR DEALERSHIP DATA:**

**ABOUT THE DATA:**

The file `dealership_data` contains CSV, JSON, and XML files for used car data which contain features named `car_model`, `year_of_manufacture`, `price`, and `fuel`.

# Downloading the File:

In [None]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/datasource.zip

--2022-09-07 14:50:27--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/datasource.zip
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4249 (4.1K) [application/zip]
Saving to: ‘datasource.zip’


2022-09-07 14:50:27 (1.05 GB/s) - ‘datasource.zip’ saved [4249/4249]



# Unzip the Files:

In [None]:
!unzip datasource.zip -d dealership_data

Archive:  datasource.zip
  inflating: dealership_data/used_car_prices1.csv  
  inflating: dealership_data/used_car_prices2.csv  
  inflating: dealership_data/used_car_prices3.csv  
  inflating: dealership_data/used_car_prices1.json  
  inflating: dealership_data/used_car_prices2.json  
  inflating: dealership_data/used_car_prices3.json  
  inflating: dealership_data/used_car_prices1.xml  
  inflating: dealership_data/used_car_prices2.xml  
  inflating: dealership_data/used_car_prices3.xml  


# Set Paths (Optional):

In [None]:
tmpfile    = "dealership_temp.tmp"               # file used to store all extracted data
logfile    = "dealership_logfile.txt"            # all event logs will be stored in this file
targetfile = "dealership_transformed_data.csv"   # file where transformed data is stored

# Extract

In [None]:
# CSV Extract Function
def extract_csv_data(file):
    """
    Extracts the data from a CSV file and returns a dataframe.
    """
    df = pd.read_csv(file)
    return df

In [None]:
# JSON extract function
def extract_json_data(file):
    """
    Extracts the data from a JSON file and returns a dataframe.
    """
    df = pd.read_json(file, lines=True)
    return df

In [None]:
# Add the XML extract function below, it is the same as the xml extract function above but the column names need to be renamed.
def extract_xml_data(file):
    """Extract data from XML file"""
    
    df = pd.DataFrame(columns=["car_model", "year_of_manufacture", "price", "fuel"])
    tree = ET.parse(file)
    root = tree.getroot()
    for i in root:
        car_model = i.find("car_model").text
        year_of_manufacture = int(i.find("year_of_manufacture").text)
        price = i.find("price").text
        fuel = i.find("fuel").text
        # Now Append the extracted data to df
        df = df.append({"car_model": car_model, "year_of_manufacture": year_of_manufacture, "price": price, "fuel": fuel}, ignore_index=True)
        return df

# **Extract Function:**

It will extract data from each file and append it to Pandas Dataframe:

In [None]:
def extract():
    extracted_data = pd.DataFrame(columns=['car_model','year_of_manufacture','price', 'fuel']) # create an empty data frame to hold extracted data
    
    #process all csv files
    for csvfile in glob.glob("dealership_data/*.csv"):
        extracted_data = extracted_data.append(extract_csv_data(csvfile), ignore_index=True)
        
    #process all json files
    for jsonfile in glob.glob("dealership_data/*.json"):
        extracted_data = extracted_data.append(extract_json_data(jsonfile), ignore_index=True)
    
    #process all xml files
    for xmlfile in glob.glob("dealership_data/*.xml"):
        extracted_data = extracted_data.append(extract_xml_data(xmlfile), ignore_index=True)
        
    return extracted_data

# Transform:

In [None]:
def transform(data):
    """ This function return data after applying transformation rules that need to be applied. """
    data.price = data.price.astype(float)
    data['price'] = round(data.price, 2)
    return data

# Loading:

It will load the data in the required format:

In [None]:
# Load Function
def load(targetfile, data_to_load):
    data_to_load.to_csv(targetfile)

# Logging:

In [None]:
# Log function
def log(message):
    timestamp_format = '%H:%M:%S-%h-%d-%Y'
    now = datetime.now()
    timestamp = now.strftime(timestamp_format)
    with open("dealership_logfile.txt", 'a') as f:
        f.write(timestamp+ ', '+ message+ '\n')

# Running ETL Process:

In [None]:
# Log that you have started the ETL process
log("ETL JOB STARTED:")

# Log that you have started the Extract step
log("Extract Phase Started")

# Call the Extract function
extracted_data = extract()
# Log that you have completed the Extract step
log("Extract Phase is Ended")

# Log that you have started the Transform step
log("Transformation Phase is Started")

# Call the Transform function
transformed_data = transform(extracted_data)
# Log that you have completed the Transform step
log("Transformation Phase is Ended")

# Log that you have started the Load step
log("Loading Phase is Started")
# Call the Load function
load(targetfile, transformed_data)
# Log that you have completed the Load step
log("Loading Phase is Ended")

# Log that you have completed the ETL process
log("ETL JOB IS ENDED")

## Author:

Umer Farooq