Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ A general architecture is as follows,
2. A command dispatcher service (like a Kafka consumer) handles reading & committing offsets including hand off to Airflow by invoking the concerned DAG (via it's REST interface). By doing this, we therefore separate consuming from processing.
3. Airflow DAG itself is parametrized to process one command / event at a time. This therefore provides granular observability and auditable controls of what happened, thus enabling retrying/re-processing or dead-letter queue.

![Dispatcher_Architecture.png](/assets/blog-images/command_processing_blog/Dispatcher_Architecture.png)
![Dispatcher_Architecture.png](../assets/blog-images/command_processing_blog/Dispatcher_Architecture.png)

This pattern could thus be used for:

Expand Down Expand Up @@ -311,7 +311,7 @@ send_email_task

In this example, we've defined a DAG called `send_email_alert` that runs once (**`schedule_interval='@once'`**) and has a single task called **`send_email_task`**. The **`EmailOperator`** is used to send the email, and we've defined the recipient, subject, and body of the email as arguments to the operator. The recipient parameter is passed from dag run conf parameter. We've also specified some default arguments for the DAG like the number of times to retry the task if it fails etc.

![EmailDAG.png](/assets/blog-images/command_processing_blog/EmailDAG.png)
![EmailDAG.png](../assets/blog-images/command_processing_blog/EmailDAG.png)

To run this DAG, you'll need to have a working SMTP server set up to send the email. You can configure the SMTP server settings in your Airflow configuration file.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@ Recently, Confluent acquired Immerok which is a contributor to the open source s

We will now build a sample end-to-end streaming pipeline using Kafka Connect and KSQLDB in both Confluent Cloud and Confluent Platform. Both the scenarios will be using Kafka Cluster deployed in Confluent Cloud for Kafka Topic storage. So, please create a Basic Kafka cluster in Confluent Cloud by registering in this [link](https://confluent.cloud/signup).

![faust_blog_1.png](/assets/blog-images/e2e_ksql_connect_blog/faust_blog_1.png)
![faust_blog_1.png](../assets/blog-images/e2e_ksql_connect_blog/faust_blog_1.png)

This demo will also require Schema registry cluster for storing Avro schema of the record values. You can create a Schema registry cluster in the Confluent Cloud by enabling the Streams Governance Essentials Package. You can follow this [link](https://docs.confluent.io/cloud/current/stream-governance/packages.html#stream-governance-packages-features-and-limits) for more information.

![streamsgovernanceimage.png](/assets/blog-images/e2e_ksql_connect_blog/streamsgovernanceimage.png)
![streamsgovernanceimage.png](../assets/blog-images/e2e_ksql_connect_blog/streamsgovernanceimage.png)

Create a Kafka cluster API Key in your Confluent cloud account from under the Cluster Overview section, so that the connectors and KSQLDB can access data from Kafka topics during stream processing.

![CreateKafkaApiKey.png](/assets/blog-images/e2e_ksql_connect_blog/CreateKafkaApiKey.png)
![CreateKafkaApiKey.png](../assets/blog-images/e2e_ksql_connect_blog/CreateKafkaApiKey.png)

Following are the details of the streaming pipeline,

Expand All @@ -104,15 +104,15 @@ Following are the details of the streaming pipeline,

The connectors used in this demo will be deployed using SQL queries in KSQL in both Confluent Cloud and Confluent Platform deployments.

![StreamDesignerPipeline.png](/assets/blog-images/e2e_ksql_connect_blog/StreamDesignerPipeline.png)
![StreamDesignerPipeline.png](../assets/blog-images/e2e_ksql_connect_blog/StreamDesignerPipeline.png)

### Confluent Cloud

We will use the Confluent Cloud console to create KSQL cluster but this can be achieved through any of the above mentioned deployment options for Confluent Cloud.

Create a KSQL cluster in your Confluent Cloud account with Global access (only for testing).

![CreateKSQLinCC.png](/assets/blog-images/e2e_ksql_connect_blog/CreateKSQLinCC.png)
![CreateKSQLinCC.png](../assets/blog-images/e2e_ksql_connect_blog/CreateKSQLinCC.png)

Once, the KSQL cluster is provisioned. we will create a Datagen source connector to push data into the input topic `clickstream_cloud_input`. We will use the `SHOE_CLICKSTREAM` quick start data generator and set the data type as `AVRO` for the record values. The Kafka API key pair created above will be used to create the input topic and write data into it.

Expand All @@ -130,7 +130,7 @@ CREATE SOURCE CONNECTOR "ClickstreamDataGenSourceConnector" WITH (

Once the Datagen source connector is successfully deployed, we should be able to see the generated records for the input topic in the Confluent Cloud UI.

![InputTopicMessagesCrop.png](/assets/blog-images/e2e_ksql_connect_blog/InputTopicMessagesCrop.png)
![InputTopicMessagesCrop.png](../assets/blog-images/e2e_ksql_connect_blog/InputTopicMessagesCrop.png)

Let’s create a Kafka stream called `CLICKSTREAM_CLOUD_STREAM` from this input topic `clickstream_cloud_input` for further processing. We will mention the timestamp field in the message value to be used for windowing by using the `timestamp` variable under the `WITH` clause.

Expand Down Expand Up @@ -158,7 +158,7 @@ GROUP BY USER_ID, PRODUCT_ID;

Following, is a sample view of the records received by the ouput topic `clickstream_cloud_output`

![OutputTopicMessagesCrop.png](/assets/blog-images/e2e_ksql_connect_blog/OutputTopicMessagesCrop.png)
![OutputTopicMessagesCrop.png](../assets/blog-images/e2e_ksql_connect_blog/OutputTopicMessagesCrop.png)

Let’s create a Postgres JDBC Sink connector which writes the aggregated values from the output topic to a remote Postgres database.

Expand Down Expand Up @@ -192,7 +192,7 @@ CREATE SINK CONNECTOR "PostgresClickstreamSinkConnector" WITH (

Here is the sample view of the `clickstream_user_product_viewtime` table in the Postgres database post the sink connector deployment.

![PostgresTableOutputCrop.png](/assets/blog-images/e2e_ksql_connect_blog/PostgresTableOutputCrop.png)
![PostgresTableOutputCrop.png](../assets/blog-images/e2e_ksql_connect_blog/PostgresTableOutputCrop.png)

### Confluent Platform

Expand Down Expand Up @@ -350,7 +350,7 @@ Connect to the KSQLDB server inside the KSQLDB CLI container,
ksql http://ksqldb-server:8088
```

![KSQLdbCLI.png](/assets/blog-images/e2e_ksql_connect_blog/KSQLdbCLI.png)
![KSQLdbCLI.png](../assets/blog-images/e2e_ksql_connect_blog/KSQLdbCLI.png)

Deploy the Datagen Source connector,

Expand Down