In [10]:
# To find out where the spark is ?
import findspark
findspark.init()

In [11]:
# Creating spark context
from pyspark import SparkContext

In [12]:
sc = SparkContext('local', 'hands on PySpark')

##### Testing sample

In [5]:
visitors = [10,3,35,25,41,9,29]
df_visitors = sc.parallelize(visitors)
df_visitors_yearly = df_visitors.map(lambda x: x * 365).collect()
print("Total visitors in a year {}".format(df_visitors_yearly))

Total visitors in a year [3650, 1095, 12775, 9125, 14965, 3285, 10585]


### Load data into RDDs

##### Getting the data files

In [1]:
import urllib

In [7]:
f = urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

##### Creating a RDD from a file

In [15]:
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

In [16]:
# To verify the content of RDD, lets use count()
# The number of lines loaded from the file into the RDD
raw_data.count()

494021

In [17]:
# Checking first few entries in our data
raw_data.take(5)

['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.']

### Parallelization with spark RDDs

In [23]:
# Python list 
a = range(100)
a

range(0, 100)

In [24]:
# Convert list into an RDD
list_rdd = sc.parallelize(a)

In [25]:
# Content of the RDD
list_rdd

PythonRDD[10] at RDD at PythonRDD.scala:53

In [26]:
# Count elements in the list
list_rdd.count()

100

In [27]:
# First 10 elements
list_rdd.take(10)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [28]:
# Reduced function - anons add func into reduced call
list_rdd.reduce(lambda a, b: a+b)

4950

### RDD Operation Basics

In [29]:
# calling filter for rdd raw data, feeding it anons lambda (), 
# that takes one param - line, returns a predicate,
# whether the normal exists in the line.
contains_normal = raw_data.filter(lambda line: "normal." in line)

In [30]:
# verify content
contains_normal.count()

97278

In [32]:
# map () on raw_data, feeding anons () called line, spliting the line by comma
# result - split file
split_file = raw_data.map(lambda line: line.split(","))

In [35]:
# materialize few lines
split_file.take(1)

[['0',
  'tcp',
  'http',
  'SF',
  '181',
  '5450',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '8',
  '8',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '9',
  '9',
  '1.00',
  '0.00',
  '0.11',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.']]

In [None]:
# to move whole raw d
raw_data.collect()