# A data intensive applications
<p><b>Databases</b> - Store data so that other applications can find it again later</p>
<p><b>Cache</b> - Remember thre results of an expensive operation to speed up reads</p>
<p><b>Index</b> - Allow users to search data by keyword or filter</p>
<p><b>Stream Processing</b> - Send a message to another process to be handled async</p>
<p><b>Batch Processing</b> - Periodically crunch a large amount of accumulated data</p>

## Tools
<p><b>Redis</b> - Database also used as message queues</p>
<p><b>Kafka</b> - Message queues with database like durability guarantees</p>
<p><b>Memcached</b> - Distributed key value store, in-memory data store. Mature scalable, open source solution for delivering sub millisecond respnse times making it useful as a cache or session store</p>
<p><b>Hadoop</b> - In a batch processing system such as Hadoop, we usually care about <i>throughput - the number of records we can process per second or the total time it takes to run a job on a dataset of a certain size. Hadoop is a framework that manages big data storage in a distributed way and processes it parallely.</i>
<p> Hadoop Distributed File System (HDFS) is cabable of storing large and various data sets in comodity hardware.</p>
<p> Hadoop MapReduce is a programming technique where huge data is processed in a parallel and distributed fashion, on the slave nodes. It extracts keys from data and stores the reference as the value.</p>
<p> Hadoop Yet Another Resource Negotiator (YARN) acts like an OS for Hadoop. It manages cluster resources and does job scheduling.</p>
</p>
<p><b>HBase</b> - </p>
<p><b>Hive</b> - </p>



<img src="Figure D.png"> <img src="Figure E.png"> <img src="Figure F.png"> <img src="Figure G.png"> <img src="Figure H.png"> <img src="Figure I.png"> <img src="Figure J.png"> <img src="Figure K.png">

## Reliability - Tolerating hardware and software faults - human error
- The system should continue to work *correctly* (performing the correct function at the desired level of performance) even in the face of *adversity* (hardware or software faults, and even human error).
- It is usually best to design fault-tolerance mechanisms that prevent faults from causing failures. Many critical bugs are actually due to poor error handling. Deliberately inducing faults, ensures fault-tolerante systems can handle faults.
- Provide guarantee - A message sueue, the number of incoming messages matches the outgoing. It can constantly check itself and raise an alert if a discrepancy is found.
- Minimize opportunities for error: well-designed abstractions, APIs, make it easy to "do the right thing and intutive". 
- Provide tools to recompute data in case it turns out that the old computation was incorrect.
- *Telemetry*-monitoring show early warning signs and allow us ti check whether any assumptions or constraints are being violated.


## Scalability - Measure load and performance - latency percentiles, throughput
- As the system *grows*(in data volume, traffic volume, or complexity), there should be reasonable ways of dealing with that growth.
- "If the system grows in a particular way, what are our options for coping with the growth?"
- "How can we add computing resources to handle the additional load?"
- Load: requests per second to a web server, the ratio of reads to writes in a dataase, number of simultaneously active users in a game, hit rate on a cache

### Twitter Fan-out
A hybrid was requried:
1-2 is used for users with millions of followers.
1-3 for regular users. Maintain a cache for each user's home timeline - like a mailbox of tweets for each recipient user. When a user posts a tweet, lookup all the users who follow that user, and insert the new tweet into each of their home timeline cache. **The request to read the home timeline is then cheap beacuse its result has been computed ahead of time.**

<img src="Figure 1-2.png"/>

<img src="Figure 1-3.png"/>

## Latency and Response time
The *response time* is what the client sees: besides the actual time to process the request (the service time), it includes network delays and queueing delays.
The *latency time* is the duration that a request is waiting to be handled - during which it is *latent* waiting to be serviced. 

Take a list of response times and sort it from fast to slow, then the median is the half way point. If the median is 200ms then half the reponses are returned in less then and the other half is greater than. 95th percentile response time is 1.5 sec, that means that 95 out of 100 requests take less then 1.5 seconds and 5 out of 100 take more. High percentile of response times, known as *tail latencies* are important as these effect the users with most data, high quality users. Using the median response time, we can determine if a fault happened if reponse is higher then median.

