# Introduction to Data Mining with Apache Spark

In this lecture Spark basics are described.

Spark dustribution used in this notebook is of version 2.0.2 and is fetched from http://spark.apache.org/downloads.html

Also an appripriate Java installation is needed.

**Note:** if you experience some troubles with pyspark freezing on basic operation consider
launching jupyter notebook with the following command:
```
PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=7777" pyspark --master 'local[4]' --executor-memory 2000M --driver-memory 2000M
```

In [1]:
# Configure the necessary Spark environment
import os
import sys

SPARK_HOME = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, SPARK_HOME + "/python")

# Add the py4j to the path.
# You may need to change the version number to match your install
#sys.path.insert(0, os.path.join(SPARK_HOME, 'python/lib/py4j-0.10.3-src.zip'))

# Initialize PySpark to predefine the SparkContext variable 'sc'
#execfile(os.path.join(SPARK_HOME, 'python/pyspark/shell.py'))

## A little bit of Functional Programming

Functional programming evolved from lambda calculus, an alternative to Turing Machines mathematical framework for defining computations.

Generally functional programming provides more abstract notation than imperative, and remarkably well suited for distributed programming.

### High order functions

In functional programming everything is a function. Even constants like 1 are (in a little bit tricky manner) defined as functions. There is not any special constructs like `while` or `for` cycles, everything is built around high order functions: functions that can receive or return another functions.

In this section we will introduce the most common high-order functions, 95% of everyday programs can be written with them.

We will use Scala-like notation for function signatures:

```scala
def f(x: List[Int], f: (Int, String) -> Double): List[Double]
```

means function that receives two arguments:
- `x` of type `Lst[Int]` (list of integers)
- `f` function, that receives 2 arguments of types `Int` and `String` and returns `Double`

and returns `List[Double]`.

Most of the time we will operate with some abstract types and will denote them by capital Latin letters, e.g. `A`, `B`:

```scala
def f(x: List[A], f: A -> B): List[B]
```

#### Map

```scala
def map(f: A -> B, xs: List[A]): List[B]
```

applies `f` to each element of `xs`.

In [2]:
map(lambda x: str(x) + "!", range(10))

['0!', '1!', '2!', '3!', '4!', '5!', '6!', '7!', '8!', '9!']

#### Reduce functions

```scala
def fold(zero: T)(f: (T, T) -> T, xs: List[T]): T
def reduce(f: (T, T) -> T, xs: List[T]): T
```

performs reduce operation, equalent python code:

```python
acc = zero
for x in xs:
    acc = f(acc, x)
```

In [3]:
def min(xs):
    return reduce(lambda a, b: a if a < b else b, xs)

### python equalent for fold is reduce with third argument
def sum(xs):
    return reduce(lambda a, b: a + b, xs, 0)

In [4]:
min(range(10))

0

In [5]:
sum(range(10))

45

In [6]:
sum([])

0

### Filter

```scala
def filter(p: T -> Boolean, xs: List[T]): List[T]
```

selects elements of `xs` that satisfy predicate `p`.

In [7]:
print filter(lambda x: x % 2 == 0, range(10))
print [ x for x in range(10) if x % 2 == 0 ]

[0, 2, 4, 6, 8]
[0, 2, 4, 6, 8]


### FlatMap (fmap)

```
def flatMap(f: T -> List[T], xs: List[T]): List[T]
```

similar to map, but function may return an orbitrary number of results.

In [8]:
def flatMap(f, xs):
    return [ y for x in xs for y in f(x) ]

In [9]:
flatMap(lambda x: [x - 1, x], range(10))

[-1, 0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9]

In [10]:
def filter(p, xs):
    return flatMap(lambda x: [x] if p(x) else [], xs)

def map(f, xs):
    return flatMap(lambda x: [f(x)], xs)

These operators are enough to easily construct 90% of everyday programs.

## Spark

### RDD

Spark operates with a Resilient Distributed Dataset, which is a ... distributed collection of data.
RDD can hold a tremendous amount of data, it is limited only by total memory of all machines in cluster.
Every Spark program is a transformation of RDDs.

