# Introduction to Spark

Apache Spark is an open-source distributed general-purpose cluster-computing framework. Spark provides an interface for programming entire clusters with implicit data parallelism and fault tolerance.

During this lecture we try out some basic spark concepts and operations. We start by loading a data into a Resilient Distributed Data Dataset (RDD) and perform some basic transformations and actions on these data.

## Creating RDD from Python list

When working with Spark at Databricks, a spark context (`SparkContext`), which represents the connection to a Spark cluster, and is used for create RDDs, is automatically created for you, when you attach a notebook to a cluster.

In [None]:
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data, 4)
rdd

## Some Spark Tranformations

### Map

`map(func)` return a new distributed dataset formed by passing each element of the source through a function *func*

In [None]:
result_rdd = rdd.map(lambda x: x * 2)
result_rdd.collect()

### Filter

`filter(func)` return a new dataset formed by selecting those elements of the source on which *func* returns `True`

In [None]:
rdd_result = rdd.filter(lambda x: x % 2 == 0) 
rdd_result.collect()

### Distinct

`distinct(func)` return a new dataset that contains the distinct elements of the source dataset

In [None]:
rdd2 = sc.parallelize([1, 4, 2, 2, 3]) 
result_rdd = rdd2.distinct()
result_rdd.collect()

### Flat Map

`flatMap(func)` similar to `map`, but each input item can be mapped to 0 or more output items (so *func* should return a *Seq* rather than a single item)

In [None]:
rdd = sc.parallelize([1, 2, 3])
rdd_result = rdd.map(lambda x: [x, x+5])
rdd_result.collect()

In [None]:
rdd_result = rdd.flatMap(lambda x: [x, x+5]) 
rdd_result.collect()

## Some Spark Actions

### Reduce

`reduce(func)` aggregate dataset’s elements using function *func*. *func* takes two arguments and returns one, and is commutative and associative so that it can be computed correctly in parallel

In [None]:
rdd = sc.parallelize([1, 2, 3]) 
rdd.reduce(lambda a, b: a * b) 

### Take

`take(n)` return an array with the first *n* elements

In [None]:
rdd.take(2)

### Collect

`collect()` return all the elements as an array

In [None]:
rdd.collect()

### Take Ordered

`takeOrdered(n, key=func)` return *n* elements ordered in ascending order or as specified by the optional key function

In [None]:
rdd = sc.parallelize([5,3,1,2])
rdd.takeOrdered(3, lambda s: -1 * s) 

## Some Key-Value Transformations

### Reduce By Key

`reduceByKey(func)` return a new distributed dataset of (K,V) pairs where the values for each key are aggregated using the given reduce function *func*, which must be of type (V,V)➞V

In [None]:
rdd = sc.parallelize([(1,2), (3,4), (3,6)]) 
rdd.reduceByKey(lambda a, b: a + b).collect()

### Sort By Key

`sortByKey(func)` return a new dataset (K,V) pairs sorted by keys in ascending order

In [None]:
rdd2 = sc.parallelize([(1,'a'), (2,'c'), (1,'b')]) 
rdd2.sortByKey().collect()

### Group By Key

`groupByKey(func)` return a new dataset of `(K, Iterable<V>)` pairs

In [None]:
rdd2 = sc.parallelize([(1,'a'), (2,'c'), (1,'b')]) 
rdd2.groupByKey().collect()

Output resulting iterables as lists:

In [None]:
rdd2.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()

# RDDs and Key Value Pairs

Now that we've worked with RDDs and how to aggregate values with them, we can begin to look into working with Key Value Pairs. In order to do this, let's create some fake data as a new text file.

This data represents some services sold to customers for some SAAS business.

First, let's load our file into Databricks by using Data section.

Spark supports various filesystems (S3, HDFS, etc.), but in our symple case we will load a locally stored file using 

`rdd = sc.textFile("/path/to/file")`.

In [None]:
services = sc.textFile('/FileStore/tables/services.txt')
services.take(3)

Let's start transforming our RDD to a list of rows, where each row is represented by another list conisting of individia cell values. For that we apply Python's `split` function that by default transforms comma-separated elements of a string into a list.

In [None]:
services.map(lambda x: x.split()).take(3)

Let's remove that first hash-tag `#`.

In [None]:
services.map(lambda x: x[1:] if x[0]=='#' else x).take(3)

Now we create a 'recipe' that includes two transformations described above and trigger the transformations by applying `collect` action.

In [None]:
services.map(lambda x: x[1:] if x[0]=='#' else x) \
        .map(lambda x: x.split()).take(3)

We have transformed our file into a format that will allow us to do useful processing of the data. Usually, when working with a tabular data, one will use SparkSQL frameworks that introduces concept of a DataFrame and provides tools for DataFrame operations. However, right now we will rely basic Spark transformartions instead. 

Let's find out the total sales per state.

First, let's store our previons transformation in a variable:

In [None]:
cleanServ = services.map(lambda x: x[1:] if x[0]=='#' else x) \
                    .map(lambda x: x.split())

Let's apply a `map` transformatin that will result in RDD containing `(key, value)` pairs containing information on transactions that is required for counputing sales per state: (State, Transaction)

In [None]:
cleanServ.map(lambda lst: (lst[3],lst[-1])).take(3)

Let's get rid of `('State', 'Amount')` tuple:

In [None]:
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
         .filter(lambda x: not x[0]=='State')\
         .take(3)

Let's apply `reduceByKey` transformation that will return a dataset of (State, Aggregated_Transactions) key-values, where values for each key (State) are aggregated using the given reduce function that will compute the sum of all the transactions for each state.
Notice how it assumes that the first item is the key!

In [None]:
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
         .filter(lambda x: not x[0]=='State')\
         .reduceByKey(lambda amt1,amt2 : amt1+amt2)\
         .take(3)

Looks like we forgot that the amounts are still strings! Let's fix that:

In [None]:
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
         .filter(lambda x: not x[0]=='State')\
         .reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))\
         .take(3)

We can continue our analysis by sorting this output:

1. Grab state and amounts.
1. Get rid of ('State','Amount').
1. Add amounts for each state transforming the amounts into strings.
1. Sort results by the amount value.

In [None]:
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
         .filter(lambda x: not x[0]=='State')\
         .reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))\
         .sortBy(lambda stateAmount: stateAmount[1], ascending=False)\
         .collect()