Skip to content

Latest commit

 

History

History

Samples Overview

This repository contains a few samples to help you get started quickly with the Kafka extension.

Getting started with Kafka

To test the sample applications, you need to have access to a Kafka instance. Here are some ways you can get access to one.

Visual Studio Core Remote - Containers

There are several DevContainer samples here. If you start the Visual Studio Code on the target sample directory, it will automatically start a development environment on a Docker container with a local Kafka cluster. It is the easiest option for starting a Kafka cluster. Developing inside a Container

Confluent Docker Compose

We provide the Confluent Docker Compose sample to get started with a local Kafka and data generator. Follow the guide at https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html#cp-quick-start-docker.

Make sure you complete the steps at least until the topics page views, users, and pageviews_female are created (including data generators). The included .NET sample function contains a consumer for each of those three topics.

Confluent Cloud

Confluent Cloud is a fully managed, cloud-native event stream platform powered by Apache Kafka. The samples include Confluence Cloud samples to understand real-world configuration.

Language Support

Azure Functions Kafka Extension support several languages with the following samples. For more details and getting started, please refer to the links below.

Language Description Link DevContainer
C# C# precompiled sample with Visual Studio Readme No
Java Java 8 sample Readme Yes
JavaScript Node 12 sample Readme Yes
PowerShell PowerShell 6 Sample Readme No
Python Python 3.8 sample Readme Yes
TypeScript TypeScript sample (Node 12) Readme Yes

Custom Container

Custom containers enable us to deploy a custom container to the Function App. We can use any other languages; however, as an example, we provide a java sample to explain how to develop it.

Language Description Link DevContainer
Java Custom container sample Readme No

Notes

Kafka extension supports several languages, however, it uses the same Azure Functions host. For this reason, there is a common configuration for each language. Please find below some common notes with applying to all the languages.

function.json

You can find all Kafka related configuration on the function.json. In the case of Java, you specify it as an annotation. However, the maven plugin generates the function.json. If your function doesn't work well, please check your code and function.json at first.

function.json

For Confluent

{
  "scriptFile" : "../kafka-function-1.0-SNAPSHOT.jar",
  "entryPoint" : "com.contoso.kafka.TriggerFunction.runMany",
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaEvents",
    "password" : "%ConfluentCloudPassword%",
    "protocol" : "SASLSSL",
    "dataType" : "string",
    "topic" : "message",
    "authenticationMode" : "PLAIN",
    "consumerGroup" : "$Default",
    "cardinality" : "MANY",
    "username" : "%ConfluentCloudUsername%",
    "brokerList" : "%BrokerList%"
  } ]
}

For EventHub

{
  "scriptFile" : "../kafka-function-1.0-SNAPSHOT.jar",
  "entryPoint" : "com.contoso.kafka.TriggerFunction.runMany",
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaEvents",
    "password" : "%EventHubConnectionString%",
    "protocol" : "SASLSSL",
    "dataType" : "string",
    "topic" : "message",
    "authenticationMode" : "PLAIN",
    "consumerGroup" : "$Default",
    "cardinality" : "MANY",
    "username" : "$ConnectionString",
    "brokerList" : "%BrokerList%"
  } ]
}

NOTE For EventHub, username should be set to "$ConnectionString" only. The password should be the actual connection string value that could be set in local.settings.json or appsettings (Please see local-settings section for more details).

local.settings.json

It is the configuration of a local function runtime. If you deploy the target application on Azure with a local.settings.json, you will require the same settings on the Function App App settings.

NOTE All the passwords and connection strings settings are recommended to be put in appsettings. For more details, refer to Local settings file.

For Confluent

{
    "IsEncrypted": false,
    "Values": {
        "BrokerList": "{YOUR_CONFLUENT_CLOUD_BROKER}",
        "ConfluentCloudUserName": "{YOUR_CONFLUENT_CLOUD_USERNAME}",
        "ConfluentCloudPassword": "{YOUR_CONFLUENT_CLOUD_PASSWORD}",
        "FUNCTIONS_WORKER_RUNTIME": "<runtime>",
        "AzureWebJobsStorage": "",
        "topic": "{YOUR_KAFKA_TOPIC_NAME}"
    }
}

For EventHub

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "<runtime>",
        "BrokerList": "<YOUR_EVENTHUB_NAMESPACE_NAME>.servicebus.windows.net:9093",
        "EventHubConnectionString": "<YOUR_EVENTHUB_CONNECTIONSTRING>",
        "topic": "{YOUR_KAFKA_TOPIC_NAME}"
    }
}

Extension Bundle and install Kafka extension

