# Python Event Systems

Event systems are proposed to be a *universal* modern big data solution.

We track realtime events in a highly efficient append-only log, and we derive the rest of our data system from this log.

We can process the log very quickly, and integrate insight and analytics easily.

The log can be archived to a data lake (ie., a distributed file system); making entire-systems backup trivial. 

### Case Study: Insurance System

* customers
    * CALL
    * APPLY
    * CLAIM
* assement team
    * ASSESS
    * REJECT_CALIM
    * ACCEPT_CLAIM
* police
    * ...


## How do I represent events?

Events are dictionaries ("documents") which are tagged values, 

In [94]:
def event(subject, verb, object, context):
    return {
        'subject': subject,
        'verb': verb,
        'object': object,
        'context': context
    }

In [95]:
event("CUSTOMER", "APPLY", "FORM", '1800-01-01 12pm')

{'subject': 'CUSTOMER',
 'verb': 'APPLY',
 'object': 'FORM',
 'context': '1800-01-01 12pm'}

## How I track events?

Events are kept in an append-only log,

In [2]:
from datetime import datetime

In [25]:
elog = []

In [26]:
elog.append(event("CUSTOMER", "APPLY", "FORM", str(datetime.now())))
elog.append(event("ASSESSOR", "ACCEPT", "FORM", str(datetime.now())))
elog.append(event("CUSTOMER", "CLAIM", "FORM", str(datetime.now())))
elog.append(event("ACCESSOR", "ACCEPT", "CLAIM", str(datetime.now())))
elog.append(event("PAYMENTS", "PAY", "CUSTOMER", str(datetime.now())))

, we do not delete, update, etc.

In [27]:
elog

[{'subject': 'CUSTOMER',
  'verb': 'APPLY',
  'object': 'FORM',
  'context': '2021-05-21 13:14:43.553497'},
 {'subject': 'ASSESSOR',
  'verb': 'ACCEPT',
  'object': 'FORM',
  'context': '2021-05-21 13:14:43.553497'},
 {'subject': 'CUSTOMER',
  'verb': 'CLAIM',
  'object': 'FORM',
  'context': '2021-05-21 13:14:43.553497'},
 {'subject': 'ACCESSOR',
  'verb': 'ACCEPT',
  'object': 'CLAIM',
  'context': '2021-05-21 13:14:43.553497'},
 {'subject': 'PAYMENTS',
  'verb': 'PAY',
  'object': 'CUSTOMER',
  'context': '2021-05-21 13:14:43.553497'}]

## How do we store the log?

Note the events above are *dictionaries*, not text. We need to convert to text to save to file.

In [28]:
elog[0]['subject']

'CUSTOMER'

In [29]:
import json

In [30]:
json.dumps(elog[0])

'{"subject": "CUSTOMER", "verb": "APPLY", "object": "FORM", "context": "2021-05-21 13:14:43.553497"}'

In [31]:
file = open('elog.txt', 'w')

for e in elog:
    file.write(json.dumps(e) + "\n") # dictionary -> text
    
file.close()

We can see these events have been saved to the file,

In [32]:
print(open('elog.txt').read())

{"subject": "CUSTOMER", "verb": "APPLY", "object": "FORM", "context": "2021-05-21 13:14:43.553497"}
{"subject": "ASSESSOR", "verb": "ACCEPT", "object": "FORM", "context": "2021-05-21 13:14:43.553497"}
{"subject": "CUSTOMER", "verb": "CLAIM", "object": "FORM", "context": "2021-05-21 13:14:43.553497"}
{"subject": "ACCESSOR", "verb": "ACCEPT", "object": "CLAIM", "context": "2021-05-21 13:14:43.553497"}
{"subject": "PAYMENTS", "verb": "PAY", "object": "CUSTOMER", "context": "2021-05-21 13:14:43.553497"}



## How do we process (ETL) the event log?

One type of ETL process involving logs is simply to archive them for analysis,

In [33]:
open('day1_log_copy.txt', 'w').write( open('elog.txt', 'r').read() )

505

We typically also clear the original, so it is fast for new events,

In [34]:
open('elog', 'w').close() # writing mode blanks the file 

We may then work from this copy and ETL into a relational db...

## How do we process event logs for analysis?

Recreate the event stream (ie., list of events)...

In [91]:
day1= open('day1_log_copy.txt', 'r')

day1_log = []
for line in day1:
    e = json.loads(line) # text -> dictionary
    day1_log.append(e)
    

In [92]:
day1_log

