In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('../input/network-attack-dataset-kitsune/SSL Renegotiation/'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
# Setup environment
!pip install pyspark
# !pip install sparkmagic

from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id


spark = SparkSession.builder.appName("outlier-detection").getOrCreate()

In [2]:
# Load Dataset
data = "../input/network-attack-dataset-kitsune/SSL Renegotiation/SSL_Renegotiation_dataset.csv"
df_data = spark.read.load(
        data,
        format="csv", sep=",",
        inferSchema="true",
        header="false")

df_data = df_data.withColumn("row_id", monotonically_increasing_id()) # starts with 0



In [4]:
# analyzing internal partitions

from pyspark.sql.functions import *
from pyspark.sql import Column

df_partition = df_data.withColumn("partition_id", shiftRight('row_id',33)).withColumn("row_offset", df_data['row_id'].bitwiseAND(2147483647))
partitions_size = df_partition.groupBy("partition_id").count().withColumnRenamed("count", "partition_size")
partitions_size.show()


+------------+--------------+
|partition_id|partition_size|
+------------+--------------+
|          26|         46584|
|          29|         46593|
|          19|         46576|
|           0|         46604|
|          22|         46589|
|           7|         46574|
|          34|         46577|
|          32|         46589|
|          43|         46623|
|          31|         46575|
|          39|         46600|
|          25|         46574|
|           6|         46575|
|           9|         46571|
|          27|         46557|
|          17|         46544|
|          41|         46603|
|          28|         46575|
|          33|         46565|
|           5|         46572|
+------------+--------------+
only showing top 20 rows



In [6]:
# Sum of the sizes of it preceding partitions
from pyspark.sql.window import Window

windowSpec = Window.orderBy("partition_id").rowsBetween(Window.unboundedPreceding, -1)

partitions_offset = partitions_size.withColumn("partition_offset",when(expr("partition_id = 0"), lit(0))
                                               .otherwise(sum("partition_size").over(windowSpec)))


In [7]:
# Final step joining with the original dataset
df_data = df_partition.join(broadcast(partitions_offset), "partition_id").withColumn("id", partitions_offset.partition_offset+df_partition.row_offset+1).drop("partition_id", "row_id", "row_offset", "partition_size", "partition_offset")
df_data.printSchema()


root
 |-- _c0: double (nullable = true)
 |-- _c1: double (nullable = true)
 |-- _c2: double (nullable = true)
 |-- _c3: double (nullable = true)
 |-- _c4: double (nullable = true)
 |-- _c5: double (nullable = true)
 |-- _c6: double (nullable = true)
 |-- _c7: double (nullable = true)
 |-- _c8: double (nullable = true)
 |-- _c9: double (nullable = true)
 |-- _c10: double (nullable = true)
 |-- _c11: double (nullable = true)
 |-- _c12: double (nullable = true)
 |-- _c13: double (nullable = true)
 |-- _c14: double (nullable = true)
 |-- _c15: double (nullable = true)
 |-- _c16: double (nullable = true)
 |-- _c17: double (nullable = true)
 |-- _c18: double (nullable = true)
 |-- _c19: double (nullable = true)
 |-- _c20: double (nullable = true)
 |-- _c21: double (nullable = true)
 |-- _c22: double (nullable = true)
 |-- _c23: double (nullable = true)
 |-- _c24: double (nullable = true)
 |-- _c25: double (nullable = true)
 |-- _c26: double (nullable = true)
 |-- _c27: double (nullable = tru

In [8]:
df_data.select('id','_c0','_c114').orderBy('id', ascending=False).show()

+-------+------------------+--------------------+
|     id|               _c0|               _c114|
+-------+------------------+--------------------+
|2207571|58.622254457239116|0.002818600661436646|
|2207570|57.639826343497425|0.002818828188881594|
|2207569|115.51903996015112|                 0.0|
|2207568|114.55557165531172|                 0.0|
|2207567|113.55641613290923|                 0.0|
|2207566|112.65039028713795|                 0.0|
|2207565|111.65122059649201|                 0.0|
|2207564| 110.7190829343993|                 0.0|
|2207563|109.74728194832748|                 0.0|
|2207562|108.83169081274899|                 0.0|
|2207561|107.85566161244098|                 0.0|
|2207560|107.28270970204764|                 0.0|
|2207559|106.30519424576302|                 0.0|
|2207558|105.90029979728847|                 0.0|
|2207557|104.90099322902226|                 0.0|
|2207556|103.97992206099649|                 0.0|
|2207555| 102.9806027982874|                 0.0|


In [9]:
from pyspark.sql.types import StructType, StructField, LongType, StringType

# Load Labels
labels = "../input/network-attack-dataset-kitsune/SSL Renegotiation/SSL_Renegotiation_labels.csv"
labels_schema = StructType([StructField("id", LongType(), False),
                     StructField("label", StringType(), False)])
df_labels = spark.read.load(
        labels,
        format="csv",
        sep=",",
        inferSchema="false",
        schema=labels_schema,
        header="true")
# df_labels.show()

In [10]:
df_labels.count()
df_labels.select("id","label").orderBy('id', ascending=False).show()


+-------+-----+
|     id|label|
+-------+-----+
|2207571|    1|
|2207570|    1|
|2207569|    0|
|2207568|    0|
|2207567|    0|
|2207566|    0|
|2207565|    0|
|2207564|    0|
|2207563|    0|
|2207562|    0|
|2207561|    0|
|2207560|    0|
|2207559|    0|
|2207558|    0|
|2207557|    0|
|2207556|    0|
|2207555|    0|
|2207554|    0|
|2207553|    0|
|2207552|    0|
+-------+-----+
only showing top 20 rows



In [11]:
#JOIN
df = df_data.join(df_labels, on=["id"], how="inner")
# df.groupBy('label').count().show()
df.count()

2207571

In [13]:
df.select("id","_c0","_c113","_c114","label").sort("id", ascending=False).show()

+-------+------------------+-----------------+--------------------+-----+
|     id|               _c0|            _c113|               _c114|label|
+-------+------------------+-----------------+--------------------+-----+
|2207571|58.622254457239116|1.057177926145534|0.002818600661436646|    1|
|2207570|57.639826343497425|1.057291029193257|0.002818828188881594|    1|
|2207569|115.51903996015112|              0.0|                 0.0|    0|
|2207568|114.55557165531172|              0.0|                 0.0|    0|
|2207567|113.55641613290923|              0.0|                 0.0|    0|
|2207566|112.65039028713795|              0.0|                 0.0|    0|
|2207565|111.65122059649201|              0.0|                 0.0|    0|
|2207564| 110.7190829343993|              0.0|                 0.0|    0|
|2207563|109.74728194832748|              0.0|                 0.0|    0|
|2207562|108.83169081274899|              0.0|                 0.0|    0|
|2207561|107.85566161244098|          