#### Creating an RDD

There is a number of ways to create RDD.
We will use only two the most simple methods:
- `textFile` - loads lines of a text files as RDD, the resulting type is `RDD[String]`;
- `parallelize` - make an RDD from a plain collection (e.g. python list).

In practice, working with large amounts of data it is rarely possible to make a `RDD` from a collection gathered on a single machine. Usually data is loaded from distributed filesystems (such as Hadoop Distributed FileSystem, HDFS), or from databases (especially, distributed ones, like Cassandra).

Usually, such databases provide adapters for Spark.

In [11]:
### all functions for creating a RDD are hold by SparkContext (usually denoted by `sc`).

lines = sc.textFile(os.path.join(SPARK_HOME, 'README.md'))
numbers = sc.parallelize(xrange(int(1.0e7)))

small_numbers = sc.parallelize(xrange(int(1.0e6)))

In [12]:
lines

/opt/spark-2.0.2-bin-hadoop2.7/README.md MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2

In [13]:
numbers.take(10)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [14]:
lines.take(10)

[u'# Apache Spark',
 u'',
 u'Spark is a fast and general cluster computing system for Big Data. It provides',
 u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that',
 u'supports general computation graphs for data analysis. It also supports a',
 u'rich set of higher-level tools including Spark SQL for SQL and DataFrames,',
 u'MLlib for machine learning, GraphX for graph processing,',
 u'and Spark Streaming for stream processing.',
 u'',
 u'<http://spark.apache.org/>']

### Transformations and actions

There is 2 type of operations one can do with RDDs:
- transformations: an operation that transforms RDD into another RDD;
- action: an operation that transorms RDD into some plain value.

Note that nothing (`void` in `C/C++` and `Java`, `None` in `Python` and `Unit` or `()` in Scala) is also a plain value.

The fundamental differences between these is that only action triggers actual computations.

In [15]:
%%time

plusX = []

for x in xrange(100):
    ### map is a transformation
    ### so no actual computations are performed here
    plusX.append(
        numbers.map(lambda x: x + 1)
    )

CPU times: user 0 ns, sys: 3.77 ms, total: 3.77 ms
Wall time: 1.51 ms


In [16]:
%%time

### sum is a action
### invoking sum triggers materialization of map
numbers.map(lambda x: x + 1).sum()

CPU times: user 10.6 ms, sys: 480 µs, total: 11.1 ms
Wall time: 1.12 s


50000005000000

In [17]:
import numpy as np

In [18]:
%%time

### note the overhead introduced by net communications and python-java conversions.
np.arange(int(1.0e7)).sum()

CPU times: user 50 ms, sys: 16.2 ms, total: 66.3 ms
Wall time: 66.2 ms


49999995000000

#### Actions

The most common actions are:
- saving dataset to a file (e.g. `saveAsTextFile`);
- reduce and its deriviatives (e.g. `sum`, `count`);
- sampling (`take`, `collect`).

In [19]:
small_numbers.sum()

499999500000

In [20]:
small_numbers.take(10)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

#### Transformations

Transformations are RDD to RDD operations.
The most common ones are:
- map;
- filter;
- flatMap.

In [21]:
small_numbers.map(lambda x: x + 1).sum()

500000500000

In [22]:
small_numbers.filter(lambda x: x % 2 == 0).sum()

249999500000

In [23]:
words = lines.flatMap(lambda x: x.split())
words.take(10)

[u'#',
 u'Apache',
 u'Spark',
 u'Spark',
 u'is',
 u'a',
 u'fast',
 u'and',
 u'general',
 u'cluster']

### Pair RDD transformations

There is a special type of RDD: `PairRDD`, which is essentially collection of pairs of values, i.e. key-value pairs.

In [24]:
counts = words.map(lambda x: (x, 1))

counts.take(10)

[(u'#', 1),
 (u'Apache', 1),
 (u'Spark', 1),
 (u'Spark', 1),
 (u'is', 1),
 (u'a', 1),
 (u'fast', 1),
 (u'and', 1),
 (u'general', 1),
 (u'cluster', 1)]

