# Large Scale Data Management

In diesem Kapitel geht es insbesondere darum, die Verfahren und Datenbankoperationen, die wir bisher kennengelernt haben, hinsichtlich paralleler Verarbeitung zu betrachten und auch die Kostenelemente, die dann eine Rolle spielen. 

Beim Large Scale Data Management geht es um sehr große Datenmengen. Da reicht es dann nicht mehr, nur eine Datenbank zu haben, man muss nun auch über die Verteilung, Server und Nebenläufigkeiten nachdenken.



<img src="pictures/Large-scale-Data-Management.png" alt="Large-scale-Data-Management" width="500" style="background-color: white;"/>

<br>

Zur Wiederholung einmal die Frage: Was ist Big Data? Big Data wird anhand von Dimensionen spezifiziert- die sogenannten V's: **Volume** (Menge von Daten), **Velocity** (Schnelligkeit der Datenverarbeitung), **Variety** (Heterogenität der Daten), **Verocity** (Daten, bei denen die Korrektheit ungewiss ist) und **Value** (die Wertigkeit der Daten).

Nun gibt es Big Data in zwei Varianten - **Operational** und **Analytic**. In der ersten Variante geht es um operationelle Sachen, also dem Transaktionsmanagement. In der zweiten Variante geht es darum, Daten zu analysieren, Insights aus Daten herzustellen und neue Erkenntnisse zu gewinnen.

Zur Verdeutlichung, über was für Datenmengen wir bei Big Data reden:

Google ist ein klassisches Beispiel für ein Datenproduzierendes und -verwaltendes Unternehmen. Dort werden jeden Tag 20 PB an Daten verarbeitet. Das sind Billionen von Zeilen, Tausende/Millionen Spalten und Tabellen, aber auch strukturierte Daten wie Text, Bilder und Videos. Würde man versuchen, diese 20 PB mit 50 MB/s zu lesen, würde das 12 Jahre dauern. Aus diesem Grund werden die Daten partioniert und verteilt verarbeitet. 



## Key enabler: Virtulization

Die beiden Varianten Operational und Analytic lassen sich mit der Virtualization managen. Hierbei versucht man entweder ein logisches System auf viele physische Systeme (Load Balancing) oder andersherum mehrere logische Systeme auf ein physisches System abzubilden (Multy-Tenancy).


<img src="pictures/Virtualization.png" alt="Virtualization" width="500" style="background-color: white;"/>


## Parallel Data Processing

<img src="pictures/Overview_3.png" alt="Overview_3" width="500" style="background-color: white;"/>

### Was bisher geschah: Serielle Verarbeitung/Single Threaded

Bisher haben wir immer von einem Computer mit mehreren Festplatten gesprochen und damit auch ein wenig über parallele Plattenzugriffe. Diese hatten insbesondere auch immer nur einen Kern. Das heißt, bei jeder Operation wurden die Blöcke nacheinander durch nur einen Kern abgearbeitet. Außerdem spielten auch Synchronisation und Kommunikation keine Rolle, da Anfragen in nur einem Thread bearbeitet wurden. Dies wollen wir nun erweitern.

<img src="pictures/serial-single-threaded.png" alt="serial-single-threaded" width="500" style="background-color: white;"/>

### Was wir verschwiegen haben...

Das Datenvolumen wächst stetig. Data Warehouses mit 1 EB sind nicht untypisch. Manche Organisationen produzieren täglich mehr als 1 PB an neuen Daten. Das entspricht 1.000.000.000.000.000 Byte (1 quadrillion).
Manche Systeme, wie beispielsweise Finanzinstitute, Onlineshops und soziale Netzwerke, haben einen sehr hohen Durchsatz (throughput) von Transaktionen. 
Deshalb ist es wichtig zu überlegen, wie die Zugriffe über die Netzwerke verteilt werden. 
Auch Analyseanfragen werden immer komplexer. Eine statistische Mustererkennung ist teuer und über die Daten muss mehrfach iteriert werden. Da reicht eine Single-CPU- oder Single-Node-Architektur nicht mehr aus und auch Moore's Law ist hier nicht mehr anwendbar. Die Lösung: **Parallele Datenverarbeitung**.


### Prominent user of parallel data processing:

