Skip to content

hpgrahsl/voxxed-days-ticino-2018

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

1 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Stateful & Reactive Stream Processing Applications without a Database

meme2

Overview

This repository contains a working example of how to build a modern data-centric application to track occurrences of emojis in near real-time, which are found in publicly available tweets. It uses the following main technologies:

meme1

Usage example:

The following paragraphs give a detailed step-by-step explanation to setup and run the application on your local machine.

1 Launch your Kafka environment:

The example application needs a fully working Kafka environment, ideally on your local machine. If you are into containers and know how to use Docker feel free to make use of pre-built Docker images for Apache Kafka of your choice (e.g. the ones provided by Confluent). For simplicity reasons, it is probably a good idea to launch all Kafka realted processes based on a convenient CLI that ships with the Open Source version of Confluent's Platform - currently version 5.0.0.

Change to your installation folder (e.g. /usr/local/confluent-5.0.0/) and run

bin/confluent start

This should successfully launch all Kafka related processes, namely zookeeper, kafka, schema-registry, kafka-rest, connect and ksql-server and may take a few moments before resulting in an [UP] status for each of them:

This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html

Using CONFLUENT_CURRENT: ...
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]

In case you are facing any issues while bringing up the Confluent Platform read through their amazing documentation which hopefully helps you getting fixed any issues :)

2 Create a Kafka topic to store tweets:

Before being able to ingest live tweets a Kafka topic needs to be created. This can be easily achieved with the command line tools that ship with Kafka. The following command creates a topic called live-tweets with 4 partitions and a replication factor of 1.

bin/kafka-topics --zookeeper localhost:2181 --topic live-tweets --create --replication-factor 1 --partitions 4

3 Run a twitter source connector to harvest public live tweets:

There is a plethora of Kafka connectors available in order to read data from a variety of sources and write data to different sinks. This application uses a twitter source connector from the community. In order to make this connector available in your local installation you have to copy a folder containing the build artefacts or a pre-built version together with its dependencies to a specific folder in your Confluent Platform installation. After unzipping the connector artefact copy the contained folder

kafka-connect-twitter-0.2.26/usr/share/kafka-connect/kafka-connect-twitter

to

/usr/local/confluent-5.0.0/share/java/

In order for kafka connect to detect the availability of this newly installed connector simply restart the connect process with the CLI by first running

bin/confluent stop connect

followed by

bin/confluent start connect

Now the twitter source connector is ready to use. It can be easily configured and managed by means of the Kafka connect REST API. First check if the connector is indeed available by sending the following GET request e.g. using CURL, Postman or some other tools:

curl http://localhost:8083/connector-plugins

This should result in JSON array containing all connectors currently available to your Kafka connect installation. Somewhere along the lines you should see the twitter source connector:

[
  ...,
  {
    "class": "com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector",
    "type": "source",
    "version": "0.2.26"
  }
  ...,
]

We can run the connector to track a subset of live tweets related to a few key words of our choice (see filter.keywords entry below) based on the following JSON configuration. Simply insert your OAuth tokens/secrets which you get by creating a Twitter application in your account. This must be created first in order to get access to the Twitter API. Send the JSON configuration as a POST request to the endpoint e.g. using CURL or Postman:

{ "name": "twitter_source_01",
  "config": {
    "connector.class": "com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector",
    "twitter.oauth.accessToken": "...",
    "twitter.oauth.consumerSecret": "...",
    "twitter.oauth.consumerKey": "...",
    "twitter.oauth.accessTokenSecret": "...",
	"kafka.status.topic": "live-tweets",
	"process.deletes": false,
	"value.converter": "org.apache.kafka.connect.json.JsonConverter",
	"value.converter.schemas.enable": false, 
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": false,
    "filter.keywords": "meltdown,spectre,intel,amd,arm,sparc,exploit,cpu,vulnerability,attack,security"
    }
}

This should result in a HTTP status 201 created response.

4 Check data ingestion

By means of the Kafka command line tools it's easy to check if tweets are flowing into the topic. Running the following in your Confluent Platform folder

bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic live-tweets --from-beginning

should consume all the tweets in the live-tweets topic and write them directly to stdout as they come in. The JSON structure of the tweets based on the source connector is pretty verbose. The example application deserialzes only the following 4 fields while actually only making use of the Text field in order to extract any emojis during the stream processing:

