# Data Ingestion Historical Defi Events into Data Lakehouse

## Insert into Raw Layer

In [1]:
import os
# change to the root directory of the project
os.chdir('..')

os.environ['APP_ENV'] = 'prod'

In [2]:
!pwd

/Users/jhonlucas/Documents/projects/spectral/defi-features-data-pipeline


In [3]:
import numpy as np
import pandas as pd
import awswrangler as wr
import glob
import logging

from spectral_data_lib.config import settings
from spectral_data_lib.data_lakehouse import DataLakehouse

pd.set_option('display.max_columns', None)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

In [4]:
datalake_client = DataLakehouse()

In [9]:
def add_partition_column(data: pd.DataFrame, partition_by_column: str) -> pd.DataFrame:
    """Function to add partition columns to the dataframe.

    Args:
        data (pd.DataFrame): Dataframe to add partition columns.
        partition_by_column (str): Column to be used as partition.

    Returns:
        pd.DataFrame: Dataframe
    """

    data["year"] = pd.to_datetime(data[partition_by_column], unit="s").dt.strftime("%Y")
    data["month"] = pd.to_datetime(data[partition_by_column], unit="s").dt.strftime("%m")

    return data

In [None]:
def insert_events_into_raw_layer(event_name: str, event_file_path: str) -> None:
    
    events_df = pd.read_parquet(event_file_path)
    events_df = add_partition_column(data=events_df, partition_by_column='timestamp')
    events_df = events_df.fillna(value=np.nan)
    
    
    try:
        datalake_client.write_parquet_table(
            data=events_df,
            table_name=event_name,
            database_name='db_raw_prod',
            partition_columns=['year', 'month'],
            source='transpose',
            layer='raw',
            mode_write='append'
        )
        
        logger.debug(f"Dataframe {event_name} has been ingested into the data lakehouse.")
        
    except Exception as e:
        logger.error(f"Error while ingesting dataframe {event_name} into the data lakehouse. Error: {e}")

In [None]:
events_file_list = glob.glob('data/raw/*.parquet')

for event_file_path in events_file_list:
    event_name = event_file_path.split('/')[-1].split('.')[0]
    insert_events_into_raw_layer(event_name=event_name, event_file_path=event_file_path)

## Insert into Stage Layer

In [None]:
def read_sql_file(sql_file_path: str) -> str:
    """Function to read sql file.

    Args:
        sql_file_path (str): Path to sql file.

    Returns:
        str: SQL query.
    """

    with open(sql_file_path, 'r') as f:
        sql = f.read()

    return sql

In [None]:
def insert_events_into_stage_dl(event_name: str, query_path) -> None:
    
    sql = read_sql_file(query_path)
    sql = sql.replace('table_name', event_name)\
        .replace('bucket_name', 's3://data-lakehouse-prod')\
        .replace('source_database', 'db_raw_prod')\
        .replace('target_database', 'db_stage_prod')\
        .replace('layer', 'stage')\
        .replace('data_source', 'transpose')
    
    try:
        
        wr.athena.start_query_execution(
            sql=sql,
            database='db_stage_prod',
            wait=True
        )
        
        logger.debug(f"Table {event_name} has been ingested into the stage data lakehouse.")
        
    except Exception as e:
        logger.error(f"Error while ingesting table {event_name} into the stage data lakehouse. Error: {e}")

In [None]:
sql_files = glob.glob('src/pipelines/stage/ddl/*.sql')

for sql_file_path in sql_files:
    event_name = sql_file_path.split('/')[-1].split('.')[0]
    insert_events_into_stage_dl(event_name=event_name, query_path=sql_file_path)

## Insert historical market data - Raw Layer

In [16]:
df_historical_market_data_raw = pd.read_parquet('data/raw/historical_market_state.parquet')
df_historical_market_data_raw.head()

Unnamed: 0,liquidationThreshold,name,inputTokenPriceUSD,id,inputToken,block_number,protocol
0,50,Compound Augur,19.665723711398268,0x158079ee67fce2f58472a96584a73c7ab9ac95c1,{'decimals': 18},7812702,compound-v2-eth
1,75,Compound USD Coin,1.0,0x39aa39c021dfbae8fac545936693ac917d5e7563,{'decimals': 6},7812702,compound-v2-eth
2,75,Compound Ether,245.63790990843344,0x4ddc2d193948926d02f9b1fe9e1daa0718270ed5,{'decimals': 18},7812702,compound-v2-eth
3,60,Compound Basic Attention Token,0.3556246606519214,0x6c8c6b02e7b2be14d4fa6022dfd6d75921d90e4e,{'decimals': 18},7812702,compound-v2-eth
4,60,Compound 0x,0.3071314363654969,0xb3319f5d18bc0d84dd1b4825dcde5d5f7266d407,{'decimals': 18},7812702,compound-v2-eth


