## github reference:
https://github.com/jadianes/spark-py-notebooks

In [1]:
%load_ext memory_profiler
%memit

peak memory: 63.32 MiB, increment: 0.06 MiB


# Old and new

## New: For spark2.4.3, sc in automatically imported

In [3]:
data = sc.parallelize(range(10000))

### range(10000) as an iterator will lead to an PipelineRDD instead of rdd

In [4]:
type(data)

pyspark.rdd.PipelinedRDD

### Have to stop this automic sc to start a new one

In [5]:
sc.stop()

## Old style

In [3]:
from pyspark.context import SparkContext,SparkConf
sc = SparkContext('local', 'test1')
sc.stop() #Cannot run multiple SparkContexts at once; 
# must stop the open one first to start the second one.
sc = SparkContext('local', 'test2')
sc.stop()
conf1 = SparkConf().setAppName('testing1').setMaster('local')
# conf2 = SparkConf().setAppName('testing2').setMaster('local')
sc = SparkContext().getOrCreate(conf=conf1)
sc.stop()

### Start a new your own sc session

In [4]:
from pyspark import SparkContext
sc =SparkContext()
# sc =SparkContext.getOrCreate()

In [2]:
data_file = "kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

In [2]:
type(raw_data)

pyspark.rdd.RDD

In [7]:
%%timeit
raw_data.count()

419 ms ± 11.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [8]:
raw_data.getNumPartitions()

1

## Add partitions and compare timing

In [None]:
sc.stop()
data_file = "kddcup.data_10_percent.gz"
sc = SparkContext('local','testPartitions')
raw_data = sc.textFile(data_file)

In [10]:
raw_data.getNumPartitions()

1

#### 'RDD' object is not iterable, must collect first

In [11]:
raw_data_ptns = sc.parallelize(raw_data.collect(),numSlices=6)

In [12]:
raw_data.getNumPartitions(),raw_data_ptns.getNumPartitions()

(1, 6)

# More partitions take less time

## 1 partition

In [13]:
from time import time
t0 = time()
counts = raw_data.count()
tt = time() - t0
tt*1000

878.0908584594727

## 6 partitions

In [14]:
t0 = time()
counts = raw_data_ptns.count()
tt = time() - t0
tt*1000

199.90229606628418

In [15]:
sc.appName

'testPartitions'

In [1]:
data_file = "kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

In [2]:
%%timeit
counts = raw_data.count()

429 ms ± 8.46 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [3]:
from time import time
t0 = time()
counts = raw_data.count()
tt = time() - t0
tt*1000

432.8787326812744

In [1]:
data_file = "kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)
raw_data_ptns8 = sc.parallelize(raw_data.collect(),numSlices=8)

In [2]:
%%timeit
counts = raw_data_ptns8.count()

The slowest run took 5.40 times longer than the fastest. This could mean that an intermediate result is being cached.
313 ms ± 182 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [4]:
from time import time
t0 = time()
counts = raw_data_ptns8.count()
tt = time() - t0
tt*1000

160.32695770263672

In [5]:
del raw_data_ptns8

In [1]:
data_file = "kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)
raw_data_ptns16 = sc.parallelize(raw_data.collect(),numSlices=16)

In [2]:
%%timeit
counts = raw_data_ptns16.count()

The slowest run took 4.40 times longer than the fastest. This could mean that an intermediate result is being cached.
328 ms ± 185 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [1]:
data_file = "kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)
raw_data_ptns16 = sc.parallelize(raw_data.collect(),numSlices=16)

In [2]:
%%timeit
counts = raw_data_ptns16.count()

323 ms ± 172 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [8]:
from time import time
t0 = time()
counts = raw_data_ptns16.count()
tt = time() - t0
tt*1000

179.4607639312744

In [14]:
del raw_data_ptns16

In [1]:
data_file = "kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)
raw_data_ptns32 = sc.parallelize(raw_data.collect(),numSlices=32)

In [2]:
%%timeit
counts = raw_data_ptns32.count()

260 ms ± 73.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [1]:
data_file = "kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)
raw_data_ptns32 = sc.parallelize(raw_data.collect(),numSlices=32)

In [2]:
%%timeit
counts = raw_data_ptns32.count()

261 ms ± 61.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [8]:
from time import time
t0 = time()
counts = raw_data_ptns32.count()
tt = time() - t0
tt*1000

181.4560890197754