The most common operations on `PairRDD` is `groupByKey` and `reduceByKey`.

In [25]:
counts.groupByKey().map(lambda (k, v): (k, list(v))).take(5)

[(u'when', [1]),
 (u'R,', [1]),
 (u'including', [1, 1, 1]),
 (u'computation', [1]),
 (u'using:', [1])]

In [26]:
freqs = counts.reduceByKey(lambda a, b: a + b)

freqs.take(5)

[(u'when', 1),
 (u'R,', 1),
 (u'including', 3),
 (u'computation', 1),
 (u'using:', 1)]

Spark allows to combine two RDDs. Since data are distributed, keys are natural (and almost only valid) way to tell which value from one dataset corresponds to values from another. 

In [27]:
n_letters = words.map(lambda w: (w, len(w))).distinct()
n_letters.take(10)

[(u'Interactive', 11),
 (u'easiest', 7),
 (u'stream', 6),
 (u'runs.', 5),
 (u'Example', 7),
 (u'Hadoop-supported', 16),
 (u'Python,', 7),
 (u'documentation,', 14),
 (u'web', 3),
 (u'fast', 4)]

In [28]:
n_letters.join(freqs).take(5)

[(u'detailed', (8, 2)),
 (u'engine', (6, 1)),
 (u'storage', (7, 1)),
 (u'project', (7, 1)),
 (u'usage', (5, 1))]

In [29]:
n_letters.leftOuterJoin(freqs).take(5)

[(u'detailed', (8, 2)),
 (u'engine', (6, 1)),
 (u'storage', (7, 1)),
 (u'project', (7, 1)),
 (u'usage', (5, 1))]

In [30]:
n_letters.rightOuterJoin(freqs).take(5)

[(u'detailed', (8, 2)),
 (u'engine', (6, 1)),
 (u'storage', (7, 1)),
 (u'project', (7, 1)),
 (u'usage', (5, 1))]

In [31]:
### the most efficient and general join-like transformation
### for each unique key it produces pair of iterables with values from each RDD.
n_letters.cogroup(counts).map(lambda (k, (v1, v2)): (k, list(v1), list(v2))).take(5)

[(u'detailed', [8], [1, 1]),
 (u'engine', [6], [1]),
 (u'storage', [7], [1]),
 (u'project', [7], [1]),
 (u'usage', [5], [1])]

**Warning**: since operations on data with the same keys are very common (`reduceByKey`) Spark prefers to store all values with the same key on one machine. Thus if a dataset has too much values with the same key, Spark may crush due to memory error.

## Example, word count

In [32]:
word_count = sc.textFile(
    os.path.join(SPARK_HOME, 'README.md')
).flatMap(
    lambda x: x.split()
).map(
    lambda x: (x, 1)
).reduceByKey(
    lambda a, b: a + b
).persist()

In [33]:
total_word_count = word_count.map(lambda (k, v): v).sum()

In [34]:
word_freq = word_count.map(lambda (w, n): (w, float(n) / total_word_count))

In [35]:
word_freq.collect()

