In [52]:

import pyspark

from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext
from pyspark.sql.functions import lit

appName = "Project_2"
master = "local"

# Create Configuration object for Spark.
conf = pyspark.SparkConf()\
    .set('spark.driver.host','127.0.0.1')\
    .set('spark.driver.memory', '15g')\
    .setAppName(appName)\
    .setMaster(master)

# Create Spark Context with the new configurations rather than relying on the default one
sc = SparkContext.getOrCreate(conf=conf)

# You need to create SQL Context to conduct some database operations like what we will see later.
sqlContext = SQLContext(sc)

# If you have SQL context, you create the session from the Spark Context
spark = sqlContext.sparkSession.builder.getOrCreate()



In [53]:
# Load data from csv to a dataframe on a local machine. 
# header=False means the first row is not a header 
# sep=',' means the column are seperated using ','


df_train = spark.read.csv("/Users/kiranprasadjp/Downloads/archive (1)/Data/FINAL_CSV/train70.csv",header=True, inferSchema= True)
df_test = spark.read.csv("/Users/kiranprasadjp/Downloads/archive (1)/Data/FINAL_CSV/test30_reduced.csv",header=True, inferSchema= True)
df_train = df_train.withColumn("dataset", lit("train"))
df_test = df_test.withColumn("dataset", lit("test"))
df = df_train.union(df_test)


                                                                                

In [54]:
db_properties={}
db_properties['username']="postgres"
db_properties['password']="1412"
db_properties['url']= "jdbc:postgresql://localhost:5432/postgres"

# I have kept the table name as intrusion2
db_properties['table']="MQTT" 
db_properties['driver']="org.postgresql.Driver"

df.write.format("jdbc")\
.mode("overwrite")\
.option("url", db_properties['url'])\
.option("dbtable", db_properties['table'])\
.option("user", db_properties['username'])\
.option("password", db_properties['password'])\
.option("Driver", db_properties['driver'])\
.save()


                                                                                

In [55]:
df_read = sqlContext.read.format("jdbc")\
    .option("url", db_properties['url'])\
    .option("dbtable", db_properties['table'])\
    .option("user", db_properties['username'])\
    .option("password", db_properties['password'])\
    .option("Driver", db_properties['driver'])\
    .load()

df_read.show(1, vertical=True)
df_read.printSchema()

[Stage 92:>                                                         (0 + 1) / 1]

-RECORD 0--------------------------------
 tcp.flags                  | 0x00000018 
 tcp.time_delta             | 1.41E-4    
 tcp.len                    | 13         
 mqtt.conack.flags          | 0          
 mqtt.conack.flags.reserved | 0.0        
 mqtt.conack.flags.sp       | 0.0        
 mqtt.conack.val            | 0.0        
 mqtt.conflag.cleansess     | 0.0        
 mqtt.conflag.passwd        | 0.0        
 mqtt.conflag.qos           | 0.0        
 mqtt.conflag.reserved      | 0.0        
 mqtt.conflag.retain        | 0.0        
 mqtt.conflag.uname         | 0.0        
 mqtt.conflag.willflag      | 0.0        
 mqtt.conflags              | 0          
 mqtt.dupflag               | 0.0        
 mqtt.hdrflags              | 0x00000030 
 mqtt.kalive                | 0.0        
 mqtt.len                   | 11.0       
 mqtt.msg                   | 32         
 mqtt.msgid                 | 0.0        
 mqtt.msgtype               | 3.0        
 mqtt.proto_len             | 0.0 

                                                                                

In [56]:
from pyspark.sql.functions import col

def replace_dot_with_underscore(data_df):
    # Replace '.' with '_' in column names
    for col_name in data_df.columns:
        new_col_name = col_name.replace('.', '_')
        data_df = data_df.withColumnRenamed(col_name, new_col_name)
    return data_df
renamed_columns_df = replace_dot_with_underscore(df)

In [57]:
renamed_columns_df.printSchema()

root
 |-- tcp_flags: string (nullable = true)
 |-- tcp_time_delta: double (nullable = true)
 |-- tcp_len: integer (nullable = true)
 |-- mqtt_conack_flags: string (nullable = true)
 |-- mqtt_conack_flags_reserved: double (nullable = true)
 |-- mqtt_conack_flags_sp: double (nullable = true)
 |-- mqtt_conack_val: double (nullable = true)
 |-- mqtt_conflag_cleansess: double (nullable = true)
 |-- mqtt_conflag_passwd: double (nullable = true)
 |-- mqtt_conflag_qos: double (nullable = true)
 |-- mqtt_conflag_reserved: double (nullable = true)
 |-- mqtt_conflag_retain: double (nullable = true)
 |-- mqtt_conflag_uname: double (nullable = true)
 |-- mqtt_conflag_willflag: double (nullable = true)
 |-- mqtt_conflags: string (nullable = true)
 |-- mqtt_dupflag: double (nullable = true)
 |-- mqtt_hdrflags: string (nullable = true)
 |-- mqtt_kalive: double (nullable = true)
 |-- mqtt_len: double (nullable = true)
 |-- mqtt_msg: string (nullable = true)
 |-- mqtt_msgid: double (nullable = true)
 |-

