In [1]:
import urllib.request
from pyspark import SparkContext 

sc = SparkContext()
f = urllib.request.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")
data_file = "kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)


In [2]:
from pyspark.sql import SQLContext 
sql_context = SQLContext(sc)

In [3]:
from pyspark.sql import Row
csv_data = raw_data.map(lambda line: line.split(","))

In [4]:
def define_schema(li):
    row = Row(
        duration=int(li[0]), 
        protocol_type=li[1],
        service=li[2],
        flag=li[3],
        src_bytes=int(li[4]),
        dst_bytes=int(li[5])
    )
    return row 
rdd_row = csv_data.map(define_schema)

In [5]:
rdd_row.take(1)

[Row(dst_bytes=5450, duration=0, flag='SF', protocol_type='tcp', service='http', src_bytes=181)]

In [6]:
df = rdd_row.toDF()
df.registerTempTable("connection")

In [17]:
set(df.select(["flag"]).collect())

{Row(flag='OTH'),
 Row(flag='REJ'),
 Row(flag='RSTO'),
 Row(flag='RSTOS0'),
 Row(flag='RSTR'),
 Row(flag='S0'),
 Row(flag='S1'),
 Row(flag='S2'),
 Row(flag='S3'),
 Row(flag='SF'),
 Row(flag='SH')}

In [25]:
flag_li = [
    ["OTH", 1], ["REJ", 2], ["RSTO", 3], ["RSTOS0", 4], ["RSTR", 5], ["S0", 6], ["S1", 7], ["S2", 8], ["SF", 9], ["SH", 10]
]

In [26]:
flag_rdd = sc.parallelize(flag_li)

In [27]:
flag_row_rdd = flag_rdd.map(lambda li: Row(flag_type=li[0], flag_value=li[1]))

In [28]:
flag_df = flag_row_rdd.toDF()

In [29]:
flag_df.show()

+---------+----------+
|flag_type|flag_value|
+---------+----------+
|      OTH|         1|
|      REJ|         2|
|     RSTO|         3|
|   RSTOS0|         4|
|     RSTR|         5|
|       S0|         6|
|       S1|         7|
|       S2|         8|
|       SF|         9|
|       SH|        10|
+---------+----------+



In [30]:
flag_df.registerTempTable("flag_table")

In [31]:
joined_table = sql_context.sql(''' 
    select connection.*, flag_table.* 
      from 
        connection left join 
            flag_table 
            on connection.flag = flag_table.flag_type
''')

In [32]:
joined_table.show()

+---------+--------+------+-------------+--------+---------+---------+----------+
|dst_bytes|duration|  flag|protocol_type| service|src_bytes|flag_type|flag_value|
+---------+--------+------+-------------+--------+---------+---------+----------+
|        0|       1|RSTOS0|          tcp| private|        0|   RSTOS0|         4|
|        0|       1|RSTOS0|          tcp| private|        0|   RSTOS0|         4|
|        0|       1|RSTOS0|          tcp| private|        0|   RSTOS0|         4|
|        0|       1|RSTOS0|          tcp| private|        0|   RSTOS0|         4|
|        0|       1|RSTOS0|          tcp| private|        0|   RSTOS0|         4|
|        0|       1|RSTOS0|          tcp| private|        0|   RSTOS0|         4|
|        0|       1|RSTOS0|          tcp| private|        0|   RSTOS0|         4|
|        0|       1|RSTOS0|          tcp| private|        0|   RSTOS0|         4|
|        0|       1|RSTOS0|          tcp| private|        0|   RSTOS0|         4|
|        0|     