In [4]:
import os

_url = 'http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz'
_fname = os.path.join('data', 'kddcup.data_10_percent.gz')

from urllib.request import urlretrieve

if (not os.path.exists(_fname)):
    print(f"{_fname} doesn't exist\nstart download data from {_url}")
    _f = urlretrieve(_url, _fname)

data/kddcup.data_10_percent.gz doesn't exist


### RDD 생성

**RDD** 는 gz와 같은 압축파일에서 데이터를 읽어서 생성할 수 있다.<br>

반면, DataFrame은 구조 schema를 정의해야하기 때문에 쉽지 않아. 여기서는 오류가 발생한다.<br>
따라서 RDD를 생성하고 난 후, 그로부터 DataFrame을 생성하고, Sql을 사용한다.

In [7]:
import pyspark

myConf = pyspark.SparkConf()
spark = pyspark.sql.SparkSession\
    .builder\
    .master("local")\
    .appName("Week7-prac")\
    .config(conf=myConf)\
    .getOrCreate()

21/10/17 16:25:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [8]:
_rdd = spark.sparkContext.textFile(_fname)
_rdd.count()

                                                                                

494021

In [9]:
_rdd.take(1)

['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.']

In [10]:
_allRdd = _rdd.map(lambda x: x.split(','))
_allRdd.take(1)

