In [140]:
from pyspark import *
from pyspark.sql import *
from pyspark.sql.types import *
import urllib.request
from time import time
from pyspark.sql.functions import udf
import pyspark.sql.functions as F

In [2]:
sc = SparkContext(master="local", appName="test")
sc

In [3]:
sqlContext = SQLContext(sc)
sqlContext

<pyspark.sql.context.SQLContext at 0x7fc6b0355780>

In [4]:
data = urllib.request.urlretrieve('http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz','kddcup.data_10_percent.gz')

In [5]:
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

In [6]:
data_split = raw_data.map(lambda x: x.split(","))

In [7]:
schema = StructType(
[
    StructField('duration',IntegerType(),True),
    StructField('protocol_type',StringType(),True),
    StructField('service',StringType(),True),
    StructField('flag',StringType(),True),
    StructField('src_bytes',IntegerType(),True),
    StructField('dst_bytes',IntegerType(),True),
    StructField('interactions',StringType(),True)
])
schema

StructType(List(StructField(duration,IntegerType,true),StructField(protocol_type,StringType,true),StructField(service,StringType,true),StructField(flag,StringType,true),StructField(src_bytes,IntegerType,true),StructField(dst_bytes,IntegerType,true),StructField(interactions,StringType,true)))

In [9]:
dataFormatted = data_split.map(lambda p: (int(p[0]),p[1],p[2],p[3],int(p[4]),int(p[5]),p[-1]))

In [10]:
df = sqlContext.createDataFrame(dataFormatted,schema)
df

DataFrame[duration: int, protocol_type: string, service: string, flag: string, src_bytes: int, dst_bytes: int, interactions: string]

In [11]:
df.show(n=10)

+--------+-------------+-------+----+---------+---------+------------+
|duration|protocol_type|service|flag|src_bytes|dst_bytes|interactions|
+--------+-------------+-------+----+---------+---------+------------+
|       0|          tcp|   http|  SF|      181|     5450|     normal.|
|       0|          tcp|   http|  SF|      239|      486|     normal.|
|       0|          tcp|   http|  SF|      235|     1337|     normal.|
|       0|          tcp|   http|  SF|      219|     1337|     normal.|
|       0|          tcp|   http|  SF|      217|     2032|     normal.|
|       0|          tcp|   http|  SF|      217|     2032|     normal.|
|       0|          tcp|   http|  SF|      212|     1940|     normal.|
|       0|          tcp|   http|  SF|      159|     4087|     normal.|
|       0|          tcp|   http|  SF|      210|      151|     normal.|
|       0|          tcp|   http|  SF|      212|      786|     normal.|
+--------+-------------+-------+----+---------+---------+------------+
only s

In [24]:
df.createOrReplaceTempView("interractions")
interractionView = sqlContext.sql("SELECT * FROM interractions")

In [25]:
interractionView.show()

+--------+-------------+-------+----+---------+---------+------------+
|duration|protocol_type|service|flag|src_bytes|dst_bytes|interactions|
+--------+-------------+-------+----+---------+---------+------------+
|       0|          tcp|   http|  SF|      181|     5450|     normal.|
|       0|          tcp|   http|  SF|      239|      486|     normal.|
|       0|          tcp|   http|  SF|      235|     1337|     normal.|
|       0|          tcp|   http|  SF|      219|     1337|     normal.|
|       0|          tcp|   http|  SF|      217|     2032|     normal.|
|       0|          tcp|   http|  SF|      217|     2032|     normal.|
|       0|          tcp|   http|  SF|      212|     1940|     normal.|
|       0|          tcp|   http|  SF|      159|     4087|     normal.|
|       0|          tcp|   http|  SF|      210|      151|     normal.|
|       0|          tcp|   http|  SF|      212|      786|     normal.|
|       0|          tcp|   http|  SF|      210|      624|     normal.|
|     

Les TempView servent à créer une vue temporaire du DataFrame dans notre Spark Session.

In [112]:
dataFilteredSql = sqlContext.sql("SELECT duration,dst_bytes FROM interractions WHERE duration > 1000 AND dst_bytes == 0 ")
dataFilteredSql.count()

139

In [126]:
dataFilteredSql.rdd.map(lambda x : "Duration: {0}, Dest. bytes: {1}".format(x.dst_bytes,x.duration)).take(10)

['Duration: 0, Dest. bytes: 5057',
 'Duration: 0, Dest. bytes: 5059',
 'Duration: 0, Dest. bytes: 5051',
 'Duration: 0, Dest. bytes: 5056',
 'Duration: 0, Dest. bytes: 5051',
 'Duration: 0, Dest. bytes: 5039',
 'Duration: 0, Dest. bytes: 5062',
 'Duration: 0, Dest. bytes: 5041',
 'Duration: 0, Dest. bytes: 5056',
 'Duration: 0, Dest. bytes: 5064']

In [150]:
t = time()

dataFilteredSql = sqlContext.sql("SELECT protocol_type,count(protocol_type) FROM interractions GROUP BY protocol_type")
dataFilteredSql.show()


elapsed_time = time() - t
print("Times to execute the request : {0}".format(elapsed_time))

+-------------+--------------------+
|protocol_type|count(protocol_type)|
+-------------+--------------------+
|          tcp|              190065|
|          udp|               20354|
|         icmp|              283602|
+-------------+--------------------+

Times to execute the request : 19.03532361984253


In [151]:
t = time()

dataFiltered = interractionView.filter((interractionView.duration > 1000) & (interractionView.dst_bytes == 0))
dataFiltered.groupby("protocol_type").count().show()

elapsed_time = time() - t
print("Times to execute the request : {0}".format(elapsed_time))

+-------------+-----+
|protocol_type|count|
+-------------+-----+
|          tcp|  139|
+-------------+-----+

Times to execute the request : 19.92646288871765


In [152]:
t = time()

dataFiltered.rdd.map(lambda p: "Duration: {0}, Dest. bytes: {1}".format(p.dst_bytes,p.duration)).take(5)

elapsed_time = time() - t
print("Times to execute the request : {0}".format(elapsed_time))

Times to execute the request : 17.510112285614014


In [135]:
def attack_or_normal_func(s):
    return "normal" if s == "normal." else "attack"


In [155]:
t = time()

udf_label = F.udf(attack_or_normal_func, StringType())
df = df.withColumn('label',udf_label(df.interactions))
df.printSchema()

elapsed_time = time() - t
print("Times to execute the request : {0}".format(elapsed_time))

root
 |-- duration: integer (nullable = true)
 |-- protocol_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: integer (nullable = true)
 |-- dst_bytes: integer (nullable = true)
 |-- interactions: string (nullable = true)
 |-- label: string (nullable = true)

Times to execute the request : 0.0690300464630127


In [157]:
t = time()

df.groupby("label").count().show()

elapsed_time = time() - t
print("Times to execute the request : {0}".format(elapsed_time))

+------+------+
| label| count|
+------+------+
|normal| 97278|
|attack|396743|
+------+------+

Times to execute the request : 20.233797311782837


In [158]:
t = time()

df.groupby("label","protocol_type").count().show()

elapsed_time = time() - t
print("Times to execute the request : {0}".format(elapsed_time))

+------+-------------+------+
| label|protocol_type| count|
+------+-------------+------+
|normal|          udp| 19177|
|normal|         icmp|  1288|
|normal|          tcp| 76813|
|attack|         icmp|282314|
|attack|          tcp|113252|
|attack|          udp|  1177|
+------+-------------+------+

Times to execute the request : 23.962221384048462
