In [1]:
import psycopg2
from sqlalchemy import create_engine
import pandas as pd
import numpy as np
from datetime import datetime, timedelta, timezone, date
from decouple import config
import string

In [2]:
# Credentials Data lake
dluname = config('DBUNAME')
dlpwd = config('DBPWD')
dlhost = config('HOST')
dldbname = config('DBNAME')
port = config('PORT')

# Credentials Datawarehouse
dwhuname = config('DWHUNAME')
dwhpwd = config('DWHPWD')
dwhhost = config('DWHHOST')
dwhdbname = config('DWHDBNAME')

In [3]:
def fetch_load_data(cur, asset, method):
    
    #define variables for fetching data
    table_name_str = "'table_tw_"+asset+"'"
    table_name = 'table_tw_'+asset
    
    df = pd.DataFrame()
    
 
    while True:
        #column names of the table in the data lake
        try:
            cur.execute("select COLUMN_NAME from information_schema.columns where table_name="+table_name_str)
            column_names = [row[0] for row in cur]
        
            col_ind_date = column_names.index('date')
            col_ind_tc = column_names.index('tweet_count')
            col_ind_asset = column_names.index('label')      
        except Exception as e: 
            print("Column indexes could no be defined")
            print(e)
            break    
    
        #fetch data in data lake
        try:             
            cur.execute("SELECT * FROM "+table_name+" WHERE date = (SELECT MAX(date) FROM "+table_name+");")
            result = cur.fetchall()
        except psycopg2.Error as e: 
            print("Error: select *")
            print(e)
            break
   
        #build dataframe 
        try:
            for row in result:
                df = df.append(pd.DataFrame([{'date': row[col_ind_date], 
                                    'asset': row[col_ind_asset],
                                     'count': row[col_ind_tc]}]))   
        except Exception as e: 
            print("Dataframe could not builded")
            print(e) 
            break            
            
        #change date format 
        date_df = df['date'].values[0]
        timestamp = ((date_df - np.datetime64('1970-01-01T00:00:00')) / np.timedelta64(1, 's'))
        df['date'] = datetime.utcfromtimestamp(timestamp)

        #define loading variables 
        dwhtblname = 'social_media'
        if_ex_val = method
        
        #load data
        try:
            conn_string = 'postgresql://'+dwhuname+':'+dwhpwd+'@'+dwhhost+':'+port+'/'+dwhdbname
            engine = create_engine(conn_string)
            df.to_sql(dwhtblname, conn_string, if_exists = if_ex_val, index=False)
            print(table_name+" loaded")
        except Exception as e:
            print(e)
            print("Data load failed: " + table_name)
            break
        break    
    

In [4]:
def close_conn_to_dl(cur, conn):
    try:
        cur.close()
        conn.close()
        print("connection closed")
    except psycopg2.Error as e: 
        print("Error: Could not close")
        print(e)    

In [5]:
def conn_to_dl():
    try: 
        conn = psycopg2.connect("host=" + dlhost + " dbname=" + dldbname + " user=" + dluname + " password="+dlpwd)
    except psycopg2.Error as e: 
        print("Error: Could not make connection to the Postgres database")
        print(e)
    
    try: 
        cur = conn.cursor()
    except psycopg2.Error as e: 
        print("Error: Could not get curser to the Database")
        print(e)
    
    # Auto commit
    conn.set_session(autocommit=True)
    print("connected")
    return cur, conn

In [6]:
assets = ['bitcoin', 'ethereum', 'binance', 'ripple', 'terra', 'cardano', 'solana', 'avalanche', 'polkadot', 'dogecoin', 'msci_world', 'euro_stoxx', 'smi', 'nasdaq', 'gold', 'silver']

In [7]:
def main():
    
    #open connection
    cur, conn = conn_to_dl()
    
    # process data
    for asset in assets:
        fetch_load_data(cur, asset,'append')  
    
    #close connection
    close_conn_to_dl(cur, conn)    

if __name__ == "__main__":
    main()

connected
table_tw_bitcoin loaded
table_tw_ethereum loaded
table_tw_binance loaded
table_tw_ripple loaded
table_tw_terra loaded
table_tw_cardano loaded
table_tw_solana loaded
table_tw_avalanche loaded
table_tw_polkadot loaded
table_tw_dogecoin loaded
table_tw_msci_world loaded
table_tw_euro_stoxx loaded
table_tw_smi loaded
table_tw_nasdaq loaded
table_tw_gold loaded
table_tw_silver loaded
connection closed
