In [1]:
from extract import extract_coindata
#from config import local_engine
    
from sqlalchemy.orm import sessionmaker
from sqlalchemy.engine import create_engine
from tqdm.std import tqdm
import pandas as pd
from datetime import datetime
import logging
logger = logging.Logger('catch_all')

In [2]:
# get coin information
coindata, columns = extract_coindata()

In [3]:
def create_readonly(admin_engine, role_name, database, exists=False):
    """
    -------------------------------------
    Creates a read only role for database
    -------------------------------------
    
    admin_engine: Admin login database engine
    role_name: The name to be given to the role created
    exists: checks if role already exist
    -----------------------------------
    Returns None

    """
    queries = [f'''CREATE ROLE {role_name};''',
            f'''GRANT CONNECT ON DATABASE {database} To {role_name};''',
            f'''GRANT USAGE ON SCHEMA public TO {role_name};''',
            f'''GRANT SELECT ON ALL TABLES IN SCHEMA public TO {role_name};''']

    if exists == True:
        queries = queries[1:]
    
    conn = admin_engine.connect()
    for query in queries:
        st = query.split()
        try:
            rs = conn.execute(query)
            print(st[0])
        except Exception as e:
            
            print(f'Could not {st[0]} {st[1]} ', e)
        
    conn.close()
        

def create_read_write(admin_engine, database, exists=False):

    """
    -------------------------------------
    Creates a read only role for database
    -------------------------------------
    
    admin_engine: Admin login database engine
    exists: if role already exists
    -----------------------------------
    Returns None

    """

    queries = ['''CREATE ROLE read_write;''',
        f'''GRANT CONNECT ON DATABASE {database} To read_write;''',
        '''GRANT USAGE, CREATE ON SCHEMA public TO read_write;''',
        '''GRANT SELECT, INSERT, UPDATE ON ALL TABLES IN SCHEMA public TO read_write;''', # granting permission on crypto table 
        '''GRANT USAGE ON ALL SEQUENCES IN SCHEMA public TO read_write;''']

    if exists == True:
        queries = queries[1:]

    conn = admin_engine.connect()
    for query in queries:
        st = query.split()
        try:
            rs = conn.execute(query)
            print(st[0])
        except Exception as e:
            
            print(f'Could not {st[0]} {st[1]} ', e)
        
    conn.close()

def create_user(admin_engine, username, password):
    """
    Create a user with login permission to the database
    ---------------------------------------------------
    admin_engine: Admin login database engine
    username: name of user
    password: password of user for login authentication
    ---------------------------------------------------

    Returns None
    """
    query = f'''CREATE USER {username} WITH PASSWORD {password}'''
    conn = admin_engine.connect()
    
    st = query.split()
    try:
        rs = conn.execute(query)
        print(st[0])
    except Exception as e:
        print(f'Could not {st[0]} {st[1]} ', e)
        
    conn.close()

def grant_user_role(admin_engine, username, rolename):
    """
    Grant a role to a database user
    -------------------------------

    admin_engine: Admin login database engine
    username: name of user to be granted role
    rolename: name of role to be granted to user
    """
    query = f'''GRANT {rolename} TO {username}'''
    conn = admin_engine.connect()
    st = query.split()
    try:
        rs = conn.execute(query)
        print(st[0])
    except Exception as e:
        print(f'Could not {st[0]} {st[1]} ', e)
        
    conn.close()

In [4]:
import sqlalchemy
sqlalchemy.__version__

'1.4.32'

## NEW EXPERIMENT FOR TRANSACT TB

In [14]:
def create_transact_table(engine, table_name):
    conn = engine.connect()
    query = f'''CREATE TABLE IF NOT EXISTS {table_name} ("TransactId" Serial PRIMARY KEY, 
            "CoinName" VARCHAR (15), "Symbol" VARCHAR (10), "Time" Timestamp (40), "Price" VARCHAR (30),
            "Change(24h)" VARCHAR (10), "Volume(24h)" VARCHAR (35), "Market Cap" VARCHAR (40), "Website" VARCHAR(200))
            '''
    try:
        conn.execute(query)
        print('Table Created successfully')
    except Exception as e:
        print("Could not create table\n",e)
    finally:
        conn.close()

create_transact_table(engine, 'Transacttb')

Table Created successfully


In [15]:
def get_common_cols(db_cols, ext_cols):
    common_cols = []
    for col in ext_cols:
        if col == '24h':
             ext_cols[ext_cols.index(col)] = 'Change(24h)'
             col = 'Change(24h)'
        if col in db_cols:
            common_cols.append(col)

    return common_cols    

### ETL to Production DB

In [46]:
from sqlalchemy import create_engine

