In [1]:
from pyspark import *
from pyspark.sql import SQLContext, types
import urllib
from pyspark.sql.types import StructType

In [2]:
sc = SparkContext(master="local", appName="test")

In [3]:
sc

In [4]:
data = urllib.request.urlretrieve('http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz', 'data_10_percent.gz')
data_file = './data_10_percent.gz'

In [5]:
lines = sc.textFile(data_file)

In [6]:
lines.take(3)

['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.']

In [7]:
sqlContext = SQLContext(sc)

In [8]:
dataparse = lines.map(lambda x:x.split(","))

In [9]:
dataparse.take(3)

[['0',
  'tcp',
  'http',
  'SF',
  '181',
  '5450',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '8',
  '8',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '9',
  '9',
  '1.00',
  '0.00',
  '0.11',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.'],
 ['0',
  'tcp',
  'http',
  'SF',
  '239',
  '486',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '8',
  '8',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '19',
  '19',
  '1.00',
  '0.00',
  '0.05',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.'],
 ['0',
  'tcp',
  'http',
  'SF',
  '235',
  '1337',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '8',
  '8',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '29',
  '29',
  '1.00',
  '0.00',


In [10]:
schema = types.StructType([
    types.StructField("duration", types.IntegerType(), True),
    types.StructField("protocol_type", types.StringType(), True),
    types.StructField("service", types.StringType(), True),
    types.StructField("flag", types.StringType(), True),
    types.StructField("src_bytes", types.IntegerType(), True),
    types.StructField("dst_bytes", types.IntegerType(), True),
    types.StructField("interactions", types.StringType(), True),
    ])
schema

StructType(List(StructField(duration,IntegerType,true),StructField(protocol_type,StringType,true),StructField(service,StringType,true),StructField(flag,StringType,true),StructField(src_bytes,IntegerType,true),StructField(dst_bytes,IntegerType,true),StructField(interactions,StringType,true)))

In [11]:
newData = dataparse.map(lambda key:(int(key[0]),key[1],key[2],key[3],int(key[4]),int(key[5]),key[-1]))

In [12]:
df = sqlContext.createDataFrame(newData, schema)

In [13]:
df.show

<bound method DataFrame.show of DataFrame[duration: int, protocol_type: string, service: string, flag: string, src_bytes: int, dst_bytes: int, interactions: string]>

In [14]:
df.columns

['duration',
 'protocol_type',
 'service',
 'flag',
 'src_bytes',
 'dst_bytes',
 'interactions']

In [15]:
df.show(10)

+--------+-------------+-------+----+---------+---------+------------+
|duration|protocol_type|service|flag|src_bytes|dst_bytes|interactions|
+--------+-------------+-------+----+---------+---------+------------+
|       0|          tcp|   http|  SF|      181|     5450|     normal.|
|       0|          tcp|   http|  SF|      239|      486|     normal.|
|       0|          tcp|   http|  SF|      235|     1337|     normal.|
|       0|          tcp|   http|  SF|      219|     1337|     normal.|
|       0|          tcp|   http|  SF|      217|     2032|     normal.|
|       0|          tcp|   http|  SF|      217|     2032|     normal.|
|       0|          tcp|   http|  SF|      212|     1940|     normal.|
|       0|          tcp|   http|  SF|      159|     4087|     normal.|
|       0|          tcp|   http|  SF|      210|      151|     normal.|
|       0|          tcp|   http|  SF|      212|      786|     normal.|
+--------+-------------+-------+----+---------+---------+------------+
only s

In [18]:
df.createOrReplaceTempView("interactions")

In [19]:
result = df [
    (df['protocol_type'] == 'tcp')
    &
    (df['duration']>1000)
    &
    (df['dst_bytes'] == 0)
].select(df['duration'], df['dst_bytes'])
result.show()
type(result)

+--------+---------+
|duration|dst_bytes|
+--------+---------+
|    5057|        0|
|    5059|        0|
|    5051|        0|
|    5056|        0|
|    5051|        0|
|    5039|        0|
|    5062|        0|
|    5041|        0|
|    5056|        0|
|    5064|        0|
|    5043|        0|
|    5061|        0|
|    5049|        0|
|    5061|        0|
|    5048|        0|
|    5047|        0|
|    5044|        0|
|    5063|        0|
|    5068|        0|
|    5062|        0|
+--------+---------+
only showing top 20 rows



pyspark.sql.dataframe.DataFrame

In [20]:
result.count()

139