In [1]:
import os
import sys

os.environ["SPARK_HOME"]=os.path.join(os.path.expanduser("~"),'spark-2.0.0-bin-hadoop2.7')
os.environ["PYLIB"]=os.path.join(os.environ["SPARK_HOME"],'python','lib')
sys.path.insert(0,os.path.join(os.environ["PYLIB"],'py4j-0.10.1-src.zip'))
sys.path.insert(0,os.path.join(os.environ["PYLIB"],'pyspark.zip'))

In [2]:
import pyspark
myConf=pyspark.SparkConf()
spark = pyspark.sql.SparkSession.builder\
    .master("local")\
    .appName("myApp")\
    .config('spark.sql.warehouse.dir','C:\Users\qorgk\code\spark')\
    .getOrCreate()

In [3]:
from pyspark.sql import Row
from pyspark.sql.types import *
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import requests

In [6]:
#rdd 생성

import urllib
url = 'http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz'
name = os.path.join(os.getcwd(),'data','kddcup.data_10_percent.gz')
if(not os.path.exists(name)):
    print "%s data does not exist! retrieving.." % name
    f=urllib.urlretrieve(url,name)
rdd = spark.sparkContext.textFile(name)

C:\Users\qorgk\code\spark\assignment\data\kddcup.data_10_percent.gz data does not exist! retrieving..


In [9]:
# dataframe 생성

csv = rdd.map(lambda l: l.split(","))
csvRdd = csv.map(lambda p: 
    Row(
        duration=int(p[0]), 
        protocol=p[1],
        service=p[2],
        flag=p[3],
        src_bytes=int(p[4]),
        dst_bytes=int(p[5]),
        attack=p[41]
    )
)

In [10]:
df = spark.createDataFrame(csvRdd)

In [11]:
df.printSchema()
df.show()

root
 |-- attack: string (nullable = true)
 |-- dst_bytes: long (nullable = true)
 |-- duration: long (nullable = true)
 |-- flag: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- service: string (nullable = true)
 |-- src_bytes: long (nullable = true)

+-------+---------+--------+----+--------+-------+---------+
| attack|dst_bytes|duration|flag|protocol|service|src_bytes|
+-------+---------+--------+----+--------+-------+---------+
|normal.|     5450|       0|  SF|     tcp|   http|      181|
|normal.|      486|       0|  SF|     tcp|   http|      239|
|normal.|     1337|       0|  SF|     tcp|   http|      235|
|normal.|     1337|       0|  SF|     tcp|   http|      219|
|normal.|     2032|       0|  SF|     tcp|   http|      217|
|normal.|     2032|       0|  SF|     tcp|   http|      217|
|normal.|     1940|       0|  SF|     tcp|   http|      212|
|normal.|     4087|       0|  SF|     tcp|   http|      159|
|normal.|      151|       0|  SF|     tcp|   http|    

In [12]:
from pyspark.sql.functions import udf
attack_udf = udf(lambda x: "normal" if x =="normal." else "attack", StringType())
myDf=df.withColumn("attackB", attack_udf(df.attack))

In [13]:
myDf.printSchema()

root
 |-- attack: string (nullable = true)
 |-- dst_bytes: long (nullable = true)
 |-- duration: long (nullable = true)
 |-- flag: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- service: string (nullable = true)
 |-- src_bytes: long (nullable = true)
 |-- attackB: string (nullable = true)



In [22]:
#attack 분류

def classifyAttack(s):
    c = ""
    if s =="normal.":
        c="normal"
    elif s =="back." or s =="land." or s =="neptune." or s =="pod." or s =="smurf." or s =="teardrop.":
        c="DOS"
    elif s =="ftp_write." or s =="guess_passwd." or s =="imap."or s =="multihop."or s =="phf."or s =="spy."or s =="warezclient."or s =="warezmaster.":
        c="R2L"
    elif s =="buffer_overflow."or s =="loadmodule."or s =="perl."or s =="rootkit.":
        c="U2R"
    elif s =="ipsweep."or s =="nmap."or s =="portsweep."or s =="satan.":
        c="probing"
    return c

In [23]:
attackUdf = udf(classifyAttack, StringType())
myDf=myDf.withColumn("attack5", attackUdf(df.attack))

In [24]:
myDf.printSchema()
myDf.show()

root
 |-- attack: string (nullable = true)
 |-- dst_bytes: long (nullable = true)
 |-- duration: long (nullable = true)
 |-- flag: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- service: string (nullable = true)
 |-- src_bytes: long (nullable = true)
 |-- attackB: string (nullable = true)
 |-- attack5: string (nullable = true)

+-------+---------+--------+----+--------+-------+---------+-------+-------+
| attack|dst_bytes|duration|flag|protocol|service|src_bytes|attackB|attack5|
+-------+---------+--------+----+--------+-------+---------+-------+-------+
|normal.|     5450|       0|  SF|     tcp|   http|      181| normal| normal|
|normal.|      486|       0|  SF|     tcp|   http|      239| normal| normal|
|normal.|     1337|       0|  SF|     tcp|   http|      235| normal| normal|
|normal.|     1337|       0|  SF|     tcp|   http|      219| normal| normal|
|normal.|     2032|       0|  SF|     tcp|   http|      217| normal| normal|
|normal.|     2032|       0|  S