In [77]:
DATABASE_URI = f'postgresql+psycopg2://postgres:udkhulbisalaam@localhost:5432/Cryptocurrency'
engine = create_engine(DATABASE_URI)

In [78]:
from auths import hostname, password
PRD_DATABASE_URI = f'postgresql+psycopg2://omotade:{password}@{hostname}:5432/ProductionDB'
TRNS_DATABASE_URI = f'postgresql+psycopg2://omotade:{password}@{hostname}:5432/CryptoTransactDB'

In [79]:
transact_engine = create_engine(TRNS_DATABASE_URI)
product_engine = create_engine(PRD_DATABASE_URI)

### EXTRACT

In [72]:
def execute_query(engine, query, returns = False):

    """
        ------------
        returns None
        ------------
        This function executes an sql query on the a database engine

        engine: a  database engine
        query: SQL query to be executed
    """
    conn = engine.connect()

    try:
        rs = conn.execute(query)
    except Exception as e:
        logger.log(e)
        print('Unknown error occured')

    finally:
        conn.close()
    if returns == True:
        return rs.fetchall()
    

In [21]:
import pandas as pd

In [22]:
query = """Select column_name from INFORMATION_SCHEMA.COLUMNS WHERE table_name = 'transacttb'"""

info =  execute_query(transact_engine, query=query, returns=True)
column_names = [i[0] for i in info]

In [23]:
info

[('TransactId',),
 ('CoinName',),
 ('Symbol',),
 ('Time',),
 ('Price',),
 ('Change_24h',),
 ('Volume_24h',),
 ('Market Cap',),
 ('Website',)]

In [24]:
column_names

['TransactId',
 'CoinName',
 'Symbol',
 'Time',
 'Price',
 'Change_24h',
 'Volume_24h',
 'Market Cap',
 'Website']

In [80]:
query = """SELECT * FROM TRANSACTTB AS TB
            WHERE TB."CoinName" = 'Bitcoin' """
btc = pd.DataFrame(execute_query(transact_engine, query=query, returns=True), columns=column_names)

In [81]:
btc

Unnamed: 0,TransactId,CoinName,Symbol,Time,Price,Change_24h,Volume_24h,Market Cap,Website
0,11009,Bitcoin,BTC,2022-07-23 10:00:00.943190,22687.72,-3.21%,30.023B,433.353B,https://finance.yahoo.com/cryptocurrencies/
1,11014,Bitcoin,BTC,2022-07-23 10:00:00.000000,"$22,676.14",-3.30%,"$30,026,881,522","$433,131,800,058",https://coinmarketcap.com/all/views/all/
2,11019,Bitcoin,BTC,2022-07-23 10:10:00.919365,22680.87,-3.28%,29.928B,433.222B,https://finance.yahoo.com/cryptocurrencies/
3,11024,Bitcoin,BTC,2022-07-23 10:10:00.000000,"$22,679.63",-3.29%,"$29,923,045,261","$433,198,550,259",https://coinmarketcap.com/all/views/all/
4,11029,Bitcoin,BTC,2022-07-23 10:20:01.079226,22694.22,-3.40%,29.92B,433.479B,https://finance.yahoo.com/cryptocurrencies/
...,...,...,...,...,...,...,...,...,...
143,11724,Bitcoin,BTC,2022-07-23 21:50:00.000000,"$22,268.73",-2.04%,"$23,016,398,294","$425,361,573,292",https://coinmarketcap.com/all/views/all/
144,11729,Bitcoin,BTC,2022-07-23 22:00:00.900987,22250.85,-2.25%,23.01B,425.02B,https://finance.yahoo.com/cryptocurrencies/
145,11734,Bitcoin,BTC,2022-07-23 22:00:00.000000,"$22,251.88",-2.23%,"$23,008,377,629","$425,039,652,094",https://coinmarketcap.com/all/views/all/
146,11739,Bitcoin,BTC,2022-07-23 22:10:01.205947,22268.38,-2.11%,22.951B,425.355B,https://finance.yahoo.com/cryptocurrencies/


### Separate and Transform

In [73]:
def get_websites(df):
    """
    -------------
    returns tuple
    -------------
    retunrs a sequence of unique websites present in a data

    df: a pandas dataframe object
    """
    webs = list(df['Website'].unique())
    if len(webs) <= 1:
        if 'END' not in webs:
            webs.append('END')
    return tuple(webs)

In [28]:
websites = get_websites(btc)

In [29]:

websites

('https://finance.yahoo.com/cryptocurrencies/',
 'https://coinmarketcap.com/all/views/all/')

