Skip to content

[SUPPORT] How to use Flink with External Hive MetaStore (HiveSync) For Community Video #11105

@soumilshah1995

Description

@soumilshah1995

Hello,

I'm currently working on creating comprehensive content for our community, focusing on integrating Apache Flink (version 1.17.1) with Python (3.8.19). While building the setup, I've encountered an issue that requires assistance from the community.

Setup:

Apache Flink version: 1.17.1
Python version: 3.8.19
Steps to Reproduce:

I've provided a Docker Compose file along with the necessary configurations to replicate the setup. Below are the essential components included in the Docker Compose file:

version: "3"

services:
  mysql:
    image: quay.io/debezium/example-mysql:2.1
    container_name: mysql
    ports:
      - "3306:3306"
    environment:
      MYSQL_ROOT_PASSWORD: debezium
      MYSQL_USER: mysqluser
      MYSQL_PASSWORD: mysqlpw
    restart: always

  fast-data-dev:
    image: dougdonohoe/fast-data-dev
    ports:
      - "3181:3181"
      - "3040:3040"
      - "7081:7081"
      - "7082:7082"
      - "7083:7083"
      - "7092:7092"
      - "8081:8081"
    environment:
      - ZK_PORT=3181
      - WEB_PORT=3040
      - REGISTRY_PORT=8081
      - REST_PORT=7082
      - CONNECT_PORT=7083
      - BROKER_PORT=7092
      - ADV_HOST=127.0.0.1

  trino-coordinator:
    image: 'trinodb/trino:latest'
    hostname: trino-coordinator
    ports:
      - '8080:8080'
    volumes:
      - ./trino/etc:/etc/trino

  metastore_db:
    image: postgres:11
    hostname: metastore_db
    ports:
      - 5432:5432
    environment:
      POSTGRES_USER: hive
      POSTGRES_PASSWORD: hive
      POSTGRES_DB: metastore
    command: ["postgres", "-c", "wal_level=logical"]
    healthcheck:
      test: ["CMD", "psql", "-U", "hive", "-c", "SELECT 1"]
      interval: 10s
      timeout: 5s
      retries: 5
    volumes:
      - ./postgresscripts:/docker-entrypoint-initdb.d

  hive-metastore:
    hostname: hive-metastore
    image: 'starburstdata/hive:3.1.2-e.18'
    ports:
      - '9083:9083' # Metastore Thrift
    environment:
      HIVE_METASTORE_DRIVER: org.postgresql.Driver
      HIVE_METASTORE_JDBC_URL: jdbc:postgresql://metastore_db:5432/metastore
      HIVE_METASTORE_USER: hive
      HIVE_METASTORE_PASSWORD: hive
      HIVE_METASTORE_WAREHOUSE_DIR: s3://warehouse/
      S3_ENDPOINT: http://minio:9000
      S3_ACCESS_KEY: admin
      S3_SECRET_KEY: password
      S3_PATH_STYLE_ACCESS: "true"
      REGION: ""
      GOOGLE_CLOUD_KEY_FILE_PATH: ""
      AZURE_ADL_CLIENT_ID: ""
      AZURE_ADL_CREDENTIAL: ""
      AZURE_ADL_REFRESH_URL: ""
      AZURE_ABFS_STORAGE_ACCOUNT: ""
      AZURE_ABFS_ACCESS_KEY: ""
      AZURE_WASB_STORAGE_ACCOUNT: ""
      AZURE_ABFS_OAUTH: ""
      AZURE_ABFS_OAUTH_TOKEN_PROVIDER: ""
      AZURE_ABFS_OAUTH_CLIENT_ID: ""
      AZURE_ABFS_OAUTH_SECRET: ""
      AZURE_ABFS_OAUTH_ENDPOINT: ""
      AZURE_WASB_ACCESS_KEY: ""
      HIVE_METASTORE_USERS_IN_ADMIN_ROLE: "admin"
    depends_on:
      - metastore_db
    healthcheck:
      test: bash -c "exec 6<> /dev/tcp/localhost/9083"

  minio:
    image: minio/minio
    environment:
      - MINIO_ROOT_USER=admin
      - MINIO_ROOT_PASSWORD=password
      - MINIO_DOMAIN=minio
    networks:
      default:
        aliases:
          - warehouse.minio
    ports:
      - 9001:9001
      - 9000:9000
    command: ["server", "/data", "--console-address", ":9001"]

  mc:
    depends_on:
      - minio
    image: minio/mc
    environment:
      - AWS_ACCESS_KEY_ID=admin
      - AWS_SECRET_ACCESS_KEY=password
      - AWS_REGION=us-east-1
    entrypoint: >
      /bin/sh -c "
      until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
      /usr/bin/mc rm -r --force minio/warehouse;
      /usr/bin/mc mb minio/warehouse;
      /usr/bin/mc policy set public minio/warehouse;
      tail -f /dev/null
      "



volumes:
  hive-metastore-postgresql:

networks:
  default:
    name: hudi

Issue:

The problem arises when I include the following three parameters related to Hive sync in my Flink setup:

'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://localhost:9083'

With these parameters included, I'm unable to insert records into the system. However, when I remove these parameters, the system functions properly, allowing record insertion.

Additional Context:

This setup has been previously tested for Delta Streamer, as documented here. The goal is to achieve similar functionality with Apache Flink instead of Spark.

Code Snippet:

I've provided a full code snippet demonstrating the setup and execution process. This includes defining the Hudi table, executing SQL queries, and attempting to insert records into the Hudi table.

import os
os.environ['JAVA_HOME'] = '/opt/homebrew/opt/openjdk@11'

from pyflink.table import EnvironmentSettings, TableEnvironment
import os

# Create a batch TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
CURRENT_DIR = os.getcwd()

# Define a list of JAR file names you want to add
jar_files = [
    "jar/flink-s3-fs-hadoop-1.17.1.jar",
    "jar/hudi-flink1.17-bundle-0.14.0.jar"
]

jar_urls = [f"file:///{CURRENT_DIR}/{jar_file}" for jar_file in jar_files]

table_env.get_config().get_configuration().set_string(
    "pipeline.jars",
    ";".join(jar_urls)
)

hudi_output_path = 'file:////Users/soumilshah/Desktop/my-flink-environment/hudi/'

hudi_sink = f"""
CREATE TABLE hudi_table(
    ts BIGINT,
    uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED,
    rider VARCHAR(20),
    driver VARCHAR(20),
    fare DOUBLE,
    city VARCHAR(20)
)
PARTITIONED BY (`city`)
WITH (
    'connector' = 'hudi',
    'path' = '{hudi_output_path}' ,
    'table.type' = 'COPY_ON_WRITE' ,
    'hive_sync.enable' = 'true',
    'hive_sync.mode' = 'hms',
    'hive_sync.metastore.uris' = 'thrift://localhost:9083'
);
"""

# Execute the SQL to create the Hudi table
table_env.execute_sql(hudi_sink).wait()

# Define the SQL query to select data from the Hudi table source
query = """
INSERT INTO hudi_table
VALUES
(1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
(1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'),
(1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'),
(1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
(1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'),
(1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo'),
(1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai'),
(1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
"""

table_env.execute_sql(query)

# Define the SQL query to select data from the Hudi table source
query = """
SELECT * FROM hudi_table where  uuid = '334e26e9-8355-45cc-97c6-c31daf0df330';
"""

table_env.execute_sql(query).print()

Request for Assistance:

I'm seeking assistance from the community to understand how to properly configure Hive sync parameters in my Flink setup. Any guidance or insights would be greatly appreciated.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    Status

    ✅ Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions