<h2 align="center"> Writing pandas df to hive db </h2>

<h2 align="center"> Erinç Koç </h2>

<h2 align="center"> Outline </h2><br>

### Table of Contents

* [1) Import Libraries & Data ](#chapter1)
* [2) Kerberos Authentication](#chapter2)
* [3) Connect to Cluster Through Kerberos Auth](#chapter3)
* [4) Generate Data to Insert DB](#chapter4)
* [5) Generate Table in Database](#chapter5)
* [6) Insert Data to Table](#chapter6)
* [7) Fetch Inserted Data](#chapter7)

### 1) Import Libraries & Data<a class="anchor" id="chapter1"></a>

In [None]:
from pyhive import hive
import time 
import pandas as pd
import os, subprocess
import numpy as np

In [None]:
### display-related options
pd.set_option('display.max_rows', 2000)
pd.set_option('display.max_columns', 2000)
pd.set_option('display.max_seq_items', 2000)
pd.options.display.float_format = '{:.2f}'.format

### 2) Kerberos Authentication <a class="anchor" id="chapter2"></a>

##### Authentication should be generated at OS level. I WON'T add keytab files due to privacy issues.

In [None]:
### get kerberos ticket
stream = os.popen('kinit -kt data.keytab user1@pyd.pym')
output = stream.read()

In [None]:
###check status of ticket
def ticket_check():
    return True if subprocess.call(['klist', '-s']) == 0 else False

In [None]:
if ticket_check():
    print('Kerberos ticket is valid')
else:  
    raise RuntimeError('No valid kerberos ticket')

### 3) Connect to Cluster Through Kerberos Auth <a class="anchor" id="chapter3"></a>

In [None]:
conn = hive.Connection(host="hivenode1",port=10000,username="user1",auth="KERBEROS",kerberos_service_name="hive")
cur = conn.cursor()

### 4) Generate Data to Insert DB <a class="anchor" id="chapter4"></a>

In [None]:
column_name = []
column_length = 100
row_length = 100
for i in range(column_length):
    column_name.append("col" + str(i))
df = pd.DataFrame(np.random.randint(0,100,size=(row_length,column_length)), columns=column_name)

In [None]:
print('Total size of df {:.8f}'.format((df.values.nbytes + df.index.nbytes + df.columns.nbytes ) / 1024.0**3).format() + ' gb')

In [None]:
### convert df to tuples in order to use it in sql query 
for r in df.columns.values:
    df[r] = df[r].map(str)
    df[r] = df[r].map(str.strip)   
tuples = [tuple(x) for x in df.values]

Convert dataframe to chunks of size n so that in each trial n number of data will be inserted to hive table

In [None]:
def chunks(l, n):
    n = max(1, n)
    return [l[i:i + n] for i in range(0, len(l), n)]

new_list = chunks(tuples, 5)

##### There exists a better way to write data to a hive table. partiained hiver table can be used. Moreover, by using multi thread or multi process option of python, multi partitions can be inserted the table location. 

### 4) Generate Table in Database <a class="anchor" id="chapter5"></a>

In [None]:
### match type of pandas df to hive type
def type_conversion(df, i):
    if df[i].dtypes == np.float64:
        column_type = "FLOAT"
    if df[i].dtypes == np.int64:
        column_type = "FLOAT"
    if df[i].dtypes == object:
        column_type = "STRING"
    else:
        raise RuntimeError("No type")
    return column_type

In [None]:
### Table creation script is generated as string
def generate_create_statment(df, db_name, table_name):
    column_list = df.columns.values.tolist()
    create_statement = "CREATE TABLE " + db_name + "." + table_name + " ("
    for i in column_list:
        corrent_type = type_conversion(df, i)
        create_statement += i + " " + corrent_type + ", "
    create_statement = create_statement[:-2]
    create_statement = create_statement + ")"
    return create_statement

In [None]:
db_name = 'sasl_db'
table_name = 'table_1'
statement_1 = f"DROP TABLE IF EXISTS {db_name}.{table_name}"
statement = generate_create_statment(df, db_name, table_name)

In [None]:
cur.execute(statement_1)
cur.execute(statement)

### 6) Insert Data to Table <a class="anchor" id="chapter6"></a>

In [None]:
### generate insertion script as string and then insert to table in db
start_time = time.time()
for i in range(len(new_list)):
    sql = f"INSERT INTO {db_name}.{table_name}" + " VALUES {}".format(new_list[i])
    for char in ['[', ']']:
        sql = sql.replace(char, "")
    cur.execute(sql)
print("---execution time %s seconds ---" % (time.time() - start_time))

### 7) Fetch Inserted Data <a class="anchor" id="chapter7"></a>

In [None]:
cur.execute(f"select * from {db_name}.{table_name}")
df_insert = pd.DataFrame(cur.fetchall())