{
    "CreatedAt": 1515751050000,
    "Id": 951755003055820800,
    "Text": "Google details how it protected services like Gmail from Spectre https://t.co/jyuEixDaQq #metabloks",
    "Lang": "en"
}

5 Launch Spring Boot application

Everything is setup now to start the stream processing application. Just build the maven project by running:

mvn clean package

then run the application from the command line using:

java -jar -Dserver.port=8881 -Dspring.kafka.streams.properties.tweets-topic=live-tweets -Dspring.kafka.streams.application-id=emojitracker-01 -Dspring.kafka.streams.properties.state-store-persistence-type=in-memory target/kstreams-emojitracker-0.6-SNAPSHOT.jar

6 Interactively querying state stores

After the application successfully started you can perform REST calls against it to query for current emoji counts:

meme3

query for all emojis tracked so far:
curl -X GET http://localhost:8881/interactive/queries/emojis/

The result is in no particular order and might look like the following based on a sample run:

[
    ...,
    {
        "emoji": "🐾",
        "count": 4
    },
    {
        "emoji": "πŸ‘‡",
        "count": 113
    },
    {
        "emoji": "πŸ‘‰",
        "count": 16
    },
    {
        "emoji": "πŸ’€",
        "count": 29
    },
    {
        "emoji": "πŸ’‹",
        "count": 1
    },
    {
        "emoji": "πŸ’–",
        "count": 1
    },
    {
        "emoji": "πŸ’₯",
        "count": 2
    },
    ...
]

NOTE: the numbers you get will obviously vary!

query for a specific emoji tracked so far:

When using CURL you need to specify the emoji by means of its URL escape code. Thus, it's more convenient to query with Postman or your browser as this allow to directly put the emoji into the URL then.

http://localhost:8881/interactive/queries/emojis/πŸ‘‡

curl -X GET http://localhost:8881/interactive/queries/emojis/%F0%9F%91%87 

{ "emoji": "πŸ‘‡", "count": 113 }

NOTE: the numbers you get will obviously vary!

query for the top N emojis tracked so far:
curl -X GET http://localhost:8881/interactive/queries/emojis/stats/topN
[
    {
        "emoji": "πŸ‘‡",
        "count": 113
    },
    {
        "emoji": "😭",
        "count": 100
    },
    {
        "emoji": "➑",
        "count": 81
    },
    {
        "emoji": "✨",
        "count": 80
    },
    {
        "emoji": "⚑",
        "count": 79
    },
    {
        "emoji": "🌎",
        "count": 77
    },
    {
        "emoji": "πŸ˜‚",
        "count": 64
    },
    {
        "emoji": "πŸ’€",
        "count": 29
    },
    {
        "emoji": "❀",
        "count": 21
    },
    {
        "emoji": "πŸ”₯",
        "count": 17
    },
    ...
]

NOTE: the numbers you get will obviously vary!

SSE change stream of emoji count updates

meme5

Client applications can subscribe to a reactive change stream of emoji count updates while the kstreams applications is processing new data. This results in SSE being continuously streamed towards clients in order to consume them with your JS framework of choice and build a nice HTML dashboard.

curl -X GET http://localhost:8881/interactive/queries/emojis/updates/notify
...

data: {"emoji": "🌎","count": 77}

data: {"emoji": "πŸ’€","count": 29}

data: {"emoji": "πŸ˜‚","count": 64}

data: {"emoji": "πŸ‘‡","count": 113}

data: {"emoji": "πŸ”₯","count": 17}

...

7 Optional: Run multiple instance of the kstreams application

meme4

In case you want to run multiple instances to experiment with scalability and fault-tolerance of kstreams just launch the application multiple times. Beware to use different server.port and live.demo.instance.id settings for each further instance

e.g. start a 2nd instance like so:

java -jar -Dserver.port=8882 -Dlive.demo.instance.id=2 -Dspring.kafka.streams.properties.tweets-topic=tweets_201018 -Dspring.kafka.streams.application-id=emojitracker-01 -Dspring.kafka.streams.properties.state-store-persistence-type=in-memory target/kstreams-emojitracker-0.6-SNAPSHOT.jar

Now you can query any of the two instances to get the emoji count results!

Have fun tracking emojis in near real-time based on public tweets... 😊

About

Code for my talk "Stateful & Reactive Streaming Applications Without a Database" at VoxxedDays Ticino 2018

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages