Universität Heidelberg
Distributed Systems I (IVS1)
Winter Semester 18/19

- Duc Anh Phi
- Michael Tabachnik
- Edgar Brotzmann

# Solutions to Problem Set 4 for lecture Distributed Systems I (IVS1)
## Due: 20.11.2018, 2pm


### Exercise 1

In [2]:
import gzip
import findspark
import re

findspark.init("/usr/local/spark")

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import unix_timestamp

In [3]:
spark = SparkSession.builder \
    .master("local") \
    .appName("Solutions to Problem Set 4 IVS1") \
    .config("spark.executor.memory", "1gb") \
    .getOrCreate()

sc = spark.sparkContext

In [32]:
def createDataframe(filename):
    with gzip.open(filename, 'rt') as f:
        # use header as row schema
        headerLine = f.readline()
        header = re.split(r'\t+', headerLine.rstrip('\t\n'))[1:]
        RowSchema = Row(*header)
        content = [row for row in f.read().split("\n") if row]
        df = sc.parallelize(content) \
            .map(lambda line: re.split(r'\t+', line.rstrip('\t\n'))[1:]) \
            .map(lambda line: RowSchema(*line)) \
            .toDF()
        
        return df

def convertColumns(df):
    df = df.withColumn('Price', df['Price'].cast(DoubleType()))
    df = df.withColumn('Timestamp', unix_timestamp('Timestamp', "yyyy-MM-dd'T'HH:mm:ssZZZZ"))
    return df

df = createDataframe("prices-ap-northeast-2-2017-11-17.txt.gz")
df = convertColumns(df)
df.groupBy('InstanceType', 'ProductDescription').avg('Price').show()

+------------+------------------+-------------------+
|InstanceType|ProductDescription|         avg(Price)|
+------------+------------------+-------------------+
|  r3.2xlarge|        Linux/UNIX| 0.6557668832731297|
|   c4.xlarge|           Windows|0.22131742268041452|
|   i3.xlarge|           Windows|0.24962053470769482|
|  p2.8xlarge|        Linux/UNIX| 117.20000000000049|
| x1.16xlarge|           Windows|  9.048597719044308|
|  i2.4xlarge|           Windows| 1.5526279912024583|
|    c4.large|           Windows|0.10950416912487713|
|  i3.8xlarge|        Linux/UNIX| 29.279999999999934|
|  r4.2xlarge|        Linux/UNIX|0.21761773442050408|
|  c4.2xlarge|           Windows|   0.42304020895996|
|  d2.4xlarge|           Windows| 1.1192076448974577|
| i3.16xlarge|        Linux/UNIX|   58.5599999999999|
|  r3.4xlarge|        Linux/UNIX| 0.5058647696038822|
|  m4.4xlarge|           Windows| 1.0006524359704119|
|  r4.2xlarge|           Windows| 0.7127052461538115|
|  i3.8xlarge|           Win

### Exercise 2

#### a)

##### What does broadcast provide?

Broadcast provide a way to share a readonly variable across multiple machines for task and operations efficiently. It sends the variable to the machines only once where it is cached respectively.

##### Which other mechanism does it improve and how?

It can reduce number of send data over the network and thus keep communication cost low. An example is shown for the join operation for two datasets. Normally you would directly join both datasets, which shuffles both over the network. A better approach would be, if one dataset is particularly small, to broadcast the smaller dataset as a map to all machines which contain the other dataset, in order to perform the join operation. This way, only the smaller dataset is send over the network.

##### Which features of the distributed program determine the number of times the variable will be actually transmitted over the network? Explain the role of tasks and nodes in this.

By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. The number of times the variable will be actually transmitted over the network depends on the number of tasks which depend on the number of partitions. However, if a variable is broadcasted, it is only send once for each node in the cluster and is going to be cached respectively.

#### b)

##### Describe the function of an accumulator.

Often, an application needs to aggregate multiple values as it progresses. 
Accumulators provide a simple syntax for aggregating values from worker nodes back to the driver program.

##### What is the alternative implementation without an accumulator and why is an accumulator a preferred option?

You could use the reduce() function alternatively.
The problem with reduce() is that you only get the result after all data has been processed. In contrast the accumulator mechanism gets real-time updates on its accumulator variable from each worker node. It can also aggregate multiple variables across multiple tasks.

##### Which example application for an accumulator is discussed in the video?

Overall count of bad records and bad bytes after filtering records. Additionally it can be used for computing averages.

##### What has to be implemented in order to define a custom accumulator?

Define an object extending AccumulatorParam\[T\], where T is your data type, and tell the system how to work with a custom data type T. Define default value to be initialized for a given T. And define an addInPlace method to merge in values.

##### Compare the accumulator mechanism to the reduce()-function

The reduce()-function collects data from all RDDs and reduces them to one value. The problem is that you only get the result after all data has been processed. In contrast the accumulator mechanism gets real-time updates on its accumulator variable from each worker node.
Additionally the reduce function is limited to one variable which is accumulated, whereas with the accumulator mechanism you can define multiple variables which you could accumulate on in parallel, even across multiple tasks.

#### c)

##### Give three examples of RDD operators that result in RDDs with partitioning. 
- join()
- mapValues()
- reduceByKey()

##### Explain the connection between partitioning and network traffic.

If data is spread across machines arbitrarily (partitions) and if the data has to come together on the same machine there is a lot of network traffic.

