# Hive Catalog

This notebook provides an example of external table registration on Hive Metastore and Delta table interaction.

You must provide a .env file with MinIO credentials and two tables as example, one in Parquet and the other in Delta Lake format:

```
MINIO_SECRET_KEY
MINIO_ACCESS_KEY
MINIO_ENDPOINT
ROBOTS_PATH # Delta Table
LEMOM_AREAS # Parquet Table
```

In [None]:
%pip install pyspark==3.5 python-dotenv

In [None]:
# Edit this URI for your environment
HIVE_URI = "172.16.203.10:9083"

In [None]:
import os

from dotenv import load_dotenv
from pyspark.sql import SparkSession

load_dotenv(".env")

os.environ["PYSPARK_SUBMIT_ARGS"] = (
            "--packages org.apache.hadoop:hadoop-aws:3.3.4,io.delta:delta-spark_2.12:3.3.0 pyspark-shell"
        )

app_name = "Data Backend"

print("Initializing spark...")
print(os.getenv("MINIO_ACCESS_KEY"))
print(os.getenv("MINIO_ENDPOINT"))
spark = (
    SparkSession.builder.appName(app_name)
    .config(
        "spark.hadoop.fs.s3a.access.key",
        os.getenv("MINIO_ACCESS_KEY"),
    )
    .config(
        "spark.hadoop.fs.s3a.secret.key",
        os.getenv("MINIO_SECRET_KEY"),
    )
    .config(
        "spark.hadoop.fs.s3a.endpoint",
        os.getenv("MINIO_ENDPOINT"),
    )
    .config(
        "spark.hadoop.fs.s3a.impl",
        "org.apache.hadoop.fs.s3a.S3AFileSystem",
    )
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
        .config(
        "spark.sql.extensions",
        "io.delta.sql.DeltaSparkSessionExtension",
    )
    .config("spark.hive.metastore.uris", "thrift://172.16.203.10:9083") \
    .config("spark.hive.metastore.schema.verification", "false") \
    .config("spark.sql.hive.thriftServer.singleSession", "false") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config('spark.sql.warehouse.dir', "s3a://warehouse/delta/") \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("delta.autoOptimize.optimizeWrite", "true") \
    .config("delta.autoOptimize.autoCompact", "true") \
    .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
    .config("spark.executor.memory", "8g")
    .config("spark.driver.memory", "4g")
    .getOrCreate()
)

sc = spark.sparkContext
sc.setLogLevel("WARN")

In [None]:
datasets = {
    "robots": {
        "path" : os.getenv("ROBOTS_PATH"),
        "type" : "delta"
    },
    "lemom_areas": {
        "path" : os.getenv("LEMOM_AREAS"),
        "type" : "parquet"
    }
}


def load_spark_tables(spark_session: SparkSession):
    """
    Loads Parquet files into Spark temporary views for Silver layer tables.

    This function iterates over a predefined dictionary of datasets and
    their corresponding file paths,
    reads each Parquet file into a Spark DataFrame, and registers the DataFrame
    as a temporary view
    with the dataset name as the view name.

    Args:
        None

    Returns:
        None

    Example Usage:
        ```python
        spark_api = SparkAPI()
        load_spark_tables(spark_api.spark)
        ```

    Notes:
        - The `datasets` dictionary must be defined beforehand, where keys
            are dataset names and values are file paths.
        - Each Parquet file is read using Spark and registered as a temporary
            view for SQL operations.
        - Ensure that the `datasets` dictionary and the Parquet files exist
            before calling this function.
    """
    for dataset_name, settings in datasets.items():
        spark_session.sql("CREATE DATABASE IF NOT EXISTS delta;")
        spark_session.sql("CREATE DATABASE IF NOT EXISTS parquet;")
        print(f"Registering table {dataset_name} of type {settings['type']} in path {settings['path']}")
        spark_session.sql(
            f"""
            CREATE EXTERNAL TABLE IF NOT EXISTS {settings['type']}.{dataset_name}
            USING {settings["type"].upper()}
            LOCATION '{settings['path']}';
        """
        )


In [None]:
spark.sql("SHOW DATABASES;").show()

In [None]:
spark.sql("SHOW TABLES IN delta").show()

In [None]:
spark.sql("SHOW TABLES IN parquet").show()

In [None]:
load_spark_tables(spark)

In [None]:
spark.sql("DESCRIBE delta.robots_uph").show(truncate=False)

In [None]:
spark.sql("DESCRIBE DETAIL parquet.lemom_areas").show(truncate=False)