In [18]:
blocks_with_timestamp = datalake_client.read_sql_query(
    query=f'select distinct number, timestamp from db_stage_prod.ethereum_blocks where number in {tuple(df_historical_market_data_raw["block_number"].unique())}',
    database_name='db_stage_prod'
)

In [19]:
blocks_with_timestamp.head()

Unnamed: 0,number,timestamp
0,14682282,1651276808
1,9471297,1581552037
2,8132992,1562889614
3,9996331,1588550415
4,8177817,1563494424


In [20]:
df_historical_market_data_raw_merged = pd.merge(
    left=df_historical_market_data_raw,
    right=blocks_with_timestamp.set_index('number'),
    how='left',
    left_on='block_number',
    right_on='number',
    right_index=True
)

In [23]:
df_historical_market_data_raw_merged = add_partition_column(df_historical_market_data_raw_merged, 'timestamp')

In [24]:
df_historical_market_data_raw_merged.head()

Unnamed: 0,liquidationThreshold,name,inputTokenPriceUSD,id,inputToken,block_number,protocol,timestamp,year,month
0,50,Compound Augur,19.665723711398268,0x158079ee67fce2f58472a96584a73c7ab9ac95c1,{'decimals': 18},7812702,compound-v2-eth,1558569616,2019,5
1,75,Compound USD Coin,1.0,0x39aa39c021dfbae8fac545936693ac917d5e7563,{'decimals': 6},7812702,compound-v2-eth,1558569616,2019,5
2,75,Compound Ether,245.63790990843344,0x4ddc2d193948926d02f9b1fe9e1daa0718270ed5,{'decimals': 18},7812702,compound-v2-eth,1558569616,2019,5
3,60,Compound Basic Attention Token,0.3556246606519214,0x6c8c6b02e7b2be14d4fa6022dfd6d75921d90e4e,{'decimals': 18},7812702,compound-v2-eth,1558569616,2019,5
4,60,Compound 0x,0.3071314363654969,0xb3319f5d18bc0d84dd1b4825dcde5d5f7266d407,{'decimals': 18},7812702,compound-v2-eth,1558569616,2019,5


In [None]:
datalake_client.write_parquet_table(
    data=df_historical_market_data_raw_merged,
    table_name='the_graph_historical_market_data',
    database_name='db_raw_prod',
    partition_columns=['year', 'month'],
    source='the_graph',
    layer='raw',
    mode_write='append'
)

## Insert historical market data - Stage Layer

In [30]:
query = """
CREATE TABLE db_stage_prod.the_graph_historical_market_data WITH (
    format = 'PARQUET',
    parquet_compression = 'SNAPPY',
    partitioned_by = ARRAY['year', 'month'],
    external_location = 's3://data-lakehouse-prod/stage/the_graph/the_graph_historical_market_data'
) AS
SELECT
    cast(liquidationthreshold as double) AS liquidation_threshold,
    name,
    cast(inputtokenpriceusd as Double) AS input_token_price_usd,
    id,
    inputtoken.decimals AS decimals,
    protocol,
    block_number,
    timestamp AS block_timestamp,
    year,
    month
FROM db_raw_prod.the_graph_historical_market_data
"""

In [31]:
wr.athena.start_query_execution(
    sql=query,
    database='db_stage_prod',
    wait=True
)

