In [1]:
# Import dependencies
import pandas as pd
import multiprocessing
import psycopg2
import getpass
from itertools import chain

In [2]:
# Pip install fast_map module for concurrent processes
!pip install fast_map



# Link to fast_map Documentation
* https://github.com/michalmonday/fast_map

In [3]:
# Import fast-map module. Also, located in the requirements.txt file
from fast_map import fast_map

# Create Objects for ETL Pipeline

---

In [4]:
# Create Credential functions
class PostSQL(object):
    """
    Build out Postgres credentials.
    """
    def create_connection(user:str, pw:str):
        """
        | Function to create connection to Postgres.
        
        Arguments:
        :param: user: Username for database.
        :param: pw: Password for database.
            
        returns: Connection.
        """
        return psycopg2.connect(database="casestdysu22t04",
                                host="pgsql.dsa.lan",
                                user=user, 
                                password=pw)

In [5]:
# Create Object for insert
class PostInsert(object):
    """
    Object to insert data into Postgres Tables.
    """
    @staticmethod
    def clean_data(data:dict) -> list:
        """
        | Function to help clean up data through the ETL pipeline.
        
        Converts a Pandas DataFrame into a list of tuples.
        
        returns: A list.
        """
        
        # Convert NANs to None
        data = data.where(pd.notnull(data),None)
        
        # Convert data into list of tuples for data insert
        return list(data.itertuples(name=None, index=False))
    
    def insert_data(parameters:None):
        """
        | Function to insert data into Postgres.
        
        Arguments:
        :param 0: data: Data to be inserted into tables.
        :param 1: insert_sql: Insert statement for tables.
        :param 2: start_position: Starting position for DataFrame.
        :param 3: batch_size: Integer value to increment data loads.
        :param 4: user: Username.
        :param 5: pw: Password to database.
        """
        # Increment data
        data = parameters[0][parameters[2]:parameters[2]+parameters[3]]
        
        # Create connection
        connection = PostSQL.create_connection(parameters[4], parameters[5])
        cursor = connection.cursor()
        
        # Clean up data
        data_list = PostInsert.clean_data(data)
        
        # Execute Insert SQL
        cursor.executemany(parameters[1], data_list)
        connection.commit()
        
        # Close Connection
        if(connection):
            cursor.close()
            connection.close()
            print("PostgreSQL connection is closed")
            
        return "Insert complete."

    # ----------- Start of Fast-Map ----------------------------
    def fast_insert_data(data, insert_sql, batch_size, user, pw):
        """
        | Function to fast-map insert data into Postgres.
        
        Arguments:
        :param: data: Data to be inserted into tables.
        :param: insert_sql: Insert statement for tables.
        :param: batch_size: Integer value to increment data loads.
        :param: user: Username.
        :param: pw: Password to database.
        """
        # Get lenght of data to create batching parameters
        data_length = len(data)
        
        print(f"Length of Data: {data_length}")
        
        number_of_skips = range(0, data_length, batch_size)
        
        # Default to 10 threads to prevent any throttling
        [i for i in fast_map(PostInsert.insert_data, [(data, insert_sql, skip_n, batch_size, user, pw) 
                                                      for skip_n in number_of_skips], threads_limit=25)]