* Big Tech (Google, Facebook)
    * Estimated 450,000 low-cost commodity servers in 2006
    * In 2005 Google indexed 8 billion web pages
    * Over 200 GFS clusters at Google. 1 cluster: 1,000 – 5,000 machines.
    * Pools of tens of thousands of machines retrieve data from GFS clusters that run as large as 5 peta-bytes of storage.
    * Aggregate read/write throughput up to 40 gigabytes/second across the cluster.
    * 6,000 MapReduce applications

* Manufacturing (Volkswagen)
    * the average connected vehicle will generate 280 petabytes of data annually
    * with four terabytes of data being generated in a day at the very least.
    * The research also states that around 470 million connected vehicles will be deployed by 2025
 
* Gene Sequencing
  
* Sensor data processing

Amazon Aurora – On Prime Day, 5,326 database instances running the PostgreSQL-compatible and MySQL-compatible editions of Amazon Aurora processed 288 billion transactions, stored 1,849 terabytes of data, and transferred 749 terabytes of data.

Amazon DynamoDB – DynamoDB powers multiple high-traffic Amazon properties and systems including Alexa, the Amazon sites, and all Amazon fulfillment centers. Over the course of Prime Day, these sources made trillions of calls to the DynamoDB API. DynamoDB maintained high availability while delivering single-digit millisecond responses and peaking at 105.2 million requests per second.

### Basics of Parallel Processing

* Parallel Speedup
    * Ahmdal‘s Law
* Levels of Parallelism
    * Instruction-level, data-level, task-level
* Modes of Query Parallelism
    * Inter-Query
        * Multi-user ability
    * Intra-Query
        * Pipeline (Inter Operator) / Data (Intra Operator)


### Parallel Speedup – Amdahl‘s law

* Speedup: Sp = T1/Tp
    * T1: Sequential runtime (1 processor)
    * Tp: Parallel runtime (p processors)
* Amdahl‘s law: Maximal speedup is determined by non-parallelizable part of program
    * where f is parallelizable fraction of program
    * Ideal speedup: S = p for f = 1 (linear speedup)
    * Usually f < 1 => S is bound by constant
        * For f = 0,9 and 10 / 20 servers: <br>
  and 
        * For p ∞,  10
    * Fixed problems can only be parallelized to a certain degree


### Parallel Speedup

<img src="pictures/Parallel-Speedup.png" alt="Parallel-Speedup" width="500" style="background-color: white;"/>

### Levels of Parallelism on Hardware

* Instruction-level Parallelism (Prozessoranweisungen)
    * Single instructions are automatically processed in parallel
    * Example: Modern CPUs with multiple pipelines and instruction units.

* Data Parallelism (Daten)
    * Different data can be processed independently
    * Each processor executes the same operations on its share of the input data.
    * Example: Distributing loop iterations over multiple processors
    * Example: GPU processing

* Task Parallelism (Aufgaben)
    * Different tasks are distributed among the processors/nodes
    * Each processor executes a different thread/process.
    * Example: Threaded programs

### Modes of Query Parallelism

* Inter-Query Parallelism (multiple concurrent queries)
    * Necessary for efficient resource utilization: While one query waits, e.g., for I/O, another one executes
    * Requires concurrency control (locking mechanisms) to guarantee transactional properties (the "I" in ACID)
    * Important for highly transactional scenarios (OLTP)

* Intra-Query Parallelism (parallel processing of a single query)
    * I/O Parallelism: Concurrent reading from multiple disks
        * Hidden: Hardware RAID
        * Transparent: Spanned tablespaces / partitioning
    * Intra-Operator Parallelism: Multiple threads work on the same operator. Example: Parallel Sort
    * Inter-Operator Parallelism: Multiple parts of the plan run in parallel (pipeline)
    * Important for complex analytical tasks (OLAP)



### Pipeline Parallelism

<img src="pictures/Pipeline-Parallelism.png" alt="Pipeline-Parallelism" width="500" style="background-color: white;"/>


* Pipeline parallelism
    * aka inter-operator parallelism: parallelism is between the operators

* In addition: Execute multiple pipelines simultaneously
    * Limited in its applicability, only if multiple pipelines are present and not dependent on each other

* Problem:
    * High synchronization overhead
    * Mostly limited to lower degree of parallelism (not too many pipelines per query)
    * Only suited for shared-memory architectures

### Data Parallelism

* Pipeline parallelism is not always applicable 
 data parallelism