In [88]:
# get WebId
def get_web_id(engine=engine):

    """
    ---------------------------
    returns dictionary of WebId
    ---------------------------
    This funtion checks and returns the web ids of websites present in the transact table.
    
    """
    query = f"""SELECT * FROM website
                Where website.url in {websites}"""
    webs = execute_query(engine, query=query, returns=True)
    if len(webs) != 0:
        urlid = {url:id for id,url in webs}
    else:
        urlid = {}
    return urlid

In [89]:
get_web_id()

{'https://coinmarketcap.com/all/views/all/': 1,
 'https://finance.yahoo.com/cryptocurrencies/': 2}

In [83]:
# get CoinId
def get_coin_id(coinname, engine=engine):
    """
    ----------
    return int
    ----------

    checks and returns the coinid of a coin

    engine: A database engine
    coinname: name of the coin
    """
    query = f"""SELECT CoinId FROM coin
                Where coin.CoinName = '{coinname}'"""
    rs = execute_query(engine, query=query, returns=True)
    if len(rs) != 0:
        coinid = rs[0][0]
    else:
        coinid = 0
    return coinid

In [84]:
get_coin_id(engine=engine, coinname='Ethereum')

3

In [36]:
query = """Select distinct "Website" from transacttb"""
rs = execute_query(transact_engine, query=query, returns=True)

In [37]:
rs

[('https://coinmarketcap.com/all/views/all/',),
 ('https://finance.yahoo.com/cryptocurrencies/',)]

In [76]:
def update_web_id(ext_engine=transact_engine, ld_engine=engine):
    """
    ------------
    returns None
    ------------ 
    create a website Id for a website on that appears on the transact tb

    ext_enigine" tranct 
    """
    query = """SELECT DISTINCT "Website" from transacttb """
    rs = execute_query(ext_engine, query, returns=True)
    webs = [web[0] for web in rs]
    for web in webs:
        query = f"""INSERT INTO website (url)
                    select '{web}'
                    where not exists (select WebId from Website where
				    url = '{web}' ) """

        execute_query(ld_engine, query)


In [53]:
update_web_id(transact_engine, ld_engine=engine)

In [75]:
def update_coin(ext_engine=transact_engine, ld_engine=engine):

    """
        -------------
        returns None
        ------------

        upates the coin table
    """
    query = """SELECT DISTINCT "CoinName", "Symbol" from transacttb """
    rs = execute_query(ext_engine, query, returns=True)
    coins = [(coin, symbol)  for coin, symbol in rs]
    for coin, symbol in coins:
        
        query = f"""INSERT INTO coin (coinname, symbol)
                    select '{coin}', '{symbol}'
                    where not exists (select CoinId from coin where
				    coinname = '{coin}' ) """

        execute_query(ld_engine, query)

In [70]:
update_coin()

In [91]:
btc['Website'].map(get_web_id())

0      2
1      1
2      2
3      1
4      2
      ..
143    1
144    2
145    1
146    2
147    1
Name: Website, Length: 148, dtype: int64

In [87]:
btc['CoinId'] = btc['CoinName'].apply(get_coin_id)

In [93]:
btc['WebId'] = btc['Website'].map(get_web_id())

In [111]:
def adjust_data(df):
    query = """Select column_name from INFORMATION_SCHEMA.COLUMNS where table_name = 'bitcoin' """
    temp =  engine.execute(query).fetchall()
    columns = [col[0] for col in temp ]
    columns.remove('ProductId')
    data = df[columns]

    return data

### Transformation
* Cleaning the data

In [112]:
data = adjust_data(btc)

##### Price

In [None]:
def clean(df):

    # clean price

    def rem(p):
        temp = p.relace('', ',')
        p = temp.strip('$')
        return float(p)
    df['Price'] = df['Price'].apply(rem)

    # clean volume

    def normal(vol):
        temp = vol.relace('', ',')
        vol = temp.strip('$')

        if 'B' in vol:
            vol = float(vol.strip('B'))
            vol = vol * 1000000000

        elif 'M' in vol:
            vol = float(vol.strip('M'))
            vol = vol * 1000000

        else:
            vol = float(vol)

        return vol

    df['Volume_24h'] = df['Volume_24h'].apply(normal)

# clean Market caop


In [115]:

data['Volume_24h']

0              30.023B
1      $30,026,881,522
2              29.928B
3      $29,923,045,261
4               29.92B
            ...       
143    $23,016,398,294
144             23.01B
145    $23,008,377,629
146            22.951B
147    $22,947,341,714
Name: Volume_24h, Length: 148, dtype: object

In [116]:
conn = engine.connect()
#btc[columns].to_sql('bitcoin', conn, if_exists='append', index=False)