Starting a Spark session

In [13]:
# Uncomment the following lines if you are using Windows!
import findspark
findspark.init()
findspark.find()

import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *

from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext

appName = "Big Data Analytics"
master = "local"

# Create Configuration object for Spark.
conf = pyspark.SparkConf()\
    .set('spark.driver.host','127.0.0.1')\
    .setAppName(appName)\
    .setMaster(master)

# Create Spark Context with the new configurations rather than relying on the default one
sc = SparkContext.getOrCreate(conf=conf)

# You need to create SQL Context to conduct some database operations like what we will see later.
sqlContext = SQLContext(sc)

# If you have SQL context, you create the session from the Spark Context
spark = sqlContext.sparkSession.builder.getOrCreate()

Creating a schema named MQTT

In [38]:
mqtt = StructType([StructField('tcp_flags', StringType(), True), 
                   StructField('tcp_time_delta', DoubleType(), True), 
                   StructField('tcp_len', IntegerType(), True), 
                   StructField('mqtt_conack_flags', StringType(), True), 
                   StructField('mqtt_conack_flags_reserved', DoubleType(), True), 
                   StructField('mqtt_conack_flags_sp', DoubleType(), True), 
                   StructField('mqtt_conack_val', DoubleType(), True), 
                   StructField('mqtt_conflag_cleansess', DoubleType(), True), 
                   StructField('mqtt_conflag_passwd', DoubleType(), True), 
                   StructField('mqtt_conflag_qos', DoubleType(), True), 
                   StructField('mqtt_conflag_reserved', DoubleType(), True), 
                   StructField('mqtt_conflag_retain', DoubleType(), True), 
                   StructField('mqtt_conflag_uname', DoubleType(), True), 
                   StructField('mqtt_conflag_willflag', DoubleType(), True), 
                   StructField('mqtt_conflags', StringType(), True), 
                   StructField('mqtt_dupflag', DoubleType(), True), 
                   StructField('mqtt_hdrflags', StringType(), True), 
                   StructField('mqtt_kalive', DoubleType(), True), 
                   StructField('mqtt_len', DoubleType(), True), 
                   StructField('mqtt_msg', StringType(), True), 
                   StructField('mqtt_msgid', DoubleType(), True), 
                   StructField('mqtt_msgtype', DoubleType(), True), 
                   StructField('mqtt_proto_len', DoubleType(), True), 
                   StructField('mqtt_protoname', StringType(), True), 
                   StructField('mqtt_qos', DoubleType(), True), 
                   StructField('mqtt_retain', DoubleType(), True), 
                   StructField('mqtt_sub_qos', DoubleType(), True), 
                   StructField('mqtt_suback_qos', DoubleType(), True), 
                   StructField('mqtt_ver', DoubleType(), True), 
                   StructField('mqtt_willmsg', DoubleType(), True), 
                   StructField('mqtt_willmsg_len', DoubleType(), True), 
                   StructField('mqtt_willtopic', DoubleType(), True), 
                   StructField('mqtt_willtopic_len', DoubleType(), True), 
                   StructField('target', StringType(), True), 
                   StructField('train_flag', IntegerType(), False)])

In [39]:
df_train = spark.read.csv(r"D:\data\train70_reduced.csv", header = True, inferSchema= False, schema = mqtt)
df_test = spark.read.csv(r"D:\data\test30_reduced.csv", header = True, inferSchema= False, schema = mqtt)
df_train = df_train.withColumn('train_flag', lit(1)) #train flag is 1 if the data is training set or 0 if test set
df_test = df_test.withColumn('train_flag', lit(0))

In [40]:
df_comb = df_train.union(df_test)
df_comb.printSchema()
df_comb.show(vertical = True)

root
 |-- tcp_flags: string (nullable = true)
 |-- tcp_time_delta: double (nullable = true)
 |-- tcp_len: integer (nullable = true)
 |-- mqtt_conack_flags: string (nullable = true)
 |-- mqtt_conack_flags_reserved: double (nullable = true)
 |-- mqtt_conack_flags_sp: double (nullable = true)
 |-- mqtt_conack_val: double (nullable = true)
 |-- mqtt_conflag_cleansess: double (nullable = true)
 |-- mqtt_conflag_passwd: double (nullable = true)
 |-- mqtt_conflag_qos: double (nullable = true)
 |-- mqtt_conflag_reserved: double (nullable = true)
 |-- mqtt_conflag_retain: double (nullable = true)
 |-- mqtt_conflag_uname: double (nullable = true)
 |-- mqtt_conflag_willflag: double (nullable = true)
 |-- mqtt_conflags: string (nullable = true)
 |-- mqtt_dupflag: double (nullable = true)
 |-- mqtt_hdrflags: string (nullable = true)
 |-- mqtt_kalive: double (nullable = true)
 |-- mqtt_len: double (nullable = true)
 |-- mqtt_msg: string (nullable = true)
 |-- mqtt_msgid: double (nullable = true)
 |-

