In [1]:
import pandas as pd
import time
from datetime import datetime, timezone, timedelta
from sqlalchemy import create_engine, Column, Float, String, Integer, DateTime, func
from sqlalchemy.orm import sessionmaker, declarative_base
from polygon import RESTClient
import pymongo

In [2]:
polygon_api = 'beBybSi8daPgsTp5yx5cHtHpYcrjp5Jq'

sql_database_path = 'fx_data.db'

csv_output_path = 'fx_rates.csv'

mongo_db_url = "mongodb://localhost:27017/"
mongo_db_name = "fx_data_mg"
mongo_auxiliary_collection_name = "auxiliary"
mongo_final_collection_name = "final"

fx_list = [("EUR", "USD"), ("GBP", "USD"), ("USD", "CHF")]

In [3]:
polygon = RESTClient(polygon_api)

In [4]:
Base = declarative_base()

class aux_data(Base):
    __tablename__ = 'aux_data'
    id = Column(Integer, primary_key = True)
    fx_timestamp = Column(DateTime)
    pair = Column(String)
    rate = Column(Float)
    entry_timestamp = Column(DateTime, default=datetime.now)
    n = Column(Integer, default=0)

    def __init__(self, pair, rate,
                fx_timestamp,
                n):
        super().__init__()
        self.pair = pair
        self.rate = rate
        self.fx_timestamp = fx_timestamp
        self.n = n
        
class final_data(Base):
    __tablename__ = 'final_data'
    id = Column(Integer, primary_key = True)
    pair = Column(String)
    fx_timestamp = Column(DateTime)
    entry_timestamp = Column(DateTime, default=datetime.now)
    max_rate = Column(Float)
    min_rate = Column(Float)
    mean_rate = Column(Float)
    kb_upper = Column(Float)
    kb_lower = Column(Float)
    vol = Column(Float)
    fd = Column(Float)

    def __init__(self, pair, fx_timestamp,
                max_rate, min_rate, mean_rate,
                kb_upper, kb_lower,
                vol, fd):
        super().__init__()
        self.pair = pair
        self.fx_timestamp = fx_timestamp
        self.max_rate = max_rate
        self.min_rate = min_rate
        self.mean_rate = mean_rate
        self.kb_upper = kb_upper
        self.kb_lower = kb_lower
        self.vol = vol
        self.fd = fd
    

engine = create_engine(f'sqlite:///{sql_database_path}')
Base.metadata.create_all(engine)

Session = sessionmaker(bind= engine)
session = Session()

In [5]:
mongo = pymongo.MongoClient(mongo_db_url)
mongo_db = mongo[mongo_db_name]
mongo_auxiliary_collection = mongo_db[mongo_auxiliary_collection_name]
mongo_final_collection = mongo_db[mongo_final_collection_name]

In [6]:
def get_and_store_6_min_fx_data():
    start_time = datetime.now(timezone.utc)
    end_time = start_time + timedelta(seconds=360)
    log_time = time.time()

    while (datetime.now(timezone.utc)<end_time):
        iteration_start_time = time.time()

        for base_c, quote_c in fx_list:
            raw_data = polygon.get_real_time_currency_conversion(base_c,quote_c)
            if raw_data is None:
                print('failed to fetch data '+str(datetime.now()))
                break

            fx_timestamp = datetime.fromtimestamp(raw_data.last.timestamp / 1000,tz= timezone.utc)
            rate = raw_data.converted
            pair = f'{base_c}:{quote_c}'
            latest_entry = session.query(final_data).filter(final_data.pair == pair).order_by(final_data.entry_timestamp.desc()).first()
            n = 0
            if latest_entry == None:
                n = 0
            else:
                if rate > latest_entry.kb_upper or rate < latest_entry.kb_lower :
                    latest_entry_aux = session.query(aux_data).filter(aux_data.pair == pair).order_by(aux_data.entry_timestamp.desc()).first()
                    if latest_entry_aux == None:
                        n=1
                    elif latest_entry_aux.n == 1:
                        n=0
                    else:
                        n=1
            
            fx_sql_data = aux_data(pair,rate,fx_timestamp,n)
            session.add(fx_sql_data)

            fx_mongo_data = {'pair' : pair, 'fx_timestamp' : fx_timestamp, 'rate' : rate, 'n' : n}
            mongo_auxiliary_collection.insert_one(fx_mongo_data)

            try:
                session.commit()
            except:
                session.rollback()
                print('aux_data commit error')

            iteration_end_time = time.time()
            if(iteration_end_time - iteration_start_time) < 1:
                time.sleep(1-(iteration_end_time - iteration_start_time))
            if((iteration_end_time - log_time)>10):
                log_time = time.time()
                print('program running '+str(datetime.fromtimestamp(log_time)))
            
    store_info_to_final_database(start_time,end_time)

