# Scalable Data Science Fundamentals

#### Storage Options

**SQL** - well-established open standard, fast index access, high data normalization, costly, difficult to scale, schema changes require DDL

**noSQL** - dynamic schema, linearly scalable, low storage cost, no data normalization/integrity constraints, less established, slower than SQL

**ObjectStorage** - schema-less, linearly scalable, cheap

In general, SQL is suitable for a small amounts of data requiring a stable schema. For larger amounts of data with high ingestion rates or frequent changes of schema, noSQL or ObjectStorage are appropriate. 

#### ApacheSpark

ApacheSpark handles the parallelization of distributed data and processing across many compute (“worker”) nodes. While the underlying execution engine in ApacheSpark is implemented in Scala on top of a Java Virtual Machine (JVM), it has connectors for multiple programming languages including Python, R, Java and Scala. The various languages come with their own advantages and disadvantages, with Python and R falling on the easier-to-learn side of the spectrum, at the cost of performance.

Multiple JVM instances can work in tandem on a single worker node, with the general rule of one JVM per CPU core. For example, a cluster with 100 nodes, 4 CPUs per node, 16 CPU cores per CPU and 4 hyperthreads per core could have 25,600 parallel threads running at the same time. Storage can either be connected via a fast network connection (off-node storage approach) or hard drives can be connected directly to worker nodes (Just a Bunch Of Disks aka JBOD approach). The second approach requires an additional software component called Hadoop Distributed File System (HDFS) to combine and present the disparate storage capacities into one virtual file system.

Central to Spark's abstraction is the **Resilient Distributed Dataset (RDD)**, a distributed immutable collection that resides on the main memory of worker nodes. RDDs are lazy, meaning that data is not read from the underlying storage system unless it is needed. RDDs are inherently fault tolerant and can spill over to disk if there is not enough RAM to store the data.

Distribute data across spark nodes: `rdd = sc.parallelize(range(100))`

Trigger the execution of a Spark job which is divided into individual tasks that are executed in parallel across the cluster: `rdd.count()`

View the first ten elements of the RDD: `rdd.take(10)`

Copy the contents of the data to the local ApacheSpark driver JVM: `rdd.collect()` (be careful with doing this with large datasets as you can cause the driver JVM to crash due to exceeded memory capacity)

**Creating a RDD from External Data in Cloud Object Storage**

```python
from pyspark.sql import SparkSession

cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')

df = spark.read.parquet(cos.url(file, bucket))
df.createOrReplaceTempView('table')
df.show()
```

#### Functional Programming (FP)

The central concept of FP is Lambda Calculus, which enables computations to be expressed as anonymous functions. Scala is the most recent representative of FP, joining the likes of Haskell, while also supporting procedural and OOP. ApacheSpark parallelizes computations using Lambda Calculus.


**Add 1 to each element of a list:**

```
rdd = sc.parallelize(range(100))
rdd.map(lambda x: x+1).take(10)
```

**Sum elements of a list:**

```
sc.parallelize(range(1,101)).reduce(lambda a,b: a+b)
```

#### ApacheSparkSQL

ApacheSparkSQL wraps the RDD with a DataFrame object,  abstracting the RDD API into a more familiar relational interface. This utility produces an abstract syntax tree, which is transformed into a logical query execution plan by the catalyst optimizer. This results in very high performance code that is more intuitive to write.

### Statistical Moments

Statistical moments measure characteristics of distributions:

**Average**

The average is a measure of central tendency of a distribution. 
* Mean: The average of all values

$\bar{x} = \frac{1}{n}\sum_{i=1}^{n}x_i$

* Median: The midpoint of a sorted distribution; more resilient to outliers

```python
rddX = sc.parallelize(random.sample(range(100),100)
rddY = sc.parallelize(random.sample(range(100),100)

meanX = rddX.sum()/float(rddX.count())
```

**Standard Deviation**

The standard deviation measures the spread of data around the mean. Data that is condensed around the mean will have a lower standard deviation.

$\sigma = \sqrt{\frac{1}{n}\sum_{i=1}^{n}(x_i - \bar{x})^2}$

```python
stdX = sqrt(rddX.map(lamba x: pow(x - meanX,2)).sum()/rddX.count())
```

**Skewness**

Skewness represents the asymmetricity of the data or "tail" of the distribution, i.e. positive or negative skew. However, it does not capture the shape of the tail (whether thick and short or thin and long).

$skewness = \frac{1}{n}\frac{\sum_{i=1}^{n}(x_i - \bar{x})^3}{\sigma^3}$

```python
skewness = (1 / rddX.count()) * rddX.map(lambda x: pow(x - meanX, 3)/pow(stdX, 3)).sum()
```

**Kurtosis**

