<a href="https://colab.research.google.com/github/Rahul711sharma/Data-Engineering/blob/main/ExtractTransformLoad_101_IBM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Objectives

After completing this lab you will be able to:

-   Read CSV and JSON file types.
-   Extract data from the above file types.
-   Transform data.
-   Save the transformed data in a ready-to-load format which data engineers can use to load into an RDBMS.


Import the required modules and functions


In [35]:
import glob                         # this module helps in selecting files 
import pandas as pd                 # this module helps in processing CSV files
import xml.etree.ElementTree as ET  # this module helps in processing XML files.
from datetime import datetime
import xml

## Download Files


In [36]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/source.zip

--2021-06-08 05:26:45--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/source.zip
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2707 (2.6K) [application/zip]
Saving to: ‘source.zip.2’


2021-06-08 05:26:45 (737 MB/s) - ‘source.zip.2’ saved [2707/2707]



## Unzip Files

> Indented block

> Indented block






In [37]:
!unzip source.zip

Archive:  source.zip
replace source3.json? [y]es, [n]o, [A]ll, [N]one, [r]ename: A
  inflating: source3.json            
  inflating: source1.csv             
  inflating: source2.csv             
  inflating: source3.csv             
  inflating: source1.json            
  inflating: source2.json            
  inflating: source1.xml             
  inflating: source2.xml             
  inflating: source3.xml             



## Set Paths


In [54]:
tmpfile    = "temp.tmp"               # file used to store all extracted data
logfile    = "logfile.txt"            # all event logs will be stored in this file
targetfile = "transformed_data.csv"   # file where transformed data is stored

## Extract


### CSV Extract Function


In [55]:
def extract_from_csv(file_to_process):
    dataframe = pd.read_csv(file_to_process)
    return dataframe

### JSON Extract Function


In [56]:
def extract_from_json(file_to_process):
    dataframe = pd.read_json(file_to_process,lines=True)
    return dataframe

In [78]:
pd.read_json('source1.json',lines=1)

Unnamed: 0,name,height,weight
0,jack,68.7,123.3
1,tom,69.8,141.49
2,tracy,70.01,136.46
3,john,67.9,112.37


### XML Extract Function


In [57]:
def extract_from_xml(file_to_process):
    dataframe = pd.DataFrame(columns=["name", "height", "weight"])
    tree = ET.parse(file_to_process)
    root = tree.getroot()
    for person in root:
        name = person.find("name").text
        height = float(person.find("height").text)
        weight = float(person.find("weight").text)
        dataframe = dataframe.append({"name":name, "height":height, "weight":weight}, ignore_index=True)
    return dataframe

In [80]:
extract_from_xml('source1.xml')

Unnamed: 0,name,height,weight
0,simon,67.9,112.37
1,jacob,66.78,120.67
2,cindy,66.49,127.45
3,ivan,67.62,114.14


### Extract Function


In [58]:
def extract():
    extracted_data = pd.DataFrame(columns=['name','height','weight']) # create an empty data frame to hold extracted data
    
    #process all csv files
    for csvfile in glob.glob("*.csv"):
        extracted_data = extracted_data.append(extract_from_csv(csvfile), ignore_index=True)
        
    #process all json files
    for jsonfile in glob.glob("*.json"):
        extracted_data = extracted_data.append(extract_from_json(jsonfile), ignore_index=True)
    
    #process all xml files
    for xmlfile in glob.glob("*.xml"):
        extracted_data = extracted_data.append(extract_from_xml(xmlfile), ignore_index=True)
        
    return extracted_data

In [59]:
glob.glob("*.csv")

['source3.csv', 'source1.csv', 'source2.csv']

In [60]:
glob.glob("*.xml")

['source2.xml', 'source3.xml', 'source1.xml']

## Transform


The transform function does the following tasks.

1.  Convert height which is in inches to millimeter
2.  Convert weight which is in pounds to kilograms


