In [1]:
%%javascript
$.getScript('http://asimjalis.github.io/ipyn-ext/js/ipyn-present.js')

<IPython.core.display.Javascript object>

<h1 class="tocheading">Lambda Architecture, Part 2</h1>
<div id="toc"></div>

<i>N.B. blockquotes (and nearly all images) in this notebook are excerpts from [Big Data](https://www.manning.com/books/big-data) by Nathan Marz.</i>

Serving Layer
===============================

By the end of this lesson, you should be able to:

* Tailor batch views to the queries they serve
* Provide a new answer to the data-normalization versus denormalization debate
* Discuss the advantages of batch-writable, random-read, and no random-write databases
* Contrast a Lambda Architecture solution with a fully incremental solution

Computing on the batch layer
---------------------------------------------------------------------
**The serving layer provides low-latency access to the results of calculations performed on the master dataset.**  
![Lambda Architecture diagram](http://fr.talend.com/sites/default/files/hadoop_summit_2015_takeaway_the_lambda_architecture-picture_1.png)

> While investigating the serving layer, you’ll learn the following:
>
* Indexing strategies to minimize latency, resource usage, and variance
* The requirements for the serving layer in the Lambda Architecture
* How the serving layer solves the long-debated normalization versus denormalization problem

## Performance metrics for the serving layer

* _latency_ - the time required to answer a single query
* _throughput_ - the number of queries that can be served within a given period of time

### Scatter/Gather

> Suppose a query requires fetching data from three servers...
>
![Figure 10.3](images/10fig03_alt.jpg)
**When distributing a task over multiple servers, the overall latency is determined by the slowest server response time.**

> In general, the more servers a query touches, the higher the overall latency of the query.  
>
![Figure 10.4](images/10fig04_alt.jpg)
**If you increase the number of servers involved in a distributed task, you also increase the likelihood that at least one will respond slowly.**

### A different indexing strategy

> Collocate the pageview information for a single URL on the same partition and store it sequentially. Fetching the pageviews will then only require a single seek and scan rather than numerous seeks.  
>
![Figure 10.5](images/10fig05_alt.jpg)
**A sorted index promotes scans and limits disk seeks to improve both latency and throughput.**

## The serving layer solution to the normalization/denormalization problem
### A normalized schema uses multiple independent datasets with little or no redundant data.

| User ID | Name | Location ID |
|:-:| ------ |:-:|
| 1 | Sally  | 3 |
| 2 | George | 1 |
| 3 | Bob    | 3 |

| Location ID | City | State | Population |
|:------:| ---- | -- | ----:|
| 1 | New York  | NY | 8.2M |
| 2 | San Diego | CA | 1.3M |
| 3 | Chicago   | IL | 2.7M |

### Denormalized tables store data redundantly to improve query performance.

| User ID | Name | Location ID | City | State |
|:-----:| ------ |:--:| ---- | -- |
| 1 | Sally  | 3 | Chicago   | IL | 
| 2 | George | 1 | New York  | NY |
| 3 | Bob    | 3 | Chicago   | IL | 

| Location ID | City | State | Population |
|:------:| ---- | -- | ----:|
| 1 | New York  | NY | 8.2M |
| 2 | San Diego | CA | 1.3M |
| 3 | Chicago   | IL | 2.7M |

The master dataset should be normalized to maintain consistency but the batch views may be denormalized for query efficiency. Since the batch views are _views_, this does not violate the principles of normaliziation.

## Requirements for a serving layer database
> 
* *Batch writable*—The batch views for a serving layer are produced from scratch. When a new version of a view becomes available, it must be possible to completely swap out the older version with the updated view.
* *Scalable*—A serving layer database must be capable of handling views of arbitrary size. As with the distributed filesystems and batch computation framework previously discussed, this requires it to be distributed across multiple machines.
* *Random reads*—A serving layer database must support random reads, with indexes providing direct access to small portions of the view. This requirement is necessary to have low latency on queries.
* *Fault-tolerant*—Because a serving layer database is distributed, it must be tolerant of machine failures.

## Designing a serving layer for SuperWebAnalytics.com

In [2]:
import pyspark
import simplejson as json

from pyspark.sql.types import *
from pyspark.sql.functions import approxCountDistinct  # a.k.a. HyperLogLog

try:  # only one SparkContext can exist at a time
    if not sc:
        sc = pyspark.SparkContext()
except NameError:
    sc = pyspark.SparkContext()

sqlContext = pyspark.HiveContext(sc)

In [3]:
# Preprocessed batch view pickled for your convenience:

distinct_normalized_page_views = sc.pickleFile("data/distinct_normalized_page_views")

### Pageviews over time

In [4]:
granularity = {'h': 60 * 60}
granularity['d'] = granularity['h'] * 24 # days in UTC (GMT) timezone
granularity['w'] = granularity['d'] * 7
granularity['m'] = granularity['w'] * 4  # 1 month = 28 days, not 1 calendar month
granularity

{'d': 86400, 'h': 3600, 'm': 2419200, 'w': 604800}

In [5]:
def gen_granular(granual):
    def func(datum): 
        url = datum['dataunit']['page_view']['page']['url']
        true_as_of_secs = datum['pedigree']['true_as_of_secs']
        return (url, granual, true_as_of_secs - true_as_of_secs % granularity[granual])
    return func

In [6]:
page_view_schema = StructType([
    StructField('url', StringType() ,True),
    StructField('granularity', StringType(), True),
    StructField('ts', IntegerType(), True)
    ])

In [7]:
page_view_by_hour = sqlContext.createDataFrame(distinct_normalized_page_views.map(gen_granular('h')), 
                                               page_view_schema)

In [8]:
hourly_page_views = page_view_by_hour.groupBy(['url', 'granularity', 'ts']).count()

In [9]:
hourly_page_views.registerTempTable('hourly_page_views')
sqlContext.cacheTable('hourly_page_views')

# N.B. This won't work if you want your days to be in any timezone other than UTC
daily_page_views = sqlContext.sql("""SELECT url, 'd' AS granularity, ts - ts % {} AS ts, sum(count) AS count 
                                FROM hourly_page_views 
                                GROUP BY url, ts - ts % {}""".format(granularity['d'], granularity['d']))

#### Exercise 1

How would you alter the above code to adjust for the Pacific timezone? (Hint: due to daylight savings time, you will probably want to use a timezone aware library like `pytz`.)

In [None]:
# write code here







In [10]:
page_view_union = hourly_page_views.unionAll(daily_page_views).orderBy('url')
page_view_union.show()

+---------------+-----------+----------+------+
|            url|granularity|        ts| count|
+---------------+-----------+----------+------+
|    mysite.com/|          h|1438912800|300054|
|    mysite.com/|          h|1438624800|101212|
|    mysite.com/|          d|1438905600|300054|
|    mysite.com/|          d|1438560000|101212|
|mysite.com/blog|          h|1438912800|899778|
|mysite.com/blog|          h|1438624800|101732|
|mysite.com/blog|          d|1438560000|101732|
|mysite.com/blog|          d|1438905600|899778|
+---------------+-----------+----------+------+



In [11]:
page_view_union.write.parquet("data/page_views", mode='overwrite')

### Uniques over time
<blockquote><p><b><i>Index design for uniques over time. Although the index keys are a compound of URL and granularity, indexes are partitioned
         between servers solely by the URL.
      </i></b></p>
      
      <p class="center1"><img src="images/10fig08_alt.jpg" alt="" class="calibre2"/></p>
      
      
      <p class="noind"><a id="iddle1159" class="calibre4"></a><a id="iddle1327" class="calibre4"></a><a id="iddle1603" class="calibre4"></a><a id="iddle1609" class="calibre4"></a><a id="iddle1718" class="calibre4"></a>In this case, an index like that represented [above] seems optimal. It’s the same key-to-sorted-map index as was used for pageviews over time, but with two differences:
      </p>
      
      <p class="calibre16"></p>
      <ul class="calibre17">
         
         <li class="calibre18">The key is a compound key of URL and granularity.
            
         </li>
         
         <li class="calibre18">The indexes are partitioned solely by the URL, not by both the URL and granularity. To retrieve a range of values for a URL
            and granularity, you’d use the URL to find the server containing the information you need, and then use both the URL and granularity
            to look up the values you’re interested in. Partitioning by just the URL ensures that all buckets for a URL are collocated
            on the same server and avoids any variance issues from having to interact with many servers for a single query.
            
         </li>
         
      </ul></blockquote>

In [12]:
def user_row(granual):
    import simplejson as json
    granularity = {'h': 60 * 60}
    granularity['d'] = granularity['h'] * 24 # days in UTC (GMT) timezone
    granularity['w'] = granularity['d'] * 7
    granularity['m'] = granularity['w'] * 4  # 1 month = 28 days, not 1 calendar month
    def func(datum):
        url = datum['dataunit']['page_view']['page']['url']
        true_as_of_secs = datum['pedigree']['true_as_of_secs']
        person = json.dumps(datum['dataunit']['page_view']['person'])
        return (url, granual, true_as_of_secs - true_as_of_secs % granularity[granual], person)
    return func

visit_schema = StructType([
    StructField('url', StringType() ,True),
    StructField('granularity', StringType(), True),
    StructField('ts', IntegerType(), True),
    StructField('person', StringType(), True)
    ])

In [13]:
hourly_visits = sqlContext.createDataFrame(distinct_normalized_page_views.map(user_row('h')),
                                           visit_schema)
hourly_visits.head(5)

[Row(url=u'mysite.com/blog', granularity=u'h', ts=1438912800, person=u'{"cookie": "PQRST"}'),
 Row(url=u'mysite.com/blog', granularity=u'h', ts=1438912800, person=u'{"user_id": 5693}'),
 Row(url=u'mysite.com/blog', granularity=u'h', ts=1438912800, person=u'{"user_id": 6257}'),
 Row(url=u'mysite.com/', granularity=u'h', ts=1438912800, person=u'{"user_id": 765}'),
 Row(url=u'mysite.com/blog', granularity=u'h', ts=1438912800, person=u'{"user_id": 7132}')]

In [14]:
hourly_visits.write.parquet('data/hourly_visits/', mode='overwrite')

In [15]:
visits_by_hour = hourly_visits.groupBy(['url', 'granularity', 'ts'])\
                              .agg(approxCountDistinct('person').alias('approx_uniques'))

#### Exercise 2

Create view of visits by day. 

Bonus: Adjust for the Pacific timezone.

In [22]:
# write code here







#### Exercise 3

1. Merge results with those above and write the results to parquet files in `data/visits`.
2. Group by `['url', 'granularity', 'ts']` and run `approxCountDistinct`. The results should look like:  

```
    +---------------+-----------+----------+--------------+
    |            url|granularity|        ts|approx_uniques|
    +---------------+-----------+----------+--------------+
    |    mysite.com/|          d|1438560000|         10874|
    |    mysite.com/|          d|1438905600|         10893|
    |    mysite.com/|          h|1438624800|         10874|
    |    mysite.com/|          h|1438912800|         10893|
    |mysite.com/blog|          h|1438912800|         10893|
    |mysite.com/blog|          h|1438624800|         10776|
    |mysite.com/blog|          d|1438560000|         10776|
    |mysite.com/blog|          d|1438905600|         10893|
    +---------------+-----------+----------+--------------+
```

Bonus: Use this to query from August 3, 3am until August 6, 3am.

Hint: ![Figure 8.3](images/08fig03_alt.jpg)  
You may want to fill in more hours to make sure your code works correctly. These data are just a small sample to get you started and only include 2 hours, so be careful.

In [24]:
# write code here







#### User-identifier normalization

> User IDs are marked as belonging to the same person via equiv edges. If you were to visualize these edges from a dataset, you’d see numerous independent subgraphs 
. . . 
> Each subgraph represents a unique user.
>  
![Figure 9.3](images/09fig03_alt.jpg)  

Such graphs can be obtained using [GraphX](http://spark.apache.org/graphx/) (not yet included in PySpark)

#### Handling equivs on the read-side workflow
> 
> ![Figure 10.11](images/10fig11_alt.jpg)
> 
1. First, retrieve every UserID set for every hour in the range, and merge them.
2. Convert the set of UserIDs to a set of PersonIDs by using the UserID-to-PersonID index.
3. Return the count of the PersonID set.

Speed Layer
===============================

By the end of this lesson, you should be able to:

* Enqueue page-views in Kafka
* Dedupe and normalize using Spark Streaming
* Store Pageviews over time in HBase
* Expire data in HBase as appropriate

> This section covers:
>  
* The theoretical model of the speed layer
* How the batch layer eases the responsibilities of the speed layer
* Using random-write databases for realtime views
* The CAP theorem and its implications
* The challenges of incremental computation
* Expiring data from the speed layer

> **The speed layer allows the Lambda Architecture to serve low-latency queries over up-to-date data.**

Computing realtime views
---------------------------------------------------------------------

### Strategy: realtime view = function(recent data)

![Figure 12.2](images/12fig02_alt.jpg)

### Incremental strategy: realtime view = function(new data, previous realtime view)

![Figure 12.3](images/12fig03_alt.jpg)


Storing realtime views
---------------------------------------------------------------------
> 
* *Random reads*—A realtime view should support fast random reads to answer queries quickly. This means the data it contains must be indexed.
* *Random writes*—To support incremental algorithms, it must also be possible to modify a realtime view with low latency.
* *Scalability*—As with the serving layer views, the realtime views should scale with the amount of data they store and the read/write rates required by the applica- tion. Typically this implies that realtime views can be distributed across many machines.
* *Fault tolerance*—If a disk or a machine crashes, a realtime view should continue to function normally. Fault tolerance is accomplished by replicating data across machines so there are backups should a single machine fail.

### Eventual accuracy

> Because all data is eventually represented in the batch and serving layer views, any approximations you make in the speed layer are continually corrected.

### Amount of state stored in the speed layer
> 
* *Online compaction*—As a read/write database receives updates, parts of the disk index become unused, wasted space. Periodically the database must perform compaction to reclaim space. Compaction is a resource-intensive process and could potentially starve the machine of resources needed to rapidly serve queries. Improper management of compaction can cause a cascading failure of the entire cluster.
* *Concurrency*—A read/write database can potentially receive many reads or writes for the same value at the same time. It therefore needs to coordinate these reads and writes to prevent returning stale or inconsistent values. Sharing mutable state across threads is a notoriously complex problem, and control strategies such as locking are notoriously bug-prone.

Challenges of incremental computation
---------------------------------------------------------------------

### Validity of the CAP theorem

![Figure 12.4](images/12fig04_alt.jpg)

**Replicas can diverge if updates are allowed under partitions.**

### The complex interaction between the CAP theorem and incremental algorithms

**conflict-free replicated data types (*CRDT*s)**

#### A G-Counter is a grow-only counter where a replica only increments its assigned counter. 

![Figure 12.5](images/12fig05_alt.jpg)

_N.B._ G-Counter is implemented as an [`accumulator`](http://spark.apache.org/docs/latest/programming-guide.html#accumulators-a-nameaccumlinka) in Spark

**The overall value of the accumulator is the sum of the replica counts.**

#### Merging G-Counters

![Figure 12.6](images/12fig06_alt.jpg)

Asynchronous versus synchronous updates
---------------------------------------------------------------------

### A simple speed layer architecture using synchronous updates

![Figure 12.7](images/12fig07.jpg)

### Asynchronous updates provide higher throughput and readily handle variable loads.

![Figure 12.8](images/12fig08_alt.jpg)

Expiring realtime views
---------------------------------------------------------------------

***The state of the serving and speed layer views at the end of the first batch computation run:***

![Figure 12.9](images/12fig09.jpg)

***A portion of the realtime views can be expired after the second run completes:***

![Figure 12.10](images/12fig10.jpg)

***The serving and speed layer views immediately before the completion of the third batch computation run:***

![Figure 12.11](images/12fig11_alt.jpg)

***Alternating clearing between two different sets of realtime views guarantees one set always contains the appropriate data for the speed layer:***

![Figure 12.12](images/12fig12_alt.jpg)

Queueing and stream processing
---------------------------------------------------------------------

**To implement asynchronous processing without queues, a client submits an event without monitoring whether its processing is successful.**

![Figure 14.1](images/14fig01.jpg)

Queuing
---------------------------------------------------------------------
---------------------------------------------------------------------

### Single-consumer queue servers

![Figure 14.2](images/14fig02.jpg)

**Multiple applications sharing a single queue consumer**

---------------------------------------------------------------------

### Multi-consumer queues

![Figure 14.3](images/14fig03_alt.jpg)

**With a multi-consumer queue, applications request specific items from the queue and are responsible for tracking the successful processing of each event.**

Stream processing
---------------------------------------------------------------------

![Figure 14.4](images/14fig04_alt.jpg)

---------------------------------------------------------------------

**Comparison of stream-processing paradigms**  
(*i.e.* Storm vs. Spark Streaming)
![Figure 14.5](images/14fig05.jpg)

### Queues and workers

---------------------------------------------------------------------

#### A representative system using a queues-and-workers architecture. 

![Figure 14.6](images/14fig06_alt.jpg)

**The queues in the diagram could potentially be distributed queues as well.**

---------------------------------------------------------------------

#### Computing pageviews over time with a queues-and-workers architecture

![Figure 14.7](images/14fig07_alt.jpg)


Micro-batch stream processing
===============================================================
> This section covers:
>  
* Exactly-once processing semantics
* Micro-batch processing and its trade-offs
* Extending pipe diagrams for micro-batch stream processing

Achieving exactly-once semantics
---------------------------------------------------------------------

### Strongly ordered processing
>  
* The stored ID is the same as the current tuple ID. In this case, you know that the count already reflects the current tuple, so you do nothing.
* The stored ID is different from the current tuple ID. In this case, you know that the count doesn’t reflect the current tuple. So you increment the counter and update the stored ID. This works because tuples are processed in order, and the count and ID are updated atomically.

### Micro-batch stream processing

**Tuple stream divided into batches**

![Figure 16.1](images/16fig01.jpg)

### Micro-batch processing topologies

#### Each batch includes tuples from all partitions of the incoming stream.

![Figure 16.4](images/16fig04.jpg)

**Word-count topology:**

![Figure 16.5](images/16fig05.jpg)

**Storing word counts with batch IDs:**

![Figure 16.6](images/16fig06.jpg)


Core concepts of micro-batch stream processing
---------------------------------------------------------------------
>  
* *Batch-local computation*—There’s computation that occurs solely within the batch, not dependent on any state being kept. This includes things like reparti- tioning the word stream by the word field and computing the count of all the tuples in a batch.
* *Stateful computation*—Then there’s computation that keeps state across all batches, such as updating a global count, updating word counts, or storing a top-three list of most frequently used words. This is where you have to be really careful about how you do state updates so that processing is idempotent under failures and retries. The trick of storing the batch ID with the state is particu- larly useful here to add idempotence to non-idempotent operations.


Realtime views
---------------------------------------------------------------------

Three separate queries one might implement for a website:

>  
* Number of pageviews over a range of hours
* Unique number of visitors over a range of hours
* Bounce rate for a domain  
    
---------------------------------------------------------------------

> Consider the following sequence of events:
>  
1. IP address `11.11.11.111` visits `foo.com/about` at 1:30 pm.
2. User `sally` visits `foo.com/about` at 1:40 pm.
3. An equiv edge between `11.11.11.111` and `sally` is discovered at 2:00 pm.

### Topology structure

1. Consume a stream of pageview events that contains a user identifier, a URL, and a timestamp.
2. Normalize URLs.
3. Update a database containing a nested map from URL to hour to a HyperLogLog (*i.e.* `approxCountDistinct`) set.

## Defining data systems

$$
\text{query} = function(\text{all data})
$$  

* *Latency*—The time it takes to run a query. May be milliseconds to hours.
* *Timeliness*—How up-to-date the query results are. 
* *Accuracy*—In many cases, approximations may be necessary

> ...mutability—and associated concepts like CRUD—are fundamentally not human-fault tolerant...  
> ...solution is to make your core data *immutable*...  


## Batch and serving layers

** Basic birthday-inference algorithm **

![Figure 18.1](images/18fig01_alt.jpg)

### Partial Recomputation
>  
1. For the new batch of data, find all people who have a new age sample.
2. Retrieve all age samples from the master dataset for all people in step 1.
3. Recompute the birthdays for all people in step 1 using the age samples from step 2 and the age samples in the new batch.
4. Merge the newly computed birthdays into the existing serving layer views.

** Bloom join **

![Figure 18.2](images/18fig02.jpg)

### Measuring and optimizing batch layer resource usage
> Consider these examples, which are based on real-world cases:
* After doubling the cluster size, the latency of a batch layer went down from 30 hours to 6 hours, an 80% improvement.
* An improper reconfiguration caused a Hadoop cluster to have 10% more task failure rates than before. This caused a batch workflow’s runtime to go from 8 hours to 72 hours, a 9x degradation in performance.


* $T$—The runtime of the workflow in hours.
* $O$—The overhead of the workflow in hours (_things like setting up processes, copying code, etc._)
* $H$—The amount of data being processed (_it’s assumed that the rate of incoming data is fairly constant._)
* $P$—The dynamic processing time. (_i.e. number of hours each unit of $H$ adds_)

$$T = O + P \times H $$