# Notes

- Resilient Distributed Datasets
    - big set of objects
    - keep each slice in many different computers
    - immutable, not designed for read/write
    - lazy operations

### Shared Spark Variables

- Broadcast variables 
    - copy is kept at each node
- Accumulators
    - you can only add; main node can read

### Functional programming in python

- Functional tools in python
    - map (applies function to list, returns results to new list)
    - filter
    - reduce 
    - lambda
    - itertools (chain, flatmap)

### Map in Python

- Apply an operation to each element of a list, return a new list with the results

In [2]:
# example of how map works
def add1(x):
    return x+1

list(map(add1, [1, 2, 3]))

[2, 3, 4]

### Filter in Python

- select only certain elements from a list

In [4]:
# example of filter
a = [1, 2, 3, 4]

def isOdd(x):
    return x%2 == 1

list(filter(isOdd, a))

[1, 3]

### reduce in Python

- applies a function to all pairs of elements of a list; returns one value, not a list

In [10]:
a = range(1, 5)
list(a)

def add(x, y):
    return x + y

from functools import reduce
reduce(add, a)

10

### Lambda in Python

- when doing map/reduce/filter, we end up with many tiny functions
- lambdas allow us to define a function as a value, without giving it a name

In [20]:
# example of lambda
x = range(0, 10)

x = map(lambda x: x+1, x)

list(x)

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

### More exercises

In [21]:
# given
a = [(1,2),(3,4),(5,6)]

In [23]:
# write an expression to get only the second elements of each tuple
list(map((lambda t: t[1]), a))

[2, 4, 6]

In [28]:
# write an expression that adds up the second elements
reduce((lambda x,y: x+y), map((lambda t: t[1]), a))

12

In [29]:
# write an expression that returns the odd numbers and 
reduce((lambda x,y: x+y), filter(isOdd, map((lambda t: t[0]), a)))

9

### Flatmap

- sometimes we end up with list of lists, and we want a 'flat' list
- many functional programming languages(and Spark)

### Creating RDDs in Spark

