<a href="https://colab.research.google.com/github/deepanshuMeteor/QA-Big-Data-Fundamentals/blob/main/LIVE_23_EventSystems.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Intro to Event Systems
## Big Data for Data Scientists

---

# Part 0: Responsive Systems

## How do I design a data system to be *responsive*? 

# Part 1: Event & Streaming Basics

## What is an Event?

Any "row" (observation, case, example)... which has the following properties:

* subject
* verb
* object
* context

In [1]:
from time import time
time()

1650549808.5777333

In [2]:
customer_reviews_film = {
    'subject': {
        'id': 101,
        'type': 'CUSTOMER'
    },
    'verb': 'REVIEW',
    'object' : {
        'id': 3001,
        'type': 'FILM'
    },
    'context': {
        'at': time()
    }
}

customer_reviews_film

{'context': {'at': 1650549815.5420847},
 'object': {'id': 3001, 'type': 'FILM'},
 'subject': {'id': 101, 'type': 'CUSTOMER'},
 'verb': 'REVIEW'}

Pretty much any piece of data can be represented as an event. 

### Aside: Events with schemaless contents

In [3]:
client_submits_taxform = {
    'subject': {
        'id': 'UK'
    },
    'verb': 'SUBMIT_TAX',
    'object': {
        'id': 901,
        'body': "SCHEMALESS-SERIALIZED-DATA"
    },
    'context': {
        'at': time()
    }
}

## What is an Event System?

Any system for which the primary data element is an event (as defined above). 

## What is Streaming and Stream Processing?

Streaming is a technical term which means using a constant amount of memory to process a large amount of data.

Here, "streaming" denotes that we process one event at a time... across a whole "stream" of events. 

## What is an append-only log?

An append-only log is just *one* possible system to record an event stream. 

Typically an event stream is only "live" over some window of time, eg., you "live" event stream has only 1day of events; here the log is cleared daily.

An append-only is a system of accept raw unprocesed events, typically over a window of time, "verbtaim" without editing or deleting.

This log is then replicated to serve for later or offline processing (& archiving). 

## How is an append-only log structured?

Events are sequenced in time, so that later events in log occur later.

"Archived logs" have *the same structure*, so you can use the same tools  to process a live and archived log.

Reprocessing an archived log is known as "replaying", and this is extremely conveient way of fixing mistakes (& reprocessing data etc.).

## Why does this seem simple?

This is deceptively simple, compare with a database, and you *CANNOT*:

* edit
* delete


And , with a relational db, "rows" are thought of as *highly structured* and *not ordered*. 


**Event logs are ordered in time, uneditable, weakly structured (subject, verb, object, at)**. 

The limitations are vital! 

## How do I persist events?

Replicated them from the log. 

## Aside: What's the difference between an event bus and event log?

An event bus is an internal signalling and coordination system for managing internal services. The data in a event bus, is generated by *internal systems* and is mostly formatted as "call function with these argumnts". 


An event log is **a database**, it's where the data is. It's not, primarily, a coodination mechaism. The data in an event log is generated by *customers* (or sensors, or any external systems). They can be use a bit like an event bus to coordinate, but this is a secondary use: most of the data in them is externally generally generated. 

# Part 2: Applications

## What is the most basic implementation of a event stream?

In [4]:
client_submits_taxform

{'context': {'at': 1650549820.7373793},
 'object': {'body': 'SCHEMALESS-SERIALIZED-DATA', 'id': 901},
 'subject': {'id': 'UK'},
 'verb': 'SUBMIT_TAX'}

In [5]:
open('log', 'w')

<_io.TextIOWrapper name='log' mode='w' encoding='UTF-8'>

In [6]:
from json import dumps as encode

client_submits_taxform = { 
    'subject': { 'id': 'UK' },
    'verb': 'SUBMIT_TAX',
    'object': { 'id': 901, 'body': "SCHEMALESS-SERIALIZED-DATA" },
    'context': { 'at': time() }
}

open('log', 'a').write(encode(client_submits_taxform) + "\n")

148

In [7]:
print(open('log').read())

{"subject": {"id": "UK"}, "verb": "SUBMIT_TAX", "object": {"id": 901, "body": "SCHEMALESS-SERIALIZED-DATA"}, "context": {"at": 1650549837.1697195}}



## How do I process a log?

In [8]:
from json import loads as decode

def process(event):
    print(event['subject']['id'], event['context']['at'])
    
# streaming
for raw_event in open('log'):
    event = decode(raw_event)
    process(event)

UK 1650549837.1697195


