In [1]:
from pyspark import *
from pyspark import SQLContext
import urllib

In [2]:
sc = SparkContext(master="local", appName="test")

In [3]:
sc

In [4]:
#download les data 
# --> http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html


In [7]:
data = urllib.request.urlretrieve('http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz','kddcup.data_10_percent.gz')

In [51]:
path ='./kddcup.data_10_percent.gz'
SqlCont = SQLContext(sc)
df = sc.textFile(path)
df = df.map( lambda x: x.split(','))
df.take(1)

[['0',
  'tcp',
  'http',
  'SF',
  '181',
  '5450',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '8',
  '8',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '9',
  '9',
  '1.00',
  '0.00',
  '0.11',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.']]

In [44]:
from pyspark.sql.types import *
structure = StructType([
        StructField("duration", IntegerType(), True),
        StructField("protocol_type", StringType(), True),
        StructField("service", StringType(), True),
        StructField("flag", StringType(), True),
        StructField("src_bytes",IntegerType(), True),
        StructField("dst_bytes",IntegerType(), True),
        StructField("interactions", StringType(), True),
])
structure

StructType(List(StructField(duration,IntegerType,true),StructField(protocol_type,StringType,true),StructField(service,StringType,true),StructField(flag,StringType,true),StructField(src_bytes,IntegerType,true),StructField(dst_bytes,IntegerType,true),StructField(interactions,StringType,true)))

In [53]:
formatedData = df.map(lambda key: (int(key[0]), key[1], key[2], key[3], int(key[4]), int(key[5]), key[-1]))
formatedData.take(1)

[(0, 'tcp', 'http', 'SF', 181, 5450, 'normal.')]

In [54]:
data = SqlCont.createDataFrame(formatedData, structure)

In [56]:
data.show(10)

+--------+-------------+-------+----+---------+---------+------------+
|duration|protocol_type|service|flag|src_bytes|dst_bytes|interactions|
+--------+-------------+-------+----+---------+---------+------------+
|       0|          tcp|   http|  SF|      181|     5450|     normal.|
|       0|          tcp|   http|  SF|      239|      486|     normal.|
|       0|          tcp|   http|  SF|      235|     1337|     normal.|
|       0|          tcp|   http|  SF|      219|     1337|     normal.|
|       0|          tcp|   http|  SF|      217|     2032|     normal.|
|       0|          tcp|   http|  SF|      217|     2032|     normal.|
|       0|          tcp|   http|  SF|      212|     1940|     normal.|
|       0|          tcp|   http|  SF|      159|     4087|     normal.|
|       0|          tcp|   http|  SF|      210|      151|     normal.|
|       0|          tcp|   http|  SF|      212|      786|     normal.|
+--------+-------------+-------+----+---------+---------+------------+
only s

In [76]:
data.createOrReplaceTempView("interactions")
# createOrReplaceTempView creates (or replaces if that view name already exists) 
# a lazily evaluated "view" that you can then use like a hive table in Spark SQL.
# Ca permet de faire une 'copie' pour éviter d'endommager les données de bases.

In [77]:
le139 = data[(data['duration']>1000) & (data['dst_bytes'] == 0) & (data['protocol_type'] == "tcp")]
le139.count()

139

In [86]:
import time
protocol = data[['protocol_type']].groupby(['protocol_type']).count()
start_time= time.time()
protocol.show()
print("Temps d execution : %s secondes ---" % (time.time() - start_time))

+-------------+------+
|protocol_type| count|
+-------------+------+
|          tcp|190065|
|          udp| 20354|
|         icmp|283602|
+-------------+------+

Temps d execution : 6.825480699539185 secondes ---


In [87]:
durabyte = data[(data['duration']>1000) & (data['dst_bytes'] == 0)].groupby(['protocol_type']).count()
start_time= time.time()
durabyte.show()
print("Temps d execution : %s secondes ---" % (time.time() - start_time))

+-------------+-----+
|protocol_type|count|
+-------------+-----+
|          tcp|  139|
+-------------+-----+

Temps d execution : 6.755943298339844 secondes ---


In [94]:
data.rdd.map(lambda x : "Duration: {0}, Dest. bytes: {1}".format(x.dst_bytes,x.duration)).take(5)

['Duration: 5450, Dest. bytes: 0',
 'Duration: 486, Dest. bytes: 0',
 'Duration: 1337, Dest. bytes: 0',
 'Duration: 1337, Dest. bytes: 0',
 'Duration: 2032, Dest. bytes: 0']

In [102]:
from pyspark.sql.functions import udf


def attack_or_normal_func(s):
    return "normal" if s == "normal." else "attack"


In [113]:
label = udf(attack_or_normal_func, StringType())
data = data.withColumn('label', label(data['interactions']))
start_time= time.time()
data.select('label').groupby('label').count().show()
print("Temps d execution : %s secondes ---" % (time.time() - start_time))

+------+------+
| label| count|
+------+------+
|normal| 97278|
|attack|396743|
+------+------+

Temps d execution : 6.875239372253418 secondes ---


In [114]:
data.printSchema()

root
 |-- duration: integer (nullable = true)
 |-- protocol_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: integer (nullable = true)
 |-- dst_bytes: integer (nullable = true)
 |-- interactions: string (nullable = true)
 |-- label: string (nullable = true)



In [115]:
start_time= time.time()
data.select('label','protocol_type').groupby('label','protocol_type').count().show()
print("Temps d execution : %s secondes ---" % (time.time() - start_time))

+------+-------------+------+
| label|protocol_type| count|
+------+-------------+------+
|normal|          udp| 19177|
|normal|         icmp|  1288|
|normal|          tcp| 76813|
|attack|         icmp|282314|
|attack|          tcp|113252|
|attack|          udp|  1177|
+------+-------------+------+

Temps d execution : 6.99531102180481 secondes ---