- All Spark commands operate on RDDs (think big distributed list)
- You can use sc.parallelize to go from list to RDD
- Many commands are lazy (they don't actually compute the results until you need them)
- In pySpark, `sc` represents your SparkContext

- `sc.parallelize(range(1,10)).first()` returns 1

### Simple Example(s)

- `list1 = sc.parallelize(range(1, 1000))`
- `list2 = list1.map(lambda x: x*10)` # notice lazy
- `list2.reduce(lambda x,y: x + y)`
- `list2.filter(lambda x: x%100 == 0).collect()`

- super fast because it doesn't actually do anything unless called upon
    - example = `.first()` 

### Some RDD methods

- Transformations
    - `.map()`: returns a new RDD applying f to each element
    - `.filter(f)`: returns a new RDD containing elements that satisfy f
    - `.flatmap(f)`: returns a 'flattened' list
- Actions
    - `.reduce(f)`: returns a value reducing RDD elements with f
    - `.take(n)`: returns n items from the RDD
    - `.collect()`: returns all elements as a list
    - `.sum()`: sum of (numeric) elements of an RDD
        - i.e. max, min, mean...

### More examples

- `rdd1 = sc.parallelize(range(1,100))`
- `rdd1.map(lambda x: x*x).sum()`
- `rdd1.filter(lambda x: x%2==0).take(5)`


### Exercises

- `sc.parallelize(range(1,10)).filter(lambda x: x%3==0).reduce(lambda x,y: x*y)`

### Reading files

- `sc.textFile(urlOrPath, minPartitions, useUnicode=True)`
    - returns an RDD of strings (one per line)
    - can read from many files, using wildcards (*)
    - can read from hdfs
    - normally use map right after and split/parse the lines
- Example:
    - `people = sc.textFile('../data/people.txt')`

### Tuples and ReduceByKey

- Many times we want to group elements first, and then calculate values for each group
- In spark, we operate on tuples <Key, Value> and we normally use reduceByKey to perform a reduce on the elements of each group

### People example/exercises

- We have a `people.txt` file with following schema:
    - Name | Gender | Age | Favorite Language
- We can load with:
    - `people = sc.textFile('../data/people.txt').map(lambda x: x.split('\t\))`
- Find number of people by gender
    - first get tuples like: ('M', 1), ('F',1) then reduce by key
    - `people.map(lambda t: (t[1],1)).reduceByKey(lambda x,y: x+y).collect()`

### Sending programs within shell

- Can use extra parameters to include python programs in your shell
    - `--py-files` (and list of files, separated with spaces)
        - can use `.py`, `.zip`, `.egg`
    - `--jars` to include java jars
    - `--packages`, `--repositories` to include maven package (java)
    - `--files` to include arbitrary files in home folder of executor
- Get out of pyspark
    - Ctrl-D
- Run it again, including `person.py` in your `--py-files`

### Person with Objects

- Number of people by gender
    - `people.map(lambda t: (t.gender,1)).reduceByKey(lambda x,y: x+y).collect()
- Let's do number of people by programming language
- Youngest person by gender
    - `people.map(lambda t: (t.gender, t.age)).reduceByKey(lambda x,y: min(x,y))

### Sales example

- Sales: `Day | StoreId | ProductId | QtySold
- Load:
    - `sales = sc.textFile('sales-data/sales_*.txt').map(lambda x: x.split('\t'))
- now sales is an RDD of arrays corresponding to the fields
    - but each field is a string
- total quantity of products sold:
    - `sales.map(lambda x: int(x[3])).sum()`
    
### Example

- `sales_by_store = sales.map(lambda t: (t[1], int(t[3])))`
- `sales_by_store.reduceByKey(lambda t1, t2: t1 + t2).collect()


### Joins

- Joins allow us to combine different RDDs
    - each RDD is of the form <K,V> (key and value)
    - Result is of the form <K,<V1,V2>> (notice the nesting)
    - Joins only on equal keys (equijoin from db)
    - Also have leftOuterJoin, rightOuterJoin and fullOuterJoin
    - And cartesian, if you want the cartesian product, and other kinds of join, but this is potentially very slow
    
### Simple join example

- `states_rdd = sc.parallelize(states)`
- `populations_rdd = sc.parallelize(populations)`
- `states_rdd.join(populations_rdd);`

### New DataTable functionality

- a database is like an RDD but with schema information
    - like a table in SQL, or datatable in pandas
    - generic objects, know their fields
    - datatable knows all its columns
    - all 'rows' are of the same kind (but there are nulls, and arrays, etc)
- we need to either read from places with schemas, or add schema info
- we specify queries on them (similar to RDD, or through SQL), but there's a query optimizer
    - slightly harder to do general aggregates
- much smaller python tax!

### Datatable

- `.select` - like map, can use strings or columns
    - `people.select('name', people.age+1).show()`
- `.filter` - filter certain rows
    - `people.filter(people.age>30)
- `.show` - display nicely
- Pandas syntax for filter
    - `people[people.gender=='F']
- GroupBy returns a grouped RDD
    - `people.groupBy(people.gender).count()`
- Join

### Performance Considerations

- Spark in python is slower than in scala due to translation
    - Spark processes are running in JVM
    - Need to send objects back and forth between JVM and python
- Datatable avoids this translation, it all lives in JVM
    - until last step to client
- Datatable can optimize better
    - but you lose some control
- Shuffling (join/reduce) is more expensive
    - partitioning can help some

### RDD Performance

- RDD is:
    - Lineage
        - set of pertitions/splits
        - list of dependencies on parent RDDs
        - function to comute each partition given its parents
    - Optimized Execution
        - partitioner - which objects go on which partitions
            - partitioning can help when shuffling
        - preferred location for each partition

# _CLICK [HERE](https://youtu.be/9xYfNznjClE) FOR VIDEO_