In [7]:
def store_info_to_final_database(start_time, end_time):
    n=10
    for base, quote in fx_list:
        pair = f'{base}:{quote}'
        max_rate = session.query(func.max(aux_data.rate))\
            .filter(aux_data.fx_timestamp>=start_time, 
                    aux_data.fx_timestamp<=end_time,
                    aux_data.pair == f'{base}:{quote}')\
            .scalar()
        min_rate = session.query(func.min(aux_data.rate))\
            .filter(aux_data.fx_timestamp>=start_time, 
                    aux_data.fx_timestamp<=end_time,
                    aux_data.pair == f'{base}:{quote}')\
            .scalar()
        sum_rate = session.query(func.sum(aux_data.rate))\
            .filter(aux_data.fx_timestamp>=start_time, 
                    aux_data.fx_timestamp<=end_time,
                    aux_data.pair == f'{base}:{quote}')\
            .scalar()
        count_rate = session.query(func.count(aux_data.rate))\
            .filter(aux_data.fx_timestamp>=start_time, 
                    aux_data.fx_timestamp<=end_time,
                    aux_data.pair == f'{base}:{quote}')\
            .scalar()
        timestamp = end_time
        mean_rate = sum_rate/count_rate
        kb_upper = mean_rate + n*0.00001*mean_rate
        kb_lower = mean_rate - n*0.00001*mean_rate
        vol = (max_rate-min_rate)/mean_rate
        fd = session.query(func.count(aux_data.n))\
            .filter(aux_data.fx_timestamp>=start_time, 
                    aux_data.fx_timestamp<=end_time,
                    aux_data.pair == f'{base}:{quote}')\
            .scalar()
        
        fx_sql_data = final_data(pair,timestamp,max_rate,min_rate,mean_rate,kb_upper,kb_lower,vol,fd)
        session.add(fx_sql_data)

        fx_mongo_data = {'pair' : pair, 'fx_timestamp' : timestamp, 'max_rate' : max_rate,'min_rate': min_rate,'mean_rate':mean_rate,'kb_upper':kb_upper,'kb_lower':kb_lower,'vol':vol,'fd':fd}
        mongo_final_collection.insert_one(fx_mongo_data)

        try:
            session.commit()
        except:
            session.rollback()
    normalize_final_data(start_time,end_time)

In [8]:
def normalize_final_data(start_time, end_time):
    for base,quote in fx_list:
        max_fd_value = session.query(func.max(final_data.fd))\
            .filter(final_data.fx_timestamp>=start_time, 
                    final_data.fx_timestamp<=end_time,
                    final_data.pair == f'{base}:{quote}')\
            .scalar()
        if max_fd_value is not None:  # 确保最大值非空
            session.query(final_data)\
            .filter(final_data.fx_timestamp>=start_time, 
                    final_data.fx_timestamp<=end_time,
                    final_data.pair == f'{base}:{quote}')\
            .update({final_data.fd: final_data.fd / max_fd_value})
        else:
            print('fd update failed')
        
        max_vol_value = session.query(func.max(final_data.vol))\
            .filter(final_data.fx_timestamp>=start_time, 
                    final_data.fx_timestamp<=end_time,
                    final_data.pair == f'{base}:{quote}')\
            .scalar()
        if max_vol_value is not None:  # 确保最大值非空
            session.query(final_data)\
            .filter(final_data.fx_timestamp>=start_time, 
                    final_data.fx_timestamp<=end_time,
                    final_data.pair == f'{base}:{quote}')\
            .update({final_data.vol: final_data.vol / max_vol_value})
        else:
            print('vol update failed')

    session.commit()

In [None]:
for i in range(50):
    get_and_store_6_min_fx_data()

In [10]:
pd.read_sql('final_data', engine).to_csv('final_data.csv')

In [None]:
#(pd.read_sql('aux_data', engine)[pd.read_sql('aux_data', engine).pair == 'EUR:USD'].rate != 1.11).sum()

0

In [None]:
#pd.read_sql('final_data', engine)

Unnamed: 0,id,pair,fx_timestamp,entry_timestamp,max_rate,min_rate,mean_rate,kb_upper,kb_lower,vol,fd
0,1,EUR:USD,2025-04-04 02:20:55.570120,2025-04-03 22:20:56.191760,1.11,1.11,1.11,1.110111,1.109889,,1.0
1,2,GBP:USD,2025-04-04 02:20:55.570120,2025-04-03 22:20:56.197797,1.31,1.31,1.31,1.310131,1.309869,,1.0
2,3,USD:CHF,2025-04-04 02:20:55.570120,2025-04-03 22:20:56.202606,0.86,0.86,0.86,0.860086,0.859914,,1.0
3,4,EUR:USD,2025-04-04 02:26:56.211492,2025-04-03 22:26:57.266420,1.11,1.11,1.11,1.110111,1.109889,,1.0
4,5,GBP:USD,2025-04-04 02:26:56.211492,2025-04-03 22:26:57.272520,1.31,1.31,1.31,1.310131,1.309869,,1.0
5,6,USD:CHF,2025-04-04 02:26:56.211492,2025-04-03 22:26:57.277941,0.86,0.86,0.86,0.860086,0.859914,,1.0
6,7,EUR:USD,2025-04-04 02:32:57.284854,2025-04-03 22:32:57.729757,1.11,1.11,1.11,1.110111,1.109889,,1.0
7,8,GBP:USD,2025-04-04 02:32:57.284854,2025-04-03 22:32:57.759445,1.31,1.31,1.31,1.310131,1.309869,,1.0
8,9,USD:CHF,2025-04-04 02:32:57.284854,2025-04-03 22:32:57.765861,0.86,0.86,0.86,0.860086,0.859914,,1.0
9,10,EUR:USD,2025-04-04 02:38:57.775972,2025-04-03 22:50:10.414436,1.11,1.11,1.11,1.110111,1.109889,,1.0


In [None]:
# attributes = dir(raw_data)
# get dict-like obj's val
# obj_dict = {attr: getattr(raw_data, attr) for attr in attributes if not attr.startswith('__')}
# print(obj_dict)

{'converted': 1.1, 'from_': None, 'from_dict': <function RealTimeCurrencyConversion.from_dict at 0x115101000>, 'initial_amount': 1, 'last': ForexQuote(ask=1.10337, bid=1.10329, exchange=48, timestamp=1743707327000), 'to': 'USD'}
