Fix: Inconsistent table names in Flink Kafka Sinks. #287
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
The
start_job.py
file creates a Kafka sink namedprocess_events_kafka
, whileaggregation_job.py
creates a Kafka source with the same nameprocess_events_kafka
. This can cause confusion and potentially lead to the aggregation job reading from the sink it is writing to, which would be incorrect.Fix:
--- a/bootcamp\materials\4-apache-flink-training\src\job\start_job.py
+++ b/bootcamp\materials\4-apache-flink-training\src\job\start_job.py
@@ -5,7 +5,7 @@
from pyflink.table import EnvironmentSettings, DataTypes, TableEnvironment, StreamTableEnvironment
def create_processed_events_sink_kafka(t_env):
kafka_key = os.environ.get("KAFKA_WEB_TRAFFIC_KEY", "")
kafka_secret = os.environ.get("KAFKA_WEB_TRAFFIC_SECRET", "")
sasl_config = f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_key}" password="{kafka_secret}";'