[{'subject': 'CUSTOMER',
  'verb': 'APPLY',
  'object': 'FORM',
  'context': '2021-05-21 13:14:43.553497'},
 {'subject': 'ASSESSOR',
  'verb': 'ACCEPT',
  'object': 'FORM',
  'context': '2021-05-21 13:14:43.553497'},
 {'subject': 'CUSTOMER',
  'verb': 'CLAIM',
  'object': 'FORM',
  'context': '2021-05-21 13:14:43.553497'},
 {'subject': 'ACCESSOR',
  'verb': 'ACCEPT',
  'object': 'CLAIM',
  'context': '2021-05-21 13:14:43.553497'},
 {'subject': 'PAYMENTS',
  'verb': 'PAY',
  'object': 'CUSTOMER',
  'context': '2021-05-21 13:14:43.553497'}]

In [89]:
def process_customer_apply(e):
    print("APPLYING")
    
def process_payments_pay(e):
    print("PAYING")

In [90]:
for e in day1_log:
    if (e['subject'] == 'CUSTOMER') and (e['verb'] == 'APPLY'):
        process_customer_apply(e)
        
    elif (e['subject'] == 'PAYMENTS') and (e['verb'] == 'PAY'):
        process_payments_pay(e)

APPLYING
PAYING


## What is Stream Processing?

Whenever we are processing one data element at a time we are *stream processing*. 

Stream processing algorithms, approaches, techniques are their own field. And many libraries and systems exist to support this type of analysis/processing. 

In [41]:
stream = [1, 2, 3, 1, 2, 3, 5, 1, 2]

We can only process this *one* thing at a time...

two possible reasons for this restriction:
1. realtime therefore we only see one event at a time (ie., no future)
2. we dont have a lot of memory/compute/etc. 
    * eg., 1000s of machines

Either of these restrictions mean we cannot extract "common data elements" (eg., via SELECT) and process them in common. 

In [42]:
for e in stream:
    print(e)

1
2
3
1
2
3
5
1
2


Suppose I want to "watch" for 2s, 3s, 5s...

In [45]:
for e in stream:
    if e == 2:
        print("SEEN 2")
        
    elif e == 5:
        print("SEEN 5")
        
    elif e == 3:
        print("SEEN 3")
        
    else:
        continue # skip

SEEN 2
SEEN 3
SEEN 2
SEEN 3
SEEN 5
SEEN 2


In practice, if we had a list like this we may not write this type of loop.

But what about a 1TB file...

There we cannot load the whole file into memory (split it up, etc.); we **have** to process line-by-line. 

This is non-tabular, there is no "saved table". In analogy: the database can only see one row at a time. 

---

## Case Study 2: Enriched Events

In practice events will be complex hierachical documents with enriched information about the nature of the event,

In [75]:
def event_detail(subject, sid, verb, object, oid, data):
    return {
        'subject': {"type" : subject, "id": sid },
        'verb': verb,
        'object': { "type" : object, "id": oid },
        'context': {
            "time": str(datetime.now()),
            "data": data
        }
    }

In [76]:
event_detail("CUSTOMER", 1001, "APPLY", "FORM", 3081, "Medical Insurance")

{'subject': {'type': 'CUSTOMER', 'id': 1001},
 'verb': 'APPLY',
 'object': {'type': 'FORM', 'id': 3081},
 'context': {'time': '2021-05-21 13:43:05.121000',
  'data': 'Medical Insurance'}}

This information, in a machine learning system, will likely provide the $X$ values we use to make predictions.


Consider the event log below: two customers, one is accepted, makes a claim, and the claim is paid; the other customer is rejected.

In [65]:
detail = []
detail.append(
    event_detail("CUSTOMER", 1001, "APPLY", "FORM", 3081, "Medical Insurance")
)

detail.append(
    event_detail("CUSTOMER", 1002, "APPLY", "FORM", 3082, "Car Insurance")
)

detail.append(
    event_detail("ASSESSOR", 32, "ACCEPT", "CUSTOMER", 1001, "GOOD")
)

detail.append(
    event_detail("ASSESSOR", 32, "REJECT", "CUSTOMER", 1002, "GOOD")
)

detail.append(
    event_detail("CUSTOMER", 1001, "CLAIM", "FORM", 3087, 10_000)
)

detail.append(
    event_detail("ASSESSOR", 32, "ACCEPT", "CLAIM", 3087, 10_000)
)

detail.append(
    event_detail("PAYMENTS", 1, "PAYS", "CUSTOMER", 1001, 7_500)
)

Assuming we have ETL'd this, we then stream process,

### How do I add machine learning to an event system?

For now consider a dummy prediction function,

In [71]:
def predict_claim(amount):
    return 0.9 * amount

