In [1]:
import glob
import pandas as pd
import psycopg2
import psycopg2.extras

In [2]:
# Classes e métodos

class Database:
    def __init__ (self, HOST, DATABASE, USER, PASSWORD):
        print('Connecting to nasdaq...')
        self.HOST=HOST
        self.DATABASE=DATABASE
        self.USER=USER
        self.PASSWORD=PASSWORD
        
    def connect_db(self):
        self.conn = psycopg2.connect(
            host=self.HOST,
            database=self.DATABASE,
            user=self.USER,
            password=self.PASSWORD
        )
        return self.conn
        
    def criate_db(self, sql):
        self.conn = self.connect_db()
        self.cur = self.conn.cursor()
        self.cur.execute(sql)
        self.conn.commit()
        self.conn.close()   
    
    def insert_db(self, sql):
        self.conn = self.connect_db()
        self.cur = self.conn.cursor()
        try:
            self.cur.execute(sql)
            self.conn.commit()
        except (Exception, psycopg2.DatabaseError) as error:
            print(f"Error: {error}")
            self.conn.rollback()
            self.cur.close()
            return 1
        self.cur.close()
    
    def bulk_insert_db(self, sql, data):
        self.conn = self.connect_db()
        self.cur = self.conn.cursor()
        psycopg2.extras.execute_values(self.cur, sql, data)
        self.conn.commit()
        self.cur.close()
        self.conn.close()
        
    def select_db(self, sql):
        self.conn = self.connect_db()
        self.cur = self.conn.cursor()
        self.cur.execute(sql)
        self.recset = self.cur.fetchall()
        self.records = []
        for rec in self.recset:
            self.records.append(rec)
        self.conn.close()
        return self.records

In [3]:
# Logando no banco de dados
HOST='localhost'
DATABASE='nasdaq'
USER='postgres'
PASSWORD='postgres'

# Database Class
db = Database(HOST, DATABASE, USER, PASSWORD)

Connecting to nasdaq...


In [15]:
# Creating tables
sql = 'DROP TABLE IF EXISTS public.nasdaq;'
db.criate_db(sql)

sql = '''
    CREATE TABLE IF NOT EXISTS nasdaq (
        id SERIAL PRIMARY KEY,
        "Date" VARCHAR(50),
        "Low" VARCHAR(50),
        "Open" VARCHAR(50),
        "Volume" VARCHAR(50),
        "High" VARCHAR(50),
        "Close" VARCHAR(50),
        "Adjusted Close" VARCHAR(50)
)'''
db.criate_db(sql)

In [4]:
# Creating tables
sql = 'DROP TABLE IF EXISTS public.normal_insert;'
db.criate_db(sql)

sql = '''
    CREATE TABLE IF NOT EXISTS normal_insert (
        id SERIAL PRIMARY KEY,
        "Date" VARCHAR(50),
        "Low" VARCHAR(50),
        "Open" VARCHAR(50),
        "Volume" VARCHAR(50),
        "High" VARCHAR(50),
        "Close" VARCHAR(50),
        "Adjusted Close" VARCHAR(50)
)'''
db.criate_db(sql)

In [5]:
path = 'data/nasdaq/csv/'
all_files = glob.glob(path + "*.csv")

list_df = []

for filename in all_files:
    try:
        df = pd.read_csv(filename, index_col=None, header=0)
        list_df.append(df)
    except:
        print(filename)
        continue

In [6]:
df = pd.concat(list_df, axis=0, ignore_index=True)
df = df[['Date', 'Low', 'Open', 'Volume', 'High', 'Close', 'Adjusted Close']]

In [7]:
%%time
# Convertendo dataframe em lista de tuplas
list_tuples = [tuple(line) for line in df.to_records(index=False)]

CPU times: user 1min 32s, sys: 1.38 s, total: 1min 33s
Wall time: 1min 33s


## Bulk Insert

In [16]:
%%time
sql = 'INSERT INTO nasdaq ("Date", "Low", "Open", "Volume", "High", "Close", "Adjusted Close") VALUES %s'
db.bulk_insert_db(sql, list_tuples)

CPU times: user 2min 31s, sys: 2.72 s, total: 2min 33s
Wall time: 6min 36s


## Normal Insert

In [None]:
%%time
# Inserting data in database
for i in df.index:
    sql = f"""
        INSERT INTO normal_insert ("Date", "Low", "Open", "Volume", "High", "Close", "Adjusted Close")
        VALUES ($${df['Date'][i]}$$, $${df['Low'][i]}$$, $${df['Open'][i]}$$, $${df['Volume'][i]}$$, $${df['High'][i]}$$, $${df['Close'][i]}$$, $${df['Adjusted Close'][i]}$$);
    """
    db.insert_db(sql)

In [17]:
df.head()

Unnamed: 0,Date,Low,Open,Volume,High,Close,Adjusted Close
0,27-09-2005,19.1,21.049999,961200.0,21.4,19.299999,18.19491
1,28-09-2005,19.200001,19.299999,5747900.0,20.530001,20.5,19.326204
2,29-09-2005,20.1,20.4,1078200.0,20.58,20.209999,19.052805
3,30-09-2005,20.18,20.26,3123300.0,21.049999,21.01,19.807001
4,03-10-2005,20.9,20.9,1057900.0,21.75,21.5,20.26894


In [None]:
def redsfhit_connection_prd():
    username = os.environ.get('DAGSTER_REDSHIFT_USERNAME')
    password = os.environ.get('DAGSTER_REDSHIFT_PASSWORD')
    con = psycopg2.connect(
        dbname=os.environ.get('DAGSTER_REDSHIFT_DATABASE'),
        host=os.environ.get('DAGSTER_REDSHIFT_HOST'),
        port='5439',
        user=username,
        password=password
    )
    return con

In [None]:
def mysql_server_prd(dbname):
    host = os.environ.get('DAGSTER_MYSQL_HOST')
    username = os.environ.get('DAGSTER_MYSQL_USERNAME')
    password = os.environ.get('DAGSTER_MYSQL_PASSWORD')
 
    engine = create_engine(
        f"mysql+pymysql://{username}:{quote_plus(password)}@{host}:3306/{dbname}"
    )
    return engine.connect()