# Introduction to <font color=blue>pySpark</font> https://spark.apache.org/

![Image of Sklearn](https://spark.apache.org/images/spark-logo-trademark.png)



## RDD creation

**Resilient Distributed Dataset** or **RDD**. An RDD is a distributed collection of elements.

* Creating new RDDs, **transforming** existing RDDs, or calling **actions** on RDDs to compute a result.

* Spark automatically distributes the data contained in RDDs across your cluster and parallelizes the operations you perform on them.

### FIRST check SparkContext is available

In [None]:
sc

### GET SparkContext configuration will be "local" see spark.master

* ### <font color=red>local[*]</font> means Spark locally with as many worker threads as logical cores on your machine.

In [None]:
sc._conf.getAll()

## <font color='magenta'>--------------------------To Do:---------------------------</font>

* ### how many cores do we have?

In [None]:
import .....


## Creating a RDD from a file  : we will use <font color=orange>churn-bigml-80.csv</font>

The most common way of creating an RDD is to load it from a file.

(Spark's `textFile` can handle compressed files directly, as gz).    

## <font color='magenta'>--------------------------To Do:---------------------------</font>

* ### load churn-bigml-80.csv as a text file

In [None]:
data_file = ....
raw_data = sc.textFile(data_file)

Now we have our data file loaded into the `raw_data` RDD.

Without getting into Spark *transformations* and *actions*, the most basic thing we can do to check that we got our RDD contents right is to `count()` the number of lines loaded from the file into the RDD.  

In [None]:
raw_data.count()

We can also check the first few entries in our data.  

In [None]:
raw_data.take(5)

## <font color='magenta'>--------------------------To Do:---------------------------</font>

* ### take first line: first_line
* ### create a `list` with features names: features
* ### print...

In [None]:
first_line=

# create a list with features
features = 
print("number of elements =",.....)
print("Features: \n",.....)

### Check: pySpark "structures" RDD vs python ... lists, etc.

In [None]:
type(raw_data)

In [None]:
type(features)

In the following notebooks, we will use this raw data to learn about the different Spark transformations and actions.  

## Creating and RDD using `parallelize`

Another way of creating an RDD is to parallelize an already existing list.  

## <font color='magenta'>--------------------------To Do:---------------------------</font>

* ### Create  range 0,1,2,.... 1000

... discuss range and list(range)

In [None]:
a = 

data = sc.parallelize(a)

## <font color='magenta'>--------------------------To Do:---------------------------</font>

* As we did before, we can count the number of elements in the RDD.
* and show the first five elements...

In [None]:
data.....

As before, we can access the first few elements on our RDD.  

In [None]:
data.....

## <font color=brown>Let's see now TRANSFORMATIONS and ACTIONS</font>

## The `filter` transformation

This transformation can be applied to RDDs in order to keep just elements that satisfy a certain condition. More concretely, a function is evaluated on every element in the original RDD. The new resulting RDD will contain just those elements that make the function return `True`.

For example, imagine we want to count how many lines contain the word `True` in our dataset.

We will filter our `raw_data` RDD as described below:

## ... We are going to use a `lambda` function

## <font color='magenta'>--------------------------To Do:---------------------------</font>

* ### Define a function `contrainsTrue` that receives a list and returns True or False depending on if the string True is present in the list

* ### Use a lambda function for the same task...

In [None]:
def ...

....

containsTrue(['1','True'])

In [None]:
# Now use a lambda function for the same task

f = ....


f(['1','True'])

## <font color='magenta'>--------------------------To Do:---------------------------</font>

* ### FILTER RDD raw_data depending on if the string True is present in its rows using `lambda` function

In [None]:
True_raw_data = raw_data.filter(........)

### ... `count` is the ACTION

Now we can count how many elements we have in the new RDD.

In [None]:
from time import time
t0 = time()
True_count = True_raw_data.count()
tt = time() - t0
print("There are {} lines containing 'True' word".format(True_count))
print("Count completed in {} seconds".format(round(tt,3)))

Notice that we have measured the elapsed time for counting the elements in the RDD. We have done this because we wanted to point out that actual (distributed) computations in Spark take place when we execute *actions* and not *transformations*. In this case `count` is the action we execute on the RDD. We can apply as many transformations as we want on a our RDD and no computation will take place until we call the first action that, in this case takes a few seconds to complete.
<font color='green' size=4>And this is the "Lazy Evaluation"!!</font>

### ** RDD Operations and Lazy Evaluation ** ###

### <font color='green'>Lazy Evaluation</font> ###
##### Lazy evaluation reduces the number of passes to take over data by grouping operations together. #####
##### - In MapReduce systems like Hadoop, developers often have to spend a lot time considering how to group together operations to minimize the number of MapReduce passes. #####
###### - In Spark, there is no substantial benefit to writing a single complex map instead of chaining together many simple operations. Thus, users are free to organize their program into smaller, more manageable operations. #####

In [None]:
# In this simple example the name of the Churn file is wrong (800 instead of 80)
#
#  select lines in log.txt with "error" messages

data_file_err = '/resources/data/MSTC/churn-bigml-800.csv'
raw_data_err = sc.textFile(data_file_err)

True_raw_data_err = raw_data_err.filter(lambda x: 'True' in x)

# but you will get NO error when executing this cell!  as only a transformation (i.e. filter()) is applied

In [None]:
# Error shows up NOW when performing actions (in this case .count()) (beacause Lazy eval)

True_count = True_raw_data_err.count()

# see error traces below:
# Input path does not exist: file: .... nb2-rdd-basics/kddcup.data_1000_percent.gz

## The `map` transformation

By using the `map` transformation in Spark, we can apply a function to every element in our RDD. Python's lambdas are specially expressive for this particular.

In this case we want to read our data file as a CSV formatted one. We can do this by applying a lambda function to each element in the RDD as follows.

## <font color='magenta'>--------------------------To Do:---------------------------</font>

* ### MAP RDD raw_data `splitting` each row by comas using `lambda` function

In [None]:
csv_data = raw_data......

# see or print some csv_data rows...
....

## <font color='magenta'>--------------------------To Do:---------------------------</font>

* ### Now insert MAP RDD raw_data `splitting` to print some rows 

In [None]:
from time import time
from pprint import pprint
csv_data = ....
t0 = time()
head_rows = csv_data.take(5)
tt = time() - t0
print("Parse completed in {} seconds".format(round(tt,3)))
pprint(head_rows[0])

## The `collect` action

So far we have used the actions `count` and `take`. Another basic action we need to learn is `collect`. Basically it will get all the elements in the RDD into memory for us to work with them. For this reason it has to be used with care, specially when working with large RDDs.

## <font color='magenta'>--------------------------To Do:---------------------------</font>

* ### Calleect all RDD raw_data into all_raw_data 
<br>


<font color='red' size=4>NOTE: (This may be dangerous when large datasets!!!) </font>

In [None]:
t0 = time()
all_raw_data = ......
tt = time() - t0
print("Data collected in {} seconds".format(round(tt,3)))

That COULD took longer as any other action we used before, of course. Every Spark worker node that has a fragment of the RDD has to be coordinated in order to retrieve its part, and then *reduce* everything together.    

## As a last example: try do some TASK combining all the previous steps....

To be defined by 4th december 2017

In [None]:
# get data from file