<img src="Figure 1-4.png"/>

Queueing delays often account for a large part of repsonse time, it only takes a small number of requests to hold up the processing of subsequent requests. Splitting messages by processing time into different message queues on different machines minimizes this fenomenon. Further reading *Percentiles in Practice pg.16*

Scaling up/verticaly - moving to more powerful machines
Scaling out/horizontaly - distributing the load across multiple machines
Elastic - machines autmatically add resources when they detect load increase

## Maintainability - Operability, simplicity and evolvability
Over time, many different people will work on the system(engineering and operations, both maintaining current behaviour and adapting the system to new use cases), and they should all be able to work on it *productivly*.

### Operability
Make it easy for operations teams to keep things running smoothly such as not using legacy tools

### Simplicity
Make it easy for new engineers to understand the system, by removing as much complexity. In programming, one of the best tools we have to remove accidental complexity is *abstraction*. Abstraction hides implementation detail behind a clean,simple to use facade.

### Evolvability
Make it easy for engineers to make changes to the system in the future, adapting it for unanticipated use cases as requirements change(*plasticity*)

# Data Models and Query Languages

In NoSQL, joins aren't recommended. But if we do, RethinkDB supports joins.

### Relational vs Document Databases
The main argument in favour of document data model are schema flexibility, better performance due to locality, and that for some applications it is closer to application structure.
The relational model counters by providing better support for joins, many-to-one and many-to-many relationships.

### Which data model?
If your data has a document like structure i.e. a tree of one-to-many relationship, where typically the entire tree is loaded at once, then it's probably a good idea to use document model. In resume example, splitting the document into tables (position, education, contact) can lead to cumbersome schema and joins. limitations are accessing nested data...

For highly interconnected data, the document model is awkward, relational is acceptable, and best are graphs.

Data conformity - schema on write in relational database ensures data matches the table. 

# Stream Processing
**Stream** refers to data that is incrementally made available over time. In reality, a lot of data is unbound because it arrives gradualy over time: users produced data yesterday and today, and they will continue to produce more data tomorrow. So the dataset is never "complete" in any meaningful way. 
<p>The idea behind *stream processing* is to process a second worth of data every second, or even continuously.</p>

### Transmitting Event Streams
Streaming:
- parse the input into a squence of records
- in streaming a record is commonly known as *event* - a small, self contained immutable object containing the details of something that happened at some point in time. 
- in a streaming system, related events are usually grouped together into a *topic* or *stream*.

### Messaging System
A common approach for notifying consumers about new events is to use a *messaging system:* a producer sends a message containing the event, which is then pushed to consumers. 
Unilike TCP - one to one -, a messaging system allows multiple producer nodes to send messages to the same topic and allows multiple consumer nodes to receive messages in a topic. 
- *What happens if the producers send messages faster then the consumers can process them?* There are 2 known options:
- - Buffer messages in a queue
- - Apply *backpressure* (also known as flow control; i.e. blocking the producer from sending more messages). For ex. Unix pipes and TCP use backpressure: they have a small fixed-size buffer and if it fills up, the sender is blocked until the recipient takes data out of the buffer. If messages are buffered in a queue, it is important to understand what happens as that queue grows. Does the system crash if the queue no longer fits in memory, or does it write messages to disk? If so, how does the disk access affect the performance of the messaging system?
- - *What happens if nodes crash or temprarily go offline-are any messages lost?* As with databases, durability may require some combination of writing to disk and/or replication, which has a cost.
- A nice property of batch processing systems is that they provide a strong reliability guarantee: failed tasks are auto retried, and partial output from failed tasks is auto discarded. This means the output is the same as if no failures happened.
- On a different note, if a consumer registers with a producer, producers can make a direct HTTP or RPC request to push messages to the consumer. This is the idea behind Webhooks - a callback makes a request to that URL whenever an event occurs.