In [41]:
db_properties={}
#update your db username
db_properties['username']="postgres"
#update your db password
db_properties['password']="asdfghjkl@23456"
#make sure you got the right port number here
db_properties['url']= "jdbc:postgresql://localhost:5432/postgres"
#make sure you had the Postgres JAR file in the right location
db_properties['driver']="org.postgresql.Driver"
db_properties['table']= "MQTT_Reduced"


df_comb.write.format("jdbc")\
.mode("overwrite")\
.option("url", db_properties['url'])\
.option("dbtable", db_properties['table'])\
.option("user", db_properties['username'])\
.option("password", db_properties['password'])\
.option("Driver", db_properties['driver'])\
.save()


In [42]:
from pyspark.sql.functions import *

In [43]:
df_read = sqlContext.read.format("jdbc")\
    .option("url", db_properties['url'])\
    .option("dbtable", db_properties['table'])\
    .option("user", db_properties['username'])\
    .option("password", db_properties['password'])\
    .option("Driver", db_properties['driver'])\
    .load()

df_read.show(1, vertical=True)

-RECORD 0--------------------------------
 tcp_flags                  | 0x00000018 
 tcp_time_delta             | 0.998867   
 tcp_len                    | 10         
 mqtt_conack_flags          | 0          
 mqtt_conack_flags_reserved | 0.0        
 mqtt_conack_flags_sp       | 0.0        
 mqtt_conack_val            | 0.0        
 mqtt_conflag_cleansess     | 0.0        
 mqtt_conflag_passwd        | 0.0        
 mqtt_conflag_qos           | 0.0        
 mqtt_conflag_reserved      | 0.0        
 mqtt_conflag_retain        | 0.0        
 mqtt_conflag_uname         | 0.0        
 mqtt_conflag_willflag      | 0.0        
 mqtt_conflags              | 0          
 mqtt_dupflag               | 0.0        
 mqtt_hdrflags              | 0x00000030 
 mqtt_kalive                | 0.0        
 mqtt_len                   | 8.0        
 mqtt_msg                   | 32         
 mqtt_msgid                 | 0.0        
 mqtt_msgtype               | 3.0        
 mqtt_proto_len             | 0.0 

Mean Length of MQTT message in Training set

In [51]:
train_lens = df_read.select('mqtt_len').where(df_read.train_flag == 1)
train_lens.summary('mean').show()

+-------+------------------+
|summary|          mqtt_len|
+-------+------------------+
|   mean|31.435725201384873|
+-------+------------------+



Average length of TCP message for each value in target

In [13]:
df_read.groupby('target').mean('tcp_len').show()

+----------+------------------+
|    target|      avg(tcp_len)|
+----------+------------------+
|   slowite| 3.463592609843192|
|bruteforce| 3.260886699507389|
|     flood|13479.687645687645|
| malformed|20.892362019356526|
|       dos| 305.3959037254816|
|legitimate| 7.785061817930654|
+----------+------------------+



An interesting insight from the above analysis, the flood type as the highest average TCP message length. Since flood attack is flooding the server with a ton of messages, this is expected. 

Return top X tcp flags

In [55]:
import pandas as pd
def freq_tcp(x):
    freq_tcps_df = df_read.groupby('tcp_flags').count().sort(col('count').desc())
    return pd.DataFrame(freq_tcps_df.head(x), columns = ['TCP FLAG', 'COUNT']) #displaying a Pandas dataframe since it looked better

In [56]:
#testing the function
freq_tcp(3)

Unnamed: 0,TCP FLAG,COUNT
0,0x00000018,183076
1,0x00000010,134547
2,0x00000011,4198


Using Google News feed to find most popular targets

In [57]:
from confluent_kafka import Producer
import socket
#Initialize Your Parameters here - Keep the variable values as is for the ones you can't find on the Confluent-Kafka connection 
KAFKA_CONFIG = {
    "bootstrap.servers":"pkc-lzvrd.us-west4.gcp.confluent.cloud:9092",
    "security.protocol":"SASL_SSL",
    "sasl.mechanisms":"PLAIN",
    "sasl.username":"7EOPJNPKEKGPG2ZV",
    "sasl.password":"NzHUaLrZRxGaxfrN8qeCuTG/wI7iXP+MjWBN19Lc2rf/4xEp4VSgQY7/8VLCKLaF",
    "session.timeout.ms":"45000",
    "group.id":"python-group-1",
    'auto.offset.reset': 'smallest',
    'client.id': socket.gethostname()
}