##### How does the modification on the pageRank example use partitioning to make the code more efficient?

Prepartition the links RDD so that links for URLs with the same hash code are on the same node. This saves future shuffling.

##### How does Spark exploit the knowledge about the partitioning to save time in task execution?

It arranges how data is spread across machines in a way, so that data with the same key is on the same machine. This reduces the amount of data which needs to be send over the network, which then saves time in task execution because the tasks receive the data quicker.

##### How can you create a custom Partitioner?

You can define your own subclass of Partitioner to leverage domain-specific knowledge. This subclass must contain three function:

- numPartitions: defines the number of partitions
- getPartition: get a partition by key
- equals: tell whether two partitions are equal

### Exercise 3


#### a)
Any lines typed in the terminal running the netcat server will be counted and printed on screen every second. The output of the streaming program does not behave as expected. It tokenizes the words without removing special characters, thus words with special characters afterwards are counted as unique new words. In the given example string, it counts 'mankind.' and 'man,' as unique words.

#### b)
The input rate is so high, that the streaming program cannot quite keep up with the throughput. The counting results are not printed every second but rather every few seconds and the printing of counting results continues even after closing the streaming program. 
After using *pv* (pipe viewer) to check the throughput of the *yes*-command I get around 300MB/s for 1 character. Assuming one byte per character the *yes*-command is called 300 million times per second.


### Exercise 4

#### 1. Describe each step of Spark execution model

##### First Step: Create DAG of RDDs to represent computation

##### Second Step: Create logical execution plan for DAG

- pipeline as much as possible
- split into "stages" based on need to reorganize data

##### Third Step: Schedule and execute individual tasks

- split each stage into tasks
- a task is data + computation
- execute all tasks within a stage before moving on
- each task is delegated to corresponding machine

#### 2. In the execution phase, Spark tries to pipeline operations as much as possible. How does pipelining affect performance? Give examples of operations that can be pipelined.

A high level of pipelining combined with multiple threads can result in high performance, because the threads can run independent and without the necessity of waiting for other data. In addition, thread-scheduling is much easier: If one task is finished, the processor can easily run the next enqueued task. In the presentation, the operations “map” and the operations “groupBy”, “mapValues” and “collect” can be pipelined.


#### 3. List the four most common issues described by Aaron. What is the recommended setting and guidelines to deal with the problems described in the talk?

1.	Ensure enough partitions for concurrency
2.	Minimize memory consumption (especially of sorting and large keys in groupBy’s)
3.	Minimize amount of data shuffled
4.	Know the standard library

So, 1. and 2. are about tuning the number of partitions: The recommended lower bound is about 2 x number of cores in the cluster and the upper bound is depending on the execution time for each task (at least 100 ms). A "reasonable number" of partitions is commonly between 100 and 10 000 partitions.

To face memory problems, you can increase spark.executor.memory or the number of partitions, or you can re-evaluate your program structure.

#### 4. Transcribe the code given in the talk into the language of your choice. Download the list of last names. Experiment with the number of partitions in the second version: what number of partition yield the fastest results on your computer?

As you can see below, we get the best result with 2 partitions.

In [4]:
import time

In [12]:
# Naive example
start_time = time.time()
sc.textFile("last-names.csv") \
    .map(lambda name: (name[0], name)) \
    .groupByKey() \
    .mapValues(lambda names: len(names)) \
    .collect()
print("took {} seconds".format(time.time() - start_time))
# took 74.23580408096313 seconds

took 74.23580408096313 seconds


In [18]:
# 'Optimized' example with 1 partition
start_time = time.time()
sc.textFile("last-names.csv") \
    .distinct(numPartitions = 1) \
    .map(lambda name: (name[0], 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .collect()
print("took {} seconds".format(time.time() - start_time))
# took 65.30063247680664 seconds


took 65.30063247680664 seconds


In [None]:
# 'Optimized' example with 2 partitions (fastest)
start_time = time.time()
sc.textFile("last-names.csv") \
    .distinct(numPartitions = 2) \
    .map(lambda name: (name[0], 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .collect()
print("took {} seconds".format(time.time() - start_time))
# took 63.04784870147705 seconds

In [None]:
# 'Optimized' example with 3 partitions
start_time = time.time()
sc.textFile("last-names.csv") \
    .distinct(numPartitions = 3) \
    .map(lambda name: (name[0], 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .collect()
print("took {} seconds".format(time.time() - start_time))
# took 64.92784357070923 seconds

In [None]:
# 'Optimized' example with 4 partitions
start_time = time.time()
sc.textFile("last-names.csv") \
    .distinct(numPartitions = 4) \
    .map(lambda name: (name[0], 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .collect()
print("took {} seconds".format(time.time() - start_time))
# took 63.1761691570282 seconds

In [None]:
# 'Optimized' example with 6 partitions
start_time = time.time()
sc.textFile("last-names.csv") \
    .distinct(numPartitions = 6) \
    .map(lambda name: (name[0], 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .collect()
print("took {} seconds".format(time.time() - start_time))
# took 64.52375602722168 seconds

In [None]:
# 'Optimized' example with 8 partitions
start_time = time.time()
sc.textFile("last-names.csv") \
    .distinct(numPartitions = 8) \
    .map(lambda name: (name[0], 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .collect()
print("took {} seconds".format(time.time() - start_time))
# took 65.26732587814331 seconds