# Big Data Concepts in Python

### Lambda Functions
* Defined inline  and are limited to a single expression
1. Takes an iterable
2. Sets the iterable to lowercase
3. Sorts the items within the list

In [6]:
x = [('Science', 90), ('English', 88), ('Maths', 97), ('Social sciences', 82)]
print('Sorted by Topic: %s' % sorted(x, key=lambda x: x[0]))
print('Sorted by Score: %s' % sorted(x, key=lambda x: x[1]))

Sorted by Topic: [('English', 88), ('Maths', 97), ('Science', 90), ('Social sciences', 82)]
Sorted by Score: [('Social sciences', 82), ('English', 88), ('Science', 90), ('Maths', 97)]


#### Filter( )

* Function: Filters out items based on a condition typically as a lambda function
1. Takes an iterable
2. Calls the lambda function on each item
3. Returns the items where lambda == True

* Filter in Method 1 returns an iterable rather than the actual item
    * Useful for Big Data sets as this prevents us from having to store datasets up to terabytes in size in memory

In [8]:
x = ['Python', 'programming', 'is', 'dank!']

# Method 1
print(list(filter(lambda arg: len(arg) < 8, x)))

# Method 2
def is_less_than_8_characters(item):
    return len(item) < 8

results = []
for item in x:
    if is_less_than_8_characters(item):
        results.append(item)

print(results)

['Python', 'is', 'dank!']
['Python', 'is', 'dank!']


#### Map( )

* Function: Applies a 1:1 mapping of the original items to a function return

1. Takes an iterable
2. Calls the lambda function on each item
3. Returns the mapped item output

* Note: The Map() function (as opposed to filter()) will always return the same number of items passed in

In [4]:
# Method 1
print(list(map(lambda arg: arg.upper(), x)))

# Method 2:
results = []

x = ['Python', 'programming', 'is', 'dank!']
for item in x:
    results.append(item.upper())
    
print(results)

['PYTHON', 'PROGRAMMING', 'IS', 'DANK']


#### Reduce ( ) 
* Function: Applies a function to elements of an iterable to transform them into a single value

1. Takes an iterable
2. Calls the iterable and its subsequent element
3. Returns the combined vlaue of the iterable and its following element

* Note: In this function, the items in teh iterable from left to right are combined into a singl item

In [None]:
from functools import reduce
x = ['Python', 'programming', 'is', 'awesome!']
print(reduce(lambda val1, val2: val1 + val2, x))

---
### Spark and PySpark

#### What is Spark?
* Apache Spark can be considered as a generic engine for processing large amounts of data
* Primarily runs on Scala and JVM

#### What is PySpark?
* Python-based wrapper on top of the Scala API
    * Like a library that allows for the processing of large amounts of data on a single machine / cluster
    * Almost like utilizing the multithreading / multiprocessing without those modules
* Pyspark can exist due to the following: 
    * Scala is functional-based
    * Functional code is easier to parallelize

#### Pyspark API and Data Structures
* RDDs: ***R***esilient ***D***istributed ***D***atasets
    * Specilized data structures to use within Spark
    * Can almost be considered like a pandas df
    * Hides the complexity of transforming and distributing data across multiple nodes via a scheduler
    * RDD's are immutable
        * Once created, it cannot be changed
    * RDD's are fault tolerant
        * Upon failure, the RDD will recover automatically
    * Types of Operations:
        * Transformation
            * When applied on an RDD, these create a new RDD
            * EX: Filter, groupBy, map
        * Action
            * Instructs Spark to perform a computation and send the result back to the driver 
        
* SparkContext:
    * Entrypoint of any PySpark program that connects to a Spark cluster and creates RDDs
* RDDs can be created out of common datastructures like lists and tuples
    * Done via the parallelize( ) function
    * As the data isn't actually stored, functions like take ( ) allows for you to see the data without destroying your machine!
    
``` 
      ------<------------ Worker Node: {Executor: [task, task], Cache: []}
      |                 /
Spark Context --> Cluster Manager 
      |                 \
      -------<----------- Worker Node: {Executor: [task, task], Cache: []}

```

##### Example
```python
big_list = range(10000)
rdd = sc.parallelize(big_list, 2)
odds = rdd.filter(lambda x: x % 2 != 0)
odds.take(5)
# [1,3,7,5,9]
```

#### MapReduce

* Map: Filter and sort data
* Reduce: Aggregate inputs and reduce its size

```
            |Block 1 <-> Map|-
         /                    \               Shuffle
Input ----> |Block 2 <-> Map|----> Combine ->    &    -- > Reduce -> Output
         \                    /                Sort
            |Block 3 <-> Map|-
```


#### Spark vs. MapReduce

* Spark Pros:
  * Spark is great for when you can hold all of your data in memory
      * It leverages RAM in order to be ~100x faster than MapReduce which uses diskspace
  * Spark has a diverse set of API access to make it easy to access
      * Python, Scala, Java, SQL
      * MapReduce does have HIVE and PIG which allows for more access than just pure Java
