In [1]:
import pycef
import pandas as pd

In [2]:
from pyspark.sql.streaming import *
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql import functions as f
from pyspark.sql.types import *

In [3]:
os.environ["PYSPARK_PYTHON"] = "python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python3"
os.environ["SPARK_HOME"] = "/home/emre/spark-3.0.1-bin-hadoop2.7"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 pyspark-shell'

In [4]:
spark_sql_kafka = "/home/emre/spark-3.0.1-bin-hadoop2.7/jars/spark-sql-kafka-0-10_2.12-3.0.1.jar"
kafka_clients = "/home/emre/spark-3.0.1-bin-hadoop2.7/jars/kafka-clients-2.6.0.jar"

In [5]:
kafka_topic_name = "cef-topic-kafka"
kafka_bootstrap_servers = 'localhost:9092'

In [7]:
spark = SparkSession \
        .builder \
        .appName("spark-kafka2") \
        .master("local[*]") \
        .config("spark.jars", spark_sql_kafka) \
        .config("spark.jars", kafka_clients) \
        .config("spark.driver.extraClassPath","/home/emre/spark-3.0.1-bin-hadoop2.7/jars/*.jar") \
        .config("spark.executor.extraClassPath","/home/emre/spark-3.0.1-bin-hadoop2.7/jars/*.jar") \
        .config("spark.jars", "/home/emre/spark-3.0.1-bin-hadoop2.7/jars/postgresql-42.3.1.jar") \
        .getOrCreate()

In [8]:
 data_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_name) \
        .option("startingOffsets", "earliest") \
        .load()

In [9]:
data_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [10]:
data_df.selectExpr("CAST(key as STRING)","CAST(value as STRING)") \
.writeStream \
.format("memory") \
.queryName("ceftopic") \
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7f02d3ef2c50>

In [11]:
df=spark.sql("SELECT value FROM ceftopic")

In [12]:
df2=df.select([col(value).cast("string") for value in df.columns])

In [13]:
df2.show()

+--------------------+
|               value|
+--------------------+
|CEF:0|Citrix|NetS...|
|CEF:0|Citrix|NetS...|
|CEF:0|Citrix|NetS...|
|CEF:0|Citrix|NetS...|
|CEF:0|Citrix|NetS...|
|CEF:0|Citrix|NetS...|
|CEF:0|Citrix|NetS...|
|CEF:0|Citrix|NetS...|
+--------------------+



In [14]:
b=[]

In [15]:
 for f in df2.collect(): 
        #print (pycef.parse(f.value))
        b.append(pycef.parse(f.value))

In [16]:
sparkDF=spark.createDataFrame(Row(**x) for x in b)

In [17]:
sparkDF.printSchema

<bound method DataFrame.printSchema of DataFrame[DeviceVendor: string, DeviceProduct: string, DeviceVersion: string, DeviceEventClassID: string, Name: string, DeviceName: string, Severity: string, DeviceSeverity: string, CEFVersion: string, src: string, spt: string, method: string, request: string, msg: string, cn1: string, cn2: string, cs1: string, cs2: string, cs3: string, cs4: string, cs5: string, act: string]>

In [21]:
sparkDF_LST=sparkDF \
.withColumn("Severity",sparkDF["Severity"].cast("integer")) \
.withColumn("DeviceSeverity",sparkDF["DeviceSeverity"].cast("integer")) \
.withColumn("CEFVersion",sparkDF["CEFVersion"].cast("integer")) \
.withColumn("spt",sparkDF["spt"].cast("integer")) \
.withColumn("cn1",sparkDF["cn1"].cast("integer")) \
.withColumn("cn2",sparkDF["cn2"].cast("integer")) \
.withColumn("cs5",sparkDF["cs5"].cast("integer")) 


In [22]:
sparkDF_LST.printSchema

<bound method DataFrame.printSchema of DataFrame[DeviceVendor: string, DeviceProduct: string, DeviceVersion: string, DeviceEventClassID: string, Name: string, DeviceName: string, Severity: int, DeviceSeverity: int, CEFVersion: int, src: string, spt: int, method: string, request: string, msg: string, cn1: int, cn2: int, cs1: string, cs2: string, cs3: string, cs4: string, cs5: int, act: string]>

In [25]:
sparkDF_LST.show()

+------------+-------------+-------------+------------------+--------------+--------------+--------+--------------+----------+-------------+-----+------+--------------------+--------------------+---+---+--------+----+--------------------+-----+----+-------+
|DeviceVendor|DeviceProduct|DeviceVersion|DeviceEventClassID|          Name|    DeviceName|Severity|DeviceSeverity|CEFVersion|          src|  spt|method|             request|                 msg|cn1|cn2|     cs1| cs2|                 cs3|  cs4| cs5|    act|
+------------+-------------+-------------+------------------+--------------+--------------+--------+--------------+----------+-------------+-----+------+--------------------+--------------------+---+---+--------+----+--------------------+-----+----+-------+
|      Citrix|    NetScaler|       NS10.0|             APPFW|APPFW_STARTURL|APPFW_STARTURL|       6|             6|         0|10.217.253.78|53743|   GET|http://vpx247.exa...|Disallow Illegal ...|233|205|profile1|PPE0|AjSZM26h2

In [28]:
mode = "overwrite"
url = "jdbc:postgresql://localhost:5432/ceftopic"
properties = {"user": "postgres","password": "*****","driver": "org.postgresql.Driver"}

In [None]:
sparkDF_LST \
.write.\
jdbc(url=url, table="log_", mode=mode, properties=properties)