In [8]:
import os
import boto3
import json
import pandas as pd

In [9]:
os.environ["AWS_ACCESS_KEY_ID"] = "dummy"
os.environ["AWS_SECRET_ACCESS_KEY"] = "dummy"
os.environ["AWS_ENDPOINT_URL"] = "http://localhost:4566"
os.environ["AWS_REGION"] = "ap-south-1"

In [13]:
# Import necessary libraries
from pyflink.table import EnvironmentSettings, TableEnvironment
import os

# 1. Create a Table Environment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

# Define configuration properties for the Flink job
props = [
    {
        "PropertyGroupId": "kinesis.analytics.flink.run.options",
        "PropertyMap": {
            "python": "GettingStarted/getting-started.py",  # Python script to be executed
            "jarfile": "flink-sql-connector-kinesis-1.16.1.jar"  # Flink connector JAR file
        }
    },
    {
        "PropertyGroupId": "consumer.config.0",
        "PropertyMap": {
            "input.stream.name": "input-stream",        # Name of the Kinesis Data Stream
            "flink.stream.initpos": "TRIM_HORIZON",     # Initial position to start reading the stream
            "aws.region": "ap-south-1"                   # AWS region where the stream is located
        }
    }
]

# Function to extract property map from the configuration properties
def property_map(props, property_group_id):
    for prop in props:
        if prop["PropertyGroupId"] == property_group_id:
            return prop["PropertyMap"]

# Function to create a source table definition for Kinesis stream
# TODO windowing on this create table statement to store the states
def create_source_table(table_name, stream_name, region, stream_initpos):
    return f""" 
    CREATE TABLE {table_name} (
        event_uuid VARCHAR,
        event_name VARCHAR,
        event_data VARCHAR,
        event_type VARCHAR,
        event_timestamp INTEGER,
        shard_id INTEGER,
        event_source VARCHAR,
        event_version VARCHAR
    )
    PARTITIONED BY (event_name)
    WITH (
        'connector' = 'kinesis',
        'stream' = '{stream_name}',
        'aws.region' = '{region}',
        'aws.endpoint' = 'http://localhost:4566',
        'aws.credentials.basic.accesskeyid' = 'dummy',
        'aws.credentials.basic.secretkey' = 'dummy',
        'scan.stream.initpos' = '{stream_initpos}',
        'format' = 'json',
        'json.timestamp-format.standard' = 'ISO-8601'
    ) 
    """

# Main function
def main():
    # Define keys for accessing properties
    input_property_group_key = "consumer.config.0"
    input_stream_key = "input.stream.name"
    input_region_key = "aws.region"
    input_starting_position_key = "flink.stream.initpos"

    # Table name for the source data
    input_table_name = "input_table"

    # Get application properties from the props list
    input_property_map = property_map(props, input_property_group_key)
    input_stream = input_property_map[input_stream_key]
    input_region = input_property_map[input_region_key]
    stream_initpos = input_property_map[input_starting_position_key]

    # Check if running in a local environment and set pipeline.jars configuration
    CURRENT_DIR = os.getcwd()  # Get the current working directory
    table_env.get_config().get_configuration().set_string(
        "pipeline.jars",
        "file:///" + CURRENT_DIR + "/flink-sql-connector-kinesis-1.16.1.jar",
    )

    # 2. Create a source table from a Kinesis Data Stream
    table_env.execute_sql(
        create_source_table(input_table_name, input_stream, input_region, stream_initpos)
    )

    print("Successfully created source tables ")

    # 3. Read data from the source table and print it to the console
    query = f"SELECT * FROM {input_table_name}"
    print(f"Executing query: {query}")
    
    # table_env.execute_sql(query).print()
    


# Entry point for the script
if __name__ == "__main__":
    main()

Successfully created source tables 
Executing query: SELECT * FROM input_table


In [14]:
# table_env.execute_sql("show tables " ).print()

In [15]:
# table_env.execute_sql("select * from  input_table " ).print()

In [16]:
# add required jar files

CURRENT_DIR = os.getcwd()

# Define a list of JAR file names you want to add
jar_files = [
    "flink-s3-fs-hadoop-1.16.1.jar",
    "hudi-flink1.13-bundle-0.13.1.jar",
    "flink-sql-connector-kinesis-1.16.1.jar"
]

# Build the list of JAR URLs by prepending 'file:///' to each file name
jar_urls = [f"file:///{CURRENT_DIR}/{jar_file}" for jar_file in jar_files]
      
table_env.get_config().get_configuration().set_string(
    "pipeline.jars",
    ";".join(jar_urls)
)

<pyflink.common.configuration.Configuration at 0x11793e310>

