In [1]:
## import libraries
from google.cloud import bigquery
from google.api_core.exceptions import Conflict
import os
import pandas as pd

In [2]:
## load data
link_transactions = pd.read_csv('../data/link_transactions.csv')

In [3]:
## drop integer values and rename columns with floats
link_transactions.drop(columns=['TransactionFeeEth', 'SignedValue', 'RunningBalance'], inplace=True)
link_transactions['TransactionIndex'] = link_transactions['TransactionIndex'].astype(int)
link_transactions['Value'] = link_transactions['Value'].astype('float64')/10**18
link_transactions.rename(inplace=True, columns={
    'TransactionFeeEthDecimal': 'TransactionFeeEth',
    'SignedValueDecimal': 'SignedValue',
    'RunningBalanceDecimal': 'RunningBalance'})

In [4]:
## set up bigquery table
# set credentials
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '../gcloud_service_creds.json'

# construct a BigQuery client object
client = bigquery.Client()

In [5]:
## create bq table

# define schema
schema = [
    bigquery.SchemaField("Name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("Address", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("From", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("To", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("ContractAddress", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("TransactionHash", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("TransactionIndex", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("TokenName", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("TokenSymbol", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("Inbound", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("Outbound", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("Timestamp", "TIMESTAMP", mode="REQUIRED"),
    bigquery.SchemaField("Value", "BIGNUMERIC", mode="REQUIRED"),
    bigquery.SchemaField("SignedValue", "BIGNUMERIC", mode="REQUIRED"),
    bigquery.SchemaField("RunningBalance", "BIGNUMERIC", mode="REQUIRED"),
    bigquery.SchemaField("TransactionFeeEth", "BIGNUMERIC", mode="REQUIRED"),
]

# first create a project (banded-charmer-324215) in the console
project_name = 'banded-charmer-324215'

# create bigquery dataset
dataset_name = 'transactions'
try:
    client.create_dataset(dataset_name)
    print("Created dataset {}.{}".format(project_name, dataset_name))
except Conflict as c:
    print("Dataset {}.{} already exists.".format(project_name, dataset_name))

# create table
table_id = "{}.{}.link_transactions".format(project_name, dataset_name)
table = bigquery.Table(table_id, schema=schema)
try:
    table = client.create_table(table)
    print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))
except Conflict as c:
    print("Table {}.{}.{} already exists.".format(table.project, table.dataset_id, table.table_id))

Dataset banded-charmer-324215.transactions already exists.
Table banded-charmer-324215.transactions.link_transactions already exists.


In [6]:
## push data to bigquery table

# set dataframe columns to match order of defined schema
link_transactions = link_transactions[[x.name for x in schema]]

# create bigquery load job
job_config = bigquery.LoadJobConfig(
    schema=schema,
    write_disposition="WRITE_APPEND",
    source_format=bigquery.SourceFormat.CSV,
    autodetect=True
)

bq_source_file = '../data/link_transactions_bq.csv'
link_transactions.to_csv(bq_source_file, index=False, header=True)

In [7]:
## load the csv into bigquery

with open(bq_source_file, "rb") as source_file:
    job = client.load_table_from_file(source_file, table_id, job_config=job_config)

job.result()
print("Loaded {} rows into {}.{}.{}.".format(
    job.output_rows, table.project, table.dataset_id, table.table_id))

Loaded 3206 rows into banded-charmer-324215.transactions.link_transactions.


In [None]:
## confirm data in bigquery
query = """
    SELECT * 
    FROM {}.{}.{}
    LIMIT 50000
    """.format(table.project, table.dataset_id, table.table_id)
query_results = client.query(query)

results_df = pd.DataFrame()
for row in query_results.result():
    sample = {}
    for key, value in row.items():
        sample[key] = value
    results_df = results_df.append(sample, ignore_index=True)

print(f"There are a total of {len(results_df)} rows in the table.")

In [None]:
## query to deduplicate (necessary for ETL cycles until block height limits are set in etherscan API call)
query = """
    CREATE OR REPLACE TABLE `{table_project}.{table_dataset_id}.{table_id}` AS
    SELECT 
        {columns_in_schema}
    FROM (
      SELECT
          *,
         ROW_NUMBER()
              OVER (PARTITION BY Address, `From`, `To`, Value, TransactionHash, TransactionIndex)
              AS rn
      FROM {table_project}.{table_dataset_id}.{table_id}
    )
    WHERE rn = 1
""".format(columns_in_schema=(",").join([f"`{x.name}`" for x in schema]),
           table_project=table.project,
           table_dataset_id=table.dataset_id,
           table_id=table.table_id
          )

query_results = client.query(query)

In [None]:
for row in query_results.result():
    for key, value in row.items():
        print(key, value)
    break

In [None]:
## confirm data in bigquery
query = """
    SELECT * 
    FROM {}.{}.{}
    LIMIT 50000
    """.format(table.project, table.dataset_id, table.table_id)
query_results = client.query(query)

results_df = pd.DataFrame()
for row in query_results.result():
    sample = {}
    for key, value in row.items():
        sample[key] = value
    results_df = results_df.append(sample, ignore_index=True)

print(f"There are a total of {len(results_df)} rows in the table.")