In [1]:
from pyflink.table import TableEnvironment, EnvironmentSettings, DataTypes, StreamTableEnvironment

In [2]:
from pyflink.common.typeinfo import RowTypeInfo

In [3]:
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer, KafkaSink, DeliveryGuarantee, KafkaRecordSerializationSchema

In [4]:
from pyflink.common.serialization import SimpleStringSchema

In [5]:
from pyflink.datastream.formats.json import JsonRowDeserializationSchema, JsonRowSerializationSchema

In [6]:
from pyflink.datastream import StreamExecutionEnvironment

In [7]:
from pyflink.table.expressions import col as col_sp

In [8]:
from pyflink.common import Types

In [9]:
from pyflink.common.watermark_strategy import WatermarkStrategy

In [10]:
from pyflink.common.configuration import Configuration

In [11]:
from pyflink.common.restart_strategy import RestartStrategies

In [12]:
from pyflink.table import CsvTableSink

In [13]:
stream_env = StreamExecutionEnvironment.get_execution_environment()

In [14]:
stream_settings = EnvironmentSettings.new_instance()\
                                    .in_streaming_mode()\
                                    .build()

In [15]:
stream_tbl_env = StreamTableEnvironment.create(stream_execution_environment=stream_env,environment_settings=stream_settings)

In [16]:
stream_tbl_env.get_config().get_configuration().set_string("parallelism.default", "1")

<pyflink.common.configuration.Configuration at 0x27ea289e790>

In [17]:
stream_env.set_parallelism(1)

<pyflink.datastream.stream_execution_environment.StreamExecutionEnvironment at 0x27eff863ac0>

In [18]:
# Put Jars in: D:\Users\kritagya\anaconda3\envs\PyFlink\Lib\site-packages\pyflink

In [19]:
# stream_tbl_env.get_config().set('pipeline.jars','file:///C:/Users/krita/Udemy/PyFlink/flink-sql-connector-kafka-1.17.1.jar')

In [20]:
# stream_env.add_jars('pipeline.jars','file:///C:/Users/krita/Udemy/PyFlink/flink-sql-connector-kafka-1.17.1.jar')

In [21]:
json_types = [Types.STRING(),Types.STRING(),Types.INT(),Types.DOUBLE(),Types.SQL_DATE()]

In [22]:
json_names = ['seller_id','product','quantity','product_price','sales_date']

In [23]:
json_schema = Types.ROW_NAMED(field_names=json_names,field_types=json_types)

In [24]:
json_deserialization_schema = JsonRowDeserializationSchema.Builder()\
                                                        .type_info(type_info=json_schema)\
                                                        .ignore_parse_errors()\
                                                        .build()

In [25]:
kafka_source = KafkaSource.builder() \
    .set_bootstrap_servers('localhost:9092') \
    .set_topics("productsales") \
    .set_group_id("source-demo") \
    .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
    .set_value_only_deserializer(deserialization_schema=json_deserialization_schema) \
    .build()

In [26]:
ds_source = stream_env.from_source(source=kafka_source,watermark_strategy=WatermarkStrategy.no_watermarks(),source_name='kafka_tbl_source')

In [27]:
kafka_stream_tbl = stream_tbl_env.from_data_stream(data_stream=ds_source)

In [28]:
kafka_stream_tbl = kafka_stream_tbl.add_columns((col_sp('quantity')*col_sp('product_price')).alias('total_cost'))

In [29]:
ds_sink = stream_tbl_env.to_data_stream(table=kafka_stream_tbl)

In [30]:
json_types = [Types.STRING(),Types.STRING(),Types.INT(),Types.DOUBLE(),Types.SQL_DATE(), Types.DOUBLE()]

In [31]:
json_names = ['seller_id','product','quantity','product_price','sales_date', 'total_cost']

In [32]:
json_schema = Types.ROW_NAMED(field_names=json_names,field_types=json_types)

In [33]:
json_serialization_schema = JsonRowSerializationSchema.Builder()\
                                                    .with_type_info(type_info=json_schema)\
                                                    .build()

In [34]:
kafka_serialization_schema = KafkaRecordSerializationSchema.builder()\
                                                        .set_topic('productsales1')\
                                                        .set_value_serialization_schema(value_serialization_schema=json_serialization_schema)\
                                                        .build()

In [35]:
kafka_sink = KafkaSink.builder()\
                        .set_bootstrap_servers('localhost:9092')\
                        .set_record_serializer(record_serializer=kafka_serialization_schema)\
                        .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE)\
                        .build()

In [36]:
ds_sink.sink_to(sink=kafka_sink)

<pyflink.datastream.data_stream.DataStreamSink at 0x27ea29bbcd0>

In [None]:
stream_env.execute()