# Corona-Warn-App Principle Explained

Copyright Jens Dittrich, [Big Data Analytics Group](https://bigdata.uni-saarland.de/), [CC-BY-SA](https://creativecommons.org/licenses/by-sa/4.0/legalcode)

This notebook demonstrates the data handling part of the [Corona-Warn-App](https://github.com/corona-warn-app) (CWA). CWA is a great example of how to perform data analytics **while** preserving the privacy of the users.

This notebook was created for my undergrad lecture Big Data Engineering at Saarland University.

In [1]:
## Finding day of year
from datetime import datetime, date, timedelta
import random as rnd
import hashlib
from getmac import get_mac_address as gma # pip install getmac

def currentDate(daysToSubstract=0):
    return (datetime.today()- timedelta(days=daysToSubstract)).strftime('%Y-%m-%d')

In [2]:
# keeps a set of keys grouped by day, keys older than two weeks are automatically removed
class KeySet:
    def __init__(self):
        # set of IDs encountered and to be considered when checking for infections
        # TODO: needs to discard entries after two weeks
        self.keys = {} 
        self.keysCount = 0

    # add a key to this key set
    def addKey(self,key):
        _currentDate = currentDate()
        if _currentDate not in self.keys:
            self.keys[_currentDate] = set()
        if key not in self.keys[_currentDate]:
            self.keysCount += 1
            self.keys[_currentDate].add(key)
        
        twoWeeksAgo = currentDate(14)
        # NOTE: for performance reason this should rather be run ONCE A DAY rather than for every call to addEncounter()
        # However: in this case we favor functionality (and privacy) over performance, 
        # i.e. we will fix performance when it becomes an issue
        
        # (1.) loop over all keys older than two weeks ago and collect them:
        keysToDelete = set()
        for key in self.keys:
            if key < twoWeeksAgo:
                keysToDelete.add(key)

        # (2.) delete those keys (days):
        for key in keysToDelete:
            del self.keys[key]
    
    # add a set of keys to this keyset
    def addKeySet(self,keySet):
        for key in keySet:
            self.addKey(key)
    
    def print(self):
        print(self.keys)
        
    # returns all keys in a single set, removes the information on which day we observed a particular key
    def asSet(self):
        ret = set()
        for subset in self.keys.values():
            ret.update(subset)
        return ret
    
    def getKeysCount(self):
        return self.keysCount
    
# logs all encounters of a person, this is a specialized KeySet
class Encounters(KeySet):
    def __init__(self):
        super().__init__()
        
    # input: list of positive keys (i.e. keys assciated with users who tested positive)
    # return: list of those keys which are available in self.keys, i.e. people I met "long and close enough"
    def determineMyPositiveKeys(self, positiveKeys):
        myPositiveKeys = set()
        totalNumberOfEncounters = 0
        totalNumberOfPositiveEncounters = 0
        for key in self.keys: 
            # add any key found in the intersection to the return list, i.e. we met these people:            
            _encounters = self.keys[key]
            _intersection = positiveKeys.intersection(_encounters)
            
            myPositiveKeys.update(_intersection)
            totalNumberOfEncounters += len(_encounters)
            totalNumberOfPositiveEncounters += len(_intersection)
            
        return myPositiveKeys, totalNumberOfPositiveEncounters, totalNumberOfEncounters             

# models a user, i.e. the information and functionality available in the
# corona warn app on is/her device
class User:
    def __init__(self, name, notificationServer):
        self.name = name
        self.ownKeys = KeySet()
        self.encounters = Encounters()
        self.keyCounter = 0
        self.notificationServer = notificationServer

        # be carefull when initializing the random engine
        # we need a unique value that is hard to guess, we take the MAC address, serves basically as a seed:
        self.salt = hashlib.md5( (gma() +self.name+ str(rnd.randint(0,420000))).encode() ).digest()
        # currently used key, we will ignore the two types of keys (day- and 15-minute keys) used in the real app for the moment
        self.generateNewKey()

    # generate a new temporary key
    def generateNewKey(self):
        int_val = int.from_bytes(bytes( hashlib.md5( self.salt +bytes(self.keyCounter) ).digest()), "big")
        self.keyCounter += 1

        self.currentKey = int_val % 1000 # for didactic purposes, remove this in a real app
        # track own key, required if we test positive:
        self.ownKeys.addKey(self.currentKey)

    # returns the currently used temporary key
    def getCurrentKey(self):
        return self.currentKey

    # logs the encounter (recording) of a foreign key
    def logEncounter(self,foreignKey):
        self.encounters.addKey(foreignKey)
        
    def print(self):
        print("\nname       : ", self.name)
        print("currentKey : ", self.currentKey)
        print("ownKeys    : ",end="")
        self.ownKeys.print()
        print("encounters : ",end="")
        self.encounters.print()

    # called in case of a positive infection; sends all keys to the notification server
    def sendKeysToNotificationServer(self):
        self.notificationServer.receiveKeysOfAUserTestedPositive(self.ownKeys.asSet())
        
    # download keys belonging to users tested positive from some server:
    # Details, see https://github.com/corona-warn-app/cwa-documentation/blob/master/cwa-risk-assessment.md
    def checkRiskStatus(self):
        positiveKeys = self.notificationServer.getPositiveKeys()
        myPositiveKeys, totalNumberOfPositiveEncounters, totalNumberOfEncounters  = self.encounters.determineMyPositiveKeys(positiveKeys)

        # based on this information determine a risk score for this user:
        # one example for this:
        riskStatus = (0,"low, anyway: you need to follow the AHA-rules")
        if totalNumberOfPositiveEncounters > 2:
            riskStatus = (2,"high, self-isolate NOW! Get tested NOW!")
        elif totalNumberOfPositiveEncounters > 0:
            riskStatus = (1,"medium, you better get tested to be on the safe side")
        
        return riskStatus, totalNumberOfPositiveEncounters

In [3]:
# models the central server of the warn app for users tested positive send their keys
# then those keys are send to ALL other warn app users and can then be compared locally
class TestNotificationServer:
    def __init__(self):
        self.positiveKeys = KeySet()
    
    def receiveKeysOfAUserTestedPositive(self, keySet):
        self.positiveKeys.addKeySet(keySet)

    # called by any user of the app to get the current positive keys
    def getPositiveKeys(self):
        return self.positiveKeys.asSet()

    def print(self):        
        self.positiveKeys.print()
        
testNotificationServer = TestNotificationServer()

In [4]:
# some example calls to user:
u = User("Alice",testNotificationServer)
u.logEncounter(25)
u.logEncounter(42)
u.logEncounter(35)
u.logEncounter(65)
u.print()

threeWeeksAgo = currentDate(21)
u.encounters.keys[threeWeeksAgo] = set()
u.print()

u.logEncounter(23245)
u.print()

print(u.checkRiskStatus())


name       :  Alice
currentKey :  775
ownKeys    : {'2023-05-11': {775}}
encounters : {'2023-05-11': {65, 25, 42, 35}}

name       :  Alice
currentKey :  775
ownKeys    : {'2023-05-11': {775}}
encounters : {'2023-05-11': {65, 25, 42, 35}, '2023-04-20': set()}

name       :  Alice
currentKey :  775
ownKeys    : {'2023-05-11': {775}}
encounters : {'2023-05-11': {65, 35, 42, 23245, 25}}
((0, 'low, anyway: you need to follow the AHA-rules'), 0)


In [5]:
import os
from time import sleep

from ipycanvas import Canvas, hold_canvas
from ipywidgets import Image, HBox

size = 600
canvas = Canvas(width=size, height=size)
bob_sprite = Image.from_file(os.path.join('pics','cwa','bob.png'))
alice_sprite = Image.from_file(os.path.join('pics','cwa','alice.png'))
bob_sprite_red = Image.from_file(os.path.join('pics','cwa','bob_red.png'))
alice_sprite_red = Image.from_file(os.path.join('pics','cwa','alice_red.png'))

canvas_bob_sprite = Canvas(width=40,height=70)
canvas_alice_sprite = Canvas(width=40,height=70)
canvas_bob_sprite_red = Canvas(width=40,height=70)
canvas_alice_sprite_red = Canvas(width=40,height=70)


canvas_alice_sprite_red.fill_style = '#a9cafc'

canvas_bob_sprite.draw_image(bob_sprite,0,0,width=40,height=70)
canvas_alice_sprite.draw_image(alice_sprite,0,0,width=40,height=70)
canvas_bob_sprite_red.draw_image(bob_sprite_red,0,0,width=40,height=70)
canvas_alice_sprite_red.draw_image(alice_sprite_red,0,0,width=40,height=70)

HBox([canvas_bob_sprite,canvas_alice_sprite,canvas_bob_sprite_red,canvas_alice_sprite_red])

HBox(children=(Canvas(height=70, width=40), Canvas(height=70, width=40), Canvas(height=70, width=40), Canvas(h…

In [6]:
# everything below this distance will be considered critical
# of the criticalDistance= 0, this implies that two rectangles have to touch to be counted as critical
criticalDistance = 0

# models the graphical element shown in the animation
# TODO: cleanup rectangle code
class Person:
    def __init__(self, name, testNotificationServer):
        self.x = 0 # assumed to be the center of the rectangle in x-dimension
        self.y = 0 # assumed to be the center of the rectangle in y-dimension
        self.width = 40
        self.height = 70
        self.user = User(name, testNotificationServer)
        self.gender = rnd.randint(0,1)
        self.xDirection = rnd.randint(-3,3) # for continuous movement
        self.yDirection = rnd.randint(-3,3)
    
    def print(self):
        self.user.print()
    
    # sets the posiiton of this Person
    def setPos(self,x,y):
        self.x = min(max(x,self.width/2), size-self.width/2)
        self.y = min(max(y,self.height/2), size-self.height/2)

    # advances the position given the current direction (i.e. xDirection and yDirection)
    def makeAStep(self):
        self.setPos(self.x + self.xDirection, self.y + self.yDirection)
        if self.x <= self.width/2 or self.x >= size-self.width/2:
            self.xDirection *= -1
        if self.y <= self.height/2 or self.y >= size-self.height/2:
            self.yDirection *= -1
   
    # perform random walk (not used anymore)
    def randomWalk(self,xdelta,ydelta):
        self.setPos(self.x + xdelta, self.y + ydelta)
    
    # draws this Person on a Canvas
    # may also visualize the risk status
    def draw(self, canvas, critical=False, drawRiskStatus=False, riskStatus=None):
        sprite = None
        if critical:
            sprite = canvas_bob_sprite_red if self.gender == 0 else canvas_alice_sprite_red
        else:
            sprite = canvas_bob_sprite if self.gender == 0 else canvas_alice_sprite
        
        canvas.draw_image(sprite, self.x-self.width/2, self.y-self.height/2)
        canvas.font = '16px serif'
        canvas.fill_style ="black"
        canvas.fill_text(self.user.name, self.x-4, self.y-7)
        canvas.fill_text(str(self.user.encounters.keysCount), self.x-self.width/2+12, self.y-self.height/2-2)

        if drawRiskStatus:
            if riskStatus == 0:
                canvas.stroke_style = 'blue'
            elif riskStatus == 1:
                canvas.stroke_style = 'orange'
            elif riskStatus == 2:
                canvas.stroke_style = 'red'
            else:
                raise ValueError("riskStatus invalid: "+str(riskStatus))
                
            canvas.stroke_rect(self.x-self.width/2, self.y-self.height/2, width=self.width, height=self.height)
    
    # computes the distance to <otherPerson>, this uses the distance of two rectangles, i.e.
    # we use minimum bouding rectangeells around the png used to visualize a person
    # in other words: for the distance computation we assume that each person is a rectangle, 
    # then we compute the distance of these rectangles
    # notice that a simpler euclidean style point-based distance did not look convincing in the visualization
    def distance(self,otherPerson):
        # note that we need to compute the distance not based ona center but on the area!
        # first solve distance of two intervals in one dimension:
        ax = self.x - self.width/2
        bx = self.x + self.width/2
        cx = otherPerson.x - otherPerson.width/2
        dx = otherPerson.x + otherPerson.width/2
        xdelta = max(max(0,cx-bx),ax-dx)

        ay = self.y - self.height/2
        by = self.y + self.height/2
        cy = otherPerson.y - otherPerson.height/2
        dy = otherPerson.y + otherPerson.height/2
        ydelta = max(max(0,cy-by),ay-dy)
        
        return math.sqrt( math.pow(xdelta,2) + math.pow(ydelta,2))
        
# number of persons to simulate:
noOfPersons = 30

# put them in a set:
persons = set()

# test notifcation server needed to notify users about positive test results
testNotificationServer = TestNotificationServer()

# the actual visual animation and simulation

# intialize and put random persons in a set:
for i in range(noOfPersons):
    persons.add(Person(str(i), testNotificationServer))
canvas.clear()
display(canvas)

import math 
for person in persons:
    # random new position:
    person.setPos(rnd.randint(50,size-50),rnd.randint(50,size-50))

# number of animation steps to perform:
steps = 200

# the actual simulation
with hold_canvas(canvas):
    for i in range(steps):
        canvas.save()
        #assign new random positions:
        for person in persons:
            # random new position:
            #person.setPos(rnd.randint(50,size-50),rnd.randint(50,size-50))
            # random walk:
            rndDistance = rnd.randint(0,30)
            #person.randomWalk(rnd.randint(-rndDistance,rndDistance),rnd.randint(-rndDistance,rndDistance))
            person.makeAStep()
        criticalContacts = set()    
        # determine critical contacts:
        for person in persons:
            for contactPerson in persons: # yeah, hidden n^2 complexity, this could be more efficient...
                if person == contactPerson:
                    continue
                rectDistance = person.distance(contactPerson)
                if rectDistance <= criticalDistance:
                    criticalContacts.add(person)
                    # collect random key and add it to person:
                    person.user.logEncounter(contactPerson.user.getCurrentKey())

        # clear the old animation step
        canvas.clear()
        for person in persons:
            person.draw(canvas, person in criticalContacts, False)
            
        canvas.save()
        if i % 20 == 0:
            # generate new random keys:
            for person in persons:
                person.user.generateNewKey()
        canvas.sleep(42)


Canvas(height=600, width=600)

In [7]:
# inspect all data available in the CWAs of all available users:
# sorted by user name
for person in sorted(persons, key=lambda Person: Person.user.name):
    person.user.print()


name       :  0
currentKey :  638
ownKeys    : {'2023-05-11': {289, 66, 324, 552, 684, 947, 819, 884, 632, 638, 223}}
encounters : {'2023-05-11': {900, 137, 13, 398, 909, 913, 423, 808, 40, 680, 300, 941, 305, 818, 325, 844, 206, 336, 617, 381}}

name       :  1
currentKey :  160
ownKeys    : {'2023-05-11': {160, 226, 195, 740, 557, 846, 537, 340, 729, 281, 543}}
encounters : {'2023-05-11': {515, 261, 391, 393, 288, 805, 807, 428, 45, 55, 578, 201, 720, 592, 597, 90, 859, 479, 102, 493, 625}}

name       :  10
currentKey :  980
ownKeys    : {'2023-05-11': {290, 964, 456, 27, 45, 241, 980, 57, 570, 795, 924}}
encounters : {'2023-05-11': {898, 132, 261, 263, 784, 720, 626, 19, 722, 657, 379, 476, 606}}

name       :  11
currentKey :  970
ownKeys    : {'2023-05-11': {544, 551, 617, 10, 619, 427, 909, 970, 660, 859, 94}}
encounters : {'2023-05-11': {226, 324, 680, 808, 842, 941, 206, 911, 339, 884, 980, 819}}

name       :  12
currentKey :  919
ownKeys    : {'2023-05-11': {419, 102, 135, 

In [8]:
# ok, let's assume one of those persons got infected, made a test, and now wants to inform everyone about this:
# let's pick one person randomly

# depending on the outcome of your simulation you may want to pick a person that had many contacts
# in the following, we do this automatically by simply picking the persson with the biggest list of encounters:
personWithMostContacts = sorted(persons, key=lambda Person: Person.user.encounters.getKeysCount(), reverse=True)[0]
personWithMostContacts.print()


name       :  16
currentKey :  102
ownKeys    : {'2023-05-11': {355, 165, 102, 619, 428, 914, 470, 791, 473, 157, 25}}
encounters : {'2023-05-11': {515, 900, 135, 398, 535, 408, 160, 419, 421, 556, 45, 176, 49, 459, 206, 592, 729, 859, 991, 479, 485, 102, 231, 744, 362, 879, 119}}


In [9]:
# now, that person informs the server about the positive test, i.e. it sends its temporary keys to that server
personWithMostContacts.user.sendKeysToNotificationServer()

In [10]:
# here is what the server sees:
testNotificationServer.print()

{'2023-05-11': {355, 165, 102, 619, 428, 914, 470, 791, 473, 157, 25}}


In [11]:
# every person that uses CWA regularly checks her/his risk status
# this means the keys available on the server are downloaded to each user's CWA
# then locally, on each CWA, the two sets are intersected:
for person in sorted(persons, key=lambda Person: Person.user.name):
    print(person.user.name,person.user.checkRiskStatus())

0 ((0, 'low, anyway: you need to follow the AHA-rules'), 0)
1 ((1, 'medium, you better get tested to be on the safe side'), 2)
10 ((0, 'low, anyway: you need to follow the AHA-rules'), 0)
11 ((0, 'low, anyway: you need to follow the AHA-rules'), 0)
12 ((2, 'high, self-isolate NOW! Get tested NOW!'), 7)
13 ((0, 'low, anyway: you need to follow the AHA-rules'), 0)
14 ((0, 'low, anyway: you need to follow the AHA-rules'), 0)
15 ((0, 'low, anyway: you need to follow the AHA-rules'), 0)
16 ((1, 'medium, you better get tested to be on the safe side'), 1)
17 ((1, 'medium, you better get tested to be on the safe side'), 1)
18 ((0, 'low, anyway: you need to follow the AHA-rules'), 0)
19 ((2, 'high, self-isolate NOW! Get tested NOW!'), 4)
2 ((0, 'low, anyway: you need to follow the AHA-rules'), 0)
20 ((0, 'low, anyway: you need to follow the AHA-rules'), 0)
21 ((1, 'medium, you better get tested to be on the safe side'), 2)
22 ((1, 'medium, you better get tested to be on the safe side'), 2)
23 (

In [12]:
# let's visualize this
# NOTE: if aou ran the entire notebook through "run all", you may not see the risk status viz 
# FIX: simply reexecute this cell
for person in persons:
    person.draw(canvas, False, True, person.user.checkRiskStatus()[0][0])

canvas

Canvas(height=600, width=600)