In [6]:
# Create object to drop columns from tables
class PostDrop(object):
    """
    Object to Alter tables in Postgres to drop column(s).
    """
    @staticmethod
    def create_drop_sql(table_name:str, more_fields:bool, *field_name:str):
        """
        | Function to create a Drop Column SQL.
        
        Arguments:
        :param: table_name: Table Name.
        :param: more_fields: Boolean value to indicate if there's more fields to drop.
        :param: *field_names: Fields to drop from the table.
        
        returns: SQL statement.
        """
        
        if more_fields == False:

            sql = f"""
                ALTER TABLE {table_name}
                DROP COLUMN IF EXISTS {field_name[0]}; 
                """
        else:
            # Create field list
            field_list = [*field_name]
            
            sql = f"""
                    ALTER TABLE {table_name}
                   """
            for f in field_list:
                sql += f" DROP COLUMN IF EXISTS {f},"

            # Remove last comma and replace with semicolon
            sql = sql.rstrip(sql[-1]) + ";"
            
        return sql
    
    @staticmethod
    def drop_table_sql(table_name:str):
        """
        | Function to create Drop Table SQL.
        
        Arguments:
        :param: table_name: Table Name.
        
        returns: SQL statement.
        """
        
        return f"""
        DROP TABLE IF EXISTS {table_name};
        """
    
    @staticmethod
    def drop_execution(user:str, pw:str, sql:str) -> None:
        """
        | Function to Drop Tables.
        
        Arguments:
        :param: user: Username for database.
        :param: pw: Password for database.
        :param: sql: SQL statement.
        
        returns: None.
        """   
        # Create connection
        connection = PostSQL.create_connection(user, pw)
        cursor = connection.cursor()
        
        # Execute Drop SQL
        cursor.execute(sql)
        connection.commit()
        
        # Close Connection
        if(connection):
            cursor.close()
            connection.close()
            print("PostgreSQL connection is closed")
            
        del user, pw
            
        return "Drop sequence complete."

        

# Deprecate frax_signatures & frax_blocks
---

In [7]:
# Create drop signature and block table sqls
drop_signature = PostDrop.drop_table_sql("frax_signatures")

# drop_blocks = PostDrop.drop_table_sql("frax_blocks")
drop_blocks = PostDrop.drop_table_sql("frax_blocks")

In [8]:
# Check SQLs
print(drop_signature)
print(drop_blocks)


        DROP TABLE IF EXISTS frax_signatures;
        

        DROP TABLE IF EXISTS frax_blocks;
        


In [9]:
# Create drop field
drop_abi = PostDrop.create_drop_sql("public.frax_contracts",False, "abi")

In [10]:
print(drop_abi)


                ALTER TABLE public.frax_contracts
                DROP COLUMN IF EXISTS abi; 
                


In [11]:
# Get password
pw = getpass.getpass()

········


In [12]:
# Drop Tables
PostDrop.drop_execution("jsyp4f", pw, drop_signature)

PostgreSQL connection is closed


'Drop sequence complete.'

In [13]:
PostDrop.drop_execution("jsyp4f", pw, drop_blocks)

PostgreSQL connection is closed


'Drop sequence complete.'

In [14]:
PostDrop.drop_execution("jsyp4f", pw, drop_abi)

PostgreSQL connection is closed


'Drop sequence complete.'

# Load and Order data
---

## Contract Data

In [15]:
# Load contracts data
contracts_df = pd.read_csv('data/frax-ethereum-contracts.csv')

# Check data
contracts_df.head()

Unnamed: 0,namespace,name,abi,address,code,base,dynamic,updated_at,created_at,id,factory
0,frax,FRAXStablecoin,"{""inputs"":[{""internalType"":""string"",""name"":""_n...",\x853d955acef822db058eb8505911ed77f175b99e,\x608060405234801561001057600080fd5b5060043610...,True,False,44299.475,44299.475,1673,


In [16]:
len(contracts_df['code'][0])

29790

In [17]:
# Order columns
con_order = ["address","base","code","created_at","dynamic","factory",
             "id","name","namespace","updated_at"]

# Order columns for data insert
contracts_df = contracts_df[con_order]

# Convert to string
contracts_df[["address","code"]] = contracts_df[["address","code"]].astype(str)
contracts_df.updated_at = pd.to_datetime(contracts_df.updated_at, unit='s')
contracts_df.created_at = pd.to_datetime(contracts_df.created_at, unit='s')

In [18]:
# Read insert sql
contract_insert = open('insert_sql/contracts_insert.sql','r').read()

In [19]:
# Insert Data
# PostInsert.fast_insert_data(contracts_df, contract_insert, 50000, "jsyp4f", pw) # Disabled due to being loaded