### Message Brokers
<p>Also known as message queue, a kind of database optimized for handling message streams. Producers write messages to the broker while consumers receive them by reading from the broker. Some brokers only keep messages in memory, deleting a message when it has been successfully delivered to consumer, while others write to disk. Faced with slow consumers, they generally allow unbound queueing as opposed to dropping messages or backpressure.</p>

<p>A consequence of queueing is also that consumers are generally <i>async</i> when a producer sends a message, it only waits for broker reception confirmation.</p>

<p>Since they quickly delete messages, most borkers assume their working set is fairly small, i.e. the queue is fairly small. Brokers notify consumers when data changes (when new message becomes available)</p>


### Consumers reading messages in the same topic
<p><b>Load Balancing</b> Each message is delivered to <i>one</i> consumer, and consumers share the work of processing the messages in the topic. The broker may assign messages to consumer arbitrarily/multiple clients consume from the same queue. This pattern is useful when messages are expensive to process and additional consumers help parallelize the process.</p>

<p><b>Fan-out</b> Each message is delivered to <i>all</i> consumers. Several independent consumers receive the same messages.</p>

<p><b>Combination</b>Two separate groups of consumers may each subscribe to a topic, such that each group collectively receives all messages, but whitin each group, <b>only one of the nodes receives each message.</b></p>

<img src="Figure 11-1.png">

### Acknoledgments and redelivery
Caonsumers may crash at any time - a borker delivers a message but the consumer never processes it. Ensureing messages are not lsot, brokers use *acknowledgments*: a lient must explicitly tell the broker when it has finished processing a message so that the broker can *remove* it from the queue.
Then a gain, if a message is processed and acknowledgment not sent, the broker will resend and creates dublicate data. 

During load balancing, messages may lose order sequence. To avoid this, use a separate queue per consumer

<img src="Figure 11-2.png">

### Logs For Message storage
- A Hybrid - combining the durable storage approac of databases with the low latency notification of messaging.
- A log is a simple append only sequence of records on disk.
- With the same reasoning, a producer sends a message, the broker appends it to the end of the log and the consumer receives messages by reading the log sequentially. If a consumer reaches the end of the log, it waits for notifications that a new message has been appended. 
- Even though these message brokers write all messages to disk, they are able to achieve throughput of millions of messages per second by partitioning across multiple machines, and fault tolerance by replicating messages.

### Dual Writes
Race conditions where two clients want to update an item. Unless there is concurrency detection mechanism, one will not even notive that concurrent writes occured - one value will simply silenty overwrite another value. Or one write works and the other faults, still creating inconsistent replication.

### Change Data Capture
The process of observing all data changes written to database and extracting them in a form in which they can be replicated to other systems. CDC is interesting if changes are made available as a astream, immediately as they are written. 

### API support for change streams
<p><b>RethinkDB</b> - allows queried to subscribe to notifications when the results of a query changes.</p>
<p><b>Firebase and CouchDB</b> - provide data synchronization based on a change feed that is also made available to applications.</p>
<p><b>Meteor</b> - uses the MongoDB oplog to subscribe to data changes and updates the user interface.</p>
<p><b>VoltDB</b> - allows transactions to continuously export data from a database in the form of a stream.</p>
<p><b>Kafka Connect</b> - integrates CDC for a wide range of database systems with Kafka. Once the stream of change events is in Kafka, it can be used to update derived data systems and also feed into stream processing systems.</p>

### Commands and Events - betting
When a request from a user first arrives, it is initially a command: at this point, it may fail, for ex. because some integrity condition is violated. The application must first validate that it can execute the command. Any validation of a command needs to happen synchronously, before it becomes an event - ex. by using serializable transaction that atomically validates the command and publishes the event. If the validation is successful and the command is accepted, it becomes and event which is durable and immutable. 

Ex. If a user tries to register a particular username, or reserve a seat on an airplane, then the application needs to check that the username or seat is not already taken. When that check has succeeded, the application can generate an event.

At this point when the event is generated, it becomes a *fact*. Even if the customer later decides to change or cancel the reservation, the fact remains true that they formerly held a reservation for a particular seat, and the change or cancellation is a separate event that is added later.  

