In [15]:
from pyarrow.fs import GcsFileSystem
from pyarrow.dataset import dataset
import pyarrow.parquet as pq
import psycopg
import pandas as pd

In [2]:
PROJECT_ID = "boxwood-well-386122"
REGION = "us-central1"
BUCKET = "vijay-onchain-bigquery-export"  
blob_name = "trx/20230101_20240218/000000000000.pqt"
path = f"{BUCKET}/{blob_name}"

In [4]:
# Credentials saved to file: [/Users/vijay/.config/gcloud/application_default_credentials.json]
# These credentials will be used by any library that requests Application Default Credentials (ADC).

! gcloud auth application-default login

Your browser has been opened to visit:

    https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=764086051850-6qr4p6gpi6hn506pt8ejuq83di341hur.apps.googleusercontent.com&redirect_uri=http%3A%2F%2Flocalhost%3A8085%2F&scope=openid+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fuserinfo.email+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fsqlservice.login&state=5nsOBm8wcBO7jjy71umPZcZV7OOXoH&access_type=offline&code_challenge=JX1iF9s_g922YYJ-pH7irv5Ctl__GaoWP8M7bcnoM9M&code_challenge_method=S256


Credentials saved to file: [/Users/vijay/.config/gcloud/application_default_credentials.json]

These credentials will be used by any library that requests Application Default Credentials (ADC).

Quota project "boxwood-well-386122" was added to ADC which can be used by Google client libraries for billing and quota. Note that some services may still bill the project owning the resource.


In [3]:
# gcs_fs = GcsFileSystem(project_id=PROJECT_ID, default_bucket_location=REGION)
gcs_fs = GcsFileSystem()

In [83]:
import gcsfs
fs = gcsfs.GCSFileSystem(project=PROJECT_ID)
pqt_filelist = fs.ls("vijay-onchain-bigquery-export/trx/20230101_20240218/")

In [84]:
len(pqt_filelist)

742

In [85]:
import random
random.sample(pqt_filelist,5)

['vijay-onchain-bigquery-export/trx/20230101_20240218/000000000356.pqt',
 'vijay-onchain-bigquery-export/trx/20230101_20240218/000000000395.pqt',
 'vijay-onchain-bigquery-export/trx/20230101_20240218/000000000058.pqt',
 'vijay-onchain-bigquery-export/trx/20230101_20240218/000000000011.pqt',
 'vijay-onchain-bigquery-export/trx/20230101_20240218/000000000340.pqt']

In [4]:
path

'vijay-onchain-bigquery-export/trx/20230101_20240218/000000000000.pqt'

In [5]:
gcs_fs.get_file_info(path)

<FileInfo for 'vijay-onchain-bigquery-export/trx/20230101_20240218/000000000000.pqt': type=FileType.File, size=29460061>

In [6]:
ds = dataset(
    source=path,
    format="parquet",
    filesystem=gcs_fs,
)

In [7]:
ds = dataset(f"gs://{path}", format="parquet")

In [8]:
%%time
parquet_file = pq.read_table(path, filesystem=gcs_fs)

CPU times: user 255 ms, sys: 458 ms, total: 714 ms
Wall time: 9.59 s


In [9]:
%%time
parquet_file = pq.read_table(f"gs://{path}")

CPU times: user 336 ms, sys: 587 ms, total: 923 ms
Wall time: 17.3 s


In [10]:
ds.schema

from_address: string
to_address: string
min_timestamp: timestamp[us]
max_timestamp: timestamp[us]
total_gas_value: decimal128(38, 9)
total_txn_value: decimal128(38, 9)
txn_count: int64
to_address_is_contract: bool
to_address_is_erc20: bool
to_address_is_erc721: bool

In [26]:
%%time
pq_df = pd.read_parquet(f"gs://{path}", engine='pyarrow')

CPU times: user 582 ms, sys: 394 ms, total: 976 ms
Wall time: 10.3 s


In [46]:
pq_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 219185 entries, 0 to 219184
Data columns (total 10 columns):
 #   Column                  Non-Null Count   Dtype         
---  ------                  --------------   -----         
 0   from_address            219185 non-null  object        
 1   to_address              219185 non-null  object        
 2   min_timestamp           219185 non-null  datetime64[us]
 3   max_timestamp           219185 non-null  datetime64[us]
 4   total_gas_value         219185 non-null  object        
 5   total_txn_value         219185 non-null  object        
 6   txn_count               219185 non-null  int64         
 7   to_address_is_contract  219185 non-null  bool          
 8   to_address_is_erc20     219185 non-null  bool          
 9   to_address_is_erc721    219185 non-null  bool          