In [58]:
from pyspark.sql.functions import avg


# Filter the DataFrame to select only rows where 'dataset' is 'train'
train_df = renamed_columns_df.filter(renamed_columns_df['dataset'] == 'train')

# Calculate the average of the 'len' column for the 'train' dataset
average_length = train_df.agg(avg('mqtt_len')).collect()[0][0]

# Print the average length
print("Average length for 'train' dataset:", average_length)



Average length for 'train' dataset: 7.2058089663222225


                                                                                

In [59]:
average_lengths = renamed_columns_df.groupBy('target').agg(avg('tcp_len').alias('average_tcp_length'))

average_lengths.show()




+----------+------------------+
|    target|average_tcp_length|
+----------+------------------+
|   slowite|3.9993479678330797|
|bruteforce|3.9871043376318873|
|     flood|13313.415986949429|
| malformed| 20.97491761259612|
|       dos|312.65759830457716|
|legitimate| 7.784230642716169|
+----------+------------------+



                                                                                

In [61]:
def list_most_frequent_tcp_flags(data_df, top_n):
    """
    List the most frequent X TCP flags.

    Parameters:
    data_df (pyspark.sql.DataFrame): PySpark DataFrame containing the data.
    top_n (int): Number of most frequent TCP flags to list.

    Returns:
    list: List of dictionaries containing TCP flags and their counts.
    """
    # Group by TCP flags and count their occurrences
    tcp_flags_counts = data_df.groupBy("TCP_flags").count().orderBy(col("count").desc())

    # Check if top_n exceeds the total number of distinct TCP flags
    total_distinct_tcp_flags = tcp_flags_counts.count()
    if top_n > total_distinct_tcp_flags:
        print(f"Error: There are only {total_distinct_tcp_flags} distinct TCP flags.")
        return []

    # Take the top n TCP flags
    top_tcp_flags = tcp_flags_counts.limit(top_n)

    # Collect the results and return as a list of dictionaries
    results = top_tcp_flags.select("TCP_flags", "count").collect()
    return [{"TCP_flags": row["TCP_flags"], "count": row["count"]} for row in results]

# Example usage to list the most frequent TCP flags
while True:
    try:
        top_n = int(input("Enter the number of most frequent TCP flags you want: "))
        if top_n > 0:
            most_frequent_flags = list_most_frequent_tcp_flags(renamed_columns_df, top_n)

            if most_frequent_flags:
                # Display the result
                for flag in most_frequent_flags:
                    print(f"TCP Flag: {flag['TCP_flags']}, Count: {flag['count']}")
                break
        else:
            print("Please enter a value greater than 0.")
    except ValueError:
        print("Please enter a valid integer.")

Enter the number of most frequent TCP flags you want:  -1


Please enter a value greater than 0.


Enter the number of most frequent TCP flags you want:  2.2


Please enter a valid integer.


Enter the number of most frequent TCP flags you want:  111


                                                                                

Error: There are only 8 distinct TCP flags.


Enter the number of most frequent TCP flags you want:  6




TCP Flag: 0x00000018, Count: 5366977
TCP Flag: 0x00000010, Count: 3175803
TCP Flag: 0x00000011, Count: 4198
TCP Flag: 0x00000002, Count: 3382
TCP Flag: 0x00000012, Count: 3382
TCP Flag: 0x00000004, Count: 1592


                                                                                

In [69]:
updated_target_types_list = renamed_columns_df.select('target').distinct()

# Collect the unique values and convert them to a list
updated_target_types_list = [row.target for row in unique_target_types.collect()]
unique_target_types_list = [target.replace('dos', 'denial-of-service') if target == 'dos' else target for target in unique_target_types_list]

# Print the unique protocol types
print("Unique Targets we have from the dataset:")
for target in unique_target_types_list:
    print(target)

Unique Targets we have from the dataset:
slowite
bruteforce
flood
malformed
denial-of-service
legitimate


                                                                                

In [70]:
from confluent_kafka import Producer
import socket
#Initialize Your Parameters here - Keep the variable values as is for the ones you can't find on the Confluent-Kafka connection 
KAFKA_CONFIG = {
    "bootstrap.servers":"pkc-6ojv2.us-west4.gcp.confluent.cloud:9092",
    "security.protocol":"SASL_SSL",
    "sasl.mechanisms":"PLAIN",
    "sasl.username":"EEXTANNTVXA6WCL2",
    "sasl.password":"BisHtfVsPcyqELq9x1Tt2U8o5/g3oTxTIFK6Zn7LUdyKcvK+ST3GHf+CCni2lku8",
    "session.timeout.ms":"45000",
    "group.id":"python-group-1",
    'auto.offset.reset': 'smallest',
    'client.id': socket.gethostname()
}

# Update your topic name
topic_name = "topic_0"
producer = Producer(KAFKA_CONFIG)