## How does a log processor know where to start processing?

In [9]:
from json import loads as decode

def process(event):
    print(event['subject']['id'], event['context']['at'])
    
processed_timestamp = 0 # time()

for raw_event in open('log'):
    event = decode(raw_event)
    
    if event['context']['at'] > processed_timestamp:
        process(event)
        processed_timestamp = event['context']['at']
    else:
        # skip events we've processed
        continue

UK 1650549837.1697195


## What do processors typically do with events?

* Noramlize and insert into database
* Insert *derived* event into a different event stream
* (Also, possibly, email user)

Typically there is a "Raw" or primary stream whose events are always generated externally. This stream is *just processed* into new streams or derived data. 

And typically, "actions" such as emailing a user, are executed on a derived stream. So that the "raw" stream is just a "vertaim record". 

## How do you respond to events on a derived stream?

In [10]:
from json import loads as decode

def process(event):
    new_event = {
        'subject': {'id': 'EMAIL_SYSTEM'}, 
        'verb': 'EMAIL', 
        'object': {'id': event['object']['id'], 'type': 'CLIENT'}, 
        'context': {
            'at': time(),
            'caused_by': event
        }
    }
    
    open('derived', 'a').write(encode(new_event) + "\n")
    
    
    
processed_timestamp = 0 # time()
for raw_event in open('log'):
    event = decode(raw_event)
    
    if event['context']['at'] > processed_timestamp:
        process(event)
        processed_timestamp = event['context']['at']
    else:
        # skip events we've processed
        continue

In [11]:
processed_timestamp = 0 # time()
for raw_event in open('derived'):
    event = decode(raw_event)
    
    if event['context']['at'] > processed_timestamp:
        print(event)
        processed_timestamp = event['context']['at']

{'subject': {'id': 'EMAIL_SYSTEM'}, 'verb': 'EMAIL', 'object': {'id': 901, 'type': 'CLIENT'}, 'context': {'at': 1650549854.3334212, 'caused_by': {'subject': {'id': 'UK'}, 'verb': 'SUBMIT_TAX', 'object': {'id': 901, 'body': 'SCHEMALESS-SERIALIZED-DATA'}, 'context': {'at': 1650549837.1697195}}}}


## How do I detect anomolies in an event stream?

In [12]:
avg_body_length = 10 

for raw_event in open('log'):
    event = decode(raw_event)
    
    if len(event['object']['body']) > avg_body_length:
        print("EVENT: UNEXPECTED DATA") # add to a different event log

EVENT: UNEXPECTED DATA


* (unstructured) text anomolies
    * whitespace frequency
    * length
    * word/symbol frequnecy 
    * ...
    
* structure:
    * check some internal measure for variation

In [13]:
avg_measure = 10
avg_std = 5

for raw_event in open('log'):
    event = decode(raw_event)
    
    is_anom = len(event['object']['body']) > (avg_measure + avg_std * 2)
    
    if is_anom:
        print("EVENT: UNEXPECTED DATA") # add to a different event log

EVENT: UNEXPECTED DATA


# Part 3: Tools

## What is the leading tool for recording event streams?

Kafka

## What tools are there for processing event streams?

* confluent (by the kafka people)
* spark
* kinesis

## Why are event log systems useful?

* ready for analysis
* *forces* you to record history of:
    * who caused data injest
    * where did come from
    * when
    * context 
* this makes your life so much easier, even if you don't need this right now!
* esp. for debugging as *every record of data* has a subject!
    * eg., the email system records itself as what caused an INSERT

# Part 4: Use Case: Cart Abandonment

### Customer Adds Items & Orders

