# Gravitino Flink-Hive Example

## Setting Up PyFlink with Hive and Gravitino Connectors

In [None]:
!sudo apt-get update && sudo apt-get install -y openjdk-17-jdk

In [None]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-arm64"
os.environ["PATH"] = f"{os.environ['JAVA_HOME']}/bin:" + os.environ["PATH"]

In [None]:
!python3 -m pip install apache-flink

In [None]:
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.common import Configuration
from pyflink.table.expressions import col
from pyflink.table import DataTypes

configuration = Configuration()
    
configuration.set_string(
    "pipeline.jars",
    "file:///tmp/gravitino/packages/gravitino-flink-connector-runtime-1.18_2.12-0.6.1-incubating.jar;"
    "file:///tmp/gravitino/packages/flink-sql-connector-hive-2.3.10_2.12-1.20.0.jar"
    )
configuration.set_string("table.catalog-store.kind", "gravitino")
configuration.set_string("table.catalog-store.gravitino.gravitino.uri", "http://gravitino:8090")
configuration.set_string("table.catalog-store.gravitino.gravitino.metalake", "metalake_demo")

env_settings = EnvironmentSettings.new_instance().with_configuration(configuration)
table_env = TableEnvironment.create(env_settings.in_batch_mode().build())


## Write Queries 

In [None]:
table_env.use_catalog("catalog_hive")
table_env.execute_sql("CREATE DATABASE IF NOT EXISTS Reading_System")
table_env.execute_sql("USE Reading_System")
table_env.execute_sql("""
    CREATE TABLE IF NOT EXISTS books (
        id INT,
        title STRING,
        author STRING,
        publish_date STRING
    ) 
""")

In [None]:
result = table_env.execute_sql("SHOW DATABASES")
with result.collect() as results:
    for row in results:
        print(row)


### Write Table API Queries

In [None]:
new_books = table_env.from_elements(
    [
        (4, 'The Great Gatsby', 'F. Scott Fitzgerald', '1925-04-10'),
        (5, 'Moby Dick', 'Herman Melville', '1851-11-14')
    ],
    DataTypes.ROW([
        DataTypes.FIELD("id", DataTypes.INT()),
        DataTypes.FIELD("title", DataTypes.STRING()),
        DataTypes.FIELD("author", DataTypes.STRING()),
        DataTypes.FIELD("publish_date", DataTypes.STRING())
    ])
)


new_books.execute_insert('books').wait()

### Write SQL Queries

In [None]:
table_env.execute_sql("""
    INSERT INTO books VALUES 
    (6, 'Pride and Prejudice', 'Jane Austen', '1813-01-28'),
    (7, 'The Catcher in the Rye', 'J.D. Salinger', '1951-07-16')
""")

### Result

In [None]:
result = table_env.execute_sql("SELECT * FROM books")
with result.collect() as results:
    for row in results:
        print(row)