In [1]:
from pyspark.sql import SparkSession

cassandra_host = "192.168.1.66"
cassandra_user = "cassandra"
cassandra_pwd  = "cassandra"
cassandra_port = 9042
key_space      = "loganalysis"
table_name     = "bgllogs"
kafka_server   = "192.168.1.66:9092"
kafka_topic    = "logs"

#Spark Session creation configured to interact with MongoDB
spark = SparkSession.builder.appName("pyspark-notebook").\
    config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0,com.datastax.spark:spark-cassandra-connector_2.12:3.0.0,com.datastax.spark:spark-cassandra-connector-driver_2.12:3.0.0").\
    config("spark.cassandra.connection.host",cassandra_host).\
    config("spark.cassandra.auth.username",cassandra_user).\
    config("spark.cassandra.auth.password",cassandra_pwd).\
    getOrCreate()

23/08/21 00:04:20 WARN Utils: Your hostname, Dino-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using fd01:db8:1111:0:0:0:0:3 instead (on interface lo0)
23/08/21 00:04:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/ngohongthai/.ivy2/cache
The jars for the packages stored in: /Users/ngohongthai/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
com.datastax.spark#spark-cassandra-connector-driver_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-10feff6a-5155-43bc-b237-06bfa0bb095b;1.0
	confs: [default]


:: loading settings :: url = jar:file:/Users/ngohongthai/miniconda3/envs/data-engineering/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.7.5 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
	found com.datastax.spark#spark-cassandra-connector_2.12;3.0.0 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.0.0 in central
	found com.datastax.oss#java-driver-core-shaded;4.7.2 in central
	found com.datastax.oss#native-protocol;1.4.10 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
	found com.typesafe#config;1.3.4 in central
	found io.dropwizard.metrics#metrics-core;4.0.5 in central
	found org.hdrhistogram#HdrHisto

### Setup stream

In [2]:
from pyspark.sql.types import StructType, StructField, Row, StringType, IntegerType

kafka_params = {
    "kafka.bootstrap.servers": kafka_server,
    "subscribe": kafka_topic,
    "startingOffsets": "latest"  # Adjust this as needed
}

kafka_stream = spark.readStream \
    .format("kafka") \
    .options(**kafka_params) \
    .load()

kafka_stream = kafka_stream.selectExpr(
    "CAST(value AS STRING) as kafka_message"
)

log_stream = kafka_stream.selectExpr("get_json_object(kafka_message, '$.message') as message")

### Prediction

In [3]:
from drain3 import TemplateMiner
from drain3.template_miner_config import TemplateMinerConfig
from drain3.file_persistence import FilePersistence
from pyspark.sql.functions import udf, unix_timestamp
import datetime

persistence = FilePersistence("drain3_state_bgl.bin")
config = TemplateMinerConfig()
config.load("drain3.ini")
config.profiling_enabled = False
template_miner = TemplateMiner(persistence, config)

def inference_line(log_line):
    log_line = log_line.rstrip()
    line_in_arr = log_line.split(' ')

    if len(line_in_arr) < 9: # wrong format
        current_timestamp = datetime.datetime.now()
        timestamp_string = current_timestamp.strftime('%Y-%m-%d %H:%M:%S')
        return Row('timestamp' ,'cluster_id', 'date', 'content', 'cluster_template', 'label', 'prediction')\
            (timestamp_string,-1, "NA", log_line, "NA", "NA", 'Abnormal')

    label = line_in_arr[0]
    timestamp = line_in_arr[4]
    date = line_in_arr[2]
    content = ' '.join(line_in_arr[8:])

    cluster = template_miner.match(content)

    if cluster is None:
        cluster_id = -1
        cluster_template = "NA"
        prediction = "Abnormal"
    else:
        cluster_id = cluster.cluster_id
        cluster_template = cluster.get_template()
        prediction = "Normal"


    return Row('timestamp' ,'cluster_id', 'date', 'content', 'cluster_template', 'label', 'prediction')\
        (timestamp ,cluster_id, date, content, cluster_template, label, prediction)

schema = StructType([
    StructField("timestamp", StringType(), True),
    StructField("cluster_id", IntegerType(), True),
    StructField("date", StringType(), True),
    StructField("content", StringType(), True),
    StructField("cluster_template", StringType(), True),
    StructField("label", StringType(), True),
    StructField("prediction", StringType(), True)
])

udf_split = udf(inference_line, schema)

processed_df = log_stream\
    .withColumn("parsed", udf_split(log_stream["message"]))\
    .select("parsed.*")\
    .withColumn("time_added",unix_timestamp())


In [4]:
# # write stream to console
# query = processed_df.writeStream \
#     .format("console") \
#     .outputMode("append") \
#     .start()

# query.awaitTermination()


### Write results to cassandra

**Guide**

```bash

docker exec -i -t cassandra bash

cqlsh -u cassandra -p cassandra

CREATE KEYSPACE IF NOT EXISTS loganalysis WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};

CREATE TABLE IF NOT EXISTS loganalysis.bgllogs (
    timestamp text,
    time_added text,
    cluster_id int,
    date text,
    content text,
    cluster_template text,
    label text,
    prediction text,
    PRIMARY KEY (timestamp)
);

truncate table loganalysis.bgllogs;

```

List tables

```bash
describe tables;
```

In [5]:
def process_row(df, epoch_id):
    """Writes data to Cassandra and HDFS location

    Parameters
    ----------
    df : DataFrame
        Streaming Dataframe
    epoch_id : int
        Unique id for each micro batch/epoch
    """
    df.write\
        .format("org.apache.spark.sql.cassandra")\
        .mode('append')\
        .options(table='bgllogs', keyspace='loganalysis')\
        .save() #hot path

#Writes streaming dataframe to ForeachBatch console which ingests data to Cassandra
processed_df \
    .writeStream \
    .foreachBatch(process_row) \
    .start() \
    .awaitTermination()

23/08/21 00:04:28 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/66/6rbnzyf91v9d5v8nb46hmn0h0000gn/T/temporary-7f676aa3-cfe9-4e80-afd3-e76a0cea6e95. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/08/21 00:04:28 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/08/21 00:04:46 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
23/08/21 00:04:47 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
23/08/21 00:04:47 WARN KafkaDataConsumer: KafkaData