{'QueryExecutionId': '89a5e2fe-6d77-408f-b922-8acba6862b6a',
 'Query': "CREATE TABLE db_stage_prod.the_graph_historical_market_data WITH (\n    format = 'PARQUET',\n    parquet_compression = 'SNAPPY',\n    partitioned_by = ARRAY['year', 'month'],\n    external_location = 's3://data-lakehouse-prod/stage/the_graph/the_graph_historical_market_data'\n) AS\nSELECT\n    cast(liquidationthreshold as double) AS liquidation_threshold,\n    name,\n    cast(inputtokenpriceusd as Double) AS input_token_price_usd,\n    id,\n    inputtoken.decimals AS decimals,\n    protocol,\n    block_number,\n    timestamp AS block_timestamp,\n    year,\n    month\nFROM db_raw_prod.the_graph_historical_market_data",
 'StatementType': 'DDL',
 'ResultConfiguration': {'OutputLocation': 's3://aws-athena-query-results-362197681756-us-east-2/tables/89a5e2fe-6d77-408f-b922-8acba6862b6a'},
 'ResultReuseConfiguration': {'ResultReuseByAgeConfiguration': {'Enabled': False}},
 'QueryExecutionContext': {'Database': 'db_stage_

## Insert current positions into Analytics layer

In [5]:
df_current_positions = pd.read_parquet('data/processed/current_positions.parquet')
df_current_positions.head()

Unnamed: 0,balance,id,isCollateral,market,side,account,protocol,market_id
0,709860.0,0x05c381218c2454bf2f2cb2df567789e8b539465f-0x2...,True,Aave interest bearing WBTC,LENDER,0x05c381218c2454bf2f2cb2df567789e8b539465f,aave-v2-eth,0x2260fac5e5542a773aa44fbcfedf7c193bc2c599
1,477206100.0,0x05c381218c2454bf2f2cb2df567789e8b539465f-0xa...,False,Aave interest bearing USDC,BORROWER,0x05c381218c2454bf2f2cb2df567789e8b539465f,aave-v2-eth,0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48
2,4.910523e+17,0x05c381218c2454bf2f2cb2df567789e8b539465f-0xc...,True,Aave interest bearing WETH,LENDER,0x05c381218c2454bf2f2cb2df567789e8b539465f,aave-v2-eth,0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2
3,2.458383e+18,0x0790aef97341b55f7e24d978b963a793d4af7618-0x6...,False,Aave interest bearing DAI,BORROWER,0x0790aef97341b55f7e24d978b963a793d4af7618,aave-v2-eth,0x6b175474e89094c44da98b954eedeac495271d0f
4,3.99737e+18,0x0790aef97341b55f7e24d978b963a793d4af7618-0x6...,True,Aave interest bearing DAI,LENDER,0x0790aef97341b55f7e24d978b963a793d4af7618,aave-v2-eth,0x6b175474e89094c44da98b954eedeac495271d0f


In [7]:
df_current_positions.rename(columns={'isCollateral': 'is_collateral'}, inplace=True)

In [14]:
datalake_client.write_parquet_table(
    data=df_current_positions,
    table_name='the_graph_current_collateral_positions',
    database_name='db_analytics_prod',
    partition_columns=None,
    source='the_graph',
    layer='analytics',
    mode_write='append'
)

{'paths': ['s3://data-lakehouse-prod/analytics/the_graph/the_graph_current_collateral_positions/41aea2ae25424ddc82c0a57957dbf4e8.snappy.parquet'],
 'partitions_values': {}}

## Insert Historical Collateral Account Positions - Raw Layer

In [7]:
df_historical_account_positions_raw = pd.read_parquet('data/raw/historical_account_positions.parquet')
df_historical_account_positions_raw.head()

Unnamed: 0,balance,id,isCollateral,market,side,account,block_number,protocol
0,48361613735,0x0006e4548aed4502ec8c844567840ce6ef1013f5-0x3...,True,{'id': '0x39aa39c021dfbae8fac545936693ac917d5e...,LENDER,{'id': '0x0006e4548aed4502ec8c844567840ce6ef10...,7883177,compound-v2-eth
1,3475536915539527857521,0x000be263b3ceb55d22ed9b40d7619f62c4862c80-0xf...,False,{'id': '0xf5dce57282a584d2746faf1593d3121fcac4...,LENDER,{'id': '0x000be263b3ceb55d22ed9b40d7619f62c486...,7883177,compound-v2-eth
2,999999879947788,0x0029ab135b5be72168bf80f140d60a9264dbd0c5-0x4...,False,{'id': '0x4ddc2d193948926d02f9b1fe9e1daa071827...,LENDER,{'id': '0x0029ab135b5be72168bf80f140d60a9264db...,7883177,compound-v2-eth
3,6999999999999932567630,0x002f9caf40a444f20813da783d152bdfaf42852f-0xf...,False,{'id': '0xf5dce57282a584d2746faf1593d3121fcac4...,LENDER,{'id': '0x002f9caf40a444f20813da783d152bdfaf42...,7883177,compound-v2-eth
4,1990502180,0x003c52a71c887461087154eccced08cb1c5384a5-0x3...,,{'id': '0x39aa39c021dfbae8fac545936693ac917d5e...,BORROWER,{'id': '0x003c52a71c887461087154eccced08cb1c53...,7883177,compound-v2-eth


In [8]:
blocks_with_timestamp = datalake_client.read_sql_query(
    query=f'select distinct number, timestamp from db_stage_prod.ethereum_blocks where number in {tuple(df_historical_account_positions_raw["block_number"].unique())}',
    database_name='db_stage_prod'
)

In [10]:
blocks_with_timestamp.head()

Unnamed: 0,number,timestamp
0,11818910,1612828818
1,8377522,1566172803
2,11877476,1613606407
3,7844770,1559001608
4,11825395,1612915215


In [11]:
df_historical_account_positions_raw_merged = pd.merge(
    left=df_historical_account_positions_raw,
    right=blocks_with_timestamp.set_index('number'),
    how='left',
    left_on='block_number',
    right_on='number',
    right_index=True
)

In [12]:
df_historical_account_positions_raw_merged = add_partition_column(df_historical_account_positions_raw_merged, 'timestamp')

In [13]:
df_historical_account_positions_raw_merged.head()

Unnamed: 0,balance,id,isCollateral,market,side,account,block_number,protocol,timestamp,year,month
0,48361613735,0x0006e4548aed4502ec8c844567840ce6ef1013f5-0x3...,True,{'id': '0x39aa39c021dfbae8fac545936693ac917d5e...,LENDER,{'id': '0x0006e4548aed4502ec8c844567840ce6ef10...,7883177,compound-v2-eth,1559520036,2019,6
1,3475536915539527857521,0x000be263b3ceb55d22ed9b40d7619f62c4862c80-0xf...,False,{'id': '0xf5dce57282a584d2746faf1593d3121fcac4...,LENDER,{'id': '0x000be263b3ceb55d22ed9b40d7619f62c486...,7883177,compound-v2-eth,1559520036,2019,6
2,999999879947788,0x0029ab135b5be72168bf80f140d60a9264dbd0c5-0x4...,False,{'id': '0x4ddc2d193948926d02f9b1fe9e1daa071827...,LENDER,{'id': '0x0029ab135b5be72168bf80f140d60a9264db...,7883177,compound-v2-eth,1559520036,2019,6
3,6999999999999932567630,0x002f9caf40a444f20813da783d152bdfaf42852f-0xf...,False,{'id': '0xf5dce57282a584d2746faf1593d3121fcac4...,LENDER,{'id': '0x002f9caf40a444f20813da783d152bdfaf42...,7883177,compound-v2-eth,1559520036,2019,6
4,1990502180,0x003c52a71c887461087154eccced08cb1c5384a5-0x3...,,{'id': '0x39aa39c021dfbae8fac545936693ac917d5e...,BORROWER,{'id': '0x003c52a71c887461087154eccced08cb1c53...,7883177,compound-v2-eth,1559520036,2019,6


In [14]:
datalake_client.write_parquet_table(
    data=df_historical_account_positions_raw_merged,
    table_name='the_graph_historical_account_positions',
    database_name='db_raw_prod',
    partition_columns=['year', 'month'],
    source='the_graph',
    layer='raw',
    mode_write='append'
)

{'paths': ['s3://data-lakehouse-prod/raw/the_graph/the_graph_historical_account_positions/year=2019/month=05/76a197f155f14c76a148b8e9987c0880.snappy.parquet',
  's3://data-lakehouse-prod/raw/the_graph/the_graph_historical_account_positions/year=2019/month=06/76a197f155f14c76a148b8e9987c0880.snappy.parquet',
  's3://data-lakehouse-prod/raw/the_graph/the_graph_historical_account_positions/year=2019/month=07/76a197f155f14c76a148b8e9987c0880.snappy.parquet',
  's3://data-lakehouse-prod/raw/the_graph/the_graph_historical_account_positions/year=2019/month=08/76a197f155f14c76a148b8e9987c0880.snappy.parquet',
  's3://data-lakehouse-prod/raw/the_graph/the_graph_historical_account_positions/year=2020/month=12/76a197f155f14c76a148b8e9987c0880.snappy.parquet',
  's3://data-lakehouse-prod/raw/the_graph/the_graph_historical_account_positions/year=2021/month=01/76a197f155f14c76a148b8e9987c0880.snappy.parquet',
  's3://data-lakehouse-prod/raw/the_graph/the_graph_historical_account_positions/year=2021/

## Insert tokens to drop during from defi events

In [5]:
tokens_to_drop = pd.read_csv('data/processed/tokens_to_drop.csv')
tokens_to_drop.rename(columns={'lower': 'contract_address'}, inplace=True)
tokens_to_drop.head()

Unnamed: 0,contract_address
0,0x04fa0d235c4abf4bcf4787af4cf447de572ef828
1,0x05d3606d5c81eb9b7b18530995ec9b29da05faba
2,0x06af07097c9eeb7fd685c692751d5c66db49c215
3,0x0954906da0bf32d5479e25f46056d22f08464cab
4,0x09a3ecafa817268f77be1283176b946c4ff2e608


In [6]:
datalake_client.write_parquet_table(
    data=tokens_to_drop,
    table_name='defi_events_tokens_to_drop',
    database_name='db_sandbox_prod',
    partition_columns=None,
    source='web3_credit_scoring',
    layer='sandbox',
    mode_write='overwrite'
)

{'paths': ['s3://data-lakehouse-prod/sandbox/web3_credit_scoring/defi_events_tokens_to_drop/31dd09dbdb424a97b15d5ff871f6bd01.snappy.parquet'],
 'partitions_values': {}}