Currently, in Azure Functions - most triggers and bindings are ordinarily obtained using the extension bundle. However, currently, the Kafka extension is not part of the extension bundle (will be added in the future). Meanwhile, you will have to install the Kafka extension manually.

For installing Kafka extension manually:

Create extensions.csproj file

Azure Functions extension is written in C#. We need to install with .NET Core capability. csproj file is a project file for .NET. For more information refer to Understanding the project file

extensions.csproj

<Project Sdk="Microsoft.NET.Sdk">
  <PropertyGroup>
    <TargetFramework>netcoreapp3.1</TargetFramework>
  <WarningsAsErrors></WarningsAsErrors>
  <DefaultItemExcludes>**</DefaultItemExcludes>
  </PropertyGroup>
  <ItemGroup>
    <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Kafka" Version="3.2.1" />
    <PackageReference Include="Microsoft.Azure.WebJobs.Script.ExtensionsMetadataGenerator" Version="1.1.7" />
  </ItemGroup>
  <ItemGroup>
    <None Update="confluent_cloud_cacert.pem">
      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
    </None>
  </ItemGroup>
</Project>

Install the Kafka extension

You can use this command. It will refer to the extensions.csproj and install the related extension.

$ func extensions install

In the case of Java, they need to specify the extension name.

$ func extensions install --package Microsoft.Azure.WebJobs.Extensions.Kafka --version ${EXTENSION_VERSION}

Check if the Kafka extension installed properly

You can go bin/runtimes/ if you find librdkafka native libraries, the installation is succeeded.

librdkafka library

Kafka extensions use libkafka native libraries. That is included in the Kafka Extensions NuGet package. However, for the Linux and OSX environment, you need to specify LD_LIBRARY_PATH for the Azure Functions runtime refer to the native library.

$ export LD_LIBRARY_PATH=/workspace/bin/runtimes/linux-x64/native

For the devcontainer, you will find the configuration on the devcontainer.json. If you deploy your app on the Linux Premium Functions, you need to configure App settings with LD_LIBRARY_PATH. For more details, refer to Linux Premium plan configuration

Confluent Cloud Configuration

You can find the configuration for the Confluent Cloud for C# in Connecting to Confluent Cloud in Azure.

Install binding library (Java/Python)

Java and Python have a binding library. Currently, it resides in this repository. In the near feature, it will move to the official repo. So you don't need to install manually.

However, currently, we need to install it manually. Please follow the instruction for each README.md.

Batch (Cardinality)

For the KafkaTrigger and non-C# implementation, if we want to execute Kafka trigger with batch, you can configure cardinality and dataType. For more details, refer to Language support configuration

If you have problems connecting to localhost:9092 try to add broker 127.0.0.1 to your host file and use instead of localhost.

Visual Studio Code Remote Containers

The sample provides a devcontainer profile. Open the folder in VsCode and perform the action Remote-Containers: Reopen in Container. The action will reopen VsCode inside a container, together with the Confluent's Kafka starter sample. Then run the function inside the remote container using the following local.settings.json file:

{
  "IsEncrypted": false,
  "Values": {
    "FUNCTIONS_WORKER_RUNTIME": "node",
    "AzureWebJobsStorage": "{AzureWebJobsStorage}",
    "BrokerList":"broker:29092"
  }
}

Headers

Headers are supported for both Kafka Trigger and Kafka Output binding. You can find the samples for headers in this folder with name KafkaTriggerWithHeaders, KafkaTriggerManyWithHeaders for Trigger functions and KafakOutputWithHeaders, KafkaOutputManyWithHeaders for output binding functions.

Output Binding Functions

KafkaOutputWithHeaders is a sample for single event type while KafkaOutputManyWithHeaders is for batch events.

To run KafkaOutputWithHeaders function, send a http GET request with message at url http://localhost:7071/api/KafkaOutputWithHeaders?message=<your_message>. It will create a new Kafka Event with payload as your_message and headers as { Key: 'test', Value: '<language>'}.

Similarly, to run KafkaOutputManyWithHeaders function, send a http GET request at url http://localhost:7071/api/KafkaOutputManyWithHeaders. It would create two messages with headers on given topic.

Trigger Functions

KafkaTriggerWithHeaders is a sample for single event type while KafkaTriggerManyWithHeaders is for batch events.

KafkaTriggerWithHeaders will be triggered whenever there is a Kafka Event. It prints the message and the corresponding headers for that message.

Similarly, KafkaTriggerManyWithHeaders is a trigger function which processes batch of Kafka events. For all the events in the batch, it prints the message and corresponding headers.