In [1]:
# if you installed Spark on windows, 
# you may need findspark and need to initialize it prior to being able to use pyspark
# Also, you may need to initialize SparkContext yourself.
# Uncomment the following lines if you are using Windows!
#import findspark
#findspark.init()
#findspark.find()

import pyspark
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("GenericAppName") \
    .getOrCreate()

22/10/27 20:50:47 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
from pyspark.sql.functions import *

In [3]:
path1=["train70_reduced.csv"]
path2=["test30_reduced.csv"]
df1 = spark.read.csv(path1,header=True, inferSchema= True) #training dataframe
df2=spark.read.csv(path2,header=True, inferSchema= True) # test dataframe

                                                                                

In [4]:
from pyspark.sql.functions import lit
df1=df1.withColumn("istrain", lit('1'))
df2=df2.withColumn("istrain", lit('0')) 

In [22]:
db_properties={}
#update your db username
db_properties['username']="postgres"
#update your db password
db_properties['password']="1234"
#make sure you got the right port number here
db_properties['url']= "jdbc:postgresql://localhost:5432/postgres"
#make sure you had the Postgres JAR file in the right location
db_properties['driver']="org.postgresql.Driver"
db_properties['table']= "mqtt.mqtt_reduced"


In [23]:

df1.write.format("jdbc")\
.mode("overwrite")\
.option("url", db_properties['url'])\
.option("dbtable", db_properties['table'])\
.option("user", db_properties['username'])\
.option("password", db_properties['password'])\
.option("Driver", db_properties['driver'])\
.save()

df2.write.format("jdbc")\
.mode("append")\
.option("url", db_properties['url'])\
.option("dbtable", db_properties['table'])\
.option("user", db_properties['username'])\
.option("password", db_properties['password'])\
.option("Driver", db_properties['driver'])\
.save()


                                                                                

In [24]:
df_read = spark.read.format("jdbc")\
    .option("url", db_properties['url'])\
    .option("dbtable", db_properties['table'])\
    .option("user", db_properties['username'])\
    .option("password", db_properties['password'])\
    .option("Driver", db_properties['driver'])\
    .load()

In [25]:
df_read.printSchema()

root
 |-- tcp.flags: string (nullable = true)
 |-- tcp.time_delta: double (nullable = true)
 |-- tcp.len: integer (nullable = true)
 |-- mqtt.conack.flags: string (nullable = true)
 |-- mqtt.conack.flags.reserved: double (nullable = true)
 |-- mqtt.conack.flags.sp: double (nullable = true)
 |-- mqtt.conack.val: double (nullable = true)
 |-- mqtt.conflag.cleansess: double (nullable = true)
 |-- mqtt.conflag.passwd: double (nullable = true)
 |-- mqtt.conflag.qos: double (nullable = true)
 |-- mqtt.conflag.reserved: double (nullable = true)
 |-- mqtt.conflag.retain: double (nullable = true)
 |-- mqtt.conflag.uname: double (nullable = true)
 |-- mqtt.conflag.willflag: double (nullable = true)
 |-- mqtt.conflags: string (nullable = true)
 |-- mqtt.dupflag: double (nullable = true)
 |-- mqtt.hdrflags: string (nullable = true)
 |-- mqtt.kalive: double (nullable = true)
 |-- mqtt.len: double (nullable = true)
 |-- mqtt.msg: string (nullable = true)
 |-- mqtt.msgid: double (nullable = true)
 |-

In [26]:
for i in df_read.columns:
    df_read=df_read.withColumnRenamed(i,i.replace('.','_'))

In [27]:
df_read.printSchema()

root
 |-- tcp_flags: string (nullable = true)
 |-- tcp_time_delta: double (nullable = true)
 |-- tcp_len: integer (nullable = true)
 |-- mqtt_conack_flags: string (nullable = true)
 |-- mqtt_conack_flags_reserved: double (nullable = true)
 |-- mqtt_conack_flags_sp: double (nullable = true)
 |-- mqtt_conack_val: double (nullable = true)
 |-- mqtt_conflag_cleansess: double (nullable = true)
 |-- mqtt_conflag_passwd: double (nullable = true)
 |-- mqtt_conflag_qos: double (nullable = true)
 |-- mqtt_conflag_reserved: double (nullable = true)
 |-- mqtt_conflag_retain: double (nullable = true)
 |-- mqtt_conflag_uname: double (nullable = true)
 |-- mqtt_conflag_willflag: double (nullable = true)
 |-- mqtt_conflags: string (nullable = true)
 |-- mqtt_dupflag: double (nullable = true)
 |-- mqtt_hdrflags: string (nullable = true)
 |-- mqtt_kalive: double (nullable = true)
 |-- mqtt_len: double (nullable = true)
 |-- mqtt_msg: string (nullable = true)
 |-- mqtt_msgid: double (nullable = true)
 |-

### Task 2

#### Task 2 part 1

In [28]:
q1=df_read.where(df_read.istrain == '1').select(mean('mqtt_len')).show()

+------------------+
|     avg(mqtt_len)|
+------------------+
|31.435725201384873|
+------------------+



#### Task 2 part 2

In [29]:
B=df_read.groupby('target').mean('tcp_len')

In [30]:
B.show()

+----------+------------------+
|    target|      avg(tcp_len)|
+----------+------------------+
|   slowite|3.9993479678330797|
|bruteforce|3.9871043376318873|
|     flood|13313.415986949429|
| malformed| 20.97491761259612|
|       dos|312.65759830457716|
|legitimate| 7.776101001432345|
+----------+------------------+



#### Task 2 part 3

In [31]:
def freqX(df : pyspark.sql.dataframe.DataFrame, X: int):
    
    count_class = df.groupby('tcp_flags').count().sort('count',ascending=False)
    
    assert X<= count_class.count() , f" Given {X} parameter is greater than number of TCP Flags {count_class.count()}"
    
    count_class = count_class.limit(X)
    
    return count_class

In [35]:
counted=freqX(df_read,6)

In [36]:
counted.show()

+----------+------+
| tcp_flags| count|
+----------+------+
|0x00000018|183076|
|0x00000010|134547|
|0x00000011|  4198|
|0x00000002|  3372|
|0x00000012|  3372|
|0x00000004|  1592|
+----------+------+