* Divide data into several sub-sets
    * Most operations do not need a complete view of the data
        * E.g., selection looks only at a single tuple at a time
    * Subsets can be are processed independently and hence in parallel.
* Maximum degree of parallelism as high as the number of possible subsets
    * For selection: As high as the number of tuples

* Some operations possibly need a view of larger portions of the data
    * E.g., grouping/aggregation operation needs all tuples with the same grouping key
    * Are they all in the same subset? Can we guarantee that?
    * Different operators need different sets
 
### Basics of Parallel Query Processing

* Levels of Resource Sharing
    * Shared-Memory, Shared-Disk, Shared-Nothing

* Data Partitioning
    * Round-robin, Hash, Range

* Parallel Operators and Costs
    * Tuple-at-a-time (e.g. Selection)
    * Sorting
    * Projection, Grouping, Aggregation
    * Join

### Parallel Architectures – Shared Memory

* Several CPUs share a single memory and disk (array)
* Communication over a single common bus
* In practice: Some private memory per processor
    * NUMA (non-uniform memory access)

<img src="pictures/shared-memory.png" alt="shared-memory" width="500" style="background-color: white;"/>

### Parallel Architectures – Shared Disk

* Several nodes with multiple CPUs, each node has its private memory
* Single attached disk (array): Often NAS, SAN, etc…


<img src="pictures/shared-disk.png" alt="shared-disk" width="500" style="background-color: white;"/>

### Parallel Architectures – Shared Nothing

* Each node has it own set of CPUs, memory and disks attached
* Most commonly use architecture for large-scale data management
* Data needs to be partitioned over the nodes
* Data is exchanged through direct node-to-node communication
    * Messages with significant overhead


<img src="pictures/shared-nothing.png" alt="shared-nothing" width="500" style="background-color: white;"/>

### Data Partitioning

* Partitioning the data means creating a set of possibly disjoint
sub-sets
    * Example: Sales data, every year gets its own partition
* For shared-nothing, data must be partitioned across nodes
    * If it were replicated, it would effectively become a shared-disk with the local disks acting like a cache (must be kept coherent during updates!)

* Partitioning with certain characteristics is beneficial
    * Some queries can be limited to operate on certain parts only
        * If provable that all relevant data (passing the predicates) is in that partition
    * Database administration: Partition can be simply dropped as a whole when it is no longer needed (e.g., discard old sales)

### Data Partitioning Strategies

* Round robin
    * Each partition gets a tuple in a round
    * All sets are guaranteed to consist of an equal amount of tuples
    * No apparent relationship between tuples in one partition
* Hash Partitioned
    * Define a set of partitioning columns.
    * Generate a hash value over those columns to decide the target set.
    * All tuples with equal values in the partitioning columns are in the same partition.
* Range Partitioned
    * Define a set of partitioning columns
    * Split the domain of those columns into ranges
    * Range determines the target set. All tuples in one partition are in the same range.


### Data Parallelism: Example

* Client sends a SQL query to one of the cluster nodes
    * Node becomes the
"coordinator"

* Coordinator compiles query
    * Parsing, checking, optimization
    * Parallelization

* Sends partial plans to the other cluster nodes
    * Coordinator also executes the partial plan on his part of the data
* Coordinator collects partial results and finalizes them 


<img src="pictures/Data-Parallelism-example.png" alt="Data-Parallelism-example" width="500" style="background-color: white;"/>

### Data Parallelism – Example

* For shared-nothing & shared-disk
    * Multiple instances of a sub-plan are executed on different computers
    * The instances operate on different splits or partitions of the data
    * At some point, results from the sub-plans are collected
    * For more complex queries, results are not collected but re-distributed, for further parallel processing


<img src="pictures/Data-Parallelism-example_2.png" alt="Data-Parallelism-example_2" width="500" style="background-color: white;"/>

### Parallel Operators

* Ideally: Operate as much as possible on individual partitions of the data
    * Ship operation to data
* Easy for simple “per-tuple” operators
    * Scan, index-scan, selection
* Problem: Some operators need the whole picture (blocking operators)
    * Sort and aggregations can only be preprocessed in parallel and need a final step on a single node.
        * Unless they occur in a correlated subplan known to contain only tuples from one partition.
    * E.g., joins need matching tuples. 
        * Either organize the inputs accordingly,
        * or join at the coordinator after collection of partial results (not parallel any more!)

