# Hive Catalog
So first we test out the Hive Catalog, which is basically using the Hive Metastore as the Iceberg catalog. The Dockerfile and the configurations used to setup the Hive metastore can be found in th `hive-metastore` folder and the Docker Compose file in the repo. This Hive Metastore also connecting to the `hive` database in the postgres instance we have setup.


## Importing Required Libraries
We will be importing `SparkSession` for, well, the Spark session. We also import the Postgress driver `psycopg`, Trino connection libraries, and pandas, to explore the data that we will be writing with Spark.

We also set some styling to display tables better.

In [8]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import psycopg
from trino.dbapi import connect
import pandas as pd

# this is to better display pyspark dataframes
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

## Setting up Spark Session
We set up Spark Session with the configs required to connect to the Hive Metastore. 

It is a single node local spark sessions, setting the driver and executor memories to 4GB, to provide it sufficient memory to load all of the data.
We are setting up `iceberg` as the iceberg catalog, and setting up all the required configs to connect to the Hive Metastore catalog ([details here](https://iceberg.apache.org/docs/latest/configuration/#catalog-properties)).

To connect to our local instance of Minio, we need to set `s3.endpoint` and `s3.path-style-access` configs, and set our warehouse location to be in the folder `iceberg-hive` under the bucket `warehouse` that was created on startup.

In [2]:
iceberg_catalog_name = "iceberg"
spark = SparkSession.builder \
  .appName("iceberg-hive") \
  .config("spark.driver.memory", "4g") \
  .config("spark.executor.memory", "4g") \
  .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
  .config("spark.jars", "/opt/extra-jars/iceberg-spark-runtime.jar,/opt/extra-jars/iceberg-aws-bundle.jar") \
  .config(f"spark.sql.catalog.{iceberg_catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
  .config(f"spark.sql.catalog.{iceberg_catalog_name}.type", "hive") \
  .config(f"spark.sql.catalog.{iceberg_catalog_name}.uri", "thrift://hive-metastore:9083") \
  .config(f"spark.sql.catalog.{iceberg_catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
  .config(f"spark.sql.catalog.{iceberg_catalog_name}.warehouse", "s3://warehouse/iceberg/") \
  .config(f"spark.sql.catalog.{iceberg_catalog_name}.s3.endpoint", "http://minio:9000") \
  .config(f"spark.sql.catalog.{iceberg_catalog_name}.s3.path-style-access", "true") \
  .getOrCreate()


24/09/09 15:36:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Loading Test Data
Now we load the 2 parquet files downloaded previously, into the Spark memory.

In [3]:
df_2024_01 = spark.read.parquet("file:///home/iceberg/workspace/downloaded-data/yellow_tripdata_2024-01.parquet")
df_2024_02 = spark.read.parquet("file:///home/iceberg/workspace/downloaded-data/yellow_tripdata_2024-02.parquet")

                                                                                

Now we check the data to get an idea of the size, structure and the actual data.

In [4]:
print("file: yellow_tripdata_2024-01.parquet")
print(f"Number of rows: {df_2024_01.count()}")
print("Schema:")
df_2024_01.printSchema()
print("Data:")
df_2024_01.show(5)

file: yellow_tripdata_2024-01.parquet
Number of rows: 2964624
Schema:
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)

Data:
+--------+--------------------+------------------

In [5]:
print("file: yellow_tripdata_2024-02.parquet")
print(f"Number of rows: {df_2024_02.count()}")
print("Schema:")
df_2024_02.printSchema()
print("Data:")
df_2024_02.show(5)

file: yellow_tripdata_2024-02.parquet
Number of rows: 3007526
Schema:
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)

Data:
+--------+--------------------+------------------

24/09/09 15:36:27 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


Data look good, so now on to actually writing it to our Iceberg catalog.

## Creating Iceberg namespace under the catalog
First, we need to create a new namespace (schema) under the iceberg catalog. Here we create the namespace `hive` under the catalog, and assign a location in Minio.

In [6]:
spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberg.hive LOCATION 's3://warehouse/iceberg/hive'")

DataFrame[]

## Writing the data to Iceberg Table
We want to create this table first, based on 2024-01 data, partitioned by the month. We can get the month from the `tpep_pickup_datetime` column.

In [11]:
df_2024_01.writeTo("iceberg.hive.yellow_tripdata").partitionedBy(
    F.months("tpep_pickup_datetime")
).create()

                                                                                

We then check how the data is saved to Minio. 

In [13]:
!mc ls --recursive minio/warehouse/

]11;?\[6n[m[32m[2024-09-09 15:36:46 UTC][0m[33m     0B[0m [34mSTANDARD[0m[36;1m iceberg/hive/[0;22m[m
[m[32m[2024-09-09 15:49:23 UTC][0m[33m 5.9KiB[0m [34mSTANDARD[0m[1m iceberg/hive/yellow_tripdata/data/tpep_pickup_datetime_month=2002-12/00000-38-67af578d-850f-4bd2-8503-844b0f3192ba-0-00003.parquet[22m[m
[m[32m[2024-09-09 15:49:23 UTC][0m[33m 5.9KiB[0m [34mSTANDARD[0m[1m iceberg/hive/yellow_tripdata/data/tpep_pickup_datetime_month=2009-01/00000-38-67af578d-850f-4bd2-8503-844b0f3192ba-0-00004.parquet[22m[m
[m[32m[2024-09-09 15:49:23 UTC][0m[33m 6.3KiB[0m [34mSTANDARD[0m[1m iceberg/hive/yellow_tripdata/data/tpep_pickup_datetime_month=2023-12/00000-38-67af578d-850f-4bd2-8503-844b0f3192ba-0-00001.parquet[22m[m
[m[32m[2024-09-09 15:49:23 UTC][0m[33m  44MiB[0m [34mSTANDARD[0m[1m iceberg/hive/yellow_tripdata/data/tpep_pickup_datetime_month=2024-01/00000-38-67af578d-850f-4bd2-8503-844b0f3192ba-0-00002.parquet[22m[m
[m[32m[2024-09-09 15:49

There something interesting here. We are expecting this file to only have data for the month of 2024-01, but there seems to be some data from some other months. Although looking at the size of the partitions, we can see the expected partition month is the biggest, and the rest of the partitions could have some bad data. 

We also check what metadata has been written to the Hive Metastore's attached database. Using the `psycopg` and `pandas` library, can get the data from specific table that the HIve metastore wrote to.

In [29]:
conn = psycopg.connect("postgresql://postgres:postgres@postgres:5432/hive")

The first table is the `DBS` table, which shows that there is a default database with the location pointing to the local file system. This explains why we need to create a new namespace with the location set to our object storage, which is the second row.

In [30]:
pd.read_sql_query('select * from "DBS"', conn)

  pd.read_sql_query('select * from "DBS"', conn)


Unnamed: 0,DB_ID,DESC,DB_LOCATION_URI,NAME,OWNER_NAME,OWNER_TYPE,CTLG_NAME,CREATE_TIME,DB_MANAGED_LOCATION_URI,TYPE,DATACONNECTOR_NAME,REMOTE_DBNAME
0,1,Default Hive database,file:/user/hive/warehouse,default,public,ROLE,hive,1725788794,,NATIVE,,
1,2,,s3://warehouse/iceberg/hive,hive,iceberg,USER,hive,1725896204,,NATIVE,,


Next we can look at the `TBLS` table, which shows the record of our recently created Iceberg table.

In [31]:
pd.read_sql_query('select * from "TBLS"', conn)

  pd.read_sql_query('select * from "TBLS"', conn)


Unnamed: 0,TBL_ID,CREATE_TIME,DB_ID,LAST_ACCESS_TIME,OWNER,OWNER_TYPE,RETENTION,SD_ID,TBL_NAME,TBL_TYPE,VIEW_EXPANDED_TEXT,VIEW_ORIGINAL_TEXT,IS_REWRITE_ENABLED,WRITE_ID
0,1,1725896964,2,-679888,iceberg,USER,2147483647,1,yellow_tripdata,EXTERNAL_TABLE,,,False,0


Finally we look at the `TABLE_PARAMS` table, which has the more information about the created Iceberg table, such as the table statistics, the current snapshot summary and schema, and the location of the Iceberg table metadata in Minio

In [32]:
pd.read_sql_query('select * from "TABLE_PARAMS"', conn)

  pd.read_sql_query('select * from "TABLE_PARAMS"', conn)


Unnamed: 0,TBL_ID,PARAM_KEY,PARAM_VALUE
0,1,default-partition-spec,"{""spec-id"":0,""fields"":[{""name"":""tpep_pickup_da..."
1,1,current-schema,"{""type"":""struct"",""schema-id"":0,""fields"":[{""id""..."
2,1,uuid,c5c14cb9-a698-4d6f-b2d8-a03c1f83953b
3,1,transient_lastDdlTime,1725896964
4,1,write.parquet.compression-codec,zstd
5,1,owner,iceberg
6,1,table_type,ICEBERG
7,1,numFilesErasureCoded,0
8,1,EXTERNAL,TRUE
9,1,numRows,5972150


There are other tables that get updated, but these are the main one. 

## Adding New partition to the table
Now, we will add the file for the month of 2024-02 as a new partition to the table we just created. we can do that by using the `append` option of the write command.

In [18]:
df_2024_02.writeTo("iceberg.hive.yellow_tripdata").append()

                                                                                

And we check the data in Minio again, to see if the new partition has been created.

In [19]:
!mc ls --recursive minio/warehouse/

]11;?\[6n[m[32m[2024-09-09 15:36:46 UTC][0m[33m     0B[0m [34mSTANDARD[0m[36;1m iceberg/hive/[0;22m[m
[m[32m[2024-09-09 15:49:23 UTC][0m[33m 5.9KiB[0m [34mSTANDARD[0m[1m iceberg/hive/yellow_tripdata/data/tpep_pickup_datetime_month=2002-12/00000-38-67af578d-850f-4bd2-8503-844b0f3192ba-0-00003.parquet[22m[m
[m[32m[2024-09-10 15:34:06 UTC][0m[33m 5.3KiB[0m [34mSTANDARD[0m[1m iceberg/hive/yellow_tripdata/data/tpep_pickup_datetime_month=2008-12/00000-47-81691b34-dddc-4409-ad0f-982b7862cc59-0-00003.parquet[22m[m
[m[32m[2024-09-09 15:49:23 UTC][0m[33m 5.9KiB[0m [34mSTANDARD[0m[1m iceberg/hive/yellow_tripdata/data/tpep_pickup_datetime_month=2009-01/00000-38-67af578d-850f-4bd2-8503-844b0f3192ba-0-00004.parquet[22m[m
[m[32m[2024-09-10 15:34:06 UTC][0m[33m 5.3KiB[0m [34mSTANDARD[0m[1m iceberg/hive/yellow_tripdata/data/tpep_pickup_datetime_month=2009-01/00000-47-81691b34-dddc-4409-ad0f-982b7862cc59-0-00004.parquet[22m[m
[m[32m[2024-09-09 15:49

Again we see the expected partition created, and some extra partitions with stray data. We also see an new setup of metadata files being created. 

Querying the snapshots for this table, we can see there are 2, one for the creation of the table, and one for addition of the next partition. 

In [20]:
spark.sql("select * from iceberg.hive.yellow_tripdata.snapshots").show()

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2024-09-09 15:49:...|8347670030789304497|               NULL|   append|s3://warehouse/ic...|{spark.app.id -> ...|
|2024-09-10 15:34:...|8346019809249799834|8347670030789304497|   append|s3://warehouse/ic...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+



We can also run a query to check the stats for all the partitions in the table.

In [26]:
spark.sql("select * from iceberg.hive.yellow_tripdata.partitions").show()

+---------+-------+------------+----------+-----------------------------+----------------------------+--------------------------+----------------------------+--------------------------+--------------------+------------------------+
|partition|spec_id|record_count|file_count|total_data_file_size_in_bytes|position_delete_record_count|position_delete_file_count|equality_delete_record_count|equality_delete_file_count|     last_updated_at|last_updated_snapshot_id|
+---------+-------+------------+----------+-----------------------------+----------------------------+--------------------------+----------------------------+--------------------------+--------------------+------------------------+
|    {648}|      0|     2964617|         2|                     46495595|                           0|                         0|                           0|                         0|2024-09-10 15:34:...|     8346019809249799834|
|    {649}|      0|     3007514|         2|                     46708043

## Querying with Trino
To start querying the data with Trino, we first need to configure Trino to connect to the [Hive catalog](https://trino.io/docs/current/object-storage/metastores.html#hive-thrift-metastore) using the following catalog properties (which has already been setup in the Trino configuration folder):

```
connector.name=iceberg
iceberg.catalog.type=hive_metastore
hive.metastore.uri=thrift://hive-metastore:9083
fs.native-s3.enabled=true
s3.endpoint=http://minio:9000
s3.path-style-access=true
s3.aws-access-key=${ENV:AWS_ACCESS_KEY_ID}
s3.aws-secret-key=${ENV:AWS_SECRET_ACCESS_KEY}
s3.region=${ENV:AWS_REGION}
```

We then use the Trino python client, together with pandas to ready the data back. First we setup the connection:

In [36]:
trino_conn = connect(
    host="trino",
    port=8080,
    user="user"
)

Then we read the data into a pandas dataframe

In [38]:
pd.read_sql_query('select * from "iceberg-hive".hive.yellow_tripdata limit 10', trino_conn)

  pd.read_sql_query('select * from "iceberg-hive".hive.yellow_tripdata limit 10', trino_conn)


Unnamed: 0,vendorid,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,ratecodeid,store_and_fwd_flag,pulocationid,dolocationid,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,2,2024-01-01 00:57:55,2024-01-01 01:17:43,1,1.72,1,N,186,79,2,17.7,1.0,0.5,0.0,0.0,1.0,22.7,2.5,0.0
1,1,2024-01-01 00:36:38,2024-01-01 00:44:56,1,1.4,1,N,79,211,1,10.0,3.5,0.5,2.0,0.0,1.0,17.0,2.5,0.0
2,1,2024-01-01 00:46:51,2024-01-01 00:52:57,1,0.8,1,N,211,148,1,7.9,3.5,0.5,3.2,0.0,1.0,16.1,2.5,0.0
3,1,2024-01-01 00:54:08,2024-01-01 01:26:31,1,4.7,1,N,148,141,1,29.6,3.5,0.5,6.9,0.0,1.0,41.5,2.5,0.0
4,2,2024-01-01 00:49:44,2024-01-01 01:15:47,2,10.82,1,N,138,181,1,45.7,6.0,0.5,10.0,0.0,1.0,64.95,0.0,1.75
5,1,2024-01-01 00:03:00,2024-01-01 00:09:36,1,1.8,1,N,140,236,1,10.0,3.5,0.5,3.75,0.0,1.0,18.75,2.5,0.0
6,1,2024-01-01 00:17:06,2024-01-01 00:35:01,1,4.7,1,N,236,79,1,23.3,3.5,0.5,3.0,0.0,1.0,31.3,2.5,0.0
7,1,2024-01-01 00:30:40,2024-01-01 00:58:40,0,3.0,1,N,246,231,2,25.4,3.5,0.5,0.0,0.0,1.0,30.4,2.5,0.0
8,2,2024-01-01 00:26:01,2024-01-01 00:54:12,1,5.44,1,N,161,261,2,31.0,1.0,0.5,0.0,0.0,1.0,36.0,2.5,0.0
9,2,2024-01-01 00:28:08,2024-01-01 00:29:16,1,0.04,1,N,113,113,2,3.0,1.0,0.5,0.0,0.0,1.0,8.0,2.5,0.0


We can also use Trino to query the Iceberg metadata, with a slightly different syntax.

In [39]:
pd.read_sql_query('select * from "iceberg-hive".hive."yellow_tripdata$snapshots"', trino_conn)

  pd.read_sql_query('select * from "iceberg-hive".hive."yellow_tripdata$snapshots"', trino_conn)


Unnamed: 0,committed_at,snapshot_id,parent_id,operation,manifest_list,summary
0,2024-09-09 15:49:23.822000+00:00,8347670030789304497,,append,s3://warehouse/iceberg/hive/yellow_tripdata/me...,"{'spark.app.id': 'local-1725896172891', 'chang..."
1,2024-09-10 15:34:06.355000+00:00,8346019809249799834,8.34767e+18,append,s3://warehouse/iceberg/hive/yellow_tripdata/me...,"{'spark.app.id': 'local-1725896172891', 'chang..."


In [40]:
pd.read_sql_query('select * from "iceberg-hive".hive."yellow_tripdata$partitions"', trino_conn)

  pd.read_sql_query('select * from "iceberg-hive".hive."yellow_tripdata$partitions"', trino_conn)


Unnamed: 0,partition,record_count,file_count,total_size,data
0,(tpep_pickup_datetime_month: 467),1,1,5433,"(VendorID: (min: 2, max: 2, null_count: 0, nan..."
1,(tpep_pickup_datetime_month: 468),4,2,11514,"(VendorID: (min: 2, max: 2, null_count: 0, nan..."
2,(tpep_pickup_datetime_month: 647),10,1,6418,"(VendorID: (min: 2, max: 2, null_count: 0, nan..."
3,(tpep_pickup_datetime_month: 648),2964617,2,46495595,"(VendorID: (min: 1, max: 6, null_count: 0, nan..."
4,(tpep_pickup_datetime_month: 395),2,1,6043,"(VendorID: (min: 2, max: 2, null_count: 0, nan..."
5,(tpep_pickup_datetime_month: 649),3007514,2,46708043,"(VendorID: (min: 1, max: 2, null_count: 0, nan..."
6,(tpep_pickup_datetime_month: 650),2,1,5908,"(VendorID: (min: 2, max: 2, null_count: 0, nan..."