```

{"subject": {"id": 1, "type": "CUSTOMER"}, "verb": {"type": "STARTED_BROWSING"}, "object": {"type": "NONE"}, "context": {"at": 1598616196536}, "event": {"inserted_at": 1598616196567}}

{"subject": {"id": 1, "type": "CUSTOMER"}, "verb": {"type": "ADDED_ITEM_TO_BASKET"}, "object": {"type": "ITEM", "item": {"id": 1, "name": "Orange", "price": 0.5, "count": 1}}, "context": {"at": 1598616199042}, "event": {"inserted_at": 1598616199056}}

{"subject": {"id": 1, "type": "CUSTOMER"}, "verb": {"type": "ADDED_ITEM_TO_BASKET"}, "object": {"type": "ITEM", "item": {"id": 2, "name": "Banana", "price": 1.22, "count": 1}}, "context": {"at": 1598616199769}, "event": {"inserted_at": 1598616199787}}

{"subject": {"id": 1, "type": "CUSTOMER"}, "verb": {"type": "ADDED_ITEM_TO_BASKET"}, "object": {"type": "ITEM", "item": {"id": 3, "name": "Lemon", "price": 5, "count": 1}}, "context": {"at": 1598616200827}, "event": {"inserted_at": 1598616200844}}

{"subject": {"id": 1, "type": "CUSTOMER"}, "verb": {"type": "ADDED_ITEM_TO_BASKET"}, "object": {"type": "ITEM", "item": {"id": 3, "name": "Lemon", "price": 5, "count": 2}}, "context": {"at": 1598616201410}, "event": {"inserted_at": 1598616201421}}

{"subject": {"id": 1, "type": "CUSTOMER"}, "verb": {"type": "CHANGED_ITEM_COUNT_IN_BASKET"}, "object": {"type": "ITEM", "item": {"id": 1, "name": "Orange", "price": 0.5, "count": 2}}, "context": {"at": 1598616204107, "by": 1}, "event": {"inserted_at": 1598616204132}}

{"subject": {"id": 1, "type": "CUSTOMER"}, "verb": {"type": "ORDERED"}, "object": {"type": "BASKET", "basket": [{"id": 1, "name": "Orange", "price": 0.5, "count": 2}, {"id": 2, "name": "Banana", "price": 1.22, "count": 1}, {"id": 3, "name": "Lemon", "price": 5, "count": 2}]}, "context": {"at": 1598616206681}, "event": {"inserted_at": 1598616206694}}

{"subject": {"id": 1, "type": "INTERNAL_ORDER_EVENT"}, "verb": {"type": "ORDER_COMPLETED"}, "object": {"type": "NONE"}, "context": {"at": 1598616206696}, "event": {"inserted_at": 1598616206696}}

```

    imperfect: customer adds items then abandons basket

A log processor watches the event stream to see when a customer first started browsing, and when their last interaction was.

If the event processor finds a last-interaction *more than 30min* from when it's looking, it generates an "ABANDONED_BASKET" event. 

```
{"subject": {"id": 15, "type": "CUSTOMER"}, "verb": {"type": "STARTED_BROWSING"}, "object": {"type": "NONE"}, "context": {"at": 1598616196536}, "event": {"inserted_at": 1598616196567}}
{"subject": {"id": 15, "type": "CUSTOMER"}, "verb": {"type": "ADDED_ITEM_TO_BASKET"}, "object": {"type": "ITEM", "item": {"id": 1, "name": "Orange", "price": 0.5, "count": 1}}, "context": {"at": 1598616199042}, "event": {"inserted_at": 1598616199056}}
{"subject": {"id": 15, "type": "CUSTOMER"}, "verb": {"type": "ADDED_ITEM_TO_BASKET"}, "object": {"type": "ITEM", "item": {"id": 2, "name": "Banana", "price": 1.22, "count": 1}}, "context": {"at": 1598616199769}, "event": {"inserted_at": 1598616199787}}
{"subject": {"id": 15, "type": "CUSTOMER"}, "verb": {"type": "ADDED_ITEM_TO_BASKET"}, "object": {"type": "ITEM", "item": {"id": 3, "name": "Lemon", "price": 5, "count": 1}}, "context": {"at": 1598616200827}, "event": {"inserted_at": 1598616200844}}
{"subject": {"id": 15, "type": "CUSTOMER"}, "verb": {"type": "ADDED_ITEM_TO_BASKET"}, "object": {"type": "ITEM", "item": {"id": 3, "name": "Lemon", "price": 5, "count": 2}}, "context": {"at": 1598616201410}, "event": {"inserted_at": 1598616201421}}
{"subject": {"id": 15, "type": "CUSTOMER"}, "verb": {"type": "CHANGED_ITEM_COUNT_IN_BASKET"}, "object": {"type": "ITEM", "item": {"id": 1, "name": "Orange", "price": 0.5, "count": 2}}, "context": {"at": 1598616204107, "by": 1}, "event": {"inserted_at": 1598616204132}}
```

In a derived event stream (ie., another log):

```
{"subject": {"id": 15, "type": "INTERNAL_ABDANDON_SYSTEM"}, "verb": {"type": "ABANDONED_BASKET"}, "object": {"type": "NONE"}, "context": {"at": 1598616204107}, "event": {"inserted_at": 1598622166810}}
```

A second system monitors for abandoment-events, and triggers emails to remind customers they have products in their basket (ie., to increase sales).