The goal of this project is to showcase how to use Kubernetes, Docker, Kafka and PySpark to analyse the sentiment of tweets in real-time using machine learning. The Spark app uses a logistic regression model to predict the sentiment of tweets that are streamed via Kafka and the results of the Spark jobs can be viewed via the Express.js app.
Screencast.mp4
Resources:
- The final project presentation can be found here.
- The screen cast is available in 4K on youtube.
- The local screen cast can be found here.
- André Anan Gilbert (3465546)
- David Hoffmann (2571020)
- Jan Henrik Bertrand (8556462)
- Marc Grün (9603221)
- Felix Noll (9467152)
The business motivation of this project is to create a big data application which enables users to get an overview of trending Twitter posts and the sentiment reflected by them. Through this functionality, it has utility for personal, academic and corporate use cases.
It allows personal users to better understand what, for some, is their primary source of information. Further, the application could be used as an information basis for researchers to study the sentiment of Twitter users towards certain topics and analyse their interactions. In addition to that, the application could also be used by commercial users to analyse user behaviour with a brand or a new product, offering valuable marketing insights.
Moreover, the big data application offers an easy-to-use front end, to allow for easy interaction of versatile user groups.
The application is built as a kappa architecture composed of a data ingestion layer, a stream processing system and a serving layer. First the web app acts as a data producer which sends tweets and application events to the Kafka broker. The spark app consumes the tweets and events, aggregates them, predicts the sentiment of the tweets in the current batch, and saves the result to MariaDB. To avoid data from being pulled from the database every time, memcached is used to store query results in-memory.
In the following, some of the components are explained in more detail.
The individual components as seen in the big data application architecture above are containerized and orchestrated using Kubernetes. The individual resources and their functional relationships are shown in the following diagram:
- Service: Acts as a communication gateway and load balancer for individual pods of a component. It keeps track of the list of IPs of active pods and updates it as pods die and dynamically restart. This is done by using tags, which not only ensures that traffic is forwarded to the right pods, but also enables seemless rolling deployments by changing the tag assigned to pods with the new version of a container image.
- Deployment: Manages the pods and their lifecycle.
- Ingress: Routs and manages external access to the different components.
The application has two distinct kafka topics. One for ingesting tweets into the streaming layer (spark) and one for tracking events occurring in the frontend, such as user interaction and engagement.
-
The first Kafka topic used for ingesting tweets is structured as shown in the following exemplary:
{ "tweet_id": 0, "tweet": "content", "timestamp": 1604325221 }
-
The second topic which is used to track application events is structured as shown in the following exemplary message:
{ "event_type": "streamed", "timestamp": 1604326237 }
Alternative event types are "clicked" or "fetched".
To produce to multiple topics at the same time, the ingestion into kafka is done via a batch.
The sentiment analysis of Twitter posts by the application is done using a logistic regression algorithm. As a datasource we used the public Sentiment140 dataset, which is a CSV with emoticons removed. The data file format has 6 fields:
- The sentiment (polarity) of the tweet (0 = negative, 2 = neutral, 4 = positive)
- The id of the tweet (e.g. 2087)
- The date of the tweet (e.g. Sat May 16 23:58:44 UTC 2009)
- The query (lyx). If there is no query, then this value is NO_QUERY.
- The user that tweeted (e.g. robotickilldozr)
- The text of the tweet (e.g. "Lyx is cool")
Before this data can be used for training, the following four preprocessing steps are applied. Firstly, the text field is cleaned using regular expressions, replacing special characters such as HTML codes and removing @mentions, and #tags. Then the data is split into train- and test-partition, using a 90:10 split. Next, the tweets are tokenized and vectorized, based on the frequency (count) of each word that occurs in the entire text, using the respective Spark ML native methods. As a last step, IDF (inverse document frequency) is applied.
The train partition is then used to learn the logistic regression model used for sentiment classification, which is then evaluated on the test partition.
Once all docker and minikube are installed, the following steps can be used to get the prerequisites necessary, to deploy the application up and running:
-
Start Docker (Open Docker Desktop)
-
Next minikube should be started. To do this, run:
minikube start --addons=ingress
-
Setup the Strimzi.io Kafka operator:
helm repo add strimzi http://strimzi.io/charts/ helm upgrade --install my-kafka-operator strimzi/strimzi-kafka-operator kubectl apply -f https://farberg.de/talks/big-data/code/helm-kafka-operator/kafka-cluster-def.yaml
-
Create a Hadoop cluster with YARN (for checkpointing):
helm repo add pfisterer-hadoop https://pfisterer.github.io/apache-hadoop-helm/ helm upgrade --install my-hadoop-cluster pfisterer-hadoop/hadoop --namespace=default --set hdfs.dataNode.replicas=1 --set yarn.nodeManager.replicas=1 --set hdfs.webhdfs.enabled=true
To develop the application using Skaffold, run the following command from the src folder:
skaffold dev
There are two ways to connect to the application.
-
To connect to LoadBalancer services, run:
minikube tunnel
Once this is done, the application can be accessed through: http://localhost.
-
Alternatively, you can generate a URL using:
minikube service popular-slides-service --url
In case an installation command fails, try to update the respective helm repo using one of the commands below:
helm repo update strimzi
helm repo update pfisterer-hadoop
If issues arise, try to redeploy k8s resources.
After initial startup, skaffold dev might fail if executed too quickly after starting kafka. Just re-run skaffold dev after waiting a short period.
If deployment still doesn't work, here are some commands to completely redeploy Hadoop and Kafka.
To delete strimzi resources:
// Get resources managed by strimzi
kubectl get strimzi -o name
// Pass those resources to delete
kubectl delete <name>
To delete a helm chart:
helm list
helm delete <chartname>
To delete a helm repo:
helm repo list
helm repo remove <repo_name>
Now you can execute the commands from prequisites again.