In [1]:
#Extract

from sqlalchemy import false
from pycoingecko import CoinGeckoAPI
import pandas as pd
import datetime as dt 

cg = CoinGeckoAPI()

class Extract():

    @staticmethod
    def coins():
        try:
            df = pd.DataFrame(cg.get_coins_list(), columns=['id','name','symbol'])
            return df
        except:
            return False
    
    @staticmethod
    def market():
        try:
            df = pd.DataFrame(cg.get_coins_markets(vs_currency='usd'), columns=['id','coinid','current_price','market_cap','price_change_percentage_24h','market_cap_change_percentage_24h','last_updated'])
            return df
        except:
            return False

    @staticmethod
    def trending():
        li=[]
        try:
            trend = cg.get_search_trending()
            for item in trend['coins']:
                li.append(item['item'])
            df = pd.DataFrame(li)
            return df
        except:
            return False

    @staticmethod
    def coins_history(number_of_days):
        try:
            list_of_coins= ['bitcoin','litecoin','ethereum']
            list_his=[]
            start_date = pd.to_datetime("today") - pd.Timedelta(number_of_days, unit='D')
            for coin in list_of_coins:
                day = start_date
                while day.date() <= (pd.to_datetime("today").date()):
                    data = cg.get_coin_history_by_id(id=coin,date=day.date().strftime("%d-%m-%Y"), localization='false')
                    data['date']= day.date().strftime("%d-%m-%Y")
                    list_his.append(data)
                    day = day + pd.Timedelta(1, unit='D')
            
            df = pd.DataFrame(list_his)
            df=df[['id','symbol','name','market_data','date']]
            df['current_price'] = pd.json_normalize(df['market_data'])['current_price.usd']
            df['market_cap'] = pd.json_normalize(df['market_data'])['market_cap.usd']
            df.drop(['market_data'], axis = 1)
            df['ingestion_date'] = pd.to_datetime("today").date().strftime("%d-%m-%Y")
            
            return df
        except:
            return False

In [2]:
#env var

db_user="postgres"
db_password="postgres"
db_server_name="localhost"
db_database_name="crypto"

In [3]:
#database

from sqlalchemy.engine import URL
from sqlalchemy import create_engine


connection_url = URL.create(
    drivername = "postgresql+pg8000", 
    username = db_user,
    password = db_password,
    host = db_server_name, 
    port = 5433, 
    database = db_database_name, 
)

engine = create_engine(connection_url)

In [4]:
#Load

from sqlalchemy import Integer, String, Float, JSON , DateTime, Boolean, BigInteger, Numeric
from sqlalchemy import Table, Column, Integer, String, MetaData, Float, JSON 
from sqlalchemy.dialects import postgresql
import logging


def overwrite_to_database(df: pd.DataFrame,table_name:str,engine)->bool:
    logging.info(f"Writing to table: {table_name}")
    df.to_sql(name=table_name, con=engine, if_exists="replace", index=False)
    logging.info(f"Successful write to table: {table_name}, rows inserted/updated: {len(df)}")
    
# def upsert((df: pd.DataFrame,table_name:str,key_columns: str,,ngine)->bool:
    

In [5]:
#Extrac_Load_pipeline

def run(tablename):
    
    if table_name == 'coins':
#         df = Extract.coins()
#     elif table_name == 'market':
#         df = Extract.market()
#     elif table_name == 'trending':
#         df = Extract.trending()
    elif table_name == 'coins_history':
        df = Extract.coins_history(1)
    
    overwrite_to_database(df,tablename,engine)

In [6]:
#main pipeline

tables = ['bitcoin','litecoin','ethereum','solana' ,'umee','terra-luna','evmos','dejitaru-tsuka','reserve-rights-token','insights-network']
for table_name in tables:
    run(table_name)

In [28]:
from sqlalchemy import Integer, String, Float, JSON , DateTime, Boolean, BigInteger, Numeric


def get_sqlalchemy_column(column_name:str , source_datatype:str, primary_key:bool=False)->Column:
    """
    A helper function that returns a SQLAlchemy column by mapping a pandas dataframe datatypes to sqlalchemy datatypes 
    """
    dtype_map = {
        "int64": BigInteger, 
        "object": String, 
        "datetime64[ns]": DateTime, 
        "float64": Numeric,
        "bool": Boolean
    }
    column = Column(column_name, dtype_map[source_datatype], primary_key=primary_key) 
    return column

def generate_sqlalchemy_schema(df: pd.DataFrame, key_columns:list, table_name, meta): 
    """
    Generates a sqlalchemy table schema that shall be used to create the target table and perform insert/upserts. 
    """
    schema = []
    for column in [{"column_name": col[0], "source_datatype": col[1]} for col in zip(df.columns, [dtype.name for dtype in df.dtypes])]:
        schema.append(get_sqlalchemy_column(**column, primary_key=column["column_name"] in key_columns))
    return Table(table_name, meta, *schema)

In [17]:
list_of_coins= ['bitcoin','litecoin','ethereum']
list_his=[]
start_date = pd.to_datetime("today") - pd.Timedelta(1, unit='D')
for coin in list_of_coins:
    day = start_date
    while day.date() <= (pd.to_datetime("today").date()):
        data = cg.get_coin_history_by_id(id=coin,date=day.date().strftime("%d-%m-%Y"), localization='false')
        data['date']= day.date().strftime("%d-%m-%Y")
        list_his.append(data)
        day = day + pd.Timedelta(1, unit='D')
list_his

[{'id': 'bitcoin',
  'symbol': 'btc',
  'name': 'Bitcoin',
  'image': {'thumb': 'https://assets.coingecko.com/coins/images/1/thumb/bitcoin.png?1547033579',
   'small': 'https://assets.coingecko.com/coins/images/1/small/bitcoin.png?1547033579'},
  'market_data': {'current_price': {'aed': 71539.14092565479,
    'ars': 2868979.3148887656,
    'aud': 30437.450176345785,
    'bch': 162.14897102594807,
    'bdt': 1973982.4008735027,
    'bhd': 7348.138004580836,
    'bmd': 19476.92659600407,
    'bnb': 68.50428772440938,
    'brl': 105418.30037000058,
    'btc': 1.0,
    'cad': 26937.660713236437,
    'chf': 19227.057104703876,
    'clp': 18853859.714197844,
    'cny': 138597.80965716497,
    'czk': 488637.1149636236,
    'dkk': 147784.10208616982,
    'dot': 3080.136426680634,
    'eos': 16472.66840517705,
    'eth': 14.648344115247351,
    'eur': 19870.360513243373,
    'gbp': 17468.095863818824,
    'hkd': 152889.10193161594,
    'huf': 8408038.830144707,
    'idr': 298069041.5472674,
   

In [18]:
import pandas as pd
import datetime

df = pd.DataFrame(list_his)

df['id_date']= df['id'].astype(str)+'_'+df['date'].astype(str)

df=df[['id_date','id','symbol','name','market_data','date']]

df['current_price'] = pd.json_normalize(df['market_data'])['current_price.usd']

df['market_cap'] = pd.json_normalize(df['market_data'])['market_cap.usd']

df = df.drop(['market_data','symbol','name'], axis = 1)

df['ingestion_date'] = pd.to_datetime("today").date().strftime("%d-%m-%Y")

display(df)

Unnamed: 0,id_date,id,date,current_price,market_cap,ingestion_date
0,bitcoin_01-10-2022,bitcoin,01-10-2022,19476.926596,373222100000.0,02-10-2022
1,bitcoin_02-10-2022,bitcoin,02-10-2022,19314.463835,370204700000.0,02-10-2022
2,litecoin_01-10-2022,litecoin,01-10-2022,53.437004,3814321000.0,02-10-2022
3,litecoin_02-10-2022,litecoin,02-10-2022,52.927788,3773067000.0,02-10-2022
4,ethereum_01-10-2022,ethereum,01-10-2022,1329.146032,160463000000.0,02-10-2022
5,ethereum_02-10-2022,ethereum,02-10-2022,1311.263324,158410100000.0,02-10-2022


In [16]:
# overwrite_to_database(df,'coins_history',engine)

In [29]:
if len(df) > 0 :
    key_columns = Load.get_key_columns(table='coins_history', path='D:\DataEngineering Bootcamp\Project1\project-one\src\crypto\models\extract')
    print(key_columns)
    meta = MetaData()
    table_schema = Load.generate_sqlalchemy_schema(df=df, key_columns=key_columns,table_name='coins_history', meta=meta)
    print(table_schema)
    meta.create_all(engine)
    insert_statement = postgresql.insert(table_schema).values(df.to_dict(orient='records'))
#     print(insert_statement)
    upsert_statement = insert_statement.on_conflict_do_update(
                index_elements=key_columns,
                set_={c.key: c for c in insert_statement.excluded if c.key not in key_columns})
    print(upsert_statement)
    result = engine.execute(upsert_statement)
    print(f"Insert/updated rows: {result.rowcount}")

['id_date']
coins_history
INSERT INTO coins_history (id_date, id, date, current_price, market_cap, ingestion_date) VALUES (%(id_date_m0)s, %(id_m0)s, %(date_m0)s, %(current_price_m0)s, %(market_cap_m0)s, %(ingestion_date_m0)s), (%(id_date_m1)s, %(id_m1)s, %(date_m1)s, %(current_price_m1)s, %(market_cap_m1)s, %(ingestion_date_m1)s), (%(id_date_m2)s, %(id_m2)s, %(date_m2)s, %(current_price_m2)s, %(market_cap_m2)s, %(ingestion_date_m2)s), (%(id_date_m3)s, %(id_m3)s, %(date_m3)s, %(current_price_m3)s, %(market_cap_m3)s, %(ingestion_date_m3)s), (%(id_date_m4)s, %(id_m4)s, %(date_m4)s, %(current_price_m4)s, %(market_cap_m4)s, %(ingestion_date_m4)s), (%(id_date_m5)s, %(id_m5)s, %(date_m5)s, %(current_price_m5)s, %(market_cap_m5)s, %(ingestion_date_m5)s) ON CONFLICT (id_date) DO UPDATE SET id = excluded.id, date = excluded.date, current_price = excluded.current_price, market_cap = excluded.market_cap, ingestion_date = excluded.ingestion_date


ProgrammingError: (pg8000.dbapi.ProgrammingError) {'S': 'ERROR', 'V': 'ERROR', 'C': '42P10', 'M': 'there is no unique or exclusion constraint matching the ON CONFLICT specification', 'F': 'd:\\pginstaller.auto\\postgres.windows-x64\\src\\backend\\optimizer\\util\\plancat.c', 'L': '828', 'R': 'infer_arbiter_indexes'}
[SQL: INSERT INTO coins_history (id_date, id, date, current_price, market_cap, ingestion_date) VALUES (%s, %s, %s, %s, %s, %s), (%s, %s, %s, %s, %s, %s), (%s, %s, %s, %s, %s, %s), (%s, %s, %s, %s, %s, %s), (%s, %s, %s, %s, %s, %s), (%s, %s, %s, %s, %s, %s) ON CONFLICT (id_date) DO UPDATE SET id = excluded.id, date = excluded.date, current_price = excluded.current_price, market_cap = excluded.market_cap, ingestion_date = excluded.ingestion_date]
[parameters: ('bitcoin_01-10-2022', 'bitcoin', '01-10-2022', 19476.92659600407, 373222100680.7475, '02-10-2022', 'bitcoin_02-10-2022', 'bitcoin', '02-10-2022', 19314.463835217448, 370204708063.3375, '02-10-2022', 'litecoin_01-10-2022', 'litecoin', '01-10-2022', 53.43700377767957, 3814320769.585342, '02-10-2022', 'litecoin_02-10-2022', 'litecoin', '02-10-2022', 52.92778750708372, 3773067353.184526, '02-10-2022', 'ethereum_01-10-2022', 'ethereum', '01-10-2022', 1329.1460320707963, 160462955554.32846, '02-10-2022', 'ethereum_02-10-2022', 'ethereum', '02-10-2022', 1311.2633244162857, 158410068981.48102, '02-10-2022')]
(Background on this error at: https://sqlalche.me/e/14/f405)

In [21]:
from sqlalchemy import Integer, String, Float, JSON , DateTime, Boolean, BigInteger, Numeric
from sqlalchemy import Table, Column, Integer, String, MetaData, Float, JSON 
import jinja2 as j2 
import pandas as pd
import numpy as np
import logging 
from sqlalchemy.dialects import postgresql

class Load():

    @staticmethod
    def get_key_columns(table:str, path:str="extract_queries")->list: 
        """
        get a list of key columns from the .sql file. 
        - `table`: name of the sql file without .sql 
        - `path`: path to the sql file 
        """
        # read sql contents into a variable 
        with open(f"{path}/{table}.sql") as f: 
            raw_sql = f.read()
        try: 
            key_columns = j2.Template(raw_sql).make_module().config["key_columns"] # get key columns 
            return key_columns
        except:
            return []
    
    @staticmethod
    def get_sqlalchemy_column(column_name:str , source_datatype:str, primary_key:bool=False)->Column:
        """
        A helper function that returns a SQLAlchemy column by mapping a pandas dataframe datatypes to sqlalchemy datatypes 
        """
        dtype_map = {
            "int64": BigInteger, 
            "object": String, 
            "datetime64[ns]": DateTime, 
            "float64": Numeric,
            "bool": Boolean
        }
        column = Column(column_name, dtype_map[source_datatype], primary_key=primary_key) 
        return column

    @staticmethod
    def generate_sqlalchemy_schema(df: pd.DataFrame, key_columns:list, table_name, meta): 
        """
        Generates a sqlalchemy table schema that shall be used to create the target table and perform insert/upserts. 
        """
        schema = []
        for column in [{"column_name": col[0], "source_datatype": col[1]} for col in zip(df.columns, [dtype.name for dtype in df.dtypes])]:
            schema.append(Load.get_sqlalchemy_column(**column, primary_key=column["column_name"] in key_columns))
        return Table(table_name, meta, *schema)

    @staticmethod
    def upsert_in_chunks(df:pd.DataFrame, engine, table_schema:Table, key_columns:list, chunksize:int=1000)->bool:
        """
        performs the upsert with several rows at a time (i.e. a chunk of rows). this is better suited for very large sql statements that need to be broken into several steps. 
        """
        max_length = len(df)
        df = df.replace({np.nan: None})
        for i in range(0, max_length, chunksize):
            if i + chunksize >= max_length: 
                lower_bound = i
                upper_bound = max_length 
            else: 
                lower_bound = i 
                upper_bound = i + chunksize
            insert_statement = postgresql.insert(table_schema).values(df.iloc[lower_bound:upper_bound].to_dict(orient='records'))
            upsert_statement = insert_statement.on_conflict_do_update(
                index_elements=key_columns,
                set_={c.key: c for c in insert_statement.excluded if c.key not in key_columns})
            logging.info(f"Inserting chunk: [{lower_bound}:{upper_bound}] out of index {max_length}")
            result = engine.execute(upsert_statement)
        return True 

    @staticmethod
    def upsert_all(df:pd.DataFrame, engine, table_schema:Table, key_columns:list)->bool:
        """
        performs the upsert with all rows at once. this may cause timeout issues if the sql statement is very large. 
        """
        insert_statement = postgresql.insert(table_schema).values(df.to_dict(orient='records'))
        upsert_statement = insert_statement.on_conflict_do_update(
            index_elements=key_columns,
            set_={c.key: c for c in insert_statement.excluded if c.key not in key_columns})
        result = engine.execute(upsert_statement)
        logging.info(f"Insert/updated rows: {result.rowcount}")
        return True 

    @staticmethod
    def upsert_to_database(df: pd.DataFrame, table_name: str, key_columns: str, engine, chunksize:int=1000)->bool: 
        """
        Upsert dataframe to a database table 
        - `df`: pandas dataframe 
        - `table`: name of the target table 
        - `key_columns`: name of key columns to be used for upserting 
        - `engine`: connection engine to database 
        - `chunksize`: if chunksize greater than 0 is specified, then the rows will be inserted in the specified chunksize. e.g. 1000 rows at a time. 
        """
        meta = MetaData()
        logging.info(f"Generating table schema: {table_name}")
        table_schema = Load.generate_sqlalchemy_schema(df=df, key_columns=key_columns,table_name=table_name, meta=meta)
        meta.create_all(engine)
        logging.info(f"Table schema generated: {table_name}")
        logging.info(f"Writing to table: {table_name}")
        if chunksize > 0:
            Load.upsert_in_chunks(df=df, engine=engine, table_schema=table_schema, key_columns=key_columns, chunksize=chunksize)
        else: 
            Load.upsert_all(df=df, engine=engine, table_schema=table_schema, key_columns=key_columns)
        logging.info(f"Successful write to table: {table_name}")
        return True 

    @staticmethod
    def overwrite_to_database(df: pd.DataFrame, table_name: str, engine)->bool: 
        """
        Upsert dataframe to a database table 
        - `df`: pandas dataframe 
        - `table`: name of the target table 
        - `engine`: connection engine to database 
        """
        logging.info(f"Writing to table: {table_name}")
        df.to_sql(name=table_name, con=engine, if_exists="replace", index=False)
        logging.info(f"Successful write to table: {table_name}, rows inserted/updated: {len(df)}")
        return True 

In [None]:
def get_sqlalchemy_column(column_name:str , source_datatype:str, primary_key:bool=False)->Column:
        """
        A helper function that returns a SQLAlchemy column by mapping a pandas dataframe datatypes to sqlalchemy datatypes 
        """
        dtype_map = {
            "int64": BigInteger, 
            "object": String, 
            "datetime64[ns]": DateTime, 
            "float64": Numeric,
            "bool": Boolean
        }
        column = Column(column_name, dtype_map[source_datatype], primary_key=primary_key) 
        return column

In [None]:
def generate_sqlalchemy_schema(df: pd.DataFrame, key_columns:list, table_name, meta): 
        """
        Generates a sqlalchemy table schema that shall be used to create the target table and perform insert/upserts. 
        """
        schema = []
        for column in [{"column_name": col[0], "source_datatype": col[1]} for col in zip(df.columns, [dtype.name for dtype in df.dtypes])]:
            schema.append(Load.get_sqlalchemy_column(**column, primary_key=column["column_name"] in key_columns))
        return Table(table_name, meta, *schema)

In [None]:
def upsert_in_chunks(df:pd.DataFrame, engine, table_schema:Table, key_columns:list, chunksize:int=1000)->bool:
        """
        performs the upsert with several rows at a time (i.e. a chunk of rows). this is better suited for very large sql statements that need to be broken into several steps. 
        """
        max_length = len(df)
        df = df.replace({np.nan: None})
        for i in range(0, max_length, chunksize):
            if i + chunksize >= max_length: 
                lower_bound = i
                upper_bound = max_length 
            else: 
                lower_bound = i 
                upper_bound = i + chunksize
            insert_statement = postgresql.insert(table_schema).values(df.iloc[lower_bound:upper_bound].to_dict(orient='records'))
            upsert_statement = insert_statement.on_conflict_do_update(
                index_elements=key_columns,
                set_={c.key: c for c in insert_statement.excluded if c.key not in key_columns})
            logging.info(f"Inserting chunk: [{lower_bound}:{upper_bound}] out of index {max_length}")
            result = engine.execute(upsert_statement)
        return True 

In [None]:
def upsert_all(df:pd.DataFrame, engine, table_schema:Table, key_columns:list)->bool:
        """
        performs the upsert with all rows at once. this may cause timeout issues if the sql statement is very large. 
        """
        insert_statement = postgresql.insert(table_schema).values(df.to_dict(orient='records'))
        upsert_statement = insert_statement.on_conflict_do_update(
            index_elements=key_columns,
            set_={c.key: c for c in insert_statement.excluded if c.key not in key_columns})
        result = engine.execute(upsert_statement)
        logging.info(f"Insert/updated rows: {result.rowcount}")
        return True 

In [None]:
def upsert_to_database(df: pd.DataFrame, table_name: str, key_columns: str, engine, chunksize:int=1000)->bool: 
        """
        Upsert dataframe to a database table 
        - `df`: pandas dataframe 
        - `table`: name of the target table 
        - `key_columns`: name of key columns to be used for upserting 
        - `engine`: connection engine to database 
        - `chunksize`: if chunksize greater than 0 is specified, then the rows will be inserted in the specified chunksize. e.g. 1000 rows at a time. 
        """
        meta = MetaData()
        logging.info(f"Generating table schema: {table_name}")
        table_schema = Load.generate_sqlalchemy_schema(df=df, key_columns=key_columns,table_name=table_name, meta=meta)
        meta.create_all(engine)
        logging.info(f"Table schema generated: {table_name}")
        logging.info(f"Writing to table: {table_name}")
        if chunksize > 0:
            Load.upsert_in_chunks(df=df, engine=engine, table_schema=table_schema, key_columns=key_columns, chunksize=chunksize)
        else: 
            Load.upsert_all(df=df, engine=engine, table_schema=table_schema, key_columns=key_columns)
        logging.info(f"Successful write to table: {table_name}")