### Concurrency control
The biggest downside of event sourcing and CDC is that the consumers of event log are usually aysnc, so there is a possibility that a user may make a write to the log, then read from a log-derived view and find that their write has not yet been reflected in the read view. 

# What is a Log?
*A log is perhaps the simplest possible storage abstraction. It is an append-only, totally-ordered sequence of records ordered by time*

<img src="Figure A.png">

Records are appended to the end of the log, and reads proceed left to right. Each entry is assigned a unique Log Sequence Number.
The **ordering** of records defines a notion of "time" since entries to the left are defined to be older. This order decoupling from any particular physical clock will turn to be essential in distributed systems.
Logs have a spcific purpose: they record what happened and when. For **distibuted** systems, this is the very heart of the problem.

### Logs in databases
<p>The usage in databases has to do with keeping in sync the variety of data structures and indexes in the presence of crashes.</p>
<p>To make this atomic and durable, a databases uses a log to <b>write out</b> information about the records they will be modifying, <b>before</b> applying the changes to all the various data structures it maintains. The log is the record of what happened, and each table or index is a projection of this history into some useful data structure or index. Since the log is immediately persisted it is used as the authoriative source in restoring all other persistent structures in the event of a crash.</p>
<p>Over-time the usage of the log grew from an implementation detail of ACID to a method for replicating data between databases. Turns out that the sequence of changes that happened on the database is exactly what is needed to keep a remote replica database in sync.</p>
<p>Logs solve 2 problems - ordering changes and distributing data</p>
<p>The log centric approach to distributed systems imply: <i>If two identical, deterministic processes begin in the same state and get the same inputs in the same order, they will produce the same output and end in the same state.</i></p>
<p><b>Deterministic</b> means that the processing isn't timing dependent and doesn't let any other input influence its results</p>

<p>Hence the purpose of the log here is to squeeze all the non-determinism out of the input stream to ensure that each replica processing this input stays in sync.</p>

<p>Now, the timestamps that index the log act as the clock for the state of the replicas-you can describe each replica by a single number, the timestamp for the maximum log entry it has processed. This timestamp combined with the log uniqiely captures the entire state (syncronization with the leader) of the replica.</p>

<p>Multitude of ways of what to put in the log:
- incoming requests to a service
- state changes the service undergoes in response to request
- the transformation comands it executes</p>

<p><b>Physical logging</b> means logging the contents of each row that is changed.</p>
<p><b>Logical logging</b> means logging the SQL commands that lead to the row that is changed (insert, update, delete statements).</p>



<p>State-Machine Model (active-active) = incoming requests are kept in a log and each replica processes each request</p>
<p>Primary-backup model = elect one replica as the leader and allow this leader to process requests in the order they arrive and log out the changes to its state from processing the requests (like log shipping).</p>


<img src="Figure B.png">

### Tables and Events
Duality: The log is similar to the list of all credits and debits and bank processes; a table is all the current account balances capturing the current (latest) state. This process works in reverse too: if you have a table taking updates, you can record these changes and publish a "changelog" of all the updates. This changelog is exactly what you need to support near real time replicas. *Hence, tables support data at rest and logs capture change.*

### Data Integration
**Data integration is making all the data an organization has available in all its servies and systems.**

The log acts as a buffer that makes data production async from data consumption. This is important for a lot of reasons, but particularly when there are multiple that may consume at different rates Hadoop/realtime. Or a failure where on come back needs to catch up. Consumers can be added and removed with no change in the pipeline.

Pros this guy mentioned while integrating data for the organisation in one place.
- Making the data available in a new processing system (Hadoop) unlocked a lot of possibilities. 
- New computations were possible
- New analysis came from multiple pieces of data that had previously been locked in specialized systems
- Hadoop and Hive data loads fully automatic, so no manual effort needed adding new data sources or handling schema changes
- From trying to build a two way pipeline between each source to isolate each consumer from the source of the data by connecting to a single data repository that would give them access to everything.
- Kafka solved the issue. Keep in mind we are talking about logs not data

<img src="Figure C.png">

