Skip to content

Latest commit

 

History

History
 
 

akskafka-databricks-cosmosdb

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
topic languages products statusNotificationTargets
sample
azurecli
json
sql
scala
azure
azure-container-instances
azure-cosmos-db
azure-databricks
azure-kubernetes-service
algattik@microsoft.com

Streaming at Scale with Kafka on Azure Kubernetes Service (AKS), Databricks and Cosmos DB

This sample uses Cosmos DB as database to store JSON data.

The provided scripts will create an end-to-end solution complete with load test client.

Running the Scripts

Please note that the scripts have been tested on Ubuntu 18 LTS, so make sure to use that environment to run the scripts. You can run it using Docker, WSL or a VM:

The following tools/languages are also needed:

  • Azure CLI
    • Install: sudo apt install azure-cli
  • jq
    • Install: sudo apt install jq
  • python
    • Install: sudo apt install python python-pip
  • databricks-cli
    • Install: pip install --upgrade databricks-cli
  • helm
    • Install: https://github.com/helm/helm
  • kubectl
    • Install: sudo apt-get install -y kubectl

Setup Solution

Make sure you are logged into your Azure account:

az login

and also make sure you have the subscription you want to use selected

az account list

if you want to select a specific subscription use the following command

az account set --subscription <subscription_name>

once you have selected the subscription you want to use just execute the following command

./create-solution.sh -d <solution_name>

then solution_name value will be used to create a resource group that will contain all resources created by the script. It will also be used as a prefix for all resource create so, in order to help to avoid name duplicates that will break the script, you may want to generate a name using a unique prefix. Please also use only lowercase letters and numbers only, since the solution_name is also used to create a storage account, which has several constraints on characters usage:

Storage Naming Conventions and Limits

to have an overview of all the supported arguments just run

./create-solution.sh

Note To make sure that name collisions will be unlikely, you should use a random string to give name to your solution. The following script will generated a 7 random lowercase letter name for you:

./_common/generate-solution-name.sh

Created resources

The script will create the following resources:

  • Azure Container Instances to host Spark Load Test Clients: by default one client will be created, generating a load of 1000 events/second
  • Azure Kubernetes Service (AKS) Kafka Cluster and Consumer Group: to ingest data incoming from test clients
  • Azure Databricks: to process data incoming from Kafka on Azure Kubernetes Service (AKS) as a stream. Workspace, Job and related cluster will be created
  • Cosmos DB Server, Database and Collection: to store and serve processed data

Streamed Data

Streamed data simulates an IoT device sending the following JSON data:

{
    "eventId": "b81d241f-5187-40b0-ab2a-940faf9757c0",
    "complexData": {
        "moreData0": 57.739726013343247,
        "moreData1": 52.230732688620829,
        "moreData2": 57.497518587807189,
        "moreData3": 81.32211656749469,
        "moreData4": 54.412361539409427,
        "moreData5": 75.36416309399911,
        "moreData6": 71.53407865773488,
        "moreData7": 45.34076957651598,
        "moreData8": 51.3068118685458,
        "moreData9": 44.44672606436184,
        [...]
    },
    "value": 49.02278128887753,
    "deviceId": "contoso://device-id-154",
    "deviceSequenceNumber": 0,
    "type": "CO2",
    "createdAt": "2019-05-16T17:16:40.000003Z"
}

Duplicate event handling

In case the infrastructure fails and recovers, it could process a second time an event from Kafka on Azure Kubernetes Service (AKS) that has already been stored in Cosmos DB. The solution uses Cosmos DB Upsert functionality to make this operation idempotent, so that events are not duplicated in Cosmos DB (based on the eventId attribute).

In order to illustrate the effect of this, the event simulator is configured to randomly duplicate a small fraction of the messages (0.1% on average). Those duplicate will not be present in Cosmos DB.

Solution customization

If you want to change some setting of the solution, like the number of load test clients, Cosmos DB RU and so on, you can do it right in the create-solution.sh script, by changing any of these values:

export AKS_NODES=3
export KAFKA_BROKERS=4
export KAFKA_PARTITIONS=4
export COSMOSDB_RU=20000
export SIMULATOR_INSTANCES=1
export DATABRICKS_NODETYPE=Standard_DS3_v2
export DATABRICKS_WORKERS=4
export DATABRICKS_MAXEVENTSPERTRIGGER=10000

The above settings have been chosen to sustain a 1,000 msg/s stream. The script also contains settings for 5,000 msg/s and 10,000 msg/s.

Monitor performance

In order to monitor performance of created solution you just have to open the created Application Insight resource and then open the "Live Metric Streams" and you'll be able to see in the "incoming request" the number of processed request per second. The number you'll see here is very likely to be lower than the number of messages/second sent by test clients since the Azure Function is configured to use batching".

Performance will be monitored and displayed on the console for 30 minutes also. The script queries Log Analytics for the latest metrics from Kafka (note that because of ingestion latency in Log Analytics, the data may be updated at a less frequent interval. If everything is working correctly, the number of reported IncomingBytes and OutgoingBytes should be roughly the same, after a few minutes of ramp-up.

        2019-11-13T20:03:07Z               60127                   0                   0
        2019-11-13T20:03:07Z               60127                   0                   0
        2019-11-13T20:03:07Z               60127                   0                   0
        2019-11-13T20:05:05Z               59456            55872597            39192676
        2019-11-13T20:07:05Z               61095            55859006            39454893
        2019-11-13T20:05:05Z               59456            55872597            39192676
        2019-11-13T20:08:06Z               58667            54928162            38560536
        2019-11-13T20:08:06Z               58667            54928162            38560536
        2019-11-13T20:08:06Z               58667            54928162            38560536
        2019-11-13T20:08:06Z               58667            54928162            38560536
        2019-11-13T20:13:06Z               59077            54944942            39192676
        2019-11-13T20:13:06Z               59077            54944942            39192676
        2019-11-13T20:13:06Z               59077            54944942            39192676
        2019-11-13T20:14:05Z               60825            54954506            39845888
        2019-11-13T20:14:05Z               60825            54954506            39845888

Azure Databricks

At present time the Cosmos DB Spark Connector does not suport timestamp data type. If you try to send to Cosmos DB a dataframe containing a timestamp, in fact, you'll get the followin error:

java.lang.ClassCastException: java.lang.Long cannot be cast to java.sql.Timestamp

As a workaround the timestamp columns are sent to Cosmos DB as Strings.

Cosmos DB

When scaling up you may have noticed that you need more RU that would you could expect. Assuming that Cosmos DB consume 7 RU per write, to stream 5000 msgs/sec you can expect to use up to 35000 RU. Instead the sample is using 50000. There are three main reasons that explain what that is happening:

  1. indexing
  2. document size
  3. physical data distribution

Look at the details of the Azure Functions sample to see a detailed description of mentioned concepts.

Strimzi Kafka Operator

This sample is using the strimzi operator to deploy Kafka on top of Kubernetes. To learn more about this operator, you can go to the official website, which has documentation, tutorials and etc. Also, there is the Fabrikate Repo that has an end to end example of running a secure stateful workload using kafka.

Query Data

Data is available in the created Cosmos DB database. You can query it from the portal, for example:

SELECT * FROM c WHERE c.type = 'CO2'

Clean up

To remove all the created resource, you can just delete the related resource group

az group delete -n <resource-group-name>