# 0X Notebook-Template

# Imports

In [11]:
import sys

sys.path.append("../../")

import pandas as pd
import connection as conn

from pyspark.sql import functions as f

from etl.clients import HiveClient
from etl.datamodels import HostDataClass, TableDataClass

# Settings & Utils

In [12]:
hive_host = HostDataClass(host=conn.HIVE_HOST, port=conn.HIVE_PORT)
hdfs_host = HostDataClass(host=conn.HDFS_HOST, port=conn.HDFS_PORT)

In [13]:
hive = HiveClient(host=hive_host, thrift_port=conn.HIVE_THRIFT_PORT)

# Hive Tests

## Hive retrieve data functions

In [14]:
tbl_employee_pay_test = TableDataClass(
    database="default",
    table_name="employee_pay_test"
)
tbl_employee_pay_test

TableDataClass(table_name='employee_pay_test', database='default', schema=None)

In [15]:
hive.get_databases()

['default', 'psa']

In [16]:
display(
    hive.get_tables(),
    hive.get_tables(database="psa")
)

['bigdata',
 'employee_pay_test',
 'firstviewonhive',
 'hiveexternaltable',
 'hiveexternaltable_parquet',
 'my_table',
 'my_table_test1113',
 'new_table',
 'new_table_2',
 'parquetfile',
 'parquetfile2']

['dbo_table', 'hiveexternaltableview', 'newtable']

In [17]:
hive.describe_table(
    table=tbl_employee_pay_test
)

Unnamed: 0,col_name,data_type,comment
0,transaction_date,string,
1,employee_id,bigint,
2,client_id,bigint,
3,costcenter_id,bigint,
4,paytype_id,bigint,
5,amount,double,


In [18]:
hive.read_table(tbl_employee_pay_test, limit=10)

Unnamed: 0,employee_pay_test.transaction_date,employee_pay_test.employee_id,employee_pay_test.client_id,employee_pay_test.costcenter_id,employee_pay_test.paytype_id,employee_pay_test.amount
0,2000-01-31,3800,50,17,1,5298.23
1,2000-01-31,3800,50,17,2,927.19
2,2000-01-31,3800,50,17,3,794.73
3,2000-01-31,3800,50,17,4,662.28
4,2000-01-31,3800,50,17,5,529.82
5,2000-01-31,3800,50,17,6,397.37
6,2000-01-31,3800,50,17,7,264.91
7,2000-01-31,3800,50,17,8,132.46
8,2000-01-31,2091,83,2,1,4795.34
9,2000-01-31,2091,83,2,2,839.18


In [19]:
hive.read_table(
    table=tbl_employee_pay_test,
    columns=["employee_id", "amount"],
    limit=15)

Unnamed: 0,employee_id,amount
0,3800,5298.23
1,3800,927.19
2,3800,794.73
3,3800,662.28
4,3800,529.82
5,3800,397.37
6,3800,264.91
7,3800,132.46
8,2091,4795.34
9,2091,839.18


In [22]:
data = {
    'col-int': pd.Series([1, 2, 3, 4, 5], dtype=int),
    'col-float': pd.Series([1.1, 2.2, 3.3, 4.4, 5.5], dtype=float),
    'col-string': pd.Series(['a', 'b', 'c', 'd', 'e'], dtype=str),
    'col-bool': pd.Series([True, False, True, False, True], dtype=bool),
    'col-datetime': pd.to_datetime(['2022-01-01', '2022-02-01', '2022-03-01', '2022-04-01', '2022-05-01']),
}

df = pd.DataFrame(data)

