In [1]:
import glob
import polars as pl
from datetime import datetime
import json

In [2]:
!wget -P 03_ETL-data/source/ https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/bank_market_cap_1.json
!wget -P 03_ETL-data/source/ https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/bank_market_cap_2.json
!wget -P 03_ETL-data/source/ https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Final%20Assignment/exchange_rates.csv

--2023-06-15 10:09:04--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/bank_market_cap_1.json
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... 

connected.
HTTP request sent, awaiting response... 200 OK
Length: 2815 (2,7K) [application/json]
Saving to: ‘03_ETL-data/source/bank_market_cap_1.json’


2023-06-15 10:09:05 (751 MB/s) - ‘03_ETL-data/source/bank_market_cap_1.json’ saved [2815/2815]

--2023-06-15 10:09:05--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/bank_market_cap_2.json
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1429 (1,4K) [application/json]
Saving to: ‘03_ETL-data/source/bank_market_cap_2.json’


2023-06-15 10:09:06 (436 MB/s) - ‘03_ETL-data/source/bank_market_cap_

In [3]:
folder_path = "03_ETL-data"
sourcefolder  = f"{folder_path}/source/"               
logfile    = f"{folder_path}/logfile.txt"            
target_file = f"{folder_path}/bank_market_cap_gbp.csv"

#### Extract

In [4]:
def extract_from_json(file_to_process, columns):
    with open(file_to_process) as file:
        data = json.load(file)   

    column_values = [data[columns[0]], data[columns[1]]]

    unwrapped_data = {}

    for column in columns:
        unwrapped_data[column] = [value for _, value in column_values[columns.index(column)].items()]

    df = pl.DataFrame(unwrapped_data)


    return df

In [5]:
def extract_from_csv(file_to_process):
    df = pl.read_csv(file_to_process)
    return df

In [6]:
def extract(path):

    print(path)

    csv_columns = ['Symbol','Rates']

    json_columns = ['Name','Market Cap (US$ Billion)']
    
    exchange_rates = None
    bank_market = None


    for csvfile in glob.glob(path + "/*.csv"):
        extract_csv = extract_from_csv(csvfile)
        exchange_rates = pl.DataFrame({
                csv_columns[0]:   extract_csv[extract_csv.columns[0]],
                csv_columns[1]: extract_csv['Rates']
            })
    

    for jsonfile in glob.glob(path + "/*.json"):
        extract_json = extract_from_json(jsonfile, json_columns)
        try:
            bank_market = extract_json.select(
                    pl.col("Name").cast(pl.Utf8),
                    pl.col("Market Cap (US$ Billion)").cast(pl.Float64)
                )
        except Exception as e:
            print(e)
            

    return exchange_rates, bank_market

## Transform

Using <code>exchange_rate</code> and the `exchange_rates.csv` file find the exchange rate of USD to GBP. Write a transform function that

1. Changes the `Market Cap (US$ Billion)` column from USD to GBP
2. Rounds the Market Cap (US$ Billion)` column to 3 decimal places
3. `Market Cap (US$ Billion)` and  `Market Cap (GBP$ Billion)`


In [7]:
def transform(exchange_rates, bank_market):
    exchange_rate = exchange_rates.filter(pl.col(exchange_rates.columns[0]) == 'GBP')
    exchange_rate_value = exchange_rate['Rates'][0]
    print(f"GBP exchange rate from USD: {exchange_rate_value}")

    bank_market = (
        bank_market
            .with_columns((
                (pl.col('Market Cap (US$ Billion)') * exchange_rate_value)
            ).round(3)
            .alias('Market Cap (GBP$ Billion)'))
        )
    
    return bank_market

#### Load

In [8]:
def load(targetfile, data_to_load):
    data_to_load.write_csv(targetfile)

#### Logging Function

In [9]:
def log(message):
    timestamp_format = '%Y-%h-%d-%H:%M:%S'
    now = datetime.now()
    timestamp = now.strftime(timestamp_format)
    with open(logfile,"a") as f:
        f.write(timestamp + ',' + message + '\n')

#### Running ETL

In [10]:
log("ETL Job Started")

In [11]:
log("Extract phase Started")
exchange_rates, bank_market = extract(sourcefolder)
log("Extract phase Ended")

03_ETL-data/source/


In [12]:
log("Transform phase Started")
transformed_data = transform(exchange_rates, bank_market)
log("Transform phase Ended")
transformed_data.head(5)

GBP exchange rate from USD: 0.7323984208000001


Name,Market Cap (US$ Billion),Market Cap (GBP$ Billion)
str,f64,f64
"""JPMorgan Chase…",390.934,286.319
"""Industrial and…",345.214,252.834
"""Bank of Americ…",325.331,238.272
"""Wells Fargo""",308.013,225.588
"""China Construc…",257.399,188.519


In [13]:
log("Load phase Started")
load(target_file, transformed_data)
log("Load phase Ended")

In [14]:
log("ETL Job Ended")