### Notations and Assumptions

* S	Relation S
* S[i,h]	Partition i of relation S according to partitioning scheme h
* B(S)	Number of blocks of relation S
* p	Number of nodes

* Assume a shared-nothing architecture
    * Most commercial database vendors use shared-nothing approaches.
* Network transfer is at least as expensive as disk access
    * In some cost models network is still far more expensive.
    * Today network bandwidth ≈ disk bandwidth
    * But: Network is shared
        * Switches and routers have a throughput limit
* Assume partitioning schemes (hash/range) produce partitions of roughly equal size.
* Assume S[i,h] > M



### Parallel Selection / Projection

* Selection and projection can be parallelized very efficiently
    * “Embarrassingly parallel” problem

* Each node performs the selection on its existing local partition.
    * Selection needs no context
    * Data can be partitioned in an arbitrary way

* Partial results are unioned afterwards.

* Cost:	B(S)/p	+ transfer (depends on selectivity)

### Parallel Grouping & Aggregation

* Two phases
    1. Local grouping & aggregation to each partition
    2. Merge results

* Cost: 3 B(S)/p local algorithm + transfer of (small) results + (fast) merge

* Works only for associative aggregation functions
    * MIN, MAX, SUM, COUNT
    * AVG: Use SUM / COUNT

* To avoid possibly expensive second phase:
    * Use hashing function on group-columns to re-partition relation onto nodes
    * Or: Parallelization of merge phase
 
### Parallel Sorting

* Range partitioned sort: partition by range, then sort
    * Range-partition the relation according to the sort column(s)
    * Sort the single partitions locally (e.g., by TPMMS)
    * Cost: B(S) partitioning + B(S) transfer + 3 B(S)/p local sorting
    * Problem: Find a uniform range partitioning scheme
        * Partitions of same/similar size

* Parallel external sort-merge: sort locally, then merge
    * Reuse an existing data partitioning
    * Partitions are sorted locally (e.g. by TPMMS)
    * Sorted partitions need to be merged
    * Pair-wise with cost: 3 B(S)/p local sorting + log2(p)*B(S)/2 transfer + log2(p)*B(S) local merge
    * Or multi-way merge
 
### Symmetric Fragment-and-Replicate Join

* Joining two relations R and S requires looking at every tuple of the Cartesian product. 
* Parallel databases need to combine every partition of R with every partition of S.
* Symmetric Fragment-and-Replicate (or Broadcast) Join:
* Given  nodes
* Fragment R into m and S into n partitions
* Replicate the fragments onto the nodes
* Each fragment of R is replicated n times
* Each fragment of S is replicated m times
* Each node locally joins exactly one fragment pair of R and S.
* Cost: 		fragmentation cost <br>
        		transfer cost <br>
        	???		local join cost
* Only parallel join type that works for all join predicates (Theta-Join).

### Symmetric Fragment-and-Replicate Join

<img src="pictures/Symmetric-Fragment-and-Replicate-Join.png" alt="Symmetric-Fragment-and-Replicate-Join" width="500" style="background-color: white;"/>

### Asymmetric Fragment-and-Replicate Join

* We can do better, if relation S is much smaller than R.

* Idea: Reuse the existing partitioning of R and replicate the whole relation S to each node.

* Cost:	p * B(S)	transport <br>
	???		local join

* Asymmetric Fragment-and-replicate join is a special case of the Symmetric Algorithm with m=p and n=1.

<img src="pictures/Asymmetric-Fragment-and-Replicate-Join.png" alt="Asymmetric-Fragment-and-Replicate-Join" width="500" style="background-color: white;"/>

### Parallel Equi-Joins (I)

* A special class of joins that are more suited for parallelization are natural- and equi-joins.

* Idea: Partition relations R and S using the same partition scheme over the join key.
    * All tuples of R and S with the same join key end up at the same node.
    * No further broadcast is needed, all joins can be performed locally.

* Actual implementation depends on how the relations are partitioned:
    * Co-Located Join
    * Directed Join
    * Re-Partitioning Join
 
### Parallel Equi-Joins: Three cases

1. Both R and S are already partitioned over the join key with the same partitioning scheme
    - „Co-Located Join“
    - No re-partitioning is needed!
    - Cost: 	???		Local join cost
