# Adding data from CSVs to a PostgreSQL database

Goal: to add all of the data from the LOBSTER csv files into a postgres database so that it can be more quickly accessed.


Follow something like [this](https://hakibenita.com/fast-load-data-python-postgresql) or [this](https://www.dataquest.io/blog/loading-data-into-postgres/)

For the section on using SQLAlchemy, we follow [this](https://www.compose.com/articles/using-postgresql-through-sqlalchemy/)

In [8]:
import os

from io import BytesIO
from itertools import chain
from pandas import Timestamp
from pathlib import Path  # allows to makes OS independent path strings
from psycopg2.errors import UniqueViolation
from urllib.request import urlopen
from zipfile import ZipFile

import pandas as pd
import psycopg2
import numpy as np
import shutil
import ssl
import sys

from sqlalchemy import create_engine  
from sqlalchemy import Column, String  
from sqlalchemy.ext.declarative import declarative_base  
from sqlalchemy.orm import sessionmaker

sys.path.append("../")

from RL4MM.database.models import Book, Trade
from RL4MM.database.PostgresEngine import PostgresEngine
from RL4MM.tasks.crawling.create_book_table import create_book_table
from RL4MM.tasks.crawling.crawl_lobster_data import crawl_lobster_data
from RL4MM.database.HistoricalDatabase import HistoricalDatabase


In [9]:
# tickers = ['AMZN','AAPL','GOOG','INTC','MSFT']
exchange='NASDAQ'
ticker = 'AMZN'
trading_date = '2012-06-21'
levels = 10

## Create the PostgreSQL table "book"

In [10]:
engine = PostgresEngine().engine

In [11]:
Session = sessionmaker(engine)  
session = Session()
Book.metadata.create_all(engine)

## Download and format the data from LOBSTER

### Select tickers and trading dates

In [12]:
temp_data_path=Path('data')/ticker/trading_date/f"level_{levels}"
try:
    temp_data_path.mkdir(parents=True)
except FileExistsError:
    pass

In [13]:
price_cols = list(chain(*[('ask_price_{0},bid_price_{0}'.format(i)).split(',') for i in range(10)]))
size_cols = list(chain(*[('ask_size_{0},bid_size_{0}'.format(i)).split(',') for i in range(10)]))
cols = list(chain(*zip(price_cols, size_cols)))

In [14]:
cols

['ask_price_0',
 'ask_size_0',
 'bid_price_0',
 'bid_size_0',
 'ask_price_1',
 'ask_size_1',
 'bid_price_1',
 'bid_size_1',
 'ask_price_2',
 'ask_size_2',
 'bid_price_2',
 'bid_size_2',
 'ask_price_3',
 'ask_size_3',
 'bid_price_3',
 'bid_size_3',
 'ask_price_4',
 'ask_size_4',
 'bid_price_4',
 'bid_size_4',
 'ask_price_5',
 'ask_size_5',
 'bid_price_5',
 'bid_size_5',
 'ask_price_6',
 'ask_size_6',
 'bid_price_6',
 'bid_size_6',
 'ask_price_7',
 'ask_size_7',
 'bid_price_7',
 'bid_size_7',
 'ask_price_8',
 'ask_size_8',
 'bid_price_8',
 'bid_size_8',
 'ask_price_9',
 'ask_size_9',
 'bid_price_9',
 'bid_size_9']

In [15]:
zip_url= f'https://lobsterdata.com/info/sample/LOBSTER_SampleFile_{ticker}_{trading_date}_{levels}.zip'
with urlopen(zip_url) as zip_resp:
    with ZipFile(BytesIO(zip_resp.read())) as zfile:
        zfile.extractall(temp_data_path)        
# Note to self: delete files after adding to PG database

In [26]:
orderbook_path = temp_data_path/ f'{ticker}_{trading_date}_34200000_57600000_orderbook_{levels}.csv'
message_path = temp_data_path/ f'{ticker}_{trading_date}_34200000_57600000_message_{levels}.csv'

orders = pd.read_csv(orderbook_path, header=None, names=cols)
messages = pd.read_csv(message_path, header=None, names=['time', 'type', 'external_id', 'size', 'price', 'direction'])

In [27]:
assert len(orders)==len(messages), "Length of the order book csv and message csv differ"

In [28]:
messages.time = pd.to_timedelta(messages.time, unit='s')
messages['trading_date'] = pd.to_datetime(trading_date)
messages["timestamp"] = messages.trading_date.add(messages.time)
messages.drop(['trading_date','time'], axis=1, inplace=True)

In [29]:
messages["exchange"]="NASDAQ"
messages["ticker"] = ticker
types = {1: 'submission',
         2: 'cancellation',
         3: 'deletion',
         4: 'execution_visible',
         5: 'execution_hidden',
         7: 'trading_halt'}

external_types = [x for x in types.keys()]
internal_types = [y for y in types.values()]
messages.type.replace(external_types,internal_types, inplace=True)
messages.direction.replace([-1,1],["sell","buy"],inplace=True)

messages["internal_index"]=f'{exchange}_{ticker}_{trading_date}_'+messages.index.astype("str")
# messages.set_index("index",inplace=True)

In [30]:
orders

Unnamed: 0,ask_price_0,ask_size_0,bid_price_0,bid_size_0,ask_price_1,ask_size_1,bid_price_1,bid_size_1,ask_price_2,ask_size_2,...,bid_price_7,bid_size_7,ask_price_8,ask_size_8,bid_price_8,bid_size_8,ask_price_9,ask_size_9,bid_price_9,bid_size_9
0,2239500,100,2231800,100,2239900,100,2230700,200,2240000,220,...,2202500,5000,2294300,100,2202000,100,2298000,100,2189700,100
1,2239500,100,2238100,21,2239900,100,2231800,100,2240000,220,...,2204000,100,2294300,100,2202500,5000,2298000,100,2202000,100
2,2239500,100,2238100,21,2239600,20,2231800,100,2239900,100,...,2204000,100,2267700,100,2202500,5000,2294300,100,2202000,100
3,2239500,100,2238100,21,2239600,20,2237500,100,2239900,100,...,2213000,4000,2267700,100,2204000,100,2294300,100,2202500,5000
4,2239500,100,2238100,21,2239600,20,2237500,100,2239900,100,...,2213000,4000,2267700,100,2204000,100,2294300,100,2202500,5000
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
269743,2206200,100,2205100,249,2206400,100,2205000,71,2206500,1290,...,2204300,2300,2207600,100,2204200,100,2207900,2300,2204100,3300
269744,2206400,100,2205100,249,2206500,1290,2205000,71,2206700,170,...,2204300,2300,2207900,2300,2204200,100,2208000,3100,2204100,3300
269745,2206400,100,2205100,249,2206500,1290,2205000,71,2206700,170,...,2204300,2300,2208000,3100,2204200,100,2208100,1700,2204100,3300
269746,2206300,100,2205100,249,2206400,100,2205000,71,2206500,1290,...,2204300,2300,2207900,2300,2204200,100,2208000,3100,2204100,3300


In [31]:
for order_type in ["ask","bid"]:
    for i in range(10):
        orders[order_type+"_price_"+str(i)]/=10000
messages["price"]/=10000

In [32]:
orders

Unnamed: 0,ask_price_0,ask_size_0,bid_price_0,bid_size_0,ask_price_1,ask_size_1,bid_price_1,bid_size_1,ask_price_2,ask_size_2,...,bid_price_7,bid_size_7,ask_price_8,ask_size_8,bid_price_8,bid_size_8,ask_price_9,ask_size_9,bid_price_9,bid_size_9
0,223.95,100,223.18,100,223.99,100,223.07,200,224.00,220,...,220.25,5000,229.43,100,220.20,100,229.80,100,218.97,100
1,223.95,100,223.81,21,223.99,100,223.18,100,224.00,220,...,220.40,100,229.43,100,220.25,5000,229.80,100,220.20,100
2,223.95,100,223.81,21,223.96,20,223.18,100,223.99,100,...,220.40,100,226.77,100,220.25,5000,229.43,100,220.20,100
3,223.95,100,223.81,21,223.96,20,223.75,100,223.99,100,...,221.30,4000,226.77,100,220.40,100,229.43,100,220.25,5000
4,223.95,100,223.81,21,223.96,20,223.75,100,223.99,100,...,221.30,4000,226.77,100,220.40,100,229.43,100,220.25,5000
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
269743,220.62,100,220.51,249,220.64,100,220.50,71,220.65,1290,...,220.43,2300,220.76,100,220.42,100,220.79,2300,220.41,3300
269744,220.64,100,220.51,249,220.65,1290,220.50,71,220.67,170,...,220.43,2300,220.79,2300,220.42,100,220.80,3100,220.41,3300
269745,220.64,100,220.51,249,220.65,1290,220.50,71,220.67,170,...,220.43,2300,220.80,3100,220.42,100,220.81,1700,220.41,3300
269746,220.63,100,220.51,249,220.64,100,220.50,71,220.65,1290,...,220.43,2300,220.79,2300,220.42,100,220.80,3100,220.41,3300


In [33]:
trade_list, book_list = list(), list()

In [34]:
messages.head()

Unnamed: 0,type,external_id,size,price,direction,timestamp,exchange,ticker,internal_index
0,execution_hidden,0,1,223.82,sell,2012-06-21 09:30:00.017459617,NASDAQ,AMZN,NASDAQ_AMZN_2012-06-21_0
1,submission,11885113,21,223.81,buy,2012-06-21 09:30:00.189607670,NASDAQ,AMZN,NASDAQ_AMZN_2012-06-21_1
2,submission,3911376,20,223.96,sell,2012-06-21 09:30:00.189607670,NASDAQ,AMZN,NASDAQ_AMZN_2012-06-21_2
3,submission,11534792,100,223.75,buy,2012-06-21 09:30:00.189607670,NASDAQ,AMZN,NASDAQ_AMZN_2012-06-21_3
4,submission,1365373,13,224.0,sell,2012-06-21 09:30:00.189607670,NASDAQ,AMZN,NASDAQ_AMZN_2012-06-21_4


In [17]:
for message in messages.itertuples():
    trade_list.append(Trade(id=message.internal_index, timestamp=message.timestamp, exchange=exchange, ticker=ticker, direction=message.direction,size=message.size, price = message.price, external_id=message.external_id, order_type=message.type))
    book_list.append(Book(id=message.internal_index, timestamp=message.timestamp, exchange=exchange, ticker=ticker,data=orders.iloc[message.Index].to_json()))

In [18]:
book_list[0]

Book(AMZN, 2012-06-21 09:30:00.017459617)

In [20]:
hist_db = HistoricalDatabase()

In [21]:
hist_db.insert_books(book_list)

In [23]:
hist_db.insert_trades(trade_list)

## Retrieving data

In [3]:
hist_db = HistoricalDatabase()

In [4]:
t1 = Timestamp('2012-06-21 11:42:40')
t2 = Timestamp('2012-06-21 12:42:40')

In [5]:
hist_db.get_trades(start_date=t1, end_date=t2, exchange="NASDAQ", ticker="AMZN")

Unnamed: 0,external_id,timestamp,size,ticker,exchange,id,order_type,price,direction
0,126001825,2012-06-21 11:42:40.102648,5.0,AMZN,NASDAQ,NASDAQ_AMZN_2012-06-21_99984,submission,2237900.0,sell
1,126001834,2012-06-21 11:42:40.102789,100.0,AMZN,NASDAQ,NASDAQ_AMZN_2012-06-21_99985,submission,2235200.0,buy
2,125770345,2012-06-21 11:42:40.203232,100.0,AMZN,NASDAQ,NASDAQ_AMZN_2012-06-21_99986,deletion,2238300.0,sell
3,126003009,2012-06-21 11:42:40.203397,100.0,AMZN,NASDAQ,NASDAQ_AMZN_2012-06-21_99987,submission,2238900.0,sell
4,125933876,2012-06-21 11:42:40.270171,100.0,AMZN,NASDAQ,NASDAQ_AMZN_2012-06-21_99988,deletion,2239500.0,sell
...,...,...,...,...,...,...,...,...,...
34942,162328641,2012-06-21 12:42:38.101827,17.0,AMZN,NASDAQ,NASDAQ_AMZN_2012-06-21_134926,deletion,2226100.0,sell
34943,162361155,2012-06-21 12:42:38.102954,17.0,AMZN,NASDAQ,NASDAQ_AMZN_2012-06-21_134927,submission,2226600.0,sell
34944,162350434,2012-06-21 12:42:38.190001,100.0,AMZN,NASDAQ,NASDAQ_AMZN_2012-06-21_134928,deletion,2226600.0,sell
34945,162361877,2012-06-21 12:42:38.190153,100.0,AMZN,NASDAQ,NASDAQ_AMZN_2012-06-21_134929,submission,2227300.0,sell


In [6]:
hist_db.get_next_snapshot(timestamp=t1, exchange=exchange,ticker=ticker)

Unnamed: 0,ticker,timestamp,data,exchange,id
0,AMZN,2012-06-21 11:42:40.102648,"{""ask_price_0"":2237700,""ask_size_0"":64,""bid_pr...",NASDAQ,NASDAQ_AMZN_2012-06-21_99984


In [7]:
hist_db.get_last_snapshot(timestamp=t1, exchange=exchange,ticker=ticker)

Unnamed: 0,ticker,timestamp,data,exchange,id
0,AMZN,2012-06-21 11:42:39.533339,"{""ask_price_0"":2237700,""ask_size_0"":64,""bid_pr...",NASDAQ,NASDAQ_AMZN_2012-06-21_99983


In [None]:
Film(title="Doctor Strange", director="Scott Derrickson", year="2016")  

In [76]:
orders[["timestamp","exchange","ticker","index"]]=messages[["timestamp","exchange","ticker","index"]]

In [79]:
messages.set_index("index",inplace=True)
orders.set_index("index",inplace=True)

In [85]:
orders.iloc[0].to_json()

'{"ask_price_0":2239500,"ask_size_0":100,"bid_price_0":2231800,"bid_size_0":100,"ask_price_1":2239900,"ask_size_1":100,"bid_price_1":2230700,"bid_size_1":200,"ask_price_2":2240000,"ask_size_2":220,"bid_price_2":2230400,"bid_size_2":100,"ask_price_3":2242500,"ask_size_3":100,"bid_price_3":2230000,"bid_size_3":10,"ask_price_4":2244000,"ask_size_4":547,"bid_price_4":2226200,"bid_size_4":100,"ask_price_5":2245400,"ask_size_5":100,"bid_price_5":2213000,"bid_size_5":4000,"ask_price_6":2248900,"ask_size_6":100,"bid_price_6":2204000,"bid_size_6":100,"ask_price_7":2267700,"ask_size_7":100,"bid_price_7":2202500,"bid_size_7":5000,"ask_price_8":2294300,"ask_size_8":100,"bid_price_8":2202000,"bid_size_8":100,"ask_price_9":2298000,"ask_size_9":100,"bid_price_9":2189700,"bid_size_9":100,"timestamp":1340271000017,"exchange":"NASDAQ","ticker":"AMZN"}'

In [16]:
book_data.head()

Unnamed: 0_level_0,time,type,external_id,size,price,direction,ask_price_0,ask_size_0,bid_price_0,bid_size_0,...,ask_price_8,ask_size_8,bid_price_8,bid_size_8,ask_price_9,ask_size_9,bid_price_9,bid_size_9,exchange,ticker
new_index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
NASDAQ_AMZN_2012-06-21_0,2012-06-21 09:30:00.017459617,execution_hidden,0,1,2238200,sell,2239500,100,2231800,100,...,2294300,100,2202000,100,2298000,100,2189700,100,NASDAQ,AMZN
NASDAQ_AMZN_2012-06-21_1,2012-06-21 09:30:00.189607670,submission,11885113,21,2238100,buy,2239500,100,2238100,21,...,2294300,100,2202500,5000,2298000,100,2202000,100,NASDAQ,AMZN
NASDAQ_AMZN_2012-06-21_2,2012-06-21 09:30:00.189607670,submission,3911376,20,2239600,sell,2239500,100,2238100,21,...,2267700,100,2202500,5000,2294300,100,2202000,100,NASDAQ,AMZN
NASDAQ_AMZN_2012-06-21_3,2012-06-21 09:30:00.189607670,submission,11534792,100,2237500,buy,2239500,100,2238100,21,...,2267700,100,2204000,100,2294300,100,2202500,5000,NASDAQ,AMZN
NASDAQ_AMZN_2012-06-21_4,2012-06-21 09:30:00.189607670,submission,1365373,13,2240000,sell,2239500,100,2238100,21,...,2267700,100,2204000,100,2294300,100,2202500,5000,NASDAQ,AMZN


In [17]:
book_data.columns

Index(['time', 'type', 'external_id', 'size', 'price', 'direction',
       'ask_price_0', 'ask_size_0', 'bid_price_0', 'bid_size_0', 'ask_price_1',
       'ask_size_1', 'bid_price_1', 'bid_size_1', 'ask_price_2', 'ask_size_2',
       'bid_price_2', 'bid_size_2', 'ask_price_3', 'ask_size_3', 'bid_price_3',
       'bid_size_3', 'ask_price_4', 'ask_size_4', 'bid_price_4', 'bid_size_4',
       'ask_price_5', 'ask_size_5', 'bid_price_5', 'bid_size_5', 'ask_price_6',
       'ask_size_6', 'bid_price_6', 'bid_size_6', 'ask_price_7', 'ask_size_7',
       'bid_price_7', 'bid_size_7', 'ask_price_8', 'ask_size_8', 'bid_price_8',
       'bid_size_8', 'ask_price_9', 'ask_size_9', 'bid_price_9', 'bid_size_9',
       'exchange', 'ticker'],
      dtype='object')

In [17]:
book_data.to_csv('temp_book.csv')

In [18]:
try:
    conn = psycopg2.connect("host=localhost dbname=lobster user=joe password=joe")
    cur = conn.cursor()
    with open('temp_book.csv', 'r') as f:
        next(f) # Skip the header row.
        cur.copy_from(f, 'book_L10', sep=',')
except UniqueViolation:
    pass

In [19]:
conn.commit()

In [20]:
try:
    shutil.rmtree(temp_data_path)
except FileNotFoundError:
    pass

# Retrieving data

In [2]:
from RL4MM.database.PostgresEngine import PostgresEngine

In [14]:
db=PostgresEngine().engine

In [11]:
aapl_data = db.engine.execute("SELECT * FROM book_L10 WHERE ticker = 'AAPL'") 

aapl_df = pd.DataFrame(aapl_data.fetchall())

aapl_df.head()

## Different ways of using SQLAlchemy

### Way 1: using SQL queries

In [27]:
# Create 
db.execute("CREATE TABLE IF NOT EXISTS films (title text, director text, year text)")  
db.execute("INSERT INTO films (title, director, year) VALUES ('Doctor Strange', 'Scott Derrickson', '2016')")

# Read
result_set = db.execute("SELECT * FROM films")  
for r in result_set:  
    print(r)

('Doctor Strange', 'Scott Derrickson', '2016')


### Way 2: Using SQL Expression Language

In [16]:
from sqlalchemy import create_engine  
from sqlalchemy import Table, Column, String, MetaData

In [17]:
meta = MetaData(db)  
film_table = Table('films', meta, Column('title', String), Column('director', String), Column('year', String))

In [18]:
conn=db.connect()

In [19]:
film_table.create()

In [23]:
insert_statement = film_table.insert().values(title="Doctor Strange", director="Scott Derrickson", year="2016")
conn.execute(insert_statement)

<sqlalchemy.engine.result.ResultProxy at 0x18c5443efa0>

In [24]:
# Read
select_statement = film_table.select()
result_set = conn.execute(select_statement)
for r in result_set:
    print(r)

('Doctor Strange', 'Scott Derrickson', '2016')


In [25]:
# Update
update_statement = film_table.update().where(film_table.c.year=="2016").values(title = "Some2016Film")
conn.execute(update_statement)

<sqlalchemy.engine.result.ResultProxy at 0x18c5443ebb0>

In [26]:
# Delete
delete_statement = film_table.delete().where(film_table.c.year == "2016")
conn.execute(delete_statement)

<sqlalchemy.engine.result.ResultProxy at 0x18c5b68b2e0>

### Way 3: Using the SQL ORM

In [35]:
from sqlalchemy import create_engine  
from sqlalchemy import Column, String  
from sqlalchemy.ext.declarative import declarative_base  
from sqlalchemy.orm import sessionmaker

In [None]:
db = PostgresEngine().engine

In [None]:
base = declarative_base()

In [71]:
class Film(base):  
    __tablename__ = 'films'

    title = Column(String, primary_key=True)
    director = Column(String)
    year = Column(String)

  util.warn(


InvalidRequestError: Table 'films' is already defined for this MetaData instance.  Specify 'extend_existing=True' to redefine options and columns on an existing Table object.

In [72]:
Session = sessionmaker(db)  
session = Session()

In [None]:
base.metadata.create_all(db)   # this creates the table "films"

In [69]:
back_to_the_future = Film(title="Back to the Future", director="Emilio Aguera", year="3001")  
session.add(back_to_the_future)  
session.commit()

IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "films_pkey"
DETAIL:  Key (title)=(Back to the Future) already exists.

[SQL: INSERT INTO films (title, director, year) VALUES (%(title)s, %(director)s, %(year)s)]
[parameters: {'title': 'Back to the Future', 'director': 'Emilio Aguera', 'year': '3001'}]
(Background on this error at: http://sqlalche.me/e/13/gkpj)

In [73]:
doctor_strange = Film(title="Doctor Strange", director="Scott Derrickson", year="2016")  
session.add(doctor_strange)  
session.commit()

In [45]:
films = session.query(Film)  
for film in films:  
    print(film.title)

Doctor Strange


In [46]:
doctor_strange.title = "Some2016Film"  
session.commit()
films = session.query(Film)  
for film in films:  
    print(film.title)

Some2016Film


In [47]:
session.delete(doctor_strange)  
session.commit()  

In [58]:
with Session(db) as session:
    doctor_strange = Film(title="Doctor Strange", director="Scott Derrickson", year="2016")  
    session.add(doctor_strange)  
    session.commit()

TypeError: __call__() takes 1 positional argument but 2 were given

In [53]:
db.engine

Engine(postgresql+psycopg2://joe:***@localhost:5432/lobster)

##  Back to the L10_book database

In [82]:
db = PostgresEngine().engine

In [83]:
base = declarative_base()

In [84]:
class Film(base):  
    __tablename__ = 'films'

    title = Column(String, primary_key=True)
    director = Column(String)
    year = Column(String)

In [85]:
Session = sessionmaker(db)  
session = Session()

In [86]:
base.metadata.create_all(db)   # this creates the table "films"

In [87]:
films = session.query(Film)  
for film in films:  
    print(film.title)

Back to the Future
Doctor Strange


In [88]:
from sqlalchemy import select

In [103]:
session.query(Film).filter(Film.title=="Doctor Strange").first().__dict__

{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState at 0x18c66d2aa00>,
 'year': '2016',
 'title': 'Doctor Strange',
 'director': 'Scott Derrickson'}

In [96]:
# query from a class
statement = select(Film).filter_by(Film.title=="Doctor Strange")

TypeError: 'DeclarativeMeta' object is not iterable

In [18]:
from RL4MM.database.models import Book

NameError: name 'PostgresEngine' is not defined