In [61]:
def transform(data):
        #Convert height which is in inches to millimeter
        #Convert the datatype of the column into float
        #data.height = data.height.astype(float)
        #Convert inches to meters and round off to two decimals(one inch is 0.0254 meters)
        data['height'] = round(data.height * 0.0254,2)
        
        #Convert weight which is in pounds to kilograms
        #Convert the datatype of the column into float
        #data.weight = data.weight.astype(float)
        #Convert pounds to kilograms and round off to two decimals(one pound is 0.45359237 kilograms)
        data['weight'] = round(data.weight * 0.45359237,2)
        return data

## Loading


In [62]:
def load(targetfile,data_to_load):
    data_to_load.to_csv(targetfile)

## Logging


In [63]:
def log(message):
    timestamp_format = '%Y-%h-%d-%H:%M:%S' # Year-Monthname-Day-Hour-Minute-Second
    now = datetime.now() # get current timestamp
    timestamp = now.strftime(timestamp_format)
    with open("logfile.txt","a") as f:
        f.write(timestamp + ',' + message + '\n')

## Running ETL Process


In [64]:
log("ETL Job Started")

In [65]:
log("Extract phase Started")
extracted_data = extract()
log("Extract phase Ended")
extracted_data

Unnamed: 0,name,height,weight
0,alex,65.78,112.99
1,ajay,71.52,136.49
2,alice,69.4,153.03
3,ravi,68.22,142.34
4,joe,67.79,144.3
5,alex,65.78,112.99
6,ajay,71.52,136.49
7,alice,69.4,153.03
8,ravi,68.22,142.34
9,joe,67.79,144.3


In [67]:
log("Transform phase Started")
transformed_data = transform(extracted_data)
log("Transform phase Ended")
transformed_data 

Unnamed: 0,name,height,weight
0,alex,0.04,23.25
1,ajay,0.05,28.08
2,alice,0.04,31.48
3,ravi,0.04,29.28
4,joe,0.04,29.69
5,alex,0.04,23.25
6,ajay,0.05,28.08
7,alice,0.04,31.48
8,ravi,0.04,29.28
9,joe,0.04,29.69


In [68]:
log("Load phase Started")
load(targetfile,transformed_data)
log("Load phase Ended")

In [69]:
log("ETL Job Ended")

# Exercise


Using the example above complete the exercise below.


## Download Files


In [70]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/datasource.zip

--2021-06-08 05:34:09--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/datasource.zip
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4249 (4.1K) [application/zip]
Saving to: ‘datasource.zip’


2021-06-08 05:34:09 (732 MB/s) - ‘datasource.zip’ saved [4249/4249]



## Unzip Files


In [71]:
!unzip datasource.zip -d dealership_data

Archive:  datasource.zip
  inflating: dealership_data/used_car_prices1.csv  
  inflating: dealership_data/used_car_prices2.csv  
  inflating: dealership_data/used_car_prices3.csv  
  inflating: dealership_data/used_car_prices1.json  
  inflating: dealership_data/used_car_prices2.json  
  inflating: dealership_data/used_car_prices3.json  
  inflating: dealership_data/used_car_prices1.xml  
  inflating: dealership_data/used_car_prices2.xml  
  inflating: dealership_data/used_car_prices3.xml  


## About the Data


The file `dealership_data` contains CSV, JSON, and XML files for used car data which contain features named `car_model`, `year_of_manufacture`, `price`, and `fuel`.


## Set Paths


In [72]:
tmpfile    = "dealership_temp.tmp"               # file used to store all extracted data
logfile    = "dealership_logfile.txt"            # all event logs will be stored in this file
targetfile = "dealership_transformed_data.csv"   # file where transformed data is stored

## Extract


### CSV Extract Function


In [73]:
# Add the CSV extract function below
def csv_files(data):
  dataframe= pd.read_csv(data)
  return dataframe

<details><summary>Click here for the solution</summary>
    
```
    
def extract_from_csv(file_to_process):
    dataframe = pd.read_csv(file_to_process)
    return dataframe
```
</details>


