<h2 align="center"> Writing Pandas df to Hive db with sqlalchemy</h2>

<h2 align="center"> Erinç Koç </h2>

<h2 align="center"> Outline </h2><br>

### Table of Contents

* [1) Import Libraries & Data ](#chapter1)
* [2) Kerberos Authentication](#chapter2)
* [3) Connect to Cluster Through Kerberos Auth](#chapter3)
* [4) Generate Data to Insert DB](#chapter4)
* [5) Generate Table in Database](#chapter5)
* [6) Insert Data to Table](#chapter6)

### 1) Import Libraries & Data<a class="anchor" id="chapter1"></a>

In [None]:
from sqlalchemy import create_engine
import time 
import pandas as pd
import os, subprocess
import numpy as np

In [None]:
### display-related options
pd.set_option('display.max_rows', 2000)
pd.set_option('display.max_columns', 2000)
pd.set_option('display.max_seq_items', 2000)
pd.options.display.float_format = '{:.2f}'.format

### 2) Kerberos Authentication <a class="anchor" id="chapter2"></a>

##### Authentication should be generated at OS level. I WON'T add keytab files due to privacy issues.

In [None]:
### get kerberos ticket
stream = os.popen('kinit -kt data.keytab user1@pyd.pym')
output = stream.read()

In [None]:
###check status of ticket
def ticket_check():
    return True if subprocess.call(['klist', '-s']) == 0 else False

In [None]:
if ticket_check():
    print('Kerberos ticket is valid')
else:  
    raise RuntimeError('No valid kerberos ticket')

### 3) Connect to Cluster Through Kerberos Auth <a class="anchor" id="chapter3"></a>

In [None]:
engine = create_engine('hive://hivenode1:10000/sasl_db',
                       connect_args={'auth': 'KERBEROS', 'kerberos_service_name': 'hive', 'username':'user1'})

### 4) Generate Data to Insert DB <a class="anchor" id="chapter4"></a>

In [None]:
column_name = []
column_length = 100
row_length = 100
for i in range(column_length):
    column_name.append("col" + str(i))
df = pd.DataFrame(np.random.randint(0,100,size=(row_length,column_length)), columns=column_name)

In [None]:
print('Total size of df {:.8f}'.format((df.values.nbytes + df.index.nbytes + df.columns.nbytes ) / 1024.0**3).format() + ' gb')

### 5) Generate Table in Database <a class="anchor" id="chapter5"></a>

In [None]:
### match type of pandas df to hive type
def type_conversion(df, i):
    if df[i].dtypes == np.float64:
        column_type = "FLOAT"
    elif df[i].dtypes == np.int32:
        column_type = "INT"
    elif df[i].dtypes == object:
        column_type = "STRING"
    else:
        raise RuntimeError("No type")
    return column_type

In [None]:
### Table creation script is generated as string
def generate_create_statment(df, db_name, table_name):
    column_list = df.columns.values.tolist()
    create_statement = "CREATE TABLE " + db_name + "." + table_name + " ("
    for i in column_list:
        corrent_type = type_conversion(df, i)
        create_statement += i + " " + corrent_type + ", "
    create_statement = create_statement[:-2]
    create_statement = create_statement + ")"
    return create_statement

In [None]:
db_name = 'sasl_db'
table_name = 'table_1'
statement_drop = f"DROP TABLE IF EXISTS {db_name}.{table_name}"
statement_create = generate_create_statment(df, db_name, table_name)

### 6) Insert Data to Table <a class="anchor" id="chapter6"></a>

In [None]:
engine.execute(statement_drop)

In [None]:
engine.execute(statement_create)

In [None]:
###multi insertion with chuck
df.to_sql(name=table_name, con=engine, if_exists='append', index=False, method='multi', chunksize=10000)