## Log Data
---

In [22]:
# Load Logs Data --> Rerun if memory issues occur
logs_df = pd.read_csv('data/frax-ethereum-logs.csv')

In [23]:
# Check Log Data
logs_df.head()

Unnamed: 0,contract_address,topic1,topic2,topic3,topic4,data,tx_hash,block_hash,block_number,block_time,index,tx_index
0,\x853d955acef822db058eb8505911ed77f175b99e,\x2f8788117e7eff1d82e926ec794901d17c78024a5027...,\x00000000000000000000000000000000000000000000...,\x000000000000000000000000a448833bece66fd8803a...,\x000000000000000000000000a448833bece66fd8803a...,,\x1c3339f6e08324aea59d21f2bb68f184f5a5fd1da760...,\x8701fa59631d4b073ef8944275e2a74af7376a5b6461...,11465581,2020-12-16T17:42:00+00:00,43,28
1,\x853d955acef822db058eb8505911ed77f175b99e,\xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4...,\x00000000000000000000000000000000000000000000...,\x000000000000000000000000234d953a9404bf9dbc3b...,,\x00000000000000000000000000000000000000000001...,\x1c3339f6e08324aea59d21f2bb68f184f5a5fd1da760...,\x8701fa59631d4b073ef8944275e2a74af7376a5b6461...,11465581,2020-12-16T17:42:00+00:00,44,28
2,\x853d955acef822db058eb8505911ed77f175b99e,\x2f8788117e7eff1d82e926ec794901d17c78024a5027...,\xb25402418ad555013210365a422f9f1206b2dd007199...,\x000000000000000000000000234d953a9404bf9dbc3b...,\x000000000000000000000000a448833bece66fd8803a...,,\x1c3339f6e08324aea59d21f2bb68f184f5a5fd1da760...,\x8701fa59631d4b073ef8944275e2a74af7376a5b6461...,11465581,2020-12-16T17:42:00+00:00,45,28
3,\x853d955acef822db058eb8505911ed77f175b99e,\x2f8788117e7eff1d82e926ec794901d17c78024a5027...,\xb25402418ad555013210365a422f9f1206b2dd007199...,\x0000000000000000000000008412ebf45bac1b340bbe...,\x000000000000000000000000a448833bece66fd8803a...,,\x1c3339f6e08324aea59d21f2bb68f184f5a5fd1da760...,\x8701fa59631d4b073ef8944275e2a74af7376a5b6461...,11465581,2020-12-16T17:42:00+00:00,46,28
4,\x853d955acef822db058eb8505911ed77f175b99e,\x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2...,\x000000000000000000000000234d953a9404bf9dbc3b...,\x0000000000000000000000007a250d5630b4cf539739...,,\x00000000000000000000000000000000000000000000...,\x5a1f723189fc09bcd770516bc3b6603ea81ba7ca1d3d...,\xc53061bd8c81a3560d5f0b418c51740a466baaa8b5a2...,11465745,2020-12-16T18:16:51+00:00,10,47


In [24]:
# Order columns
log_order = ["block_hash","block_time","contract_address","data","index","topic1" 
            ,"topic2","topic3","topic4","tx_hash","tx_index"]

logs_df = logs_df[log_order]

In [25]:
# Update columns to string type
logs_string = ["block_hash","contract_address","data","topic1" ,"topic2","topic3","topic4","tx_hash"]

logs_df[logs_string] = logs_df[logs_string].astype(str)

In [26]:
# Read in logs insert SQL
logs_sql = open('insert_sql/logs_insert.sql','r').read()

In [28]:
# Insert logs data -- Ran into issues where block hash is not unique.
# PostInsert.fast_insert_data(logs_df, logs_sql, 50000, "jsyp4f", pw) # Diabled for schema update

## Traces Data
---

### Sprint 3 Update Schema to Match Data Pulled

In [30]:
# Reading in seperately due to size
traces_v1_df = pd.read_csv('data/frax-ethereum-traces.csv')