Below we perform simple reporting analysis (counting the number of applicants, and claim volume),

In [78]:
applications = 0
claims = 0

for e in detail:
    if (e['subject']['type'] == 'CUSTOMER' and e['verb'] == 'APPLY'):
        applications += 1
    elif e['subject']['type'] == 'ASSESSOR' and e['verb'] == 'ACCEPT' and e['object'] == 'CLAIM': 
        claims += e['context']['data']
    elif e['subject']['type'] == 'PAYMENTS' and e['verb'] == 'PAYS':
        claims -= e['context']['data']

In [77]:
print(applications, claims)

2 -7500


We could revise this to add our prediction function, which is triggered whenever we see a claim being received,

In [83]:

for e in detail:
    if (e['subject']['type'] == 'CUSTOMER' and e['verb'] == 'APPLY'):
        ...
        
        
    elif e['subject']['type'] == 'CUSTOMER' and e['verb'] == 'CLAIM':
        prediction = predict_claim(e['context']['data'])
        
        if prediction > 5_000:
            print("CLAIM RECEIVED")
            print("WE PREDICT A PAYOUT OF:", prediction) 
            print("...EMAILING INTERNAL DEPARTMENTS..")
            
            
    elif e['subject']['type'] == 'ASSESSOR' and e['verb'] == 'ACCEPT' and e['object'] == 'CLAIM': 
        ...
        
        
    elif e['subject']['type'] == 'PAYMENTS' and e['verb'] == 'PAYS':
        ...
        

CLAIM RECEIVED
WE PREDICT A PAYOUT OF: 9000.0
...EMAILING INTERNAL DEPARTMENTS..


## What is a realistic prediction function?

We can use the event archive with sklearn to create a predictive model,

In [96]:
from sklearn.linear_model import LinearRegression

X = [[10_000], [20_000], [30_000]] # claim amounts
y = [8_300, 18_200, 19_500] # actual payouts

model = LinearRegression().fit(X, y)

Above, data is just simulated for convenience.

In [102]:

for e in detail:
    if e['subject']['type'] == 'CUSTOMER' and e['verb'] == 'CLAIM':
        
        x_claim_amount = e['context']['data']
        
        prediction = model.predict([ [x_claim_amount] ]).round(2)
        
        if prediction > 5_000:
            print("CLAIM RECEIVED")
            print("WE PREDICT A PAYOUT OF:", prediction) 
            print("...EMAILING INTERNAL DEPARTMENTS..")
            
            

CLAIM RECEIVED
WE PREDICT A PAYOUT OF: [9733.33]
...EMAILING INTERNAL DEPARTMENTS..


## Exercise

Consider the IoT health data for a single individual in their home,

In [106]:
def health_event(subject, verb, object, context, time):
    return {
        'subject': subject,
        'verb': verb,
        'object': object,
        'context': context,
        'time': time
    }

We measure time from 0 = 12pm noon. We have watches, kettles, ... which track their events,

In [111]:
healthlog = []

healthlog.append(health_event("WATCH", "MEASURES", "HR", 60, 0))
healthlog.append(health_event("WATCH", "MEASURES", "HR", 78, 1))
healthlog.append(health_event("WATCH", "MEASURES", "spO2", 98, 3))
healthlog.append(health_event("WATCH", "MEASURES", "spO2", 99, 7))
healthlog.append(health_event("WATCH", "MEASURES", "HR", 78, 8))
healthlog.append(health_event("KETTLE", "PERCOLATES", "COFFEE", 500, 60))
healthlog.append(health_event("WATCH", "MEASURES", "HR", 100, 200))
healthlog.append(health_event("WATCH", "MEASURES", "HR", 60, 1100))
healthlog.append(health_event("WATCH", "MEASURES", "HR", 78, 1200))


To analyse this log we process the event streaming in a loop,

In [113]:
total = 0
count = 0

for e in healthlog:    
    if e['object'] == 'HR':
        count += 1
        total += e['context']
        
    print(e['verb'])
    
total / count

MEASURES
MEASURES
MEASURES
MEASURES
MEASURES
PERCOLATES
MEASURES
MEASURES
MEASURES


75.66666666666667

### Questions (20 min)

* modify the loop to record a running total for the HR
    * and then report an *average heart rate*
    
* HINTS:
    * count = 0 -- before
    * total = 0 -- before the loop
    * total += e['context'] -- add the data to the total
    * count += 1
    * total/count
    
* EXTRA:
    * detect a HR anomoly -- ie., a HR spike
    * advise the person on their health
        * ie., come up with prediction function which maps HR,O2,Coffee->Quality