# Adapters and Wikipedia data

In this example, we'll explore and visualize a set of data from Wikimedia displaying the most recent changes to any of the pages or files within Wikimedia domains using the CSP and Perspective libraries.

To do that, we need to install a few extra dependencies, listed below:

```
pip install sseclient
pip install perspective-python
```

Next, we will need to write a new CSP [Adapter](https://github.com/Point72/csp/wiki/Adapters) to fetch the data and process it. In this example, we will write both a [Realtime Adapter](https://github.com/Point72/csp/wiki/Write-Realtime-Input-Adapters) and a [Historical Adapter](https://github.com/Point72/csp/wiki/Write-Historical-Input-Adapters).

**Note**: This tutorial has been tested for Python 3.11, using JupyterLab 4.2.0 and perspective-python 3.1.5.

In [None]:
!pip list | grep "jupyterlab" && pip list | grep "perspective-python"

## Writing adapters

There are two main categories of writing input adapters, historical and realtime. When writing historical adapters you will need to implement a "pull" adapter, which pulls data from a historical data source in time order, one event at a time. When writing realtime adapters, you will need to implement a "push" adapter, which will get data from a separate thread that drives external events and "pushes" them into the engine as they occur.

When writing input adapters it is also very important to denote the difference between "graph building time" and "runtime" versions of your adapter. Graph build time components solely describe the adapter. They are meant to do little else than keep track of the type of adapter and its parameters, which will then be used to construct the actual adapter implementation when the engine is constructed from the graph description. It is the runtime implementation that actual runs during the engine execution phase to process data.

## Using CSP to read realtime data

The data we are going to explore is the [MediaWiki Recent Changes feed](https://www.mediawiki.org/wiki/Manual:RCFeed) stream, which emits events related to recent changes across Wikimedia sites. The stream can be accessed through the [EventStreams web service](https://wikitech.wikimedia.org/wiki/Event_Platform/EventStreams_HTTP_Service).

Initially, we'll set the URL and define the data structure we expect to encounter.

In [None]:
from datetime import datetime, timedelta

URL = 'https://stream.wikimedia.org/v2/stream/recentchange'

Next, we'll set up an initial (empty) visualization widget from the Perspective library so we can visualize the streaming data later. (Note that if you are running this example locally on JupyterLab, you may need to restart the Jupyter server after installing the Perspective library in order to visualize the widget.)

In [None]:
from perspective.widget import PerspectiveWidget

# Data schema
data = {"servername": "string", "timestamp": "datetime", "event": "string", "servername_count": "integer"}
#widget = PerspectiveWidget(data, plugin="X Bar", group_by=["servername"], columns=["servername_count"], aggregates={"servername_count": "last"}, theme='Pro Light', binding_mode="client-server")
widget = PerspectiveWidget(data,  plugin="X Bar", group_by=["servername"], columns=["servername_count"], aggregates={"servername_count": "last"}, theme='Pro Light', binding_mode="client-server")
widget

Now, we will use CSP to build our EventSource adapter and update the Perspective widget we defined above with real-time events from the stream.

First, we'll define a `csp.Struct` to hold our data.

In [None]:
import re
import json
import threading

from sseclient import SSEClient as EventSource

import csp

class WikiData(csp.Struct):
    servername: str
    servername_count: int
    timestamp: str
    event: str

Next, we will use a [PushInputAdapter](https://github.com/Point72/csp/wiki/Write-Realtime-Input-Adapters#pushinputadapter---python). This `PushInputAdapter` that you define will be used as the _runtime implementation_, and you also need to define a _graph- time_ representation of the time series edge by using `py_push_adapter_def`. 

When running the cell below, you should see the widget updated in real time with each new event, indexed by the server where the update happened.

In [None]:
from csp.impl.pushadapter import PushInputAdapter
from csp.impl.wiring import py_push_adapter_def

# Define the runtime implementation of our adapter
class FetchWikiDataAdapter(PushInputAdapter):
    def __init__(self, url: str):
        self._thread = None
        self._running = False
        self._url = url

    def start(self, starttime, endtime):
        print("FetchWikiDataAdapter::start")
        self._source = EventSource(self._url)
        self._running = True
        self._thread = threading.Thread(target=self._run)
        self._thread.start()

    def stop(self):
        print("FetchWikiDataAdapter::stop")
        if self._running:
            self._running = False
            self._thread.join()
            self._source.resp.close()

    def _run(self):
        servernames = dict([])
        for item in self._source:
            if not self._running:
                break
            if item.event == 'message':
                try:
                    change = json.loads(item.data)
                except ValueError:
                    pass
                else:
                    # discard canary events
                    # WMF Data Engineering team produces artificial 'canary' events into 
                    # each stream multiple times an hour. The presence of these canary
                    # events in a stream allow us to differentiate between a broken event
                    # stream, and an empty one.
                    # We will also filter bot-generated events
                    if change['meta']['domain'] == 'canary' or re.search('bot', change['user'], re.IGNORECASE):
                        continue
                    timestamp = change['meta']['dt']
                    event = f"{timestamp}:: {change['user']} edited {change['title']}"
                    servername = change['server_name']
                    # Manually "tick" this edge
                    self.push_tick(
                        WikiData(
                            servername=servername,
                            servername_count=0,
                            timestamp=timestamp,
                            event=event
                        )
                    )

# Create the graph-time representation of our adapter
FetchWikiData = py_push_adapter_def("FetchWikiData", FetchWikiDataAdapter, csp.ts[WikiData], url=str)

Now, we define the graph and run the engine.

In [None]:
@csp.node
def update_widget(wiki_event: csp.ts[WikiData], widget: PerspectiveWidget, throttle: timedelta = timedelta(seconds=0.5)):
    # Updates the perspective widget with batched updates for scalability
    with csp.alarms():
        alarm = csp.alarm(bool)

    with csp.state():
        s_buffer = []

    with csp.start():
        csp.schedule_alarm(alarm, throttle, True)
        
    if csp.ticked(wiki_event):
        s_buffer.append({
            "servername": wiki_event.servername,
            "servername_count": wiki_event.servername_count,
            "timestamp": wiki_event.timestamp,
            "event": wiki_event.event,
        })

    if csp.ticked(alarm):
        if len(s_buffer) > 0:
            widget.update(s_buffer)
            s_buffer = []

        csp.schedule_alarm(alarm, throttle, True)

@csp.node
def compute_server_count(wiki_event: csp.ts[WikiData]) -> csp.ts[WikiData]:
    # takes the raw struct in, creates a copy with the count set and ticks it out
    with csp.state():
        s_servernames = dict([])
    if csp.ticked(wiki_event):
        if wiki_event.servername in s_servernames:
            s_servernames[wiki_event.servername] += 1
        else:
            s_servernames[wiki_event.servername] = 1
    wiki_event.servername_count = s_servernames[wiki_event.servername]
    return wiki_event

@csp.graph
def wiki_graph():
    print("Start of graph building")
    URL = 'https://stream.wikimedia.org/v2/stream/recentchange'
    # Create an edge in the graph with the events fetched by the adapter
    result = FetchWikiData(url=URL)
    result = compute_server_count(result)
    # Update Perspective widget with each event/tick
    update_widget(result, widget=widget)
    # We can also print the event object
    csp.print("Event", result.event)
    # Add this edge as a graph output
    csp.add_graph_output("Wiki events", result.event)
    print("End of graph building")

start = datetime.utcnow()
csp.run(wiki_graph, starttime=start, endtime=start+timedelta(seconds=30), realtime=True)
print("Done.")

## Using CSP to read historical data

Now, let's explore writing a _historical_ adapter to read the same data stream. The Wikimedia API offers the following query for historical data:

```
URL = f'https://stream.wikimedia.org/v2/stream/recentchange?since={start_date}'
```

where `{start_date}` is a string representing the start of the period we want to read from. 

The stream history for Wikimedia data is not kept indefinitely. Depending on the particular stream configuration, there will likely be between 7 and 31 days of history available. We will choose a past date to start reading data, and stop after 23h of events.

In [None]:
from datetime import timezone

start_time = datetime.now() - timedelta(days=2)
end_time = start_time.replace(tzinfo=timezone.utc) + timedelta(hours=23)

To read this data, we will use a [Pull input adapter](https://github.com/Point72/csp/wiki/Write-Historical-Input-Adapters#pullinputadapter).

In [None]:
from csp.impl.pulladapter import PullInputAdapter
from csp.impl.wiring import py_pull_adapter_def

# Define the runtime implementation of our adapter
class HistoricalWikiDataAdapter(PullInputAdapter):
    def __init__(self, url: str):
        self._url = url
        self._events = []
        super().__init__()

    def start(self, starttime, endtime):
        print("HistoricalWikiDataAdapter::start")
        self._events = EventSource(self._url)

    def stop(self):
        print("HistoricalWikiDataAdapter::stop")
    
    def next(self):
        """Return tuple of datetime, value of next tick, or None if no more data is available"""
        self._next_event = next(self._events)
        time = self._start_time
        if self._next_event and time < self._end_time:
             while True:
                item = self._next_event
                if item.event == 'message':
                    try:
                        change = json.loads(item.data)            
                    except ValueError:
                        self._next_event = next(self._events)
                        continue
                    else:
                        # discard canary events
                        # WMF Data Engineering team produces artificial 'canary' events into 
                        # each stream multiple times an hour. The presence of these canary
                        # events in a stream allow us to differentiate between a broken event
                        # stream, and an empty one.
                        # We will also filter bot-generated events
                        if change['meta']['domain'] == 'canary' or re.search('bot', change['user'], re.IGNORECASE):
                            self._next_event = next(self._events)
                            continue
                        timestamp = change['meta']['dt']
                        time = datetime.fromisoformat(timestamp.rstrip('Z'))
                        if time <= self._end_time:
                            event = f"{timestamp}:: {change['user']} edited {change['title']}"
                            servername = change['server_name']
                            # Manually "tick" this edge
                            return (
                                time,
                                WikiData(
                                    servername=servername,
                                    timestamp=timestamp,
                                    event=event,
                                    servername_count=0,
                                )
                            )
        return None

# Create the graph-time representation of our adapter
HistoricalWikiData = py_pull_adapter_def(
    "HistoricalWikiData",
    HistoricalWikiDataAdapter,
    csp.ts[WikiData],
    url=str
)

@csp.node
def pretty_print(event: csp.ts[WikiData]) -> csp.ts[str]:
    if csp.ticked(event):
        return f"{event.event}"

@csp.graph
def wiki_graph():
    URL = f"https://stream.wikimedia.org/v2/stream/recentchange?since={start_time}"
    print(f"Start of graph building from {start_time} until {end_time}")
    # Create an edge in the graph with the events fetched by the adapter
    events = HistoricalWikiData(url=URL)
    # We will process the events data through a node, and only print events
    # happening at en.wikipedia.org
    en_wiki = csp.filter(events.servername == "en.wikipedia.org", events)
    # Let's pretty-print the results
    en_events = pretty_print(en_wiki)
    csp.print("Wiki event:", en_events)
    # Add this edge as a graph output
    # This allows you to connect an edge as a "graph output", returned to the
    # caller from csp.run as a dictionary of key: [(datetime, value)]
    csp.add_graph_output("en_wiki", en_wiki)
    print("End of graph building")

csp.run(wiki_graph, starttime=start_time, endtime=end_time)
print("Done.")

The strongest point of CSP is that we can use the historical edge or the realtime edge interchangeably for the same graph, as shown below.

In [None]:
@csp.node
def pretty_print(event: csp.ts[WikiData]) -> csp.ts[str]:
    if csp.ticked(event):
        return f"{event.event}"

@csp.graph
def wiki_graph():
    print("Start of graph building")
    if csp.is_configured_realtime():
        URL = "https://stream.wikimedia.org/v2/stream/recentchange"
        events = FetchWikiData(url=URL)
    else:
        URL = f"https://stream.wikimedia.org/v2/stream/recentchange?since={start_time}"
        events = HistoricalWikiData(url=URL)

    # We will process the events data through a node, and only print events
    # happening at en.wikipedia.org
    en_wiki = csp.filter(events.servername == "en.wikipedia.org", events)
    # Let's pretty-print the results
    en_events = pretty_print(en_wiki)
    csp.print("Wiki event:", en_events)
    # Add this edge as a graph output
    # This allows you to connect an edge as a "graph output", returned to the
    # caller from csp.run as a dictionary of key: [(datetime, value)]
    csp.add_graph_output("en_wiki", en_wiki)
    print("End of graph building")

start_time = datetime.utcnow()
end_time = start_time+timedelta(seconds=30)
csp.run(wiki_graph, starttime=start_time, endtime=end_time, realtime=True)
print("Done.")

---

### References

1. https://github.com/Point72/csp/wiki/5.-Adapters#writing-input-and-output-adapters
2. https://wikitech.wikimedia.org/wiki/Event_Platform/EventStreams
3. https://schema.wikimedia.org/repositories/primary/jsonschema/mediawiki/recentchange/latest.yaml
4. https://github.com/finos/perspective