2. Only one relation is partitioned over the join key:
    - „Directed Join“
    - Re-Partition the other relation with same partitioning scheme.
    - Cost (assuming R is already partitioned):	
    	- B(S) 		partitioning
        - B(S)		transfer 
    	- ???		Local join cost
3. No relation is partitioned over the join key: 	
    - „Repartition Join“
    - Re-Partition both relations over the join key
    - Cost:	B(S)+B(R) 	partitioning
        - B(S)+B(R)	transfer
        - ???		Local join cost
     
### Limits in Parallel Databases

* Database clusters tend to scale until 64 or 128 nodes
    * Afterwards the speedup curve flattens
    * Communication overhead eats speedup
    * Hard limit example: 1000 nodes for DB2 (2010)

* Shared Disk: Does not scale infinitely; bus and synchronization become overhead
    * For updates: Cache Coherency Problem
    * For reads: I/O Bandwidth Limits

* Shared Nothing: Cannot compensate loss of a node easily
    * In large clusters, failures and outages are most common.
    * Loss of a node means loss of data!
    * Unless: Data is replicated. But: Replicated data must be kept consistent! Has a high overhead…

### Where traditional databases are unsuitable

* Analysis over raw (unstructured) data
    * Text processing
    * In general: If relation schema does not fit
* Where cost-effective scalability is required
    * Use commodity hardware
    * Adaptive cluster size (horizontal scaling)
    * Incremental growth: add computers without expensive reorganization that halts the system
* In unreliable (= large) infrastructures
    * Must be able to deal with failures – hardware, software, network
        * Failure is expected rather than the exception
    * Transparent to applications
        * Too expensive to build reliability into each application
     
### Example Use Case: Web Index Creation

* A Search Engine scenario:
    * Have crawled the internet and stored the relevant documents
    * Documents contain words 	(Doc-URL, [list of words])
    * Documents contain links   	(Doc-URL, [Target-URLs])
* Need to build a search index
    * Invert the files 		(word, [list of URLs])
    * Compute a ranking that requires an inverted graph: 
				(Doc-URL, [URLs-pointing-to-it])
* Obvious reasons against relational databases
    * Relational schema is unsuitable/”unnatural”
    * Importing the documents, converting them to the storage format is expensive
* A mismatch between what databases were designed for and what is really needed:
    * Databases come originally from transactional processing. They give hard guarantees about absolute consistencies in the case of concurrent updates.
    * Analytics are added on top of that
    * The documents are never updated, they are read only.
    * Perfect transactional consistency is not always necessary
 
### An ongoing Re-Design…

* Driven by companies like Google, Facebook, Yahoo, Apple, Microsoft
* Use heavily distributed system
    * Google used 450,000 low-cost commodity servers in 2006
in cluster of 1000 – 5000 nodes
* Redesign infrastructure and architectures completely with the key goal to be
    * Highly scalable
    * Tolerant of failures
* Stay generic and schema free in the data model
* Start with: Data Storage
* Next Step: Distributed Analysis

### Storage Requirements

* Extremely large files: Terabytes to Petabytes
* High availability: Data must be kept replicated
* High throughput
    * Read/write operations must not go through other servers
* No single point of failure
    * Any master must be kept redundantly
* Many different distributed file systems exist.
    * Different goals: transparency, updateability, archiving, etc…
* Google Filesystem (GFS)
    * Widely used reference architecture for high-throughput and high-availability DFS

### The Storage Model – Distributed File System

* The file system
    * Distributed across many nodes (DataNodes)
    * Provides a single namespace for the entire cluster
    * Metadata is managed on a dedicated node (NameNode)
    * Write-once-read-many access model
* Files are split into blocks
    * Typically 128 MB block size
    * Each block is replicated on multiple data nodes
* The client
    * can determine the location of blocks
    * can access data directly from the DataNode
    * over the network
* Problem: bandwidth to data
    * Scanning the data from remote storage is expensive (50MB/s remote access vs. 150-200MB/s local access)
    * Moving computation is more efficient than moving data
    * Map/Reduce framework tries to perform computations close to the data
    * Nodes have two purposes: data storage and computation

<img src="pictures/The-Storage-Model.png" alt="The-Storage-Model" width="500" style="background-color: white;"/>

### Retrieving and Analyzing Data

* Data is stored as custom records in files
    * Most generic data model that is possible
    * Key/value model
