In [None]:
! pip install duckdb

In [None]:
# create a simple file-backed database, sitting inside our /tmp folder.

import duckdb

con = duckdb.connect('/tmp/duckdb-cache-selectedcols.duckdb')

con.execute('SET threads TO 4;')

con.execute("INSTALL 'httpfs';")
con.execute("LOAD 'httpfs';")


In [None]:
# import all NBBO quotes for product 'A', participant DirectEdgeX, from the 'mt_nbbo_quote' source table, for the 3-Jan-2022.

import datetime
import maystreet_data as md

con.execute('DROP TABLE IF EXISTS all_a_trades;')

files_list = md.parquet_query("mt_nbbo_quote", ["cqs_pillar"], datetime.date(2022, 1, 3))
s3_urls = map(lambda f: f"'s3://{f}'" if not f.startswith('https://') else f"'{f}'", files_list)

sql = f"CREATE TABLE all_a_trades AS SELECT DISTINCT AskPrice, AskQuantity, BestAskParticipants, BidPrice, BidQuantity, BestBidParticipants, SequenceNumber FROM read_parquet([{', '.join(s3_urls)}]) WHERE Product = 'A' AND BestAskParticipants = 'DirectEdgeX';"
con.execute(sql)


# how many did we read in?
number_entered = con.execute('SELECT COUNT(*) FROM all_a_trades;').fetchdf()
number_entered

In [None]:
# a completely impractical and very slow way to retrieve the number of NBBO entries for cqs_pillar given the criteria below...

import datetime
import maystreet_data as md

files_list = md.parquet_query("mt_nbbo_quote", ["cqs_pillar"], datetime.date(2022, 1, 3))
s3_urls = map(lambda f: f"'s3://{f}'" if not f.startswith('https://') else f"'{f}'", files_list)
sql = f"SELECT COUNT(*) FROM read_parquet([{', '.join(s3_urls)}]) WHERE Product = 'A' AND BestAskParticipants = 'DirectEdgeX';"

con.execute(sql).fetchdf()

In [None]:
# insert CSV data from our local directory into DuckDB.

con = duckdb.connect('/tmp/duckdb-cache-selectedcols.duckdb')

con.execute('DROP TABLE IF EXISTS example_csv;')

sql = 'CREATE TABLE example_csv AS SELECT * FROM read_csv_auto("example_csv_file.csv");'
con.execute(sql)

sql = 'SELECT * FROM example_csv;'
data_frame = con.execute(sql).fetchdf()
data_frame

In [None]:
# retrieve the NBBOs from LLG's data joined with the data we supplied.

data_frame = con.execute('SELECT ex.*, at.* FROM example_csv ex LEFT JOIN all_a_trades at ON at.SequenceNumber = ex.SequenceID').fetchdf()
data_frame

In [None]:
# export data into a file in the local directory

con.execute("COPY all_a_trades TO '/home/workbench/all-a-trades-DirectEdgeX.csv' WITH (HEADER 1);")

In [None]:
# finally close the connection

con.close()