# Update your topic name
topic_name = "topic_1"
producer = Producer(KAFKA_CONFIG)


In [58]:
import feedparser
import time
# We are searching for Popular Cyber Attacks in the news
feed_url = "https://news.google.com/rss/search?q=popular+cyber+attacks"
def extract_news_feed(feed_url):
    start_time = time.time()
    print('Feed started at ' + str(start_time))
    feed = feedparser.parse(feed_url)
    articles = []
    extracted_articles = set()
    while time.time() < 300 + start_time: #running five minutes feed
        for entry in feed.entries:
            link = entry.link
            title = entry.title.encode('ascii', 'ignore').decode()
            unique_id = f'{link}-{title}'
            if unique_id in extracted_articles:
                continue
            extracted_articles.add(unique_id)
            article_data = {"title": title, "link":link}
            if article_data is not None:
                producer.produce(topic_name, key=article_data["title"], value=article_data["link"])
    print('Feed ended at ' + str(time.time()))
    print('Total time elapsed ' + str(time.time() - start_time))
    producer.flush()

    
extract_news_feed(feed_url)

Feed started at 1697159993.4363306
Feed ended at 1697160293.4371393
Total time elapsed 300.0008087158203


In [59]:
from confluent_kafka import Consumer
from pyspark.sql.types import *
import string


# Clean the punctation by making a translation table that maps punctations to empty strings
translator = str.maketrans("", "", string.punctuation)


emp_RDD = spark.sparkContext.emptyRDD()
# Defining the schema of the DataFrame
columns = StructType([StructField('key', StringType(), False),
                      StructField('value', StringType(), False)])

# Creating an empty DataFrame
df = spark.createDataFrame(data=emp_RDD,
                                   schema=columns)
 
# Printing the DataFrame with no data
df.show()

consumer = Consumer(KAFKA_CONFIG)
consumer.subscribe([topic_name])

try:
    i = 0
    while i < 5:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            i = i + 1
            print("Waiting...")
            continue
        if msg is not None:
            key = msg.key().decode('utf-8').lower().translate(translator)
            cleaned_key = " ".join(key.split())
            value = msg.value().decode('utf-8')
            added_row = [[cleaned_key,value]]
            added_df = spark.createDataFrame(added_row, columns)
            df = df.union(added_df)

except KeyboardInterrupt:
    pass
finally:
    consumer.close()
    df.show()


+---+-----+
|key|value|
+---+-----+
+---+-----+

Waiting...
Waiting...
Waiting...
Waiting...
Waiting...
+--------------------+--------------------+
|                 key|               value|
+--------------------+--------------------+
|are you making yo...|https://news.goog...|
|what is threat in...|https://news.goog...|
|us smbs urge for ...|https://news.goog...|
|cyber algorithm n...|https://news.goog...|
|home affairs and ...|https://news.goog...|
|the undeniable be...|https://news.goog...|
|isro fights over ...|https://news.goog...|
|the 6 most common...|https://news.goog...|
|the role of gover...|https://news.goog...|
|cyberattacks in t...|https://news.goog...|
|newsom signs bill...|https://news.goog...|
|utsas cybersecuri...|https://news.goog...|
|cyber insurance l...|https://news.goog...|
|whatsapp says vir...|https://news.goog...|
|in an era of esca...|https://news.goog...|
|canadian organiza...|https://news.goog...|
|cisos watch out t...|https://news.goog...|
|generative ai i

In [60]:
from pyspark.sql.functions import *
import nltk 
nltk.download('stopwords')

stop_words = nltk.corpus.stopwords.words('english')
streamed_data = df.withColumn('word', explode(split(col('key'), ' '))) \
                .filter(~col('word').isin(stop_words)) \
                .groupBy('word') \
                .count() \
                .sort('count', ascending=False)
    
streamed_data.show()

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\aayus\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


+-------------+-----+
|         word|count|
+-------------+-----+
|        cyber|   61|
|      attacks|   24|
|     security|   23|
|      threats|   13|
|cybersecurity|   12|
| cyberattacks|   12|
|       attack|   10|
|         news|    9|
|     magazine|    9|
|  cyberattack|    8|
|       forbes|    6|
|         2023|    6|
|          hit|    5|
| infosecurity|    5|
|     industry|    5|
|           ai|    5|
|        point|    5|
| intelligence|    4|
|           us|    4|
|       online|    4|
+-------------+-----+
only showing top 20 rows



In [61]:
attack_types = ['slowite', 'dos', 'legitimate', 'malformed', 'brute', 'ddos', 'denial-of-service']

In [62]:
attack_count = streamed_data.filter(col('word').isin(attack_types))
attack_count.show()

+----+-----+
|word|count|
+----+-----+
+----+-----+