In [17]:
# write to hudi 
s3_bucket = 'south-test-1'
hudi_output_path = f's3a://{s3_bucket}/tmp1'

hudi_sink = f"""
DROP TABLE IF EXISTS hand_data;

CREATE TABLE hand_data(
    event_uuid VARCHAR,
    event_name VARCHAR,
    event_data VARCHAR,
    event_type VARCHAR,
    event_timestamp INTEGER,
    shard_id INTEGER,
    event_source VARCHAR,
    event_version VARCHAR
)
PARTITIONED BY (`event_name`)
WITH (
    'connector' = 'hudi',
    'path' = '{hudi_output_path}' ,
    'table.type' = 'MERGE_ON_READ' ,
    'read.streaming.enabled' = 'true'
);
"""

print(f"{hudi_sink}")

# Execute the SQL to create the Hudi table
# table_env.execute_sql(hudi_sink)


# Execute the DDL statements
for statement in hudi_sink.strip().split(';'):
    if statement.strip():
        table_env.execute_sql(statement.strip())

# Define the data to be inserted into the Hudi table
table_env.execute_sql("""
    INSERT INTO hand_data
        SELECT * FROM input_table
""").print()


DROP TABLE IF EXISTS hand_data;

CREATE TABLE hand_data(
    event_uuid VARCHAR,
    event_name VARCHAR,
    event_data VARCHAR,
    event_type VARCHAR,
    event_timestamp INTEGER,
    shard_id INTEGER,
    event_source VARCHAR,
    event_version VARCHAR
)
PARTITIONED BY (`event_name`)
WITH (
    'connector' = 'hudi',
    'path' = 's3a://south-test-1/tmp1' ,
    'table.type' = 'MERGE_ON_READ' ,
    'read.streaming.enabled' = 'true'
);



Py4JJavaError: An error occurred while calling o332.executeSql.
: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.hand_data'.

Table options are:

'connector'='hudi'
'path'='s3a://south-test-1/tmp1'
'read.streaming.enabled'='true'
'table.type'='MERGE_ON_READ'
	at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:331)
	at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:451)
	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:227)
	at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:177)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
	at scala.collection.Iterator.foreach(Iterator.scala:937)
	at scala.collection.Iterator.foreach$(Iterator.scala:937)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
	at scala.collection.IterableLike.foreach(IterableLike.scala:70)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike.map(TraversableLike.scala:233)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:177)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1296)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:874)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1112)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.hudi.exception.HoodieIOException: Get table config error
	at org.apache.hudi.util.StreamerUtil.getTableConfig(StreamerUtil.java:299)
	at org.apache.hudi.table.HoodieTableFactory.setupTableOptions(HoodieTableFactory.java:104)
	at org.apache.hudi.table.HoodieTableFactory.createDynamicTableSink(HoodieTableFactory.java:93)
	at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:328)
	... 29 more
Caused by: java.nio.file.AccessDeniedException: s3a://south-test-1/tmp1/.hoodie: getFileStatus on s3a://south-test-1/tmp1/.hoodie: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 7KM7ZE33ESGE7H9B; S3 Extended Request ID: H9shpbgMzSVOEdgAXr06E6x9UZBIcKjdDBX0zjMhufoHIG6sNzBI23Zcg1JeUzac29AGntmQln0=; Proxy: null), S3 Extended Request ID: H9shpbgMzSVOEdgAXr06E6x9UZBIcKjdDBX0zjMhufoHIG6sNzBI23Zcg1JeUzac29AGntmQln0=:403 Forbidden
	at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:255)
	at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3796)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$exists$34(S3AFileSystem.java:4703)
	at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
	at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:4701)
	at org.apache.hudi.util.StreamerUtil.getTableConfig(StreamerUtil.java:295)
	... 32 more
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 7KM7ZE33ESGE7H9B; S3 Extended Request ID: H9shpbgMzSVOEdgAXr06E6x9UZBIcKjdDBX0zjMhufoHIG6sNzBI23Zcg1JeUzac29AGntmQln0=; Proxy: null), S3 Extended Request ID: H9shpbgMzSVOEdgAXr06E6x9UZBIcKjdDBX0zjMhufoHIG6sNzBI23Zcg1JeUzac29AGntmQln0=
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206)
	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1360)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$10(S3AFileSystem.java:2545)
	at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414)
	at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2533)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2513)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3776)
	... 40 more


In [43]:
table_env.execute_sql("""
    show tables
""").print()

+-------------+
|  table name |
+-------------+
|   hand_data |
| input_table |
+-------------+
2 rows in set


In [7]:
table_env.execute_sql("""
    drop table input_table
""").print()

OK
