This is a flink based project that utilizes kafka to simulate a real incoming stream of twitter data into flink. Data is parsed and processed and then fed into a pipeline of logical subdivisions that flink calls windows. The result of the pipeline is the users that contribute the most to the biggest star topologies present in the network graph. This is achieved by calculating the out degree centrality of each user. Then the users that have a higher than 1000 out degree pass through a filter and persistent stars can be detected across the period of a month.
In order for the data to be fed into flink I use a bash script that reads a months worth of data and writes it to the kafka stream. Path variables need to be changed to run locally on another machine. I have marked them with @pathChange.
- Install kafka locally from https://kafka.apache.org/downloads. In the makings of this project kafka 2.8.0 was used.
- On a new terminal run
bin/zookeeper-server-start.sh config/zookeeper.properties
- On another terminal run
bin/kafka-server-start.sh config/server.properties
- Create a topic named quickstart-events by running
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
on another terminal - Step 4 has to be performed only once.
- Run the maven project
StreamEnv_lessStrictStarDetectionTest.java
in InteliJ - On a terminal run the script
resources/startStream.sh
A maven assembly plug-in included in this project can produce a jar file by running the install
command in the maven lifecycle.
The project also contains an algorithm that works on static graphs, utilizing the dataset library and simple execution environment, and detects strict star topologies. It is located in
starDetectionTest.java
. In contrast, the StreamEnv_lessStrictStarDetectionTest.java
uses a stream environment and utilizes streaming solutions. Also in the main class of this project there is a flatMap implementation called ExistingStar that performs the static persistent user star search.The parsing script used is also inclused in the resources/parseAllToCSV.py
file.
- If you are having issues related to kafka logs you can delete them without breaking anything by running
rm -rf /tmp/kafka-logs
- If you are having issues related to the server in which kafka is listening then try changing the
listeners
in theserver.properties
config file tolocalhost
Distributed under the MIT License. See LICENSE for more information