[['0',
  'tcp',
  'http',
  'SF',
  '181',
  '5450',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '8',
  '8',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '9',
  '9',
  '1.00',
  '0.00',
  '0.11',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.']]

### 정상 공격 건수

데이터가 normal인 경우와 아닌 경우로 구분하자.<br>
fileter()는 41번째 행을 조건에 따라 데이터를 구분한다.<br>
count() 함수로 건수를 계산하면 'normal' 97,278 'attack'은 396,743건이다.

In [16]:
_normalRdd = _allRdd.filter(lambda x: x[41] == "normal.")
_attackRdd  = _allRdd.filter(lambda x: x[41] != "normal.")
print(_normalRdd.count(), _attackRdd.count())

[Stage 9:>                                                          (0 + 1) / 1]

97278 396743


                                                                                

### attak별 건수

attack 종류는 41번째 열에 구분되어 있다. 총 494,021건을 정상 'normal', 나머지는 'attack'으로 구분한다.<br>
attack은 크게 4종류로 나눈다. DOS는 서비스 거부, R2L은 원격 침입, U2R은 루트권한침입, probing은 탐지이다.

In [17]:
_41 = _allRdd.map(lambda x: (x[41], 1))
_41.reduceByKey(lambda x, y: x + y).collect()

                                                                                

[('normal.', 97278),
 ('buffer_overflow.', 30),
 ('loadmodule.', 9),
 ('perl.', 3),
 ('neptune.', 107201),
 ('smurf.', 280790),
 ('guess_passwd.', 53),
 ('pod.', 264),
 ('teardrop.', 979),
 ('portsweep.', 1040),
 ('ipsweep.', 1247),
 ('land.', 21),
 ('ftp_write.', 8),
 ('back.', 2203),
 ('imap.', 12),
 ('satan.', 1589),
 ('phf.', 4),
 ('nmap.', 231),
 ('multihop.', 7),
 ('warezmaster.', 20),
 ('warezclient.', 1020),
 ('spy.', 2),
 ('rootkit.', 10)]

In [20]:
_41 = _allRdd.map(lambda x: (x[41], 1))
_41.groupByKey().mapValues(lambda x: len(x)).collect()

                                                                                

[('normal.', 97278),
 ('buffer_overflow.', 30),
 ('loadmodule.', 9),
 ('perl.', 3),
 ('neptune.', 107201),
 ('smurf.', 280790),
 ('guess_passwd.', 53),
 ('pod.', 264),
 ('teardrop.', 979),
 ('portsweep.', 1040),
 ('ipsweep.', 1247),
 ('land.', 21),
 ('ftp_write.', 8),
 ('back.', 2203),
 ('imap.', 12),
 ('satan.', 1589),
 ('phf.', 4),
 ('nmap.', 231),
 ('multihop.', 7),
 ('warezmaster.', 20),
 ('warezclient.', 1020),
 ('spy.', 2),
 ('rootkit.', 10)]

### DataFrame 생성

열 0, 1, 2, 3, 4, 5, 41 을 선별하여 스키마를 정해서 RDD를 생성한다.

In [23]:
from pyspark.sql import Row

_csv = _rdd.map(lambda l: l.split(','))
_csvRdd = _csv.map(lambda x:
                  Row(
                      duration=int(x[0]),
                      protocol=x[1],
                      service=x[2],
                      flag=x[3],
                      src_bytes=int(x[4]),
                      dst_bytes=int(x[5]),
                      attack=x[41]
                  ))

In [24]:
_df = spark.createDataFrame(_csvRdd)
_df.printSchema()
_df.show(5)

root
 |-- duration: long (nullable = true)
 |-- protocol: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: long (nullable = true)
 |-- dst_bytes: long (nullable = true)
 |-- attack: string (nullable = true)

+--------+--------+-------+----+---------+---------+-------+
|duration|protocol|service|flag|src_bytes|dst_bytes| attack|
+--------+--------+-------+----+---------+---------+-------+
|       0|     tcp|   http|  SF|      181|     5450|normal.|
|       0|     tcp|   http|  SF|      239|      486|normal.|
|       0|     tcp|   http|  SF|      235|     1337|normal.|
|       0|     tcp|   http|  SF|      219|     1337|normal.|
|       0|     tcp|   http|  SF|      217|     2032|normal.|
+--------+--------+-------+----+---------+---------+-------+
only showing top 5 rows



### Attack 분류

네트워크 침입이 'attack' 또는 'normal' 에 따라 구분해서 attackB 컬럼을 생성한다.

In [25]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

attackUdfd = udf(lambda x: "normal" if x == "normal." else "attack", StringType())
myDf = _df.withColumn("attackB", attackUdfd(_df.attack))

In [26]:
myDf.printSchema()

root
 |-- duration: long (nullable = true)
 |-- protocol: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: long (nullable = true)
 |-- dst_bytes: long (nullable = true)
 |-- attack: string (nullable = true)
 |-- attackB: string (nullable = true)



In [29]:
def classufy41(s):
    if s in ["normal."]: return "normal"
    elif s in ["back.", "land.", "neptune.", "pod.", "smurf.", "teardrop."]: return "dos"
    elif s in ["ftp_write.", "guess_passwd.", "imap.", "multihop.", 
                  "phf.", "spy.", "warezclient.", "warezmaster."]: return "r2l"
    elif s in ["buffer_overflow.", "loadmodule.", "perl.", "rootkit."]: return "u2r"
    elif s in ["ipsweep.", "nmap.", "portsweep.", "satan."]: return "probing"
    
attack5_udf = udf(classufy41, StringType())

In [30]:
myDf = myDf.withColumn("attack5", attack5_udf(_df.attack))

In [33]:
myDf.printSchema()
myDf.show(5)

root
 |-- duration: long (nullable = true)
 |-- protocol: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: long (nullable = true)
 |-- dst_bytes: long (nullable = true)
 |-- attack: string (nullable = true)
 |-- attackB: string (nullable = true)
 |-- attack5: string (nullable = true)

+--------+--------+-------+----+---------+---------+-------+-------+-------+
|duration|protocol|service|flag|src_bytes|dst_bytes| attack|attackB|attack5|
+--------+--------+-------+----+---------+---------+-------+-------+-------+
|       0|     tcp|   http|  SF|      181|     5450|normal.| normal| normal|
|       0|     tcp|   http|  SF|      239|      486|normal.| normal| normal|
|       0|     tcp|   http|  SF|      235|     1337|normal.| normal| normal|
|       0|     tcp|   http|  SF|      219|     1337|normal.| normal| normal|
|       0|     tcp|   http|  SF|      217|     2032|normal.| normal| normal|
+--------+--------+-------+----

Traceback (most recent call last):
  File "/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 643, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError


In [35]:
myDf.groupby("attack5").count().show()

                                                                                

+-------+------+
|attack5| count|
+-------+------+
|probing|  4107|
|    u2r|    52|
| normal| 97278|
|    r2l|  1126|
|    dos|391458|
+-------+------+





In [36]:
myDf.groupBy('protocol').count().show()

                                                                                

+--------+------+
|protocol| count|
+--------+------+
|     tcp|190065|
|     udp| 20354|
|    icmp|283602|
+--------+------+



In [38]:
myDf.groupBy("attackB", "protocol").count().show()

                                                                                

+-------+--------+------+
|attackB|protocol| count|
+-------+--------+------+
| normal|     udp| 19177|
| normal|    icmp|  1288|
| normal|     tcp| 76813|
| attack|    icmp|282314|
| attack|     tcp|113252|
| attack|     udp|  1177|
+-------+--------+------+



In [39]:
myDf.groupBy("attackB").pivot("protocol").count().show()

                                                                                

+-------+------+------+-----+
|attackB|  icmp|   tcp|  udp|
+-------+------+------+-----+
| normal|  1288| 76813|19177|
| attack|282314|113252| 1177|
+-------+------+------+-----+



## SQL

SQL을 사용해보자. 위에 사용했던 _df에서 임시 테이블 _tab을 생성한다.

In [41]:
_df.registerTempTable("_tab")

In [42]:
tcp_interations = spark.sql(
"""
    SELECT duration, dst_bytes FROM _tab
    WHERE protocol='tcp' AND duration > 1000 AND dst_bytes = 0
"""
)

In [43]:
tcp_interations.show(5)

[Stage 71:>                                                         (0 + 1) / 1]

+--------+---------+
|duration|dst_bytes|
+--------+---------+
|    5057|        0|
|    5059|        0|
|    5051|        0|
|    5056|        0|
|    5051|        0|
+--------+---------+
only showing top 5 rows



                                                                                