def generate_hive_table_statement(df, table_name, hdfs_location):
    columns = []
    
    for column_name, dtype in df.dtypes.items():
        # Ignoriere die 'Category'-Spalte
        if dtype.name != 'category':
            # Mappe Pandas-Datentypen zu HiveQL-Datentypen
            hive_dtype = 'BIGINT' if dtype == 'int64' else \
                         'DOUBLE' if dtype == 'float64' else \
                         'STRING' if dtype == 'object' else \
                         'BOOLEAN' if dtype == 'bool' else \
                         'TIMESTAMP' if dtype.name == 'datetime64[ns]' else None

            if hive_dtype:
                columns.append(f"`{column_name}` {hive_dtype}")

    # Erstelle das CREATE EXTERNAL TABLE Statement
    hiveql_statement = (
        f"CREATE EXTERNAL TABLE IF NOT EXISTS {table_name} (\n"
        f"    {', '.join(columns)}\n"
        ")\n"
        "ROW FORMAT DELIMITED\n"
        "FIELDS TERMINATED BY ','\n"
        f"STORED AS TEXTFILE\n"
        f"LOCATION '{hdfs_location}'"
    )

    return hiveql_statement

# Verwendung der Funktion
table_name = 'your_table_name'
hdfs_location = '/your/hdfs/location'
hiveql_statement = generate_hive_table_statement(df, table_name, hdfs_location)

print(hiveql_statement)


CREATE EXTERNAL TABLE IF NOT EXISTS your_table_name (
    `col-int` BIGINT, `col-float` DOUBLE, `col-string` STRING, `col-bool` BOOLEAN, `col-datetime` TIMESTAMP
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/your/hdfs/location'


In [23]:
hive.execute_hive_query(hiveql_statement)

ProgrammingError: No result set

In [None]:
hive.delete_external_table("TestCreateDeleteTable")

## Spark on Hive

In [None]:
raise Exception("Please be sure you want to execute big queries with spark!")

In [10]:
spark = hive.create_spark_session()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/22 22:29:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [12]:
df = spark.sql(f"SELECT * FROM default.employee_pay_test")
df.show()

+----------------+-----------+---------+-------------+----------+-------+
|transaction_date|employee_id|client_id|costcenter_id|paytype_id| amount|
+----------------+-----------+---------+-------------+----------+-------+
|      2000-11-30|       7119|       83|            1|         1|5579.23|
|      2000-11-30|       7119|       83|            1|         2| 976.37|
|      2000-11-30|       7119|       83|            1|         3| 836.88|
|      2000-11-30|       7119|       83|            1|         4|  697.4|
|      2000-11-30|       7119|       83|            1|         5| 557.92|
|      2000-11-30|       7119|       83|            1|         6| 418.44|
|      2000-11-30|       7119|       83|            1|         7| 278.96|
|      2000-11-30|       7119|       83|            1|         8| 139.48|
|      2000-11-30|        612|        3|           17|         1|6263.01|
|      2000-11-30|        612|        3|           17|         2|1096.03|
|      2000-11-30|        612|        

In [14]:
df.printSchema()

root
 |-- transaction_date: string (nullable = true)
 |-- employee_id: long (nullable = true)
 |-- client_id: long (nullable = true)
 |-- costcenter_id: long (nullable = true)
 |-- paytype_id: long (nullable = true)
 |-- amount: double (nullable = true)



In [19]:
df_agg = df.groupBy("client_id", "costcenter_id").agg(f.sum("amount").alias("total_amount"))
df_agg.show()



+---------+-------------+--------------------+
|client_id|costcenter_id|        total_amount|
+---------+-------------+--------------------+
|       92|           10|      3.5432563698E8|
|       50|            1| 3.483273433499998E8|
|       69|            2|3.3774057639000005E8|
|       50|           10|3.6544836635999995E8|
|       67|            2| 3.321358519500001E8|
|       69|           17| 3.547921280399999E8|
|       92|           17| 3.605586988500001E8|
|       63|            1|3.5656114536000025E8|
|       69|           10|3.5766427473000026E8|
|        4|           17| 3.614978070900001E8|
|       50|            2|3.6106176014999956E8|
|       27|            2|3.5264925234000015E8|
|       92|            2|3.5391423122999984E8|
|        3|           15| 3.473282307599999E8|
|       87|            1| 3.418949929499999E8|
|        4|           10| 3.398013108000001E8|
|       50|           17|      3.3833523402E8|
|       63|           17|3.3902211924000007E8|
|       27|  

                                                                                

In [20]:
spark.stop()