1.Imports

In [1]:
!pip install pymongo
import os
import requests
import pandas as pd
from datetime import datetime
from pymongo import MongoClient
from pymongo.server_api import ServerApi
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy import create_engine, text, MetaData, Table
from dotenv import load_dotenv
load_dotenv()



2. Request to API

In [2]:
#API-request to exchangerate.host and parse
API_KEY= os.environ['exchange_api']
url='http://api.exchangerate.host/live'
params={
    'access_key': API_KEY,
    'currencies':'USD,EUR,UAH',
    'source': 'USD'
}

response= requests.get(url,params=params)
response.raise_for_status()

data=response.json()
timestamp = data.get('timestamp', int(datetime.utcnow().timestamp()))
date = datetime.utcfromtimestamp(timestamp)
ingested_at = datetime.utcnow() #Create column with date & time when it was saved
quotes = data.get('quotes', {})
source_currency = data.get('source', 'USD')

3. Connect to MongoDB

In [3]:
#Connect to MongoDB
mongo_pass = os.environ["Mongo_Pass"]
Atlas_url = f'mongodb+srv://Rubik:{mongo_pass}@datalabed.llfr6ac.mongodb.net/?retryWrites=true&w=majority&appName=DataLabED'

client = MongoClient(Atlas_url, server_api=ServerApi('1'))
db = client['exchange_rate_practic']
collection = db['exchange']

4.Insert Data into MongoDB

In [4]:
#Save data to MongoDB
saved_count=0

for pair,rate in quotes.items():
    target_currency=pair.replace(source_currency,'')

    exists=collection.find_one({
        'source':source_currency,
        'target':target_currency,
        'date': date
    })
    if not exists:
        collection.insert_one({
            'source':source_currency,
            'target':target_currency,
            'rate': rate,
            'date':date,
            'ingested_at': datetime.utcnow()
        })
        saved_count+=1


print(f"✅ Збережено {saved_count} нових курсів на дату {date.date()}")

✅ Збережено 2 нових курсів на дату 2025-07-13


5. Transform data from Mongo to CSV

In [5]:
#Transform to DataFrame
docs=list(collection.find({},{'_id':0})) #without _id to avoid breaking the CSV/table
if docs:
    df=pd.DataFrame(docs)   
    print(df.head())
    #Export to CSV
    df.to_csv('exchange_rates.csv',index=False)
    print('✅ Дані збережено у exchange_rates.csv')

else:
    print('⚠️Колекція пуста')

  source target       rate                date             ingested_at
0    USD    EUR   0.855404 2025-07-13 12:22:14 2025-07-13 12:22:52.287
1    USD    UAH  41.770787 2025-07-13 12:22:14 2025-07-13 12:22:52.342
2    USD    EUR   0.855398 2025-07-13 18:58:04 2025-07-13 18:58:55.084
3    USD    UAH  41.770254 2025-07-13 18:58:04 2025-07-13 18:58:55.147
4    USD    EUR   0.857060 2025-07-13 22:23:04 2025-07-13 22:23:52.050
✅ Дані збережено у exchange_rates.csv


6. Create PostgreSQL table and Index

In [6]:
#AlwaysData connecting params 
ad_name=os.environ['ad_name']
ad_host=os.environ['ad_host']
ad_pass=os.environ['ad_pass']

#Connecting to PosgreSQL
engine=create_engine(f"postgresql://rubi:{ad_pass}@{ad_host}/{ad_name}")

#Create table and unique index
create_table='''
    CREATE TABLE IF NOT EXISTS exchange_rates(
    source TEXT,
    target TEXT,
    rate FLOAT,
    date DATE,
    ingested_at TIMESTAMP
);
'''

create_index='''
CREATE UNIQUE INDEX IF NOT EXISTS unique_exchange_rate ON exchange_rates(source,target,date);
'''

with engine.begin() as conn:
    conn.execute(text(create_table))
    conn.execute(text(create_index))
print("table and index has been created")

table and index has been created


7.Insert or Upsert data into PostgreSQL, using ON CONFLICT logic

In [7]:
#Insert and Upsert data from DataFrame
metadata=MetaData()
exchange_rates_table=Table('exchange_rates',metadata,autoload_with=engine)

with engine.begin() as conn:
    for _, row in df.iterrows():
        stmt=insert(exchange_rates_table).values(
            source=row['source'],
            target=row['target'],
            rate=row['rate'],
            date=row['date'],
            ingested_at=datetime.utcnow()
        )
        stmt=stmt.on_conflict_do_update(
            index_elements=['source','target','date'],
            set_={
                'rate':row['rate'],
                'ingested_at': datetime.utcnow()
            }
        )
        conn.execute(stmt)
print('Дані оновлено або додано в PostgreSQL')

Дані оновлено або додано в PostgreSQL


8. Validate

In [8]:
#Cheking result
with engine.connect() as conn:
    result=conn.execute(text('SELECT * FROM exchange_rates'))
    rows=result.mappings().all()
    for row in rows:
        print(dict(row))

{'source': 'USD', 'target': 'EUR', 'rate': 0.85691, 'date': datetime.date(2025, 7, 13), 'ingested_at': datetime.datetime(2025, 7, 13, 22, 27, 3, 743372)}
{'source': 'USD', 'target': 'UAH', 'rate': 41.770254, 'date': datetime.date(2025, 7, 13), 'ingested_at': datetime.datetime(2025, 7, 13, 22, 27, 3, 765344)}
