## Connections and cursor

In [1]:
# %reload_ext autoreload
# %autoreload 2
# from sql_tables import *

In [2]:
import psycopg2
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("load csv into postgres") \
    .getOrCreate()

## Postgres functions

In [2]:
def cur_for_db(db_name):
    conn = psycopg2.connect(
    host = "localhost",
    database = db_name,
    user = "taha",
    password = "1234")
    
    conn.autocommit = True
    cur = conn.cursor()
    return cur


def create_db(cur,db_name):
    cur.execute("SELECT 1 FROM pg_catalog.pg_database WHERE datname = '%s'"%db_name)
    exists = cur.fetchone()
    if not exists:
        cur.execute('CREATE DATABASE %s'%db_name)

def create_schema(cur,schema_name):
    cur.execute("SELECT schema_name FROM information_schema.schemata WHERE schema_name = '%s';"%schema_name)
    exists = cur.fetchone()
    if not exists:
        cur.execute('CREATE SCHEMA %s'%schema_name)


def create_table(cur,table_name):
    cur.execute(eval("CREATE_"+table_name))
    
    
def insert_postgres(df,table_name,schema_name):
    insert_values = [tuple(x) for x in df.collect()]
    records_list_template = ','.join(['%s'] * len(insert_values) )
    insert_query = eval("INSERT_"+table_name).format(schema_name,table_name,records_list_template)

    return insert_query, insert_values


    

## sql queries 

In [3]:
# Create Table Queries

CREATE_account = """CREATE TABLE IF NOT EXISTS accounts_data.account
                        (
                            account_id  integer PRIMARY KEY,
                            district_id integer,
                            frequency   text,
                            date        text
                        );
                        
                    alter table accounts_data.account owner to taha;
        
                """

CREATE_transaction = """CREATE TABLE IF NOT EXISTS accounts_data.transaction
                            (
                                trans_id integer PRIMARY KEY,
                                account_id integer,
                                date text,
                                type text,
                                operation text,
                                amount numeric,
                                balance numeric,
                                k_symbol text,
                                bank text,
                                account integer
                            );

                            alter table accounts_data.transaction owner to taha;
                """

# Insert Queries
INSERT_account = "INSERT INTO {}.{} (account_id, district_id, frequency, date) VALUES {} ON CONFLICT DO NOTHING;"

INSERT_transaction = "INSERT INTO {}.{} (trans_id,account_id,date,type,operation,amount,balance,k_symbol,bank,account) VALUES {} ON CONFLICT DO NOTHING;"


# Transformations
# TODO for later
TRANSFORM_transaction ="""

UPDATE flinks.accounts_data.transaction
SET date =  to_date(cast(date AS text),'YYMMDD');



UPDATE flinks.accounts_data.transaction
SET type =  CASE
                WHEN type='PRIJEM' THEN 'credit'
                WHEN type='VYDAJ' THEN 'debit'
                WHEN type='credit' OR type='debit' THEN type
                ELSE 'unknown_type'
            END
;



UPDATE flinks.accounts_data.transaction
SET type =  CASE
                WHEN type='VYBER KARTOU' THEN 'credit_card_withdrawl'
                WHEN type='VKLAD' THEN 'credit_in_cash'
                WHEN type='PREVOD Z UCTU' THEN 'collection_from_another_bank'
                WHEN type='VYBER' THEN 'withdrawl_in_cash'
                WHEN type='PREVOD NA UCET' THEN 'remittance_to_another_bank'
                WHEN    type='credit_card_withdrawl'
                     OR type='credit_in_cash'
                     OR type='collection_from_another_bank'
                     OR type='withdrawl_in_cash'
                     OR type='remittance_to_another_bank'
                THEN type
                ELSE 'unknown_type'
            END
;

"""







## Create db, schema, tables

In [7]:
posgres_cur = cur_for_db("postgres")             

create_db(posgres_cur,"flinks")

flinks_cur = cur_for_db("flinks")

create_schema(flinks_cur,"accounts_data")


tables = ["account","transaction"]
for table in tables:
    create_table(flinks_cur,table)
    

##  Read csv  files with spark and write them to postgres

In [12]:
import os

for table in tables:
    csv_file = os.path.join(os.path.abspath(''), "data/{}.csv".format(table))
    df = spark.read.format("csv").options(header='true', inferschema='true').load(csv_file).limit(10)
    
    insert_query, insert_values = insert_postgres(df,table,"accounts_data")
    flinks_cur.execute(insert_query, insert_values)

## Transformations and cleaning
TODO for later: load from postgres and do the cleaning here, instead of enrichment notebook

In [18]:
import pandas as pd


conn = psycopg2.connect(
    host = "localhost",
    database = "flinks",
    user = "taha",
    password = "1234")

account_df = pd.read_sql_query('select * from accounts_data.account',con=conn)
transaction_df = pd.read_sql_query('select * from accounts_data.transaction',con=conn)

## df to json 
TODO for later: do json transformations here, before implementing it for flask

In [33]:
import simplejson as json
import psycopg2


conn = psycopg2.connect(
host = "localhost",
database = "flinks",
user = "taha",
password = "1234")

conn.autocommit = True
cur = conn.cursor()
    


def query_db(query, args=()):
    cur.execute(query, args)
    r = [dict((cur.description[i][0], value) \
               for i, value in enumerate(row)) for row in cur.fetchall()]
    cur.connection.close()
    return r[0] if r else None


my_query = query_db("select * from accounts_data.transaction limit %s", (3,))
json_output = json.dumps(my_query, use_decimal=True)
json_output


'[{"trans_id": 695247, "account_id": 2378, "date": "930101", "type": "PRIJEM", "operation": "VKLAD", "amount": 700.0, "balance": 700.0, "k_symbol": null, "bank": null, "account": null}, {"trans_id": 171812, "account_id": 576, "date": "930101", "type": "PRIJEM", "operation": "VKLAD", "amount": 900.0, "balance": 900.0, "k_symbol": null, "bank": null, "account": null}, {"trans_id": 207264, "account_id": 704, "date": "930101", "type": "PRIJEM", "operation": "VKLAD", "amount": 1000.0, "balance": 1000.0, "k_symbol": null, "bank": null, "account": null}]'