In [11]:
del raw_data_ptns17

## Check memory again
* memory increased from ~60M to ~90M

In [16]:
%memit

peak memory: 90.31 MiB, increment: -0.00 MiB


# Lazy and Actions

## 1. The filter transformation
This transformation can be applied to RDDs in order to keep just elements that satisfy a certain condition. More concretely, a function is evaluated on every element in the original RDD. The new resulting RDD will contain just those elements that make the function return True.
For example, imagine we want to count how many normal. interactions we have in our dataset. We can filter our raw_data RDD as follows.

**1.1 From blow timing we can see it only takes 75us which is the overhead time, not really filtering process**

In [41]:
# %%timeit
normal_raw_data = raw_data.filter(lambda x: 'normal' in x)
normal_raw_data_ptns = raw_data_ptns.filter(lambda x: 'normal' in x)

## 2. Action: count()

##### count partitions 

In [42]:
normal_raw_data_ptns.getNumPartitions()

6

In [43]:
normal_raw_data.getNumPartitions()

1

## 2.1 %%timeit does not memorize the name normal_count
* Do it without %%timeit if you want it to be in memory

## 2.2 Less time if using more partitions

In [48]:
from time import time
t0 = time()
counts = normal_raw_data.count()
tt = time() - t0
tt*1000

549.1409301757812

In [49]:
from time import time
t0 = time()
counts = normal_raw_data_ptns.count()
tt = time() - t0
tt*1000

286.2701416015625

## 3. raw_data count takes less time since normal_raw_data need to do one more step:  filtering

In [21]:
type(raw_data), type(normal_raw_data)

(pyspark.rdd.RDD, pyspark.rdd.PipelinedRDD)

In [22]:
# %%timeit
raw_count = raw_data.count()
print(raw_count)

494021


## 4. Turn RDD to PipelineRDD by transformation: map, filter, etc..

In [23]:
mp_raw_data = raw_data.map(lambda x: x)
type(mp_raw_data)

pyspark.rdd.PipelinedRDD

In [24]:
type(mp_raw_data)

pyspark.rdd.PipelinedRDD

### time the operations: filter
* time 1000 for ms
* default is by second

In [25]:
from time import time
t0 = time()
normal_count = normal_raw_data.count()
tt = time() - t0
tt*1000

551.8331527709961

## 5. Turn PipelineRDD into RDD

In [26]:
back_raw_data = mp_raw_data.collect()

In [27]:
type(back_raw_data)

list

In [28]:
back_raw_data[:5]

['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.']

In [30]:
import pandas as pd
df = pd.DataFrame(back_raw_data[:5])
df

Unnamed: 0,0
0,"0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0..."
1,"0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,..."
2,"0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0..."
3,"0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0..."
4,"0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0..."


## Try to partition collected data

* Use numSlices to partition: sc.parallelize(back_raw_data,numSlices=8)
* If using sc.parallelize(back_raw_data).partitionBy(9), then can not count()

In [58]:
rdd = sc.parallelize(back_raw_data).partitionBy(9)
rdd.getNumPartitions()

9

In [53]:
type(rdd)

pyspark.rdd.RDD

In [56]:
%%timeit
rdd.collect()

513 ms ± 18.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [51]:
rdd.getNumPartitions()

8

#### Give error to count

rdd.count() # Give error

#### Use numSlices to partition is okay, w/o errors

In [61]:
rdd = sc.parallelize(back_raw_data,numSlices=8)

In [62]:
rdd.getNumPartitions()

8

In [38]:
type(rdd)

pyspark.rdd.RDD

In [39]:
rdd.count()

494021

In [40]:
type(rdd),type(raw_data)

(pyspark.rdd.RDD, pyspark.rdd.RDD)

In [33]:
%%timeit
rdd_data = rdd.collect()

727 ms ± 15.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [34]:
# %%timeit
rdd.count()

494021

In [None]:
from pprint import pprint
csv_data = raw_data.map(lambda x: x.split(","))
t0 = time()
head_rows = csv_data.take(5)
tt = time() - t0
print "Parse completed in {} seconds".format(round(tt,3))
pprint(head_rows[0])

In [None]:
def parse_interaction(line):
    elems = line.split(",")
    tag = elems[41]
    return (tag, elems)

key_csv_data = raw_data.map(parse_interaction)
head_rows = key_csv_data.take(5)
pprint(head_rows[0])