Kurtosis measures the the outlier content of a distribution, i.e. the tail. The higher the kurtosis, the more outliers are present in the data and the longer the tail.

$kurtosis = \frac{1}{n}\frac{\sum_{i=1}^{n}(x_i - \bar{x})^4}{\sigma^4}$

```python
kurtosis = (1 / rddX.count()) * rddX.map(lambda x: pow(x - meanX, 4)/pow(stdX, 4)).sum()
```

**Covariance**

Covariance is a measure of the interdependence of between columns of data.

$cov(x,y) = \frac{1}{n}\sum_{i=1}^{n}(x_i - \bar{x})(y_i - \bar{y})$


```python
rddXY = rddX.zip(rddY)

covXY = rddXY.map(lambda x_y: (x_y[0] - meanX)(x_y[1] - meanY)).sum()/rddXY.count()                 
```

**Correlation**

Correlation is a measure of dependence that ranges from -1 to +1. Total positive/negative dependence results in a correlation of +1/-1, while data that shows no interaction has a correlation of 0. It is useful to utilize results in a correlation matrix when dealing with data in multiple columns.

$corr(x,y) = \frac{cov(x,y)}{\sigma_x\sigma_y}$

```python
corrXY = covXY / (stdX * stdY)
```

### Plotting

Sampling is critical when working with big data. This process involves selecting a subset of the entire dataset to feed into plotting libraries, which expect small inputs in the form of vectors or matrices. Even if these libraries were capable of handling massive quantities of data, we would run into memory and performance issues if we tried to utilize 100% of the information. Random sampling preserves sufficient properties in the original data to enable plotting.

Matplotlib can be used with Spark to visualize data:

* Box plots: distribution of data including mean, standard deviation, skew and outliers
* Histograms: distribution of data within a particular dimension
* Run charts: time series data visualization
* Scatter plot: correlation between variables

**Boxplot**

```python
import matplotlib.pyplot as plt
%matplotlib inline

# Extract desired data
voltage = spark.sql("select voltage from table where voltage is not null")

# Create a Python list
voltage_array = data.rdd.sample(False, 0.1).map(lambda row: row.voltage).collect()

plt.boxplot(voltage_array)
plt.show()
```

**Run chart**

```python
data = spark.sql("select voltage, time from table where voltage is not null order by time asc")

data_sampled = data.rdd.sample(False, 0.1).map(lambda row: (row.voltage, row.time))
voltage_array = data_sampled.rdd.map(lambda x: x[0]).collect()
time_array = data_sampled.rdd.map(lambda x: x[1]).collect()

plt.plot(time_array, voltage_array)
plt.xlabel("time")
plt.ylabel("voltage")
plt.show()
```

**3D Plotting**

```python
from mpl_toolkits.mplot3d import Axes3D

result_df = spark.sql("""
select hardness,temperature,flowrate from washing
    where hardness is not null and 
    temperature is not null and 
    flowrate is not null
""")

result_rdd = result_df.rdd.sample(False,0.1).map(lambda row : (row.hardness,row.temperature,row.flowrate))
result_array_hardness = result_rdd.map(lambda hardness_temperature_flowrate: hardness_temperature_flowrate[0]).collect()
result_array_temperature = result_rdd.map(lambda hardness_temperature_flowrate: hardness_temperature_flowrate[1]).collect()
result_array_flowrate = result_rdd.map(lambda hardness_temperature_flowrate: hardness_temperature_flowrate[2]).collect()

fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')

ax.scatter(result_array_hardness,result_array_temperature,result_array_flowrate, c='b', marker='o')

ax.set_xlabel('Hardness')
ax.set_ylabel('Temperature')
ax.set_zlabel('Flowrate')

plt.show()
```

**Histogram**

```python
plt.hist(result_array_hardness)
plt.show()
```

### Principal Component Analysis (PCA)

Dimensionality reduction transforms an n-dimensional euclidian vector space into a new dataset in a lower dimension where the distance between points are preserved. After this projection process has been applied, we are left with artificial dimensions, called principal components, ordered by the amount of variation that each PC explains in the original dataset. **Using PCA to reduce dimensionality to 3 enables us to utilze 3D plotting libraries**. 

While powerful, PCA has its tradeoffs. One downfall of PCA is that principal components are usually less interpretable than the original features that they represent. In addition, the algorithm is lossy. However, the amount of information that is lost during dimensionality can be calculated and managed.

```python

from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

# Define vector transformation helper class which takes all input features (result.columns) and creates an additional column containing all input features in a single column format wrapped in "DenseVector" objects
assembler = VectorAssembler(inputCols=result.columns, outputCol = "features")

# Transform the data
features = assembler.transform(result)

# Prepare algorithm and then apply PCA, desired number of features = 3
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(features)

# Transform the data
result_pca = model.transform(features).select("pcaFeatures")
```