* Records are read and written with data model specific (de)serializers
* Analysis or transformation tasks must be written directly as a program
    * Not possible to generate it from a higher level statement
    * Like a query-plan that is automatically generated from SQL
* Programs must be parallel, highly scalable, fault tolerant
    * Extremely hard to program
    * Need a programming model and framework that takes care of that
    * The map/reduce model has been suggested and successfully adapted on a broad scale

### Skalierungsmuster

* Phase 0: Daten verteilen (split)
* Phase 1: Berechnungen auf Teilmengen der Daten (map)
* Phase 2: Zusammenführung der Teilmengen (reduce)
    * Gemeinsame Betrachtung zusammengehöriger Daten

* Beispiel: **Two-Phase-Multiway-Mergesort** (TPMMS)
    * Phase 1: Sortierung von Teilen der Daten
    * Phase 2: Sortierter Teillisten zusammenführen
* Beispiel: **Datenanalyse**
    * Phase 1: Gruppierung
    * Phase 2: Aggregation
* Beispiel: **Index bauen**
    * Phase 1: Teilmengen indizieren
    * Phase 2: Indizes zusammenführen

<img src="pictures/Skalierungsmuster.png" alt="Skalierungsmuster" width="500" style="background-color: white;"/>

## Map Reduce & Hadoop

### “MapReduce is a programming model and an associated implementation for processing and generating large data sets.”

<img src="pictures/MapReduce-Simplified-data-processing-on-large-clusters.png" alt="MapReduce-Simplified-data-processing-on-large-clusters" width="500" style="background-color: white;"/>

### What is Map/Reduce?

* Programming model
    * Borrows concepts from functional programming
    * Suited for parallel execution
        * Automatic parallelization & distribution of data and computational logic
    * Clean abstraction for programmers
* Functional programming influences
    * Treats computation as the evaluation of mathematical functions
    * No changes of states (no side effects)
    * Output value of a function depends only on its arguments
* Map and Reduce are higher-order functions (2nd order)
    * Take user-defined functions as argument
    * Return a function as result
    * User implements the two functions

### Grundbausteine

* Datenmodel
    * Schlüssel/Wert-Paare  
        * “key/value pairs”
        * Z.B. (int, string), oder(string, [string]), …

* MapReduce Programm
    * Input: Liste an Schlüssel/Wert-Paare 
    * Output: Liste an Werten
* Zwei Herausforderungen
    * Entwurf der Funktionen
    * Verteilte, fehlertolerante und effiziente Ausführung des Programms
 
* Nutzer definieren zwei Funktionen
    * Map:	
        * Oft nur ein Paar 
    * Reduce:	
        * Meist nur ein Wert 
        * Meist auch  im Output, dadurch Verkettung von MapReduce-Schritten

### MapReduce workflow

<img src="pictures/MapReduce-workflow.png" alt="MapReduce-workflow" width="500" style="background-color: white;"/>

#### Aufgabe: Bestimme für jedes Wort dessen Häufigkeit im Korpus

```
map(filename, line){
	  for each (word in line)
	     emit(word, 1);	 	 
}

reduce(word, numbers){
	  int sum = 0;
	  for each (value in numbers){
	    sum += value;
	  }
	  emit(word, sum);
}

```
<img src="pictures/Aufgabe-Häufigkeit-bestimmen.png" alt="Aufgabe-Häufigkeit-bestimmen" width="500" style="background-color: white;"/>

### Map Reduce Illustrated (2)

<img src="pictures/MapReduce-Illustrated-2.png" alt="MapReduce-Illustrated-2" width="500" style="background-color: white;"/>

#### Aufgabe: Bestimme Liste gemeinsamer Bekannte für jedes Personenpaar

```
map(person, friendlist){
	  for each (friend in friendlist)
      if(friend < person)
	        emit(, friendlist);
      else 
         emit(, friendlist);
}

reduce(, friendlists){
   emit(, friendlist[1] ∩ friendlist[2]);
}

```

* 2016: 1,4 Milliarden Facebook-Nutzer, durchschnittlich 155 Freunde
    * 979.999.999.300.000.000 Paare
 

<img src="pictures/Aufgabe-gemeinsame-Bekannte.png" alt="Aufgabe-gemeinsame-Bekannte" width="500" style="background-color: white;"/>


#### Example

