diff --git a/_posts/2023-03-30-Command_Processing_and_Event_Driven_Workflows_with_Apache_Kafka_and_Airflow.md b/_posts/2023-03-30-Command_Processing_and_Event_Driven_Workflows_with_Apache_Kafka_and_Airflow.md index 5010bba266..ad203f0537 100644 --- a/_posts/2023-03-30-Command_Processing_and_Event_Driven_Workflows_with_Apache_Kafka_and_Airflow.md +++ b/_posts/2023-03-30-Command_Processing_and_Event_Driven_Workflows_with_Apache_Kafka_and_Airflow.md @@ -61,7 +61,7 @@ A general architecture is as follows, 2. A command dispatcher service (like a Kafka consumer) handles reading & committing offsets including hand off to Airflow by invoking the concerned DAG (via it's REST interface). By doing this, we therefore separate consuming from processing. 3. Airflow DAG itself is parametrized to process one command / event at a time. This therefore provides granular observability and auditable controls of what happened, thus enabling retrying/re-processing or dead-letter queue. -![Dispatcher_Architecture.png](/assets/blog-images/command_processing_blog/Dispatcher_Architecture.png) +![Dispatcher_Architecture.png](../assets/blog-images/command_processing_blog/Dispatcher_Architecture.png) This pattern could thus be used for: @@ -311,7 +311,7 @@ send_email_task In this example, we've defined a DAG called `send_email_alert` that runs once (**`schedule_interval='@once'`**) and has a single task called **`send_email_task`**. The **`EmailOperator`** is used to send the email, and we've defined the recipient, subject, and body of the email as arguments to the operator. The recipient parameter is passed from dag run conf parameter. We've also specified some default arguments for the DAG like the number of times to retry the task if it fails etc. -![EmailDAG.png](/assets/blog-images/command_processing_blog/EmailDAG.png) +![EmailDAG.png](../assets/blog-images/command_processing_blog/EmailDAG.png) To run this DAG, you'll need to have a working SMTP server set up to send the email. You can configure the SMTP server settings in your Airflow configuration file. diff --git a/_posts/2023-06-28-End-To-End-Streaming-Pipeline-using-KSQLDB-and-Kafka-Connect.md b/_posts/2023-06-28-End-To-End-Streaming-Pipeline-using-KSQLDB-and-Kafka-Connect.md index e98e743121..7f1e53670a 100644 --- a/_posts/2023-06-28-End-To-End-Streaming-Pipeline-using-KSQLDB-and-Kafka-Connect.md +++ b/_posts/2023-06-28-End-To-End-Streaming-Pipeline-using-KSQLDB-and-Kafka-Connect.md @@ -85,15 +85,15 @@ Recently, Confluent acquired Immerok which is a contributor to the open source s We will now build a sample end-to-end streaming pipeline using Kafka Connect and KSQLDB in both Confluent Cloud and Confluent Platform. Both the scenarios will be using Kafka Cluster deployed in Confluent Cloud for Kafka Topic storage. So, please create a Basic Kafka cluster in Confluent Cloud by registering in this [link](https://confluent.cloud/signup). -![faust_blog_1.png](/assets/blog-images/e2e_ksql_connect_blog/faust_blog_1.png) +![faust_blog_1.png](../assets/blog-images/e2e_ksql_connect_blog/faust_blog_1.png) This demo will also require Schema registry cluster for storing Avro schema of the record values. You can create a Schema registry cluster in the Confluent Cloud by enabling the Streams Governance Essentials Package. You can follow this [link](https://docs.confluent.io/cloud/current/stream-governance/packages.html#stream-governance-packages-features-and-limits) for more information. -![streamsgovernanceimage.png](/assets/blog-images/e2e_ksql_connect_blog/streamsgovernanceimage.png) +![streamsgovernanceimage.png](../assets/blog-images/e2e_ksql_connect_blog/streamsgovernanceimage.png) Create a Kafka cluster API Key in your Confluent cloud account from under the Cluster Overview section, so that the connectors and KSQLDB can access data from Kafka topics during stream processing. -![CreateKafkaApiKey.png](/assets/blog-images/e2e_ksql_connect_blog/CreateKafkaApiKey.png) +![CreateKafkaApiKey.png](../assets/blog-images/e2e_ksql_connect_blog/CreateKafkaApiKey.png) Following are the details of the streaming pipeline, @@ -104,7 +104,7 @@ Following are the details of the streaming pipeline, The connectors used in this demo will be deployed using SQL queries in KSQL in both Confluent Cloud and Confluent Platform deployments. -![StreamDesignerPipeline.png](/assets/blog-images/e2e_ksql_connect_blog/StreamDesignerPipeline.png) +![StreamDesignerPipeline.png](../assets/blog-images/e2e_ksql_connect_blog/StreamDesignerPipeline.png) ### Confluent Cloud @@ -112,7 +112,7 @@ We will use the Confluent Cloud console to create KSQL cluster but this can be a Create a KSQL cluster in your Confluent Cloud account with Global access (only for testing). -![CreateKSQLinCC.png](/assets/blog-images/e2e_ksql_connect_blog/CreateKSQLinCC.png) +![CreateKSQLinCC.png](../assets/blog-images/e2e_ksql_connect_blog/CreateKSQLinCC.png) Once, the KSQL cluster is provisioned. we will create a Datagen source connector to push data into the input topic `clickstream_cloud_input`. We will use the `SHOE_CLICKSTREAM` quick start data generator and set the data type as `AVRO` for the record values. The Kafka API key pair created above will be used to create the input topic and write data into it. @@ -130,7 +130,7 @@ CREATE SOURCE CONNECTOR "ClickstreamDataGenSourceConnector" WITH ( Once the Datagen source connector is successfully deployed, we should be able to see the generated records for the input topic in the Confluent Cloud UI. -![InputTopicMessagesCrop.png](/assets/blog-images/e2e_ksql_connect_blog/InputTopicMessagesCrop.png) +![InputTopicMessagesCrop.png](../assets/blog-images/e2e_ksql_connect_blog/InputTopicMessagesCrop.png) Let’s create a Kafka stream called `CLICKSTREAM_CLOUD_STREAM` from this input topic `clickstream_cloud_input` for further processing. We will mention the timestamp field in the message value to be used for windowing by using the `timestamp` variable under the `WITH` clause. @@ -158,7 +158,7 @@ GROUP BY USER_ID, PRODUCT_ID; Following, is a sample view of the records received by the ouput topic `clickstream_cloud_output` -![OutputTopicMessagesCrop.png](/assets/blog-images/e2e_ksql_connect_blog/OutputTopicMessagesCrop.png) +![OutputTopicMessagesCrop.png](../assets/blog-images/e2e_ksql_connect_blog/OutputTopicMessagesCrop.png) Let’s create a Postgres JDBC Sink connector which writes the aggregated values from the output topic to a remote Postgres database. @@ -192,7 +192,7 @@ CREATE SINK CONNECTOR "PostgresClickstreamSinkConnector" WITH ( Here is the sample view of the `clickstream_user_product_viewtime` table in the Postgres database post the sink connector deployment. -![PostgresTableOutputCrop.png](/assets/blog-images/e2e_ksql_connect_blog/PostgresTableOutputCrop.png) +![PostgresTableOutputCrop.png](../assets/blog-images/e2e_ksql_connect_blog/PostgresTableOutputCrop.png) ### Confluent Platform @@ -350,7 +350,7 @@ Connect to the KSQLDB server inside the KSQLDB CLI container, ksql http://ksqldb-server:8088 ``` -![KSQLdbCLI.png](/assets/blog-images/e2e_ksql_connect_blog/KSQLdbCLI.png) +![KSQLdbCLI.png](../assets/blog-images/e2e_ksql_connect_blog/KSQLdbCLI.png) Deploy the Datagen Source connector,