In [None]:
from kafka import KafkaProducer

In [None]:
def construct_message(event_data, namespaces):
    # use dictionary to change assign namespace value and catch any unknown namespaces (like ns 104)
    try:
        event_data['namespace'] = namespaces[event_data['namespace']]
    except KeyError:
        event_data['namespace'] = 'unknown'

    # assign user type value to either bot or human
    user_type = 'bot' if event_data['bot'] else 'human'

    # define the structure of the json event that will be published to kafka topic
    event = {
        "id": event_data['id'],
        "domain": event_data['meta']['domain'],
        "namespace": event_data['namespace'],
        "title": event_data['title'],
        "timestamp": event_data['meta']['dt'],
        "user_name": event_data['user'],
        "user_type": user_type,
        #"minor": event_data['minor'],
        "old_length": event_data.get('length', {}).get('old'),
        "new_length": event_data.get('length', {}).get('new'),
    }
    return event


def init_namespaces():
    # create a dictionary for the various known namespaces
    # more info https://en.wikipedia.org/wiki/Wikipedia:Namespace#Programming
    return {
        -2: 'Media', 
        -1: 'Special', 
        0: 'main namespace', 
        1: 'Talk', 
        2: 'User',
        3: 'User Talk',
        4: 'Wikipedia',
        5: 'Wikipedia Talk', 
        6: 'File',
        7: 'File Talk',
        8: 'MediaWiki',
        9: 'MediaWiki Talk', 
        10: 'Template',
        11: 'Template Talk', 
        12: 'Help',
        13: 'Help Talk', 
        14: 'Category',
        15: 'Category Talk', 
        100: 'Portal',
        101: 'Portal Talk',
        108: 'Book',
        109: 'Book Talk', 
        118: 'Draft',
        119: 'Draft Talk', 
        446: 'Education Program',
        447: 'Education Program Talk', 
        710: 'TimedText',
        711: 'TimedText Talk', 
        828: 'Module',
        829: 'Module Talk', 
        2300: 'Gadget',
        2301: 'Gadget Talk', 
        2302: 'Gadget definition',
        2303: 'Gadget definition Talk',
   }

In [None]:
import json
from sseclient import SSEClient as EventSource

url = 'https://stream.wikimedia.org/v2/stream/recentchange'
namespaces = init_namespaces()
sent_cnt = 0
producer = KafkaProducer(bootstrap_servers='kafka:9092,')
for event in EventSource(url):
    if event.event == 'message':
        try:
            event_data = json.loads(event.data)
        except ValueError:
            pass
        else:
            if 'id' not in event_data:
                continue
            msg = construct_message(event_data, namespaces)
            future = producer.send('wiki_event_raw', key=msg['user_name'].encode(), value=json.dumps(msg).encode())
            status = future.get(timeout=5)
            sent_cnt += 1
            if sent_cnt % 100 == 0:
                print(f'Sent {sent_cnt} events to Kafka...')