**Friends lists:** <br>
A -> B C D <br>
B -> A C D E <br>
C -> A B D E <br>
D -> A B C E <br>
E -> B C D <br>

**After mapping:**

(A B) -> B C D <br>
(A C) -> B C D <br>
(A D) -> B C D <br>
 <br>
(A B) -> A C D E <br>
(B C) -> A C D E <br>
(B D) -> A C D E <br>
(B E) -> A C D E <br>
 <br>
(A C) -> A B D E <br>
(B C) -> A B D E <br>
(C D) -> A B D E <br>
(C E) -> A B D E <br>
 <br>
(A D) -> A B C E <br>
(B D) -> A B C E <br>
(C D) -> A B C E <br>
(D E) -> A B C E <br>
 <br>
(B E) -> B C D <br>
(C E) -> B C D <br>
(D E) -> B C D <br>

**After shuffling:** <br>
(A B) -> (A C D E) (B C D) <br>
(A C) -> (A B D E) (B C D) <br>
(A D) -> (A B C E) (B C D) <br>
(B C) -> (A B D E) (A C D E) <br>
(B D) -> (A B C E) (A C D E) <br>
(B E) -> (A C D E) (B C D) <br>
(C D) -> (A B C E) (A B D E) <br>
(C E) -> (A B D E) (B C D) <br>
(D E) -> (A B C E) (B C D) <br>

**After reducing:** <br>
(A B) -> (C D) <br>
(A C) -> (B D) <br>
(A D) -> (B C) <br>
(B C) -> (A D E) <br>
(B D) -> (A C E) <br>
(B E) -> (C D) <br>
(C D) -> (A B E) <br>
(C E) -> (B D) <br>
(D E) -> (B C) <br>


### Parallel DBMS vs. Map/Reduce

<img src="pictures/Parallel_DBMS_vs_Map_Reduce.png" alt="Parallel_DBMS_vs_Map_Reduce" width="500" style="background-color: white;"/>


### Relational Operators as Map/Reduce jobs

* **SQL Query** 

```
SELECT year, SUM(price)
FROM   sales
WHERE  area_code = “US”
GROUP BY year
```
* **Map/Reduce job:**

```
map(key, tuple) {
	  int year = YEAR(tuple.date);
	  if (tuple.area_code = “US”)
	    emit(year, {‘price’ => tuple.price });
	}

	reduce(key, tuples) {
	  double sum_price = 0;
	  foreach (tuple in tuples) {
	    sum_price += tuple.price;
	  }
	  emit(key, sum_price);
	}
```

* **Sorting with SQL Query:**

```
SELECT * 
FROM sales 
ORDER BY year
```

* **Map/Reduce job:**

```
map(key, tuple) {
	  emit(YEAR(tuple.date) div 10, tuple);
	}

	reduce(key, tuples) {
	  emit(key, sort(tuples));
	}
```

### Hadoop – A map/reduce Framework 

* Hadoop: Apache Top Level Project
    * Open source
    * Written in Java

* Hadoop provides a stack of
    * Distributed file system (HDFS) – modeled after the Google File System
    * Map/Reduce engine
    * Data processing languages (Pig Latin, Hive SQL)
    * Plus very many packages

* Runs on
    * Linux, Mac OS/X, Windows, Solaris
    * Commodity hardware

<img src="pictures/hadoop.png" alt="hadoop" width="500" style="background-color: white;"/>

### Hadoop Distributed File System (HDFS)

* Master-Slave Architecture
    * Based on GFS architecture

* HDFS Master “NameNode”
    * Manages all file system metadata
    * Transactions are logged, merged 
    * at startup
    * Controls read/write access to files
    * Manages block replication
    * Can be replicated to avoid single-point-of-failure

* HDFS Slave “DataNode”
    * Communicates with the NameNode periodically via heartbeats
    * Serves read/write requests from clients
    * Performs replication tasks upon instruction by NameNode
        * Default replication factor: 3

<img src="pictures/HDFS.png" alt="HDFS" width="500" style="background-color: white;"/>

### Hadoop Map/Reduce Engine

Jobs are executed like a Unix pipeline: <br>
* cat * | grep | sort | uniq -c | cat    > output <br>
* Input | Map  | Sort & Shuffle | Reduce | Output

