## Perspective (recap)
- How can we parallelize this task?
- How can we make this component more modular and reusable?
- Can we profile this application and employ optimizations to improve the training or classification time or response time?

## Hive Tips and Bestpractices



### Query Analysis

- Hive provides the EXPLAIN and ANALYZE statements that can be used as utilities to check and identify the performance of queries.
- Hive statistics are a collection of data that describe more details, such as the number of rows, number of files, and raw data size, on the objects in the Hive database. Statistics is a metadata of Hive data. Hive supports statistics at the table, partition, and column level. These statistics serve as an input to the Hive Cost-Based Optimizer (CBO), which is an optimizer to pick the query plan with the lowest cost




### Partitions
Hive partitioning is one of the most effective methods to improve the query performance on larger tables. The query with partition filtering will only load the data in the specified partitions (subdirectories), so it can execute much faster than a normal query that filters by a non-partitioning field. ”

**Partitions by date and time: **
Use date and time, such as year, month, and day (even hours), as partition keys when data is associated with the time dimension

**Partitions by locations: **
Use country, territory, “state, and city as partition keys when data is location related

**Partitions by business logics:**
Use department, sales region, applications, customers, and so on as partitioned keys when data can be separated evenly by some business logic”



### Parallel execution

By default, Hadoop launches a new JVM for each map or reduce job and runs the map or reduce task in parallel. When the map or reduce job is a lightweight job running only for a few seconds, the JVM startup process could be a significant overhead.

To reuse JVM by sharing the JVM to run mapper/reducer serially instead of parallel. JVM reuse applies to map or reduce tasks in the same job. Tasks from different jobs will always run in a separate JVM. To enable the reuse, we can set the maximum number of tasks for a single job for JVM reuse using the mapred.job.reuse.jvm.num.tasks property. Its default value is 1:

jdbc:hive2://> SET mapred.job.reuse.jvm.num.tasks=5;
We can also set the value to –1 to indicate that all the tasks for a job will run in the same JVM.


Hive queries commonly are translated into a number of stages that are executed by the default sequence. These stages are not always dependent on each other. Instead, they can run in parallel to save the overall job running time. We can enable this feature with the following settings:

jdbc:hive2://> SET hive.exec.parallel=true; -- default false
jdbc:hive2://> SET hive.exec.parallel.thread.number=16; 
-- default 8, 
it defines the max number for running in parallel
Parallel execution will increase the cluster utilization. If the utilization of a cluster is already very high, parallel execution will not help much in terms of overall performance.




### User Defined Functions

Hive defines the following three types of UDF:

**UDFs:** 
These are regular user-defined functions that operate row-wise and output one result for one row, such as most built-in mathematic and string functions.

**UDAFs:** 
These are user-defined aggregating functions that operate row-wise or group-wise and output one row or one row for each group as a result, such as the MAX and COUNT built-in functions.

**UDTFs:**
These are user-defined table-generating functions that also operate row-wise, but they produce multiple rows/tables as a result, such as the EXPLODE function. UDTF can be used either after SELECT or after the LATERAL VIEW statement.


### See assignments at the bottom

## Spark Tips and Bestpractices

#### General Tips
- Don’t collect large RDDs
- Don't use count() when you don't need to return the exact number of rows
- Use coalesce to repartition in decrease number of partition

#### When to use Broadcast variable
- Joining a large and a small RDD
- Joining a large and a medium size RDD


#### Which storage level to choose
- Avoiding Shuffle "Less stage, run faster"
- Use the right level of parallelism


#### Serialization

Serialization plays an important role in the performance of any distributed application. Formats that are slow to serialize objects into, or consume a large number of bytes, will greatly slow down the computation. Often, this will be the first thing you should tune to optimize a Spark application. The Java default serializer has very mediocre performance regarding runtime as well as the size of its results. Therefore the Spark team recommends to use the Kryo serializer instead.

## Assignments (Hive)

### Creating and uploading a virtual environment -- Hive

In [None]:
import sys
import logging
from itertools import groupby
from operator import itemgetter
import numpy as np
import pandas as pd

SEP = '\t'
NULL = '\\N'

_logger = logging.getLogger(__name__)


def read_input(input_data):
    for line in input_data:
        yield line.strip().split(SEP)


def main():
    logging.basicConfig(level=logging.INFO, stream=sys.stderr)
    data = read_input(sys.stdin)
    ## TODO: Your function invocation goes here
    ## print the output don't return it


if __name__ == '__main__':
    main()

#### Create a bash script to load the virtual environment and the python script

#### Move scripts and udf's to hdfs

#### Execution

### Spark assignments

In [None]:
### Assingment 1
Take any data set and perform a data shuffle and split the data into two parts (90/10 split for training/testing).

### Solution
using randomSplit function
weights = [.8, .2]
seed = 42 
# Use randomSplit with weights and seed
rawTrainData, rawTestData = data.randomSplit(weights, seed)

or 

val Array(training, test) = data.randomSplit(Array(0.9, 0.1))