dtypes: bool(3), datetime64[us](2), int64(1), object(4)
memory usage: 12.3+ MB


In [47]:
%%time
pq_df = pd.read_parquet(f"gs://{path}", engine='fastparquet')

CPU times: user 1.53 s, sys: 468 ms, total: 2 s
Wall time: 9.88 s


In [48]:
pq_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 219185 entries, 0 to 219184
Data columns (total 10 columns):
 #   Column                  Non-Null Count   Dtype         
---  ------                  --------------   -----         
 0   from_address            219185 non-null  object        
 1   to_address              219185 non-null  object        
 2   min_timestamp           219185 non-null  datetime64[us]
 3   max_timestamp           219185 non-null  datetime64[us]
 4   total_gas_value         219185 non-null  float64       
 5   total_txn_value         219185 non-null  float64       
 6   txn_count               219185 non-null  int64         
 7   to_address_is_contract  219185 non-null  bool          
 8   to_address_is_erc20     219185 non-null  bool          
 9   to_address_is_erc721    219185 non-null  bool          
dtypes: bool(3), datetime64[us](2), float64(2), int64(1), object(2)
memory usage: 12.3+ MB


In [12]:
import getpass
pg_password = getpass.getpass()

 ········


In [67]:
pg_url = f"postgres://postgres:{pg_password}@127.0.0.1:55432/postgres"
tmp_ddl = "CREATE TABLE tmp_data AS select * from eth_transactions_sums limit 0"
with psycopg.connect(pg_url) as conn:
    with conn.cursor() as cursor:
        cursor.execute(tmp_ddl)

In [71]:
# #### WARNING - Brittle approach. Too many errors.
# # Credit - https://naysan.ca/2020/05/09/pandas-to-postgresql-using-psycopg2-bulk-insert-performance-benchmark/
# from io import StringIO
# def copy_from_stringio(cursor, df, table):
#     """
#     Here we are going save the dataframe in memory 
#     and use copy_from() to copy it to the table
#     """
#     # save dataframe to an in memory buffer
#     buffer = StringIO()
#     df.to_csv(buffer, index_label='id', header=False)
#     buffer.seek(0)
#     try:
#         cursor.copy_from(buffer, table, sep=",")
#     except (Exception, psycopg.DatabaseError) as error:
#         print("Error: %s" % error)
#         return False
#     print("copy_from_stringio() done")
#     return True

In [73]:
# #### WARNING - Brittle approach. Too many errors.
# %%time
# import psycopg2
# pg_url = f"postgres://postgres:{pg_password}@127.0.0.1:55432/postgres"
# with psycopg2.connect(pg_url) as conn:
#     with conn.cursor() as cursor:
#         if copy_from_stringio(cursor, pq_df, 'tmp_data'):
#             conn.commit()
#         else:
#             conn.rollback()

In [74]:
import csv
def psql_insert_copy(table, conn, keys, data_iter): #mehod
    """
    Execute SQL statement inserting data

    Parameters
    ----------
    table : pandas.io.sql.SQLTable
    conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
    keys : list of str
        Column names
    data_iter : Iterable that iterates the values to be inserted
    """
    dbapi_conn = conn.connection
    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()
        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)

        columns = ', '.join('"{}"'.format(k) for k in keys)
        if table.schema:
            table_name = '{}.{}'.format(table.schema, table.name)
        else:
            table_name = table.name

        sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
            table_name, columns)
        cur.copy_expert(sql=sql, file=s_buf)

In [75]:
%%time
from sqlalchemy import create_engine
pg_url = f"postgresql+psycopg2://postgres:{pg_password}@127.0.0.1:55432/postgres"
engine = create_engine(pg_url)
pq_df.to_sql(
    name="tmp_data",
    con=engine,
    if_exists="append",
    index=False,
    method=psql_insert_copy
)

CPU times: user 2.33 s, sys: 192 ms, total: 2.53 s
Wall time: 2min 6s


In [80]:
insert_sql = """
    INSERT INTO eth_transactions_sums \
            (from_address, to_address, min_timestamp, max_timestamp, 
            total_gas_value, total_txn_value, txn_count,
            to_address_is_contract,
            to_address_is_erc20,
            to_address_is_erc721)
    SELECT from_address, to_address, min_timestamp, max_timestamp, 
            total_gas_value::numeric, total_txn_value::numeric, txn_count,
            to_address_is_contract,
            to_address_is_erc20,
            to_address_is_erc721 
    FROM tmp_data
    """

In [81]:
pg_url = f"postgres://postgres:{pg_password}@127.0.0.1:55432/postgres"
with psycopg2.connect(pg_url) as conn:
    with conn.cursor() as cursor:
        cursor.execute(insert_sql)