Concept of a data warehouse ia clean reportistory to support analysis. Involves periodically extracting data from source databases, transforming to fit in facts and dims - star or snowflake schema, columinar format,  and loading into data warehouse. It is a batch query infrastructure. 
Since  a DWH has to accomodate various sources, the central data team should propose a common API for anyone to interface with as a central point of integration.

Any kind of value-added transformation that can be done in real-tie should be done as post-processing on the raw log feed produced. This would include things like sessionization of event data, or the addition of the derived fields that are of general interest. The original log is still available, but this real-time processing produces a derived log containing augmented data.

*Sessionization is the **act of turning event-based data into sessions, the ordered list of a user's actions in completing a task.** It is widely used in several domains, such as: Web analytics. This is the most common use, where a session is composed of a user's actions during one particular visit to the website.*

### Log Files and Events
The typical approach to activity data in the web industry is to log it out to text files where it can be scrapped into a data warehouse or into HDFS for aggregation and querying. The **problem** with this is the same as with all **batch ETL**: it couples the data flow to the data warehouse's capabilities and processing schedule.

One solution is to use Kafka as the central, multi-subscriber event log, each capturing attributes on types of actions. Systems are decoupled, pulling only the data they need. In a huge scale system, few Kafka tricks are to:
- Partition the logs allows log appends to occur without co-ordintion between shards and allows the throughput of the system to scale linearly with the Kafka cluster size. Plus, partitions are replicated; if a leader failes, one of the replicas takes over. 
- Optimize throughout by batching reads and writes - Kafka guarantees that appends to a particular partition from a single sender will be delivered in the order they are sent - "state replication". Small reads and writes can be grouped together into larger, high-throughput operations by Kafka. Batching occurs from client to server when sending data, writes to dick, replication between servers, data transfer to consumer and in acknowledging commited data. 
- Avoiding needless data copies - Zero Copy Data Transfer

Logs provides buffering to the processes. If processing proceeds inan unsynchronized fashion it is likelt yo happen that an upstream data producing job will produce data more quickly than another downstream job can sonsume it. So Logs acts as a very very large buffer that allows process to be restarted or fail without slowing down other parts of the processing graph. There can't be a faulty job causeing backpressure that halts the entire flow. 

### Stateful Real-Time Processing
### Stream-table duality
KStream and ksqlDB can do the initial stateful processing of data. Ktable can be persisted to preserve state in case of failure. 

Stream data comes in -> events are agregated in table by Key -> logic AND previous state with new state -> If change, publish to change log stream.

### Log Compaction
Keeps value of last state to preserve storage.


# Map, Filter and Collect methods in Java Stream
<img src="Figure L.png">

The **Stream.map(Function mapper)** is a method in Stream class used to **transform one object into another by applying a function.** 
Ex. Convert a *List of String* into a *List of Integer* by applying the *Integer.valueOf()*

The **Stream.filter(Predicate condition) filters elements based upon a condition.**
Ex. Filter a list of numbers outputing only even numbers.

The **Stream.collect(Collectors.toList()) accumulates all even numbers into a List and returns.**

# Indexing Very Large Tables
Whenever we create an index, a copy of the indexed column + the PK are created on disk, and the index is kept in memory as much as possible. If the index has all the data required by a query, it will never go to the actual table. Ex. filter data on *customer_id* and group by *year* and selecting only these two columns, the query won't fetch the disk if the index is on *customer_id, year*. I think memory size = index size.
- More indexes means slower inserts.
- More indexes mean more disk & memory space occupied.
- Modify an existing index instead of adding more indexes. 
- Partitioning splits tables into smaller sub-tables so queries don't have ot performa full scan. 

#### The key, the whole key, nothing but the key, so help me Codd. The benefits of indexing foreign keys
A primary key is a constraint in SQL which acts to uniquely identify each row in a table, a unique index is created. The key can be defined as a single non-NULL column, or a  combination of non-null columns which generate a unique value, and is used to enforce entity integrity for a table. 
A foreign key is a column or combination of columns that are the same as the PK, but in a different table. FK enforce integrity between two tables. By default an index is not created for a FK, however it is not uncommon to add manually. 