[(u'when', 0.00211864406779661),
 (u'R,', 0.00211864406779661),
 (u'including', 0.006355932203389831),
 (u'computation', 0.00211864406779661),
 (u'using:', 0.00211864406779661),
 (u'guidance', 0.00423728813559322),
 (u'Scala,', 0.00211864406779661),
 (u'environment', 0.00211864406779661),
 (u'only', 0.00211864406779661),
 (u'rich', 0.00211864406779661),
 (u'Apache', 0.00211864406779661),
 (u'sc.parallelize(range(1000)).count()', 0.00211864406779661),
 (u'Building', 0.00211864406779661),
 (u'And', 0.00211864406779661),
 (u'guide,', 0.00211864406779661),
 (u'return', 0.00423728813559322),
 (u'Please', 0.006355932203389831),
 (u'[Eclipse](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-Eclipse)',
  0.00211864406779661),
 (u'Try', 0.00211864406779661),
 (u'not', 0.00211864406779661),
 (u'Spark', 0.03177966101694915),
 (u'scala>', 0.00211864406779661),
 (u'Note', 0.00211864406779661),
 (u'cluster.', 0.00211864406779661),
 (u'./bin/pyspark', 0.00

### MNIST exercise

In [36]:
import os
import os.path as osp

import numpy as np

%matplotlib inline
import matplotlib.pyplot as plt

import sklearn

In [37]:
%%sh

wget -q -nc https://raw.githubusercontent.com/amitgroup/amitgroup/master/amitgroup/io/mnist.py

In [38]:
### http://g.sweyla.com/blog/2012/mnist-numpy/
import mnist

In [39]:
%%sh

mkdir -p mnist && {
    cd mnist;
    wget -q -nc http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz &&
    wget -q -nc http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz &&
    wget -q -nc http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz &&
    wget -q -nc http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz &&
    gunzip *.gz
}

gzip: t10k-images-idx3-ubyte already exists;	not overwritten
gzip: t10k-labels-idx1-ubyte already exists;	not overwritten
gzip: train-images-idx3-ubyte already exists;	not overwritten
gzip: train-labels-idx1-ubyte already exists;	not overwritten


In [40]:
X, y = mnist.load_mnist(dataset='training', path='mnist/')
X = X.reshape(-1, 1, 28, 28)

X_test, y_test = mnist.load_mnist(dataset='testing', path='mnist/')
X_test = X_test.reshape(-1, 1, 28, 28)

In [41]:
X_train_rdd = sc.parallelize(
    (i, y[i], X[i].ravel().copy())
    for i in xrange(X.shape[0])
).persist()

X_test_rdd = sc.parallelize(
    (i, y_test[i], X_test[i].ravel().copy())
    for i in xrange(X_test.shape[0])
).persist()

In [42]:
def softmax(x):
    x_max = np.max(x)
    
    exps = np.exp(x - x_max)
    
    return exps / np.sum(exps)

In [43]:
class LogisticRegression(object):
    def __init__(self, C = 1.0e-3):
        self.W = np.random.uniform(-1, 1, size=(28 * 28, 10))
        self.b = np.random.uniform(-1, 1, size=(10, ))
        
        self.C = C
    
    def predict(self, X):
        ### note that this is not an efficient implementation
        ### of X.dot(W)
        return X.map(
            lambda (i, y, x): softmax(x.dot(self.W) + self.b)
        )

In [44]:
lr = LogisticRegression()

In [45]:
lr.predict(X_test_rdd).collect()[:5]

[array([  2.02476371e-03,   9.60981585e-04,   8.13040289e-03,
          3.17359036e-03,   1.49190365e-02,   4.81511129e-03,
          1.08157937e-02,   9.42059854e-01,   1.30980772e-02,
          2.38827921e-06]),
 array([  7.08920531e-01,   2.44384021e-06,   7.97656774e-05,
          3.13705662e-08,   2.60347080e-08,   1.84057095e-11,
          1.29229970e-01,   7.44069585e-06,   1.61757765e-01,
          2.02621951e-06]),
 array([  9.66105450e-03,   8.79654741e-01,   1.41754681e-03,
          6.57827614e-06,   7.38867021e-02,   1.85144117e-04,
          1.70333515e-02,   3.25249027e-03,   1.15099660e-02,
          3.39242565e-03]),
 array([  3.43460159e-02,   2.63254609e-03,   3.30914975e-01,
          9.00755856e-07,   4.17104012e-03,   7.06484947e-02,
          2.51425230e-01,   7.04200604e-02,   2.35392327e-01,
          4.84103385e-05]),
 array([  2.80198154e-06,   2.71215256e-06,   6.77019650e-04,
          6.12829718e-05,   2.52962946e-06,   2.71294854e-04,
          4.93400426

In [46]:
X_test_rdd.count()

10000

In [47]:
X_train_rdd.count()

60000