In [3]:
#import libraries for the project
import pandas as pd
import requests 
import json
from datetime import datetime, timezone
import sqlalchemy
from sqlalchemy import create_engine
from google.cloud import bigquery
import os
import pyarrow


In [4]:
#To find the Service Account Key location in this project
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "D:\\Python\\Financial_data_pipeline\\exchange-rate-pipeline-476115-5100605ea811.json"

In [5]:
#The api_token to connect with exchangerate.host
api_token = "8ed177bf655d362a495251f500f37a35"

In [6]:
#Extracting the currencies rate from exchangerate.host
def extract_curr(base="IDR"):
  url = f"https://api.exchangerate.host/live?access_key={api_token}&source={base}"
  response = requests.get(url)
  data = response.json()

  return data

In [7]:
def transform_curr(data):
    rates = data["quotes"]

    #Transform the json to dataframes
    df = pd.DataFrame(rates.items(), columns = ["Target_Currency","Exchange_Rate"])

    #Adding and formatting tables for bettter data insight
    df["Target_Currency"] = df["Target_Currency"].str.replace("IDR","")
    df["Base_Currency"] = data["source"]
    df["Date"] = datetime.fromtimestamp(data["timestamp"], tz = timezone.utc).strftime('%Y-%m-%d')
    df["Rate_in_IDR"] = 1 / df["Exchange_Rate"]
    
    #Opening the currency_list.json
    with open("currency_list.json","r") as file:
        curr = json.load(file)
    currency_df = pd.DataFrame(curr["currencies"].items(), columns = ["Target_Currency","Description"])

    #Merge the dataframes by Target_Currency    
    df = df.merge(currency_df, on = "Target_Currency", how = "left")

    #Created the Created_at column to get the information when the data is extracted
    df["Created_at"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    #Format the columns' data types
    df = df.astype({
        "Date" : "datetime64[ns]",
        "Target_Currency" : "string",
        "Exchange_Rate" : "float64",
        "Rate_in_IDR" : "float64",
        "Base_Currency" : "string",
        "Description" : "string",
        "Created_at" : "datetime64[ns]"
    })

    #Reorder the columns
    column_order = ['Date','Target_Currency','Description','Exchange_Rate','Rate_in_IDR','Base_Currency','Created_at']
    df = df[column_order]

    #Return df 
    return df

In [8]:
idr_rate = extract_curr("IDR")

In [9]:
df = transform_curr(idr_rate)

df.head()

Unnamed: 0,Date,Target_Currency,Description,Exchange_Rate,Rate_in_IDR,Base_Currency,Created_at
0,2025-10-26,AED,United Arab Emirates Dirham,0.000221,4524.886878,IDR,2025-10-26 22:07:18
1,2025-10-26,AFN,Afghan Afghani,0.003991,250.563768,IDR,2025-10-26 22:07:18
2,2025-10-26,ALL,Albanian Lek,0.005004,199.840128,IDR,2025-10-26 22:07:18
3,2025-10-26,AMD,Armenian Dram,0.023005,43.468811,IDR,2025-10-26 22:07:18
4,2025-10-26,ANG,Netherlands Antillean Guilder,0.000108,9259.259259,IDR,2025-10-26 22:07:18


In [10]:
print(df.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 171 entries, 0 to 170
Data columns (total 7 columns):
 #   Column           Non-Null Count  Dtype         
---  ------           --------------  -----         
 0   Date             171 non-null    datetime64[ns]
 1   Target_Currency  171 non-null    string        
 2   Description      171 non-null    string        
 3   Exchange_Rate    171 non-null    float64       
 4   Rate_in_IDR      171 non-null    float64       
 5   Base_Currency    171 non-null    string        
 6   Created_at       171 non-null    datetime64[ns]
dtypes: datetime64[ns](2), float64(2), string(3)
memory usage: 9.5 KB
None


In [11]:
def load_to_bigquery(df,table_name):
    client = bigquery.Client()

    #Define the project, dataset, and table
    project_id = "exchange-rate-pipeline-476115"
    dataset_id = "currency_data"
    table_id = table_name

    table_ref = f"{project_id}.{dataset_id}.{table_id}"

    try:
        job_config = bigquery.LoadJobConfig(
            write_disposition = bigquery.WriteDisposition.WRITE_APPEND,
        )

        #Load the dataframe to bigquery's table
        job = client.load_table_from_dataframe(df,table_ref,job_config = job_config)
        job.result()

        table = client.get_table(table_ref)
        print("Data berhasil dimuat", table)

    except Exception as e:
        print("Data gagal dimuat", e)

In [14]:
table_exchange_rate = "exchange_rate"

df = load_to_bigquery(df,table_exchange_rate)



Data berhasil dimuat exchange-rate-pipeline-476115.currency_data.exchange_rate


In [12]:
# def load_to_mysql(df):
#   #connect the MySQL database
#   usr = 'root'
#   pwd = 'root'
#   host = '127.0.0.1'
#   port = 3306
#   dbName = 'currency'
#   tableName = 'exchange_rate'
#   try:
#     engine = create_engine(f"mysql+mysqldb://{usr}:{pwd}@{host}:{port}/{dbName}",
#                            echo=True,
#                            future=True)
#     #Send the dataframes to MySQL table
#     df.to_sql(name=tableName, con=engine, if_exists="append", index=False)

#     print("Data berhasil ditambahkan ke tabel 'exchange_rate' di MySQL")
#   except Exception as e:
#     print("Gagal memuat data ke mysql:",e)

In [13]:
# df = load_to_mysql(df)