### 분석

In [26]:
# attack별로 건수 조회하기
myDf.groupBy('attack5').count().show()

+-------+------+
|attack5| count|
+-------+------+
|probing|  4107|
|    R2L|  1126|
| normal| 97278|
|    DOS|391458|
|    U2R|    52|
+-------+------+



In [27]:
# protocol별로 건수 조회하기
myDf.groupBy('protocol').count().show()

+--------+------+
|protocol| count|
+--------+------+
|     tcp|190065|
|     udp| 20354|
|    icmp|283602|
+--------+------+



In [28]:
# attack과 normal, protocol별로 건수 조회
myDf.groupBy('attackB','protocol').count().show()

+-------+--------+------+
|attackB|protocol| count|
+-------+--------+------+
| normal|     udp| 19177|
| normal|    icmp|  1288|
| normal|     tcp| 76813|
| attack|    icmp|282314|
| attack|     tcp|113252|
| attack|     udp|  1177|
+-------+--------+------+



In [29]:
#attack, protocol별로 건수 조회
myDf.groupBy('attack5','protocol').count().show()

+-------+--------+------+
|attack5|protocol| count|
+-------+--------+------+
| normal|     udp| 19177|
|probing|    icmp|  1260|
| normal|    icmp|  1288|
|    R2L|     tcp|  1126|
|    DOS|     udp|   979|
| normal|     tcp| 76813|
|    DOS|    icmp|281054|
|    U2R|     udp|     3|
|probing|     udp|   195|
|    U2R|     tcp|    49|
|probing|     tcp|  2652|
|    DOS|     tcp|109425|
+-------+--------+------+



In [30]:
# 위와 조회한 내용은 같지만 protocol을 pivot으로 하여 조회 가시성이 상승
myDf.groupBy('attack5').pivot('protocol').count().show()

+-------+------+------+-----+
|attack5|  icmp|   tcp|  udp|
+-------+------+------+-----+
|probing|  1260|  2652|  195|
|    R2L|  null|  1126| null|
| normal|  1288| 76813|19177|
|    DOS|281054|109425|  979|
|    U2R|  null|    49|    3|
+-------+------+------+-----+



In [31]:
# attack별 duration의 평균 길이 조회
myDf.groupBy('attack5').avg('duration').show()

+-------+--------------------+
|attack5|       avg(duration)|
+-------+--------------------+
|probing|   485.0299488677867|
|    R2L|   559.7522202486679|
| normal|  216.65732231336992|
|    DOS|7.254929008986916E-4|
|    U2R|    80.9423076923077|
+-------+--------------------+



In [32]:
# attack과 protocol별 dst_bytes의 최대값 조회
from pyspark.sql import functions as F
myDf.groupBy('attackB').pivot('protocol').agg(F.max('dst_bytes')).show()

+-------+----+-------+---+
|attackB|icmp|    tcp|udp|
+-------+----+-------+---+
| normal|   0|5134218|516|
| attack|   0|5155468| 74|
+-------+----+-------+---+



In [33]:
# attack별 duration>500인 개수 조회
myDf.filter(df.duration>500).groupBy('attack5').count().show()

+-------+-----+
|attack5|count|
+-------+-----+
|probing|   84|
|    R2L|   93|
| normal| 4414|
|    U2R|    1|
+-------+-----+



In [40]:
# attack과 normal, protocol별 duration>500인 개수 조회
myDf.filter(df.duration>500).groupBy('attackB','protocol').count().show()

+-------+--------+-----+
|attackB|protocol|count|
+-------+--------+-----+
| normal|     udp| 4326|
| normal|     tcp|   88|
| attack|     tcp|  178|
+-------+--------+-----+



In [36]:
# 위와 내용은 동일. pivot함수를 사용하여 가시성 높임
myDf.filter(df.duration>500).groupBy('attackB').pivot('protocol').count().show()

+-------+---+----+
|attackB|tcp| udp|
+-------+---+----+
| normal| 88|4326|
| attack|178|null|
+-------+---+----+



In [35]:
# attack과 protocol별 src_bytes의 평균 조회
myDf.groupBy('attack5').pivot('protocol').avg('src_bytes').show()

+-------+------------------+------------------+------------------+
|attack5|              icmp|               tcp|               udp|
+-------+------------------+------------------+------------------+
|probing|10.700793650793651| 261454.6003016591|25.235897435897435|
|    R2L|              null|271972.57460035523|              null|
| normal| 91.47049689440993|1439.3120305156679| 98.01220211711947|
|    DOS| 936.2672084368129| 1090.303422435458|              28.0|
|    U2R|              null| 960.8979591836735|13.333333333333334|
+-------+------------------+------------------+------------------+



In [37]:
# attack별 duration의 평균 조회
myDf.groupBy('attack5').avg('duration').show()

+-------+--------------------+
|attack5|       avg(duration)|
+-------+--------------------+
|probing|   485.0299488677867|
|    R2L|   559.7522202486679|
| normal|  216.65732231336992|
|    DOS|7.254929008986916E-4|
|    U2R|    80.9423076923077|
+-------+--------------------+