%4|1697155351.428|CONFWARN|Kirans-MacBook-Pro.local#producer-20| [thrd:app]: Configuration property group.id is a consumer property and will be ignored by this producer instance
%4|1697155351.428|CONFWARN|Kirans-MacBook-Pro.local#producer-20| [thrd:app]: Configuration property session.timeout.ms is a consumer property and will be ignored by this producer instance
%4|1697155351.428|CONFWARN|Kirans-MacBook-Pro.local#producer-20| [thrd:app]: Configuration property auto.offset.reset is a consumer property and will be ignored by this producer instance


In [74]:
import feedparser
import time

# We are searching for Analytics in the news
feed_url = "https://news.google.com/rss/search?q=popular+cyber+attacks"
def extract_news_feed(feed_url, runtime_minutes=2):
    feed = feedparser.parse(feed_url)
    articles = []
    extracted_articles = set()
    start_time = time.time()
    end_time = start_time + (runtime_minutes * 60)  # Convert minutes to seconds
    while time.time() < end_time:
        for entry in feed.entries:
            link = entry.link
            title = entry.title.encode('ascii', 'ignore').decode()
            unique_id = f'{link}-{title}'
            if unique_id in extracted_articles:
                continue
            extracted_articles.add(unique_id)
            article_data = {"title": title, "link": link}
            if article_data is not None:
                producer.produce(topic_name, key=article_data["title"], value=article_data["link"])
                
        producer.flush()

extract_news_feed(feed_url, runtime_minutes=.1)

In [75]:
from confluent_kafka import Consumer
from pyspark.sql.types import *
import string


# Clean the punctation by making a translation table that maps punctations to empty strings
translator = str.maketrans("", "", string.punctuation.replace('-', ''))


emp_RDD = spark.sparkContext.emptyRDD()
# Defining the schema of the DataFrame
columns = StructType([StructField('key', StringType(), False),
                      StructField('value', StringType(), False)])

# Creating an empty DataFrame
df = spark.createDataFrame(data=emp_RDD,
                                   schema=columns)
 
# Printing the DataFrame with no data
df.show()

consumer = Consumer(KAFKA_CONFIG)
consumer.subscribe([topic_name])

try:
    i = 0
    while i < 5:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            i = i + 1
            print("Waiting...")
            continue
        if msg is not None:
            print(msg.key())
            key = msg.key().decode('utf-8').lower().translate(translator)
            cleaned_key = " ".join(key.split())
            print('decoded', cleaned_key)

            value = msg.value().decode('utf-8')
            added_row = [[cleaned_key,value]]
            added_df = spark.createDataFrame(added_row, columns)
            df = df.union(added_df)

except KeyboardInterrupt:
    pass
finally:
    consumer.close()
    df.show()


+---+-----+
|key|value|
+---+-----+
+---+-----+

Waiting...
Waiting...
b'What small businesses can do to protect against cyber threats - Daily Journal'
decoded what small businesses can do to protect against cyber threats - daily journal
b'Florida schools latest target in escalating cyber attacks - FOX 35 Orlando'
decoded florida schools latest target in escalating cyber attacks - fox 35 orlando
b'US Smashes Annual Data Breach Record With Three Months Left - Infosecurity Magazine'
decoded us smashes annual data breach record with three months left - infosecurity magazine
b'India emerges top-5 victim of cyber attacks in H1 CY23 - BusinessLine'
decoded india emerges top-5 victim of cyber attacks in h1 cy23 - businessline
b'23andMe suffers credential stuffing cyber attack | Cyber Security Hub - Cyber Security Hub'
decoded 23andme suffers credential stuffing cyber attack cyber security hub - cyber security hub
b'Ukraine cyber-conflict: Hacking gangs vow to de-escalate - BBC.com'
decoded uk



+--------------------+--------------------+
|                 key|               value|
+--------------------+--------------------+
|what small busine...|https://news.goog...|
|florida schools l...|https://news.goog...|
|us smashes annual...|https://news.goog...|
|india emerges top...|https://news.goog...|
|23andme suffers c...|https://news.goog...|
|ukraine cyber-con...|https://news.goog...|
|cyber attack - ph...|https://news.goog...|
|will quantum comp...|https://news.goog...|
|mgm faces 100m lo...|https://news.goog...|
|6 actions ceos mu...|https://news.goog...|
|iocta spotlight r...|https://news.goog...|
|cybersecurity awa...|https://news.goog...|
|box and crowdstri...|https://news.goog...|
|how i got started...|https://news.goog...|
|casino giant caes...|https://news.goog...|
|scada the invisib...|https://news.goog...|
|well-informed emp...|https://news.goog...|
|experts canvass a...|https://news.goog...|
|2022s 4 most comm...|https://news.goog...|
|emerging cyber th...|https://ne

                                                                                

In [76]:
from pyspark.sql.functions import *
# import nltk 
# nltk.download('stopwords')

# stop_words = nltk.corpus.stopwords.words('english')
streamed_data = df.withColumn('word', explode(split(col('key'), ' '))) \
                .filter(col('word').isin(unique_target_types_list)) \
                .groupBy('word') \
                .count() \
                .sort('count', ascending=False)
    
    
streamed_data.show()



+-----------------+-----+
|             word|count|
+-----------------+-----+
|denial-of-service|    1|
+-----------------+-----+



                                                                                