### JSON Extract Function


In [77]:
def json_file(data):
  dataframe = pd.read_json(data,lines=1)
  return dataframe

<details><summary>Click here for the solution</summary>
    
```
    
def extract_from_json(file_to_process):
    dataframe = pd.read_json(file_to_process,lines=True)
    return dataframe
```
</details>


### XML Extract Function


In [103]:
tree = ET.parse('dealership_data/used_car_prices1.xml')
root = tree.getroot()
for i in root:
  print(i.find('car_model').text)
  print(i.find('fuel').text)


corolla altis
Petrol
etios cross
Petrol
fortuner
Diesel
fortuner
Diesel
fortuner
Diesel
etios liva
Diesel
innova
Petrol
fortuner
Diesel
corolla altis
Petrol
corolla altis
Petrol


In [115]:
# Add the XML extract function below, it is the same as the xml extract function above but the column names need to be renamed.
def xml_file(data):
  dataframe = pd.DataFrame(columns=['car_model','year_of_manufacture','price','fuel'])
  tree = ET.parse(data)
  root = tree.getroot()
  for car in root:
    dataframe = dataframe.append({'car_model': car.find('car_model').text,'year_of_manufacture':int(car.find('year_of_manufacture').text),'price':float(car.find('price').text),'fuel':car.find('fuel').text},ignore_index=1)
  return dataframe

In [116]:
xml_file('dealership_data/used_car_prices1.xml')

Unnamed: 0,car_model,year_of_manufacture,price,fuel
0,corolla altis,2013,10373.134328,Petrol
1,etios cross,2015,6716.41791,Petrol
2,fortuner,2014,27985.074627,Diesel
3,fortuner,2015,35074.626866,Diesel
4,fortuner,2017,49253.731343,Diesel
5,etios liva,2014,7089.552239,Diesel
6,innova,2017,29477.61194,Petrol
7,fortuner,2010,13805.970149,Diesel
8,corolla altis,2011,6492.537313,Petrol
9,corolla altis,2016,21268.656716,Petrol


In [117]:
def extract():
    extracted_data = pd.DataFrame(columns=['car_model','year_of_manufacture','price', 'fuel']) # create an empty data frame to hold extracted data
    
    #process all csv files
    for csvfile in glob.glob("dealership_data/*.csv"):
        extracted_data = extracted_data.append(csv_files(csvfile), ignore_index=True)
        
    #process all json files
    for jsonfile in glob.glob("dealership_data/*.json"):
        extracted_data = extracted_data.append(json_file(jsonfile), ignore_index=True)
    
    #process all xml files
    for xmlfile in glob.glob("dealership_data/*.xml"):
        extracted_data = extracted_data.append(xml_file(xmlfile), ignore_index=True)
        
    return extracted_data

## Transform


In [121]:
# Add the transform function below
def transform(data):
  data['price'] = round(data['price'],2)
  return data

## Loading


In [None]:
# Add the load function below
def load(targetfile,transformed_data):
  transformed_data.to_csv(targetfile)
  

## Logging


In [127]:
# Add the log function below
def log(message):
  time = datetime.now()
  time_stamp = time.strftime('%D-%M-%Y-%H:%M:%S')
  with open(logfile,'a') as f:
    f.write(time_stamp+ ',' + message + '\n')

## Running ETL Process


In [128]:
# Log that you have started the ETL process
log('ETL begin')

# Log that you have started the Extract step
log('Extraction Started')
# Call the Extract function
extract_data = extract()
# Log that you have completed the Extract step
log('Extraction Ended')

# Log that you have started the Transform step

log('transform started')
# Call the Transform function
transformed = transform(extract_data)
# Log that you have completed the Transform step

log('transformation ended')

# Log that you have started the Load step
log('loading started')

# Call the Load function
load(targetfile,transformed)
# Log that you have completed the Load step
log('loading ended')

# Log that you have completed the ETL process
log('ETL process ended')