Welcome to the Confluent for VS Code Extension Workshop! In this hands-on session, you'll simulate building a real-time data pipeline using Apache Kafka and Flink, all from within your IDE.
You are a developer at Llama Electronics, a quirky yet fast-growing electronic goods retailer, who’s tasked with integrating and processing e-commerce data for downstream analysis. By the end of this workshop, you’ll know how to:
- Generate and run Kafka applications using VS Code
- Simulate data pipelines without production access
- Author and run Flink SQL queries locally
- Use GitHub Copilot to accelerate development
Before you begin, please make sure you have the following installed and configured:
-
Confluent Cloud Account
You can use any Kafka cluster to complete the first two steps of the workshop. But Confluent Cloud and Confluent Cloud for Apache Flink is required to continue later steps. -
Visual Studio Code
Your development environment for this workshop. -
Confluent for VS Code Extension
Install from the VS Code Marketplace. -
Docker Desktop (Optional)
Needed only if you prefer to run Kafka locally while debugging.
You're tasked with building a near real-time reporting system for Llama Electronics. Sales data comes from Salesforce, but direct access is restricted. you only has a sample JSON dataset to work with. Luckily, with the Confluent for VS Code Extension and GitHub Copilot, you can simulate the pipeline locally and prepare everything before production is live.
- If you don't have a Confluent Cloud account, sign up here.
- Create a new environment (doc)
- Create a new Kafka cluster (doc).
- Create a new API key for the Kafka cluster, make sure to save it for use later (doc).
- Create a new API key for Schema Registry, make sure to save it for use later (doc).
- Create a new Flink compute pool (doc).
- Navigate to the Confluent tab in VS Code.
- Click on the "Sign in" icon next to Confluent Cloud and sign in.
Tip
If you cannot sign up for Confluent Cloud, or prefer debugging your code against a local Kafka cluster first, you can start a local Kafka cluster using the extension instead.
You can scaffold a producer project in multiple ways.
- In VS Code, open the Command Palette (
Cmd+Shift+PorCtrl+Shift+P). - Type and select
Confluent: Generate Project.
- Open the Confluent sidebar in VS Code.
- Scroll to Support > Generate Project from Template.
- Follow the prompts to scaffold the project.
- Choose a Kafka producer template in the language of your preference (e.g., Python).
- Enter
Kafka Bootstrap Server,Kafka Cluster API Key,Kafka Cluster API Secret, andTopic Name,Schema Registry URL,Schema Registry API Key, andSchema Registry API Secret, then selectGenerate & Save.- To create an API Key and Secret for the cluster, navigate to the cluster overview page, API Keys, Add Key, and follow the on screen instruction to create one.
- To create an API Key and Secret for Schema Registry, navigate to the Stream Governance, Schema Registry, API Keys, Add Key, and follow the on screen instruction to create one.
- We will call the topic name
sales_orders.
- Set your project name and destination folder.
- Open the generated folder in VS Code.
Follow the instructions in the
README.mdfile of the generated project if you've selected a language other than Python. - Create a Python virtual environment:
virtualenv env source env/bin/activate - Install the dependencies of this application:
pip install -r requirements.txt
- Make the scripts executable:
chmod u+x producer.py consumer.py
- You can execute the producer script by running:
./producer.py
- You will see error in the console output about topic doesn't exist, to fix this, create a new topic with name
sales_orders. - Re-run the producer script to confirm it's now working.
- Go to the Confluent extension tab in the sidebar.
- Select the environment and cluster you used when generating the project.
- Locate the topic you're producing to while configuring the project, and click on the icon to view messages.
- Confirm the 10 sample messages are produced to the topic.
Tip
Can’t get the project working? You can find pre-generated projects in the 1-project-setup folder. Go to the folder for your preferred language, and continue with step 2 of the workshop.
Let’s update producer so that it produces messages based on the sample data provided.
- Locate the producer code file (e.g.,
producer.py) from your project folder. - Download and copy
sample_data.jsoninto project's root folder. - Toggle on GitHub Copilot chat window.
- Make sure you are in Agent or Edit mode.
- Copilot should automatically select the currently opened file, e.g.
producer.pyas context. - Select "Add Context...", and choose the
sample_data.jsonfile in the same folder to include it as additional context.
- Copilot should automatically select the currently opened file, e.g.
- Use Copilot to rewrite the logic so it reads from the JSON file and produces records that match the sample data format.
- Prompt Copilot to update the code and use content of
sample_data.jsonto generate messages.
- Prompt Copilot to update the code and use content of
Note
Suggested prompt: Update producer.py, instead of generating hardcoded messages, read the content in sample_data.json, generate and send messages based on its content. update sample-schema.avsc to match the new data.
- Review Copilot's suggestions and accept or refine as needed.
- Save your changes to
producer.py. - Before running the producer code, delete
sales_orderstopic andsales_orders-valueschema created previously. - Run the producer code again and confirm that it now reads from the CSV and produces the correct records.
./producer.py
- In the Confluent for VSCode extension sidebar, find your Kafka cluster and topic.
- Right-click the topic and select "View Messages".
- Confirm that the produced messages match the sample data.
Tip
If you have trouble updating the project code using copilot, you can find pre-generated projects in the 2-project-modify folder. Go to the folder for your preferred language and use it to proceed with the workshop.
- Create a new file by selecting
File,New File.... - Change the language mode to
Flink SQL, by selecting theselect a languagewatermark, orPlain texton the status bar. - Select
Set compute pool, then select the Flink compute pool you created in step 0. - Select
Set catalog & database, then select the environment and cluster you created in step 0. - Enter a simple Flink SQL query to make sure it works.
SELECT * FROM `sales_orders`;
- Submit the query.
- Use the query result viewer to confirm data is returned as expected.
- Update your Flink SQL script to aggregate sales orders in a time window. We want to know the item IDs and total number of orders within each 1 minute interval.
SELECT window_start, window_end, ARRAY_AGG(DISTINCT itemid) AS item_ids, COUNT(*) AS total_orders FROM TABLE ( TUMBLE(TABLE sample_data, DESCRIPTOR(`$rowtime`), INTERVAL '1' SECOND) ) GROUP BY window_start, window_end
- Submit the query.
- Review the results in the query result viewer.
- You're being asked to add a new column for the total order amount per time indow.
- Let's use GitHub Copilot to modify the query.
Note
Suggested prompt: Update the query to include a new column, total_amount, that will calculate the sum of all orders within the time window.
SELECT
window_start,
window_end,
ARRAY_AGG(DISTINCT itemid) AS item_ids,
COUNT(*) AS total_orders,
SUM(orderAmount) AS total_amount
FROM TABLE (
TUMBLE(TABLE sample_data, DESCRIPTOR(`$rowtime`), INTERVAL '1' SECOND)
)
GROUP BY window_start, window_end- Review Copilot suggestion, accept if correct, and resubmit the query.
- Confirm the new column appears and values are as expected.
- Now that we know the query is working as expected, we can turn it into a presistent query that runs in the background.
- Modify the query and change it from
SELECTtoCREATE OR INSERT
Note
Suggested prompt: Update the query to presistent query and write the output to table named processed_orders
CREATE TABLE processed_orders AS
SELECT
window_start,
window_end,
ARRAY_AGG(DISTINCT itemid) AS item_ids,
COUNT(*) AS total_orders,
SUM(orderAmount) AS total_amount
FROM TABLE (
TUMBLE(TABLE sample_data, DESCRIPTOR(`$rowtime`), INTERVAL '1' SECOND)
)
GROUP BY window_start, window_end- Review Copilot suggestion, accept if correct, and resubmit the query.
- Submit the query, and confirm the table is created and contains the data as expected.
- Make sure to enable Auto Update for the extension to receive the latest improvements.
- Add a connector to produce data from an external systems.
- Use Mockstream to produce more sophisticated mock data.
- Create a Kafka consumer app from project template to consume the processed data after Flink SQL.