In [31]:
traces_v2_df = pd.read_csv('data/frax-ethereum-traces-v2.csv')

In [32]:
# V1 Traces
traces_v1_df.head(1)

Unnamed: 0,block_time,sub_traces,tx_success,success,block_hash,block_number,tx_hash,from,value,gas,gas_used,tx_index,type,call_type,input,output,to,error
0,2022-04-24T04:45:16+00:00,0,True,True,\x04047a735db304c3b3190e7c3701a9d1e66896b10d52...,14645412,\x7b09f8e76cd803990133351b9d7863804babb5dff2c4...,\x5b81da82ecf1b7abd880f9e989aac32ddcebe165,0,34068,24733.0,191,call,call,\x095ea7b3000000000000000000000000d9e1ce17f264...,\x00000000000000000000000000000000000000000000...,\x853d955acef822db058eb8505911ed77f175b99e,


In [33]:
# Check V2 Traces
traces_v2_df.head(1)

Unnamed: 0,block_time,sub_traces,tx_success,success,block_hash,block_number,tx_hash,from,value,gas,gas_used,tx_index,type,call_type,input,output,to,error
0,2022-01-27T05:21:21+00:00,0,True,True,\x5a0f461bc79856993991a9a8b8d84049437ca21f185d...,14085768,\x81087944db08a07983f48c3f07cfcdaf7b3914b1a190...,\xd632f22692fac7611d2aa1c0d552930d43caed3b,0,103915,30223.0,0,call,call,\xa9059cbb000000000000000000000000e9b05bc1fa86...,\x00000000000000000000000000000000000000000000...,\x853d955acef822db058eb8505911ed77f175b99e,


In [34]:
# Combine dataframes
traces_df = pd.concat([traces_v1_df, traces_v2_df], axis=1)

## Transaction Data
---

In [36]:
# Load transaction data
transactions_df = pd.read_csv('data/frax-ethereum-transactions.csv')

In [37]:
# Check transaction data
transactions_df.head(1)

Unnamed: 0,block_time,nonce,index,success,from,to,value,block_number,block_hash,gas_limit,gas_price,gas_used,data,hash,type,access_list,max_fee_per_gas,max_priority_fee_per_gas,priority_fee_per_gas
0,2020-12-16T17:47:31+00:00,8,6,True,\x234d953a9404bf9dbc3b526271d440cd2870bcd2,\x853d955acef822db058eb8505911ed77f175b99e,0.0,11465606,\x0e8fa84abd5cb4a38af1bd42ae9b013a753006da2d31...,8000000,90000000000.0,43498,\xc3bc89100000000000000000000000003432b6a60d23...,\xd0f6162b84e20b29afed59ac5067c4f30cdf7d1c79be...,,,,,


In [41]:
# Order Transaction data
transaction_order = ["access_list","block_hash","block_number","block_time","data",
                     "from","gas_limit","gas_price","gas_used","hash","index",
                     "max_fee_per_gas","max_priority_fee_per_gas","nonce",
                     "priority_fee_per_gas","success","to","type","value"]

transactions_df = transactions_df[transaction_order]

In [43]:
# Check data
transactions_df.head(1)

Unnamed: 0,access_list,block_hash,block_number,block_time,data,from,gas_limit,gas_price,gas_used,hash,index,max_fee_per_gas,max_priority_fee_per_gas,nonce,priority_fee_per_gas,success,to,type,value
0,,\x0e8fa84abd5cb4a38af1bd42ae9b013a753006da2d31...,11465606,2020-12-16T17:47:31+00:00,\xc3bc89100000000000000000000000003432b6a60d23...,\x234d953a9404bf9dbc3b526271d440cd2870bcd2,8000000,90000000000.0,43498,\xd0f6162b84e20b29afed59ac5067c4f30cdf7d1c79be...,6,,,8,,True,\x853d955acef822db058eb8505911ed77f175b99e,,0.0
