- Project goal
- Project scope
- Project design
- Tech stack
- Local setup
- Future enhancements
- References
- Miscelleneous
Real-time analysis of OS Process logs through a data pipeline built using Apache Kafka, Spark, and Airflow
- The file
/var/log/syslog
captures all process logs - Of the various logs, the ones of interest are those having a message starting with "Started" or "Successfully activated"
- The message format follows a consistent pattern:
<Month><Date><Time><host><Process><Process ID><Started/Successfully activated><Service Name>
- Project scope is to setup Kafka pipeline and enable Spark analysis, hence detailed metrics are not captured from syslog
- The metrics that we will be analyzing:
- Most frequent service
- Most frequent daemon/process
- Number of processes started/activated at a given time
- The fields that we'll capture to enable the above analyses:
- Date and Time
- Host name
- Process name
- Process ID
- Service name
- An
Apache Airflow
instance periodically triggers theKafka Streams
andKafka Producer
instances - An
Apache Kafka Producer
instance pulls data from/var/sys/log
to read OS process logs, writes it to Kafka Topic 1 - An
Apache Kafka Streams
instance pulls data from Kafka Topic 1, converts intoProtobuf
format, and writes it to Kafka Topic 2 - An
Apache Kafka Consumer
instance pulls data from Kafka Topic 2, convert into Java POJO, and saves into aMySQL table
viaJPA Hibernate
- An
Apache Spark
instance pulls data from the MySQL table, computes metrics, and produces visualizations
- Backend: Java Spring, Spring Boot
- Storage/Queue:
- Big data processor:
- Scheduler: Apache Airflow
- Data format: Protobuf
- Others:
- Storage access tool: Spring JPA
- Data visualization: Pandas, Seaborn, Matplotlib
- Apache Kafka
- Apache Spark
- Apache Airflow
- MySQL Database
- Python 3.10 and Java 17
- This git repository
$ git clone https://github.com/bmsohwinc/kafka-os-logs.git
$ cd /kafka/directory
$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties
$ sudo service mysql start
syslog.path=/var/sys/log # in case this is different, only then update this prop
spring.datasource.password=your_mysql_password
$ ./runner.sh
- Create
~/airflow/dags
directory - Copy the dag files from
airflow-dag
directory of this project to the above location - Run
$ python *.dag
to submit the airflow configs - Head over to the Airflow dashboard and enable the jobs tagged with
kos
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kos-streams-proto-log --from-beginning
mysql> use kos;
mysql> select count(*) from parsedoslog_entry;
$ /home/bms/projects/my-spark/spark-3.5.0-bin-hadoop3/bin/spark-submit --packages "com.mysql:mysql-connector-j:8.3.0" --master local[4] KOSSparkApp.py
- Read syslog file with an offset + limit
- Currently, entire file is read on every producer call
- This can be solved by reading from an API instead of plain file
- Display metrics in Dashboard
- Currently, metrics captured in Spark are logged in the console and saved as images
- Setting a React/Redux dashboard with Charts updating in real-time would streamline analysis
- Enrich log capture
- Currently, only the syslog entries are recorded
- This can be extended to provide a framework that can consume any log and provide visualizations
- It can also provide an interface that can be implemented by developers to write custom parsing code
- Downloading ProtoBuf Compiler protoc
- Populating Repeated fields in Protobuf class
- Saving Custom objects in Kafka Topics
- Basics about various log files
- What all we could do with System logs
- Providing correct paths for protobuf compilation
- Compiling protobuf files
- Properly importing external or library proto files in main proto file
- Converting LocalDateTime to Protobuf Timestamp
- Writing custom Serdes
- Writing Serializer and Deserializer for Protobuf classes
- Serdes is just a combination of a serializer and a deserializer. You have to implement all 3 (Serializer, Deserializer, and Serdes) to use it in the Producer/Consumer/Streams
- You can provide
custom Serdes
usingConsumed
andProduced
classes in theKStreams source
andsink
, respectively - Reading Kafka records from Spark using writeStream.start() and .awaitTermination()
- Include necessary protobuf and kafka packages in spark-submit command line
- Creating Protobuf descriptor files
- Good article on spark streaming with kafka
- Creating a simple Spring JPA project with MySQL
- Configuring Kafka consumer
- Starting MySQL service if down or cannot connect
Click to expand other commands used during dev work
$ sudo service mysql start
$ mvn package -DskipTests
$ java -jar target/kos-0.0.1-SNAPSHOT.jar
OR
$ ./runner.sh
- Create the
osparsed.proto
file in resources/ directory - Import necessary
google/*.proto
files in theosparsed.proto
file (import "google/protobuf/timestamp.proto";
in this case) - Note down full path to the directory where
osparsed.proto
resides and the full path where the protoc'sinclude
directory resides - Run the below commands
$ cd /path/to/protoc
$ ./protoc -I /home/bms/my-proto/include -I /home/bms/projects/my-kafka/kos/src/main/resources/proto --java_out=/home/bms/projects/my-kafka/kos/src/main/java/ /home/bms/projects/my-kafka/kos/src/main/resources/proto/osparsed.proto
- Run the below commands (do not forget the
include_imports
flag, else dependencies won't be added to the descriptor)
$ ./protoc --include_imports -I /home/bms/my-proto/include -I /home/bms/projects/my-kafka/kos/src/main/resources/proto --descriptor_set_out=py-spark/KOSParsed.desc /home/bms/projects/my-kafka/kos/src/main/resources/proto/osparsed.proto
- Run the below commands (do not forget the
packages
flag)
$ /home/bms/projects/my-spark/spark-3.5.0-bin-hadoop3/bin/spark-submit --packages "org.apache.spark:spark-protobuf_2.12:3.5.0","org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0" --master local[4] KStreamApp.py
- Read the records written to your kafka topic via the command:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <KAFKA_TOPIC_NAME> --from-beginning
- Install Airflow as given in Official doc
- Create
~/airflow/dags
directory - Copy the dag files from
airflow-dag
directory of this project to the above location - Run
$ python *.dag
to submit the airflow configs - Head over to the Airflow dashboard and enable the jobs tagged with
kos
- Start Zookeeper and Kafka main
$ cd /kafka/directory
$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties
- Open Kafka console producer for topic-1
$ cd /kafka/directory
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic kos-raw-os-log
- Open Kafka console consumer for topic-1
$ cd /kafka/directory
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kos-raw-os-log --from-beginning
- Open Kafka console consumer for topic-2
$ cd /kafka/directory
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-test-output-2 --from-beginning
- Start KOSApplication
$ cd /project/directory
$ ./runner.sh
- Start Kafka Streams
$ curl http://localhost:8080/kos/streams/start
- Start PySpark Application
$ /home/bms/projects/my-spark/spark-3.5.0-bin-hadoop3/bin/spark-submit --packages "org.apache.spark:spark-protobuf_2.12:3.5.0","org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0" --master local[4] KStreamApp.py
- Write to Kafka producer of topic-1
> Feb 24 20:29:42 dell-Inspiron-15-3567 systemd[1]: Started Locale Service.
- Check that records are logged onto:
- Topic-1 Console Consumer
- Topic-2 Console Consumer
- PySpark Console
- Java Spring
- Spring Kafka