* MapReduce Pros:
  * MapReduce is great for data that can't fit in memory

---
### Pyspark Code Examples

#### File Read In Example

* Example program to count the number of A's and B's in a file
```python
ex_file = "file:///home/hadoop/spark-2.1.0-bin/README.md
sc = SparkContext("local", "first app") # Define and instantiate Spark file
logData = sc.textFile(logFile).cache() # Read the file into the cache
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print('Lines with a: %i, lines with b: %i' % (numAs, numBs)
```

### RDD Operations

#### count()
* Returns the number of elements in the RDD

```python
from pyspark import SparkContext
sc = SparkContext('local', 'count app')
words = sc.parallelize (['scala', 'java', 'hadoop', 'spark'])
counts = words.count()
print('Number of counts: %i (counts))
# > Number of counts: 4
```

#### collect()
* Returns all of the elements in an RDD
```python
from pyspark import SparkContext
sc = SparkContext('local', 'collect app')
words = sc.parallelize (['scala', 'java', 'hadoop', 'spark'])
coll = words.collect()
print('Elements: %s' % coll)
# > Elements: ['scala', 'java', 'hadoop', 'spark']
```

#### foreach()
* Returns only the elements which meet the condition of hte function inside the foreach
```python
from pyspark import SparkContext
sc = SparkContext('local', 'foreach app')
words = sc.parallelize (['scala', 'java', 'hadoop', 'spark'])
def f(x): print(x)
fore = words.foreach(f)
# > scala
#   java
#   hadoop
#   spark
```

#### filter(f)
* Returns a new RDD which contain the elements that satisfy the conditions of a filter
```python
from pyspark import SparkContext
sc = SparkContext('local', 'filter app')
words = sc.parallelize (['scala', 'java', 'hadoop', 'spark'])
words_filter = words.filter(lambda x: 's' in x) # create filter
filtered = words.filter.collect() # gather information
print('Filtered RDD: %s' % filtered)
# > ['scala', 'spark']
```

#### map(f, preservesPartitioning = False)
* Returns a new RDD filled with a mapped output for each element

```python
from pyspark import SparkContext
sc = SparkContext('local', 'map app')
words = sc.parallelize (['scala', 'java', 'hadoop', 'spark'])
words_map = words.map(lambda x: (x, x.upper)
mapping = words_map.collect()
print('Key Val Pair: %s' % mapping)
# > Key Val Pair: [('scala', 'SCALA'), ('java', 'JAVA'), '(hadoop', 'HADOOP'), ('spark', 'SPARK')]
```

#### reduce(f)
* Returns the associated RDD element with a specified binary operation

```python
from pyspark import SparkContext
sc = SparkContext('local', 'reduce app')
nums = sc.parallelize ([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print('Sum of all elements: %i' % adding)
# > Sum of all elements: 15
```

#### join(other, numPartitions=None)
* Returns an RDD that has joined matching keys 

```python
from pyspark import SparkContext
sc = SparkContext('local', 'join app')
x = sc.parallelize([('spark', 1), ('hadoop', 4)])
y = sc.parallelize([('spark', 2), ('hadoop', 5)])
joined = x.join(y)
final = joined.collect()
print('Joined RDD: %s' % final)
# > Joined RDD: [('spark', (1, 2)), ('hadoop', (4, 5))]
```

#### cache()
* Adds the RDD to the default storage level and checks if its cached or not

```python
from pyspark import SparkContext
sc = SparkContext('local', 'map app')
words = sc.parallelize (['scala', 'java', 'hadoop', 'spark'])
words.cache()
caching = words.persist().is_cached
print('Cached: %s' % cachine)
# > Cached: true
```
---

### PySpark Broadcast & Accumulator

* Spark will use shared variables for parallel processing
    * These typically will get send when teh driver sends a tasks to the cluster executor
    
* Shared Variable Types:
    * Broadcast
        * Saves the copy of data across all nodes
        * Cached on all machines and not sent on machines with tasks
        ```python
        from pyspark import SparkContext 
        sc = SparkContext("local", "Broadcast app") 
        words_new = sc.parallelize (['scala', 'java', 'hadoop', 'spark'])
        data = words_new.value 
        print "Stored data -> %s" % (data) 
        # > ['scala', 'java', 'hadoop', 'spark']
        ```
    * Accumulator
        * Used for aggregating information through associative and commutative operations
        * Data is used ONLY within the driver program
        ```python
        from pyspark import SparkContext 
        sc = SparkContext("local", "Accumulator app") 
        num = sc.accumulator(10) 
        def f(x): 
           global num 
           num+=x 
        rdd = sc.parallelize([20,30,40,50]) 
        rdd.foreach(f) 
        final = num.value 
        print "Accumulated value is -> %i" % (final)
        # > Accumulated value is -> 150
        ```
        