Workflow
1. Input phase: generates a number of FileSplits from input files (one per Map task)
2. Map phase: executes a user function to transform input kv-pairs into a new set of kv-pairs
3. Sort & shuffle phase: sort and distribute the kv-pairs to output nodes
4. Reduce phase: combines all kv-pairs with the same key into new kv-pairs
5. Output phase writes the resulting pairs to files

All phases are distributed with many tasks doing the work
* Framework handles scheduling of tasks on cluster
* Framework handles recovery when a node fails


* Master / Slave architecture

* Map/Reduce Master: JobTracker
    * Accepts jobs submitted by clients
    * Assigns map and reduce tasks to TaskTrackers
    * Monitors execution status, re-executes tasks upon failure

* Map/Reduce Slave: TaskTracker
    * Runs map / reduce tasks upon instruction from the task tracker
    * Manage storage, sorting and transmission of intermediate output

### Hadoop Map/Reduce Engine

<img src="pictures/Hadoop-Map_Reduce-engine.png" alt="Hadoop-Map_Reduce-engine" width="500" style="background-color: white;"/>

### Fehlertoleranz

* Viele Daten  …  in langen Prozessen  …  auf vielen Maschinen


* Verteiltes Dateisystem (DFS / HDFS)
    * Speichert verteilt und fehlertolerant durch Replikation
    * Input ist redundant verfügbar
* Speicherung von Zwischenergebnissen ins DFS
    * Aufwändig, aber Fehlererholung im laufenden Prozess einfach und schnell
* Abstürze
    * Werden erkannt falls periodisches Signal ausfällt (heartbeat)
    * Neustart des Mappers oder Reducers
        * Auf anderer Maschine, mit Replikat des ursprünglichen Inputs
     
### When to use Hadoop?

* Good fit for batch processing applications that need to touch all your data:
    * Data mining
    * Model tuning
    * Text processing

* Bad fit for applications that need to find/edit one particular record
    * High overhead
    * High latency

* Bad fit for applications that need to communicate between processes
    * Hadoop is oriented around independent units of work

### In der Praxis: Komplexe (optimierte) MapReduce Workflows
* Neue (relationale) Operatoren
    * Join, Cross, Union, …
* Planoptimierung & Re-optimierung
* Scheduling & Lastbalancierung
* Cross-Plattform Ausführung

<img src="pictures/Komplexe-optimierte-MapReduce-workflows.png" alt="Komplexe-optimierte-MapReduce-workflows" width="500" style="background-color: white;"/>

### Hadoop vs. Parallel DBMS

* 2012
  
<img src="pictures/Hadoop_vs_Parallel_DBMS.png" alt="Hadoop_vs_Parallel_DBMS" width="500" style="background-color: white;"/>
  
* 2014
  
<img src="pictures/Hadoop_vs_Parallel_DBMS_2.png" alt="Hadoop_vs_Parallel_DBMS_2" width="500" style="background-color: white;"/>


<img src="pictures/Hadoop-vs-Parallel-DBMS.png" alt="Hadoop-vs-Parallel-DBMS" width="500" style="background-color: white;"/>

### In der Praxis: Viele Bibliotheken

* Startpunkt: Hadoop
    * Java, open-source
    * Basis-Bibliotheken				Common, MapReduce
    * Verteiltes Dateisystem			HDFS
    * Scheduling, Monitoring			Yarn

* Erweiterungen
    * Service- und Cluster-Verwaltung		ZooKeeper
    * Datenspeicher				HBase
    * Datenbank und Anfragesprachen		Pig, Hive, Phoenix
    * Bibliotheken für komplexe Verfahren		Mahout, Giraph, Solr
    * Datenstromverarbeitung			Kafka, Flink, Spark
    * …


<img src="pictures/Viele-Bibliotheken.png" alt="Viele-Bibliotheken" width="500" style="background-color: white;"/>

## Outlook: What about updates/transactions?

* OLTP style applications that are beyond relational databases' capabilities exist as well.
* Some applications still require fast and efficient lookup and retrieval of small amounts of data
    * Web index access, mail accounts, warehouse updates for resellers 
    * Addressed by Key/Value pair based storage systems (e.g. Google BigTable and Megastore)
    * Can access the data only through a key
    * Can apply only an additional filter on columns and timestamps
* Some applications still need updates and certain guarantees about them
    * No hard transactions, especially no multi record transactions.
    * Eventual consistency model
* See next set of slides