From f51aaef5acca2760e11e26dc81b614d78746082e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Melissa=20Weber=20Mendon=C3=A7a?= Date: Sun, 16 Jun 2024 23:02:12 -0300 Subject: [PATCH 1/6] Add PushPullAdapter example with earthquake data MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Melissa Weber Mendonça --- examples/07_end_to_end/earthquake.ipynb | 416 ++++++++++++++++++++++++ 1 file changed, 416 insertions(+) create mode 100644 examples/07_end_to_end/earthquake.ipynb diff --git a/examples/07_end_to_end/earthquake.ipynb b/examples/07_end_to_end/earthquake.ipynb new file mode 100644 index 000000000..afb9e2a00 --- /dev/null +++ b/examples/07_end_to_end/earthquake.ipynb @@ -0,0 +1,416 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "f375a246-44ee-474b-af5b-f56ee3a6fda8", + "metadata": {}, + "source": [ + "# Processing Earthquake Events Data with `obspy` and `csp`" + ] + }, + { + "cell_type": "markdown", + "id": "0d6e2db4-414c-4981-bd74-fcb8cee0f535", + "metadata": {}, + "source": [ + "## Introduction" + ] + }, + { + "cell_type": "markdown", + "id": "671af10e-c238-40e4-b155-a4dc2b9a750f", + "metadata": {}, + "source": [ + "In this example, we will use CSP to process earthquake events and plot them in a map using a [Perspective](https://perspective.finos.org) widget." + ] + }, + { + "cell_type": "markdown", + "id": "eb509f1a-22b8-48d9-939a-d46784815889", + "metadata": {}, + "source": [ + "First, we need to install a few extra libraries:\n", + "- `obspy`, for reading the earthquake stream from USGS;\n", + "- `perspective-python`, to create the visualization of the data; and \n", + "- `cartopy`, for plotting the individual events in the USGS catalog (this is optional)." + ] + }, + { + "cell_type": "markdown", + "id": "9e5ce935-7f5d-4c37-8bd9-cea707e0aefe", + "metadata": {}, + "source": [ + "**Note:** This example has been tested for `jupyterlab==4.2.0` and `perspective-python==2.10.0`." + ] + }, + { + "cell_type": "markdown", + "id": "7d470b6f-eddf-4617-9e12-d5b907ab9f54", + "metadata": {}, + "source": [ + "You can install these dependencies in your Python environment with the following command:" + ] + }, + { + "cell_type": "markdown", + "id": "2a6ebda7-caa6-4a23-abda-fe589171e2c2", + "metadata": { + "scrolled": true + }, + "source": [ + "```\n", + "pip install obspy cartopy jupyterlab==4.2.0 perspective-python==2.10.0\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "ed8fddaf-865c-42d0-b0bd-6d1061f2b72d", + "metadata": {}, + "source": [ + "#### Reading realtime data from USGS as QuakeML" + ] + }, + { + "cell_type": "markdown", + "id": "cea45007-2e4a-439c-8884-29a5e0232730", + "metadata": {}, + "source": [ + "The [USGS website](https://earthquake.usgs.gov/earthquakes) provides several feeds with recent seismic events, and we will read the \"All day\" feed containing all seismic events of the past 24h. Using `obspy`, we can read the feed in the [QUAKEML](https://earthquake.usgs.gov/earthquakes/feed/v1.0/quakeml.php) format as a `catalog`:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3b0da0df-21d9-434b-88ca-0b4bc5a1bae8", + "metadata": {}, + "outputs": [], + "source": [ + "from obspy import read_events\n", + "\n", + "url = \"https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_day.quakeml\"\n", + "catalog = read_events(url, format=\"QUAKEML\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "20ebbd2b-2bc6-445f-a53b-920324e7ad5b", + "metadata": {}, + "outputs": [], + "source": [ + "catalog" + ] + }, + { + "cell_type": "markdown", + "id": "6efee42c-41d9-4f42-b6f9-1eaa1170c653", + "metadata": {}, + "source": [ + "This catalog's events are not ordered, but we can sort them easily:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "aad93bfe-b889-44dc-9f48-a9ba36fdf7b9", + "metadata": {}, + "outputs": [], + "source": [ + "def sorting(event):\n", + " return event.origins[0].time\n", + "catalog.events.sort(key=sorting)" + ] + }, + { + "cell_type": "markdown", + "id": "dc6eeaeb-17ec-4656-9675-112ab5552317", + "metadata": {}, + "source": [ + "Now, we can calso use [cartopy](https://scitools.org.uk/cartopy/docs/latest/) to to plot all events in a world map:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2786ced2-1e5c-4c69-84ab-e4a4c16c7547", + "metadata": {}, + "outputs": [], + "source": [ + "catalog.plot();" + ] + }, + { + "cell_type": "markdown", + "id": "f0d17801-aadd-4c2c-b257-0b30de7e7b7d", + "metadata": {}, + "source": [ + "To see what these event objects are, we can look closer:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "692262d9-f217-4fa0-8140-2d5641f6aa75", + "metadata": {}, + "outputs": [], + "source": [ + "event = catalog[0]\n", + "event" + ] + }, + { + "cell_type": "markdown", + "id": "bd33d70d-da4b-41a6-bf72-01cc0bdcb875", + "metadata": {}, + "source": [ + "This feed lists different kinds of events, noted by `event.event_type`. We can also inspect location, time and magnitude information:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "81b272b6-ab8f-4853-a115-2a6fbc94a335", + "metadata": {}, + "outputs": [], + "source": [ + "event.origins, event.magnitudes" + ] + }, + { + "cell_type": "markdown", + "id": "c94fd3cc-5377-498d-ab2c-5b528a87d93c", + "metadata": {}, + "source": [ + "This feed is updated every minute, meaning we get a historical dataset of all seismic events in the past 24h, but we also get a continually updated feed that adds new events as they are entered into this feed (all events will contain this `creation_time` information)." + ] + }, + { + "cell_type": "markdown", + "id": "390b601f-8d13-496f-a3ca-820b08485009", + "metadata": {}, + "source": [ + "## With CSP" + ] + }, + { + "cell_type": "markdown", + "id": "f3b61d0a-1f3c-4d26-937a-4c14f05f757d", + "metadata": {}, + "source": [ + "We can now use CSP to read the same data in either realtime or simulation modes, by building a `PushPullAdapter`. This adapter combines a realtime or `PushAdapter` and a historical or `PullAdapter` into a single implementation. This makes it easy to switch from realtime to historical mode by the `realtime` argument of `csp.run`." + ] + }, + { + "cell_type": "markdown", + "id": "e54bfc85-ddfe-4f09-a9b7-1212e67f6b62", + "metadata": {}, + "source": [ + "Because we want to visualize the results, we will start by creating a Perspective widget containing a table and a world map. This widget will be updated every time a new event is detected (and the corresponding CSP edge is *ticked*)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e02c1d65-4ad3-4e7f-9bf2-c4c85dffc635", + "metadata": {}, + "outputs": [], + "source": [ + "from perspective import PerspectiveWidget, Plugin\n", + "import ipywidgets as widgets\n", + "from datetime import datetime\n", + "\n", + "# Data schema for Perspective widget\n", + "data = {\n", + " \"longitude\": float,\n", + " \"latitude\": float,\n", + " \"magnitude\": float,\n", + " \"time\": datetime,\n", + " \"creation_time\": datetime,\n", + "}\n", + "\n", + "datagrid = PerspectiveWidget(\n", + " data,\n", + " plugin=Plugin.GRID,\n", + " group_by=[\"time\"],\n", + " columns=[\"longitude\", \"latitude\", \"magnitude\", \"creation_time\"],\n", + " aggregates={\n", + " \"longitude\": \"last\",\n", + " \"latitude\": \"last\",\n", + " \"magnitude\": \"last\",\n", + " \"creation_time\": \"last\", \n", + " },\n", + ")\n", + "worldmap = PerspectiveWidget(\n", + " data,\n", + " plugin=Plugin.MAP_SCATTER,\n", + " columns=[\"longitude\", \"latitude\", \"magnitude\", \"magnitude\", \"time\", \"creation_time\"],\n", + " #tooltip=[\"magnitude\", \"time\"]\n", + ")\n", + "# Create a tab widget with some PerspectiveWidgets inside\n", + "widget = widgets.Tab()\n", + "widget.children = [datagrid, worldmap]\n", + "widget.titles = [\"All events\", \"World map\"]\n", + "widget" + ] + }, + { + "cell_type": "markdown", + "id": "e249cf63-355e-4e65-bd9c-d475cbc2d310", + "metadata": {}, + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0eb12869-eb31-4eba-8079-5fa6ba28f892", + "metadata": {}, + "outputs": [], + "source": [ + "# PushPullAdapter\n", + "import threading\n", + "import csp\n", + "from obspy import read_events\n", + "from datetime import datetime, timedelta\n", + "from csp.impl.pushpulladapter import PushPullInputAdapter\n", + "from csp.impl.wiring import py_pushpull_adapter_def\n", + "from obspy.core.utcdatetime import UTCDateTime\n", + "import time\n", + "\n", + "class EventData(csp.Struct):\n", + " time: UTCDateTime\n", + " longitude: float\n", + " latitude: float\n", + " magnitude: float\n", + " creation_time: datetime\n", + "\n", + "# Create a runtime implementation of the adapter\n", + "class FetchEventDataAdapter(PushPullInputAdapter):\n", + " def __init__(self, interval, url, realtime):\n", + " self._interval = interval\n", + " self._thread = None\n", + " self._running = False\n", + " self._url = url\n", + " self._realtime = realtime\n", + "\n", + " def start(self, starttime, endtime):\n", + " print(\"FetchEventDataAdapter::start\")\n", + " self._running = True\n", + " self._thread = threading.Thread(target=self._run)\n", + " self._thread.start()\n", + " self._starttime = datetime.utcnow()\n", + "\n", + " def stop(self):\n", + " print(\"FetchEventDataAdapter::stop\")\n", + " if self._running:\n", + " self._running = False\n", + " self._thread.join()\n", + "\n", + " def _run(self):\n", + " history = []\n", + " while self._running:\n", + " now = datetime.utcnow()\n", + " print(\"----------------------------------------------\")\n", + " print(f\"{now}: refreshing earthquake feed\")\n", + " print(\"----------------------------------------------\")\n", + " catalog = read_events(self._url, format=\"QUAKEML\")\n", + " def sorting(event):\n", + " return event.origins[0].time\n", + " catalog.events.sort(key=sorting)\n", + " # tick whenever new events appear\n", + " live = self._realtime\n", + " if live:\n", + " for event in catalog:\n", + " if event not in history:\n", + " history.append(event)\n", + " my_event = EventData(\n", + " time=event.origins[0].time,\n", + " longitude=event.origins[0].longitude,\n", + " latitude=event.origins[0].latitude,\n", + " magnitude=event.magnitudes[0].mag,\n", + " creation_time=event.origins[0].creation_info.creation_time.datetime,\n", + " )\n", + " # For a PushPullAdapter, push_tick takes 3 arguments: push_tick(live: bool, time: datetime, value: csp.Struct)\n", + " self.push_tick(live, my_event.time.datetime, my_event)\n", + " interval = self._interval.total_seconds()\n", + " time.sleep(interval)\n", + " else:\n", + " for event in catalog:\n", + " time.sleep(0.2)\n", + " if event not in history:\n", + " history.append(event)\n", + " my_event = EventData(\n", + " time=event.origins[0].time,\n", + " longitude=event.origins[0].longitude,\n", + " latitude=event.origins[0].latitude,\n", + " magnitude=event.magnitudes[0].mag,\n", + " creation_time=event.origins[0].creation_info.creation_time.datetime,\n", + " )\n", + " # For a PushPullAdapter, push_tick takes 3 arguments: push_tick(live: bool, time: datetime, value: csp.Struct)\n", + " self.push_tick(live, now, my_event)\n", + " self.stop()\n", + "\n", + "# Create the graph-time representation of our adapter\n", + "FetchEventData = py_pushpull_adapter_def(\"FetchEventData\", FetchEventDataAdapter, csp.ts[EventData], interval=timedelta, url=str, realtime=bool)\n", + "\n", + "@csp.node\n", + "def update_widget(event: csp.ts[EventData], widget: widgets.widgets.widget_selectioncontainer.Tab):\n", + " if csp.ticked(event):\n", + " # widget.children = [datagrid, worldmap]\n", + " data = {\n", + " \"time\": [event.time],\n", + " \"longitude\": [event.longitude],\n", + " \"latitude\": [event.latitude],\n", + " \"magnitude\": [event.magnitude],\n", + " \"creation_time\": [event.creation_time],\n", + " }\n", + " widget.children[0].update(data)\n", + " widget.children[1].update(data)\n", + "\n", + "@csp.graph\n", + "def earthquake_graph():\n", + " print(\"Start of graph building\")\n", + " url = \"https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_day.quakeml\"\n", + " interval = timedelta(seconds=60)\n", + " earthquakes = FetchEventData(interval, url=url, realtime=csp.is_configured_realtime())\n", + " update_widget(earthquakes, widget=widget)\n", + " csp.add_graph_output(\"Earthquakes\", earthquakes)\n", + " print(\"End of graph building\")\n", + "\n", + "start = datetime.utcnow()\n", + "end = start + timedelta(minutes=10)\n", + "csp.run(earthquake_graph, starttime=start, endtime=end, realtime=False)\n", + "print(\"Done.\")" + ] + }, + { + "cell_type": "markdown", + "id": "c2db6101-9612-48f4-a526-29f0395821f7", + "metadata": {}, + "source": [ + "---" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} From a65c1b61e58a707ba3be78621b1fd1508cac55ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Melissa=20Weber=20Mendon=C3=A7a?= Date: Fri, 5 Jul 2024 12:48:19 -0300 Subject: [PATCH 2/6] Rework PushPull logic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Melissa Weber Mendonça --- examples/07_end_to_end/earthquake.ipynb | 95 +++++++++++-------------- 1 file changed, 43 insertions(+), 52 deletions(-) diff --git a/examples/07_end_to_end/earthquake.ipynb b/examples/07_end_to_end/earthquake.ipynb index afb9e2a00..65d6d19da 100644 --- a/examples/07_end_to_end/earthquake.ipynb +++ b/examples/07_end_to_end/earthquake.ipynb @@ -198,7 +198,7 @@ "id": "f3b61d0a-1f3c-4d26-937a-4c14f05f757d", "metadata": {}, "source": [ - "We can now use CSP to read the same data in either realtime or simulation modes, by building a `PushPullAdapter`. This adapter combines a realtime or `PushAdapter` and a historical or `PullAdapter` into a single implementation. This makes it easy to switch from realtime to historical mode by the `realtime` argument of `csp.run`." + "We can now use CSP to read the same data in either realtime or simulation modes, by building a `PushPullAdapter`. This adapter combines a realtime or `PushAdapter` and a historical or `PullAdapter` into a single implementation. This makes it easy to switch from historical mode to realtime mode at runtime. " ] }, { @@ -206,13 +206,13 @@ "id": "e54bfc85-ddfe-4f09-a9b7-1212e67f6b62", "metadata": {}, "source": [ - "Because we want to visualize the results, we will start by creating a Perspective widget containing a table and a world map. This widget will be updated every time a new event is detected (and the corresponding CSP edge is *ticked*)." + "Because we want to visualize the results, we will start by creating a Perspective widget containing a table and a world map. This widget will be updated every time a new event is detected (and the corresponding CSP edge is *ticked*). We will read historical events from the past 6h, and run the engine for 10 minutes while we wait for new events to be added to the catalog in realtime." ] }, { "cell_type": "code", "execution_count": null, - "id": "e02c1d65-4ad3-4e7f-9bf2-c4c85dffc635", + "id": "f2fbdb81-afcd-4937-8e6a-69059da3229c", "metadata": {}, "outputs": [], "source": [ @@ -225,27 +225,28 @@ " \"longitude\": float,\n", " \"latitude\": float,\n", " \"magnitude\": float,\n", - " \"time\": datetime,\n", - " \"creation_time\": datetime,\n", + " \"raw_time\": datetime,\n", + " \"raw_creation_time\": datetime,\n", + " \"time\": str,\n", + " \"creation_time\": str,\n", "}\n", "\n", "datagrid = PerspectiveWidget(\n", " data,\n", " plugin=Plugin.GRID,\n", - " group_by=[\"time\"],\n", - " columns=[\"longitude\", \"latitude\", \"magnitude\", \"creation_time\"],\n", + " group_by=[\"raw_time\"],\n", + " columns=[\"longitude\", \"latitude\", \"magnitude\", \"raw_creation_time\"],\n", " aggregates={\n", " \"longitude\": \"last\",\n", " \"latitude\": \"last\",\n", " \"magnitude\": \"last\",\n", - " \"creation_time\": \"last\", \n", + " \"raw_creation_time\": \"last\", \n", " },\n", ")\n", "worldmap = PerspectiveWidget(\n", " data,\n", " plugin=Plugin.MAP_SCATTER,\n", " columns=[\"longitude\", \"latitude\", \"magnitude\", \"magnitude\", \"time\", \"creation_time\"],\n", - " #tooltip=[\"magnitude\", \"time\"]\n", ")\n", "# Create a tab widget with some PerspectiveWidgets inside\n", "widget = widgets.Tab()\n", @@ -254,29 +255,30 @@ "widget" ] }, - { - "cell_type": "markdown", - "id": "e249cf63-355e-4e65-bd9c-d475cbc2d310", - "metadata": {}, - "source": [] - }, { "cell_type": "code", "execution_count": null, - "id": "0eb12869-eb31-4eba-8079-5fa6ba28f892", - "metadata": {}, + "id": "4925e8c3-cdcd-4411-820c-90a284c927a1", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, "outputs": [], "source": [ "# PushPullAdapter\n", "import threading\n", "import csp\n", "from obspy import read_events\n", - "from datetime import datetime, timedelta\n", + "from datetime import datetime, timedelta, timezone\n", "from csp.impl.pushpulladapter import PushPullInputAdapter\n", "from csp.impl.wiring import py_pushpull_adapter_def\n", "from obspy.core.utcdatetime import UTCDateTime\n", "import time\n", "\n", + "\n", "class EventData(csp.Struct):\n", " time: UTCDateTime\n", " longitude: float\n", @@ -286,19 +288,19 @@ "\n", "# Create a runtime implementation of the adapter\n", "class FetchEventDataAdapter(PushPullInputAdapter):\n", - " def __init__(self, interval, url, realtime):\n", + " def __init__(self, interval, url):\n", " self._interval = interval\n", " self._thread = None\n", " self._running = False\n", " self._url = url\n", - " self._realtime = realtime\n", "\n", " def start(self, starttime, endtime):\n", " print(\"FetchEventDataAdapter::start\")\n", " self._running = True\n", " self._thread = threading.Thread(target=self._run)\n", " self._thread.start()\n", - " self._starttime = datetime.utcnow()\n", + " self._starttime = starttime\n", + " self._endtime = endtime\n", "\n", " def stop(self):\n", " print(\"FetchEventDataAdapter::stop\")\n", @@ -308,19 +310,19 @@ "\n", " def _run(self):\n", " history = []\n", + " live = False\n", " while self._running:\n", " now = datetime.utcnow()\n", - " print(\"----------------------------------------------\")\n", - " print(f\"{now}: refreshing earthquake feed\")\n", - " print(\"----------------------------------------------\")\n", + " print(\"-------------------------------------------------------------------\")\n", + " print(f\"{now}: Refreshing earthquake feed, live={live}\")\n", + " print(\"-------------------------------------------------------------------\")\n", " catalog = read_events(self._url, format=\"QUAKEML\")\n", " def sorting(event):\n", " return event.origins[0].time\n", " catalog.events.sort(key=sorting)\n", - " # tick whenever new events appear\n", - " live = self._realtime\n", - " if live:\n", - " for event in catalog:\n", + " for event in catalog:\n", + " # tick whenever new (historical or realtime) events appear\n", + " if event.origins[0].time.datetime > self._starttime and event.origins[0].time.datetime < self._endtime:\n", " if event not in history:\n", " history.append(event)\n", " my_event = EventData(\n", @@ -332,37 +334,26 @@ " )\n", " # For a PushPullAdapter, push_tick takes 3 arguments: push_tick(live: bool, time: datetime, value: csp.Struct)\n", " self.push_tick(live, my_event.time.datetime, my_event)\n", - " interval = self._interval.total_seconds()\n", - " time.sleep(interval)\n", - " else:\n", - " for event in catalog:\n", - " time.sleep(0.2)\n", - " if event not in history:\n", - " history.append(event)\n", - " my_event = EventData(\n", - " time=event.origins[0].time,\n", - " longitude=event.origins[0].longitude,\n", - " latitude=event.origins[0].latitude,\n", - " magnitude=event.magnitudes[0].mag,\n", - " creation_time=event.origins[0].creation_info.creation_time.datetime,\n", - " )\n", - " # For a PushPullAdapter, push_tick takes 3 arguments: push_tick(live: bool, time: datetime, value: csp.Struct)\n", - " self.push_tick(live, now, my_event)\n", - " self.stop()\n", + " # After the first full pass at the events catalog, we can turn on live=True and wait for real time events\n", + " live=True\n", + " interval = self._interval.total_seconds()\n", + " time.sleep(interval)\n", "\n", "# Create the graph-time representation of our adapter\n", - "FetchEventData = py_pushpull_adapter_def(\"FetchEventData\", FetchEventDataAdapter, csp.ts[EventData], interval=timedelta, url=str, realtime=bool)\n", + "FetchEventData = py_pushpull_adapter_def(\"FetchEventData\", FetchEventDataAdapter, csp.ts[EventData], interval=timedelta, url=str)\n", "\n", "@csp.node\n", "def update_widget(event: csp.ts[EventData], widget: widgets.widgets.widget_selectioncontainer.Tab):\n", " if csp.ticked(event):\n", " # widget.children = [datagrid, worldmap]\n", " data = {\n", - " \"time\": [event.time],\n", + " \"raw_time\": [event.time],\n", " \"longitude\": [event.longitude],\n", " \"latitude\": [event.latitude],\n", " \"magnitude\": [event.magnitude],\n", - " \"creation_time\": [event.creation_time],\n", + " \"raw_creation_time\": [event.creation_time],\n", + " \"time\": [str(event.time)],\n", + " \"creation_time\": [str(event.creation_time)],\n", " }\n", " widget.children[0].update(data)\n", " widget.children[1].update(data)\n", @@ -372,14 +363,14 @@ " print(\"Start of graph building\")\n", " url = \"https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_day.quakeml\"\n", " interval = timedelta(seconds=60)\n", - " earthquakes = FetchEventData(interval, url=url, realtime=csp.is_configured_realtime())\n", + " earthquakes = FetchEventData(interval, url=url)\n", " update_widget(earthquakes, widget=widget)\n", " csp.add_graph_output(\"Earthquakes\", earthquakes)\n", " print(\"End of graph building\")\n", "\n", - "start = datetime.utcnow()\n", - "end = start + timedelta(minutes=10)\n", - "csp.run(earthquake_graph, starttime=start, endtime=end, realtime=False)\n", + "start = datetime.utcnow() - timedelta(hours=6)\n", + "end = datetime.utcnow() + timedelta(minutes=10)\n", + "csp.run(earthquake_graph, starttime=start, endtime=end, realtime=True)\n", "print(\"Done.\")" ] }, From 6b3c2cae18654df67b0dc3f4edd14786611f89e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Melissa=20Weber=20Mendon=C3=A7a?= Date: Fri, 5 Jul 2024 12:50:04 -0300 Subject: [PATCH 3/6] Add earthquake example to README MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Melissa Weber Mendonça --- conda/dev-environment-unix.yml | 2 +- examples/07_end_to_end/README.md | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/conda/dev-environment-unix.yml b/conda/dev-environment-unix.yml index 65d72f4f5..a5841c54f 100644 --- a/conda/dev-environment-unix.yml +++ b/conda/dev-environment-unix.yml @@ -37,7 +37,7 @@ dependencies: - pytest-asyncio - pytest-cov - pytest-sugar - - python<3.13 + - python<3.12 - python-rapidjson - rapidjson - requests diff --git a/examples/07_end_to_end/README.md b/examples/07_end_to_end/README.md index 3a915d0b7..e03220896 100644 --- a/examples/07_end_to_end/README.md +++ b/examples/07_end_to_end/README.md @@ -3,3 +3,5 @@ - [Wikipedia Updates and Edits](./wikimedia.ipynb) - [Seismic Data with obspy](./seismic_waveform.ipynb) - [MTA Subway Data](./mta.ipynb) +- [Earthquake Data PushPullAdapter](./earthquake.ipynb) + From eb1302cf72708eedeecbdebd725a23894bb1fc98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Melissa=20Weber=20Mendon=C3=A7a?= Date: Fri, 5 Jul 2024 12:52:18 -0300 Subject: [PATCH 4/6] Revert changes to dev-environment-unix.yml MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Melissa Weber Mendonça --- conda/dev-environment-unix.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conda/dev-environment-unix.yml b/conda/dev-environment-unix.yml index a5841c54f..65d72f4f5 100644 --- a/conda/dev-environment-unix.yml +++ b/conda/dev-environment-unix.yml @@ -37,7 +37,7 @@ dependencies: - pytest-asyncio - pytest-cov - pytest-sugar - - python<3.12 + - python<3.13 - python-rapidjson - rapidjson - requests From f6e3c42744e5af44782600c35ec78c2e0d52a458 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Melissa=20Weber=20Mendon=C3=A7a?= Date: Fri, 5 Jul 2024 12:56:58 -0300 Subject: [PATCH 5/6] Lint end-to-end examples README MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Melissa Weber Mendonça --- examples/07_end_to_end/README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/07_end_to_end/README.md b/examples/07_end_to_end/README.md index e03220896..43a0b488c 100644 --- a/examples/07_end_to_end/README.md +++ b/examples/07_end_to_end/README.md @@ -4,4 +4,3 @@ - [Seismic Data with obspy](./seismic_waveform.ipynb) - [MTA Subway Data](./mta.ipynb) - [Earthquake Data PushPullAdapter](./earthquake.ipynb) - From 44dbb7c3370a376c236a58465dfde0f40a458906 Mon Sep 17 00:00:00 2001 From: Adam Glustein Date: Thu, 18 Jul 2024 11:31:44 -0400 Subject: [PATCH 6/6] Fix up example and incorporate PR comments Signed-off-by: Adam Glustein --- examples/07_end_to_end/earthquake.ipynb | 176 ++++++++++++++---------- 1 file changed, 101 insertions(+), 75 deletions(-) diff --git a/examples/07_end_to_end/earthquake.ipynb b/examples/07_end_to_end/earthquake.ipynb index 65d6d19da..29c0fb369 100644 --- a/examples/07_end_to_end/earthquake.ipynb +++ b/examples/07_end_to_end/earthquake.ipynb @@ -102,32 +102,12 @@ "catalog" ] }, - { - "cell_type": "markdown", - "id": "6efee42c-41d9-4f42-b6f9-1eaa1170c653", - "metadata": {}, - "source": [ - "This catalog's events are not ordered, but we can sort them easily:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "aad93bfe-b889-44dc-9f48-a9ba36fdf7b9", - "metadata": {}, - "outputs": [], - "source": [ - "def sorting(event):\n", - " return event.origins[0].time\n", - "catalog.events.sort(key=sorting)" - ] - }, { "cell_type": "markdown", "id": "dc6eeaeb-17ec-4656-9675-112ab5552317", "metadata": {}, "source": [ - "Now, we can calso use [cartopy](https://scitools.org.uk/cartopy/docs/latest/) to to plot all events in a world map:" + "Now, we can also use [cartopy](https://scitools.org.uk/cartopy/docs/latest/) to to plot all events in a world map:" ] }, { @@ -190,7 +170,7 @@ "id": "390b601f-8d13-496f-a3ca-820b08485009", "metadata": {}, "source": [ - "## With CSP" + "## Using CSP to Process Historical and Realtime Events" ] }, { @@ -206,13 +186,21 @@ "id": "e54bfc85-ddfe-4f09-a9b7-1212e67f6b62", "metadata": {}, "source": [ - "Because we want to visualize the results, we will start by creating a Perspective widget containing a table and a world map. This widget will be updated every time a new event is detected (and the corresponding CSP edge is *ticked*). We will read historical events from the past 6h, and run the engine for 10 minutes while we wait for new events to be added to the catalog in realtime." + "Because we want to visualize the results, we will start by creating a Perspective widget containing a table and a world map. This widget will be updated every time a new event is detected (and the corresponding CSP edge is *ticked*). We will read historical events from the past 6h, and then run the engine in realtime mode for 10 minutes while we wait for new events to be added to the catalog" + ] + }, + { + "cell_type": "markdown", + "id": "d0b620e9-42ae-4146-b6d6-36f2b0da8568", + "metadata": {}, + "source": [ + "First, we will create our Perspective widget to display the live updating map." ] }, { "cell_type": "code", "execution_count": null, - "id": "f2fbdb81-afcd-4937-8e6a-69059da3229c", + "id": "7cb746ba-98bc-4361-81d8-f64e99ee2723", "metadata": {}, "outputs": [], "source": [ @@ -225,29 +213,28 @@ " \"longitude\": float,\n", " \"latitude\": float,\n", " \"magnitude\": float,\n", - " \"raw_time\": datetime,\n", - " \"raw_creation_time\": datetime,\n", - " \"time\": str,\n", - " \"creation_time\": str,\n", + " \"time\": datetime,\n", "}\n", "\n", "datagrid = PerspectiveWidget(\n", " data,\n", " plugin=Plugin.GRID,\n", - " group_by=[\"raw_time\"],\n", - " columns=[\"longitude\", \"latitude\", \"magnitude\", \"raw_creation_time\"],\n", + " group_by=[\"time\"],\n", + " columns=[\"time\", \"longitude\", \"latitude\", \"magnitude\"],\n", " aggregates={\n", + " \"time\": \"last\",\n", " \"longitude\": \"last\",\n", " \"latitude\": \"last\",\n", " \"magnitude\": \"last\",\n", - " \"raw_creation_time\": \"last\", \n", " },\n", ")\n", + "\n", "worldmap = PerspectiveWidget(\n", " data,\n", " plugin=Plugin.MAP_SCATTER,\n", - " columns=[\"longitude\", \"latitude\", \"magnitude\", \"magnitude\", \"time\", \"creation_time\"],\n", + " columns=[\"longitude\", \"latitude\", \"magnitude\", \"magnitude\", \"time\", \"time\"],\n", ")\n", + "\n", "# Create a tab widget with some PerspectiveWidgets inside\n", "widget = widgets.Tab()\n", "widget.children = [datagrid, worldmap]\n", @@ -255,6 +242,18 @@ "widget" ] }, + { + "cell_type": "markdown", + "id": "af9f6c87-060f-4e46-b2e3-d5f2ca0f6918", + "metadata": {}, + "source": [ + "After launching the Widget, we see that it is empty. We will pass the Widget to our `csp` application which will update the data. \n", + "\n", + "Next, we create a `PushPullInputAdapter` to bring in the earthquake data to a `csp.graph`. In `csp`, a *push* adapter pushes real-time events to the application and a *pull* adapter pulls in historical data. A push-pull adapter will pull in historical data until its source is exhausted and then transition to real-time mode on a live feed. \n", + "\n", + "The push-pull adapter is especially useful when real-time execution depends on some *state* influenced by prior events. We can playback the history to reach our desired starting state before processing live data. In this example, we will playback the past day of earthquake events to get some data on our map before listening for new " + ] + }, { "cell_type": "code", "execution_count": null, @@ -275,19 +274,17 @@ "from datetime import datetime, timedelta, timezone\n", "from csp.impl.pushpulladapter import PushPullInputAdapter\n", "from csp.impl.wiring import py_pushpull_adapter_def\n", - "from obspy.core.utcdatetime import UTCDateTime\n", "import time\n", "\n", - "\n", + "# We use a csp.Struct to store the earthquake event data\n", "class EventData(csp.Struct):\n", - " time: UTCDateTime\n", + " time: datetime\n", " longitude: float\n", " latitude: float\n", " magnitude: float\n", - " creation_time: datetime\n", - "\n", + " \n", "# Create a runtime implementation of the adapter\n", - "class FetchEventDataAdapter(PushPullInputAdapter):\n", + "class EarthquakeEventAdapter(PushPullInputAdapter):\n", " def __init__(self, interval, url):\n", " self._interval = interval\n", " self._thread = None\n", @@ -295,7 +292,7 @@ " self._url = url\n", "\n", " def start(self, starttime, endtime):\n", - " print(\"FetchEventDataAdapter::start\")\n", + " print(\"EarthquakeEventAdapter::start\")\n", " self._running = True\n", " self._thread = threading.Thread(target=self._run)\n", " self._thread.start()\n", @@ -303,57 +300,78 @@ " self._endtime = endtime\n", "\n", " def stop(self):\n", - " print(\"FetchEventDataAdapter::stop\")\n", + " print(\"EarthquakeEventAdapter::stop\")\n", " if self._running:\n", " self._running = False\n", " self._thread.join()\n", "\n", " def _run(self):\n", - " history = []\n", - " live = False\n", + " # This is the function that defines how data is pushed/pulled into the graph\n", + " # First, we \"pull\" all the historical events in playback mode\n", + " catalog = read_events(self._url, format=\"QUAKEML\")\n", + " catalog.events.sort(key=lambda event: event.origins[0].time)\n", + " for event in catalog:\n", + " event_data = EventData(\n", + " time=event.origins[0].time.datetime,\n", + " longitude=event.origins[0].longitude,\n", + " latitude=event.origins[0].latitude,\n", + " magnitude=event.magnitudes[0].mag,\n", + " )\n", + " # push_tick for a push-pull adapter takes 3 arguments: live (bool), time, value\n", + " # for historical data live=False\n", + " self.push_tick(False, event_data.time, event_data)\n", + "\n", + " print(\"-------------------------------------------------------------------\")\n", + " print(f\"{datetime.utcnow()}: Historical replay complete, pulled {len(catalog)} events\")\n", + " print(\"-------------------------------------------------------------------\")\n", + " self.flag_replay_complete()\n", + "\n", + " last_event_time_pushed = catalog[-1].origins[0].time.datetime\n", + "\n", + " # Now we transition to live execution\n", + " # The while-loop will run every 1-minute in real-time mode\n", " while self._running:\n", - " now = datetime.utcnow()\n", + " catalog = read_events(self._url, format=\"QUAKEML\")\n", + " catalog.events.sort(key=lambda event: event.origins[0].time)\n", + "\n", + " # Find any new events from the last minute\n", + " new_events = []\n", + " for event in reversed(catalog):\n", + " if event.origins[0].time.datetime > last_event_time_pushed:\n", + " new_events.append(event)\n", + " else:\n", + " break\n", + "\n", " print(\"-------------------------------------------------------------------\")\n", - " print(f\"{now}: Refreshing earthquake feed, live={live}\")\n", + " print(f\"{datetime.utcnow()}: Refreshing earthquake live feed with {len(new_events)} events\")\n", " print(\"-------------------------------------------------------------------\")\n", - " catalog = read_events(self._url, format=\"QUAKEML\")\n", - " def sorting(event):\n", - " return event.origins[0].time\n", - " catalog.events.sort(key=sorting)\n", - " for event in catalog:\n", - " # tick whenever new (historical or realtime) events appear\n", - " if event.origins[0].time.datetime > self._starttime and event.origins[0].time.datetime < self._endtime:\n", - " if event not in history:\n", - " history.append(event)\n", - " my_event = EventData(\n", - " time=event.origins[0].time,\n", - " longitude=event.origins[0].longitude,\n", - " latitude=event.origins[0].latitude,\n", - " magnitude=event.magnitudes[0].mag,\n", - " creation_time=event.origins[0].creation_info.creation_time.datetime,\n", - " )\n", - " # For a PushPullAdapter, push_tick takes 3 arguments: push_tick(live: bool, time: datetime, value: csp.Struct)\n", - " self.push_tick(live, my_event.time.datetime, my_event)\n", - " # After the first full pass at the events catalog, we can turn on live=True and wait for real time events\n", - " live=True\n", - " interval = self._interval.total_seconds()\n", - " time.sleep(interval)\n", + " \n", + " for event in reversed(new_events):\n", + " # Push live data\n", + " event_data = EventData(\n", + " time=event.origins[0].time.datetime,\n", + " longitude=event.origins[0].longitude,\n", + " latitude=event.origins[0].latitude,\n", + " magnitude=event.magnitudes[0].mag,\n", + " )\n", + " # for historical data live=True\n", + " last_event_time_pushed = event_data.time\n", + " self.push_tick(True, event_data.time, event_data)\n", "\n", + " time.sleep(self._interval.total_seconds())\n", + " \n", "# Create the graph-time representation of our adapter\n", - "FetchEventData = py_pushpull_adapter_def(\"FetchEventData\", FetchEventDataAdapter, csp.ts[EventData], interval=timedelta, url=str)\n", + "EarthquakeEvent = py_pushpull_adapter_def(\"EarthquakeEventAdapter\", EarthquakeEventAdapter, csp.ts[EventData], interval=timedelta, url=str)\n", "\n", "@csp.node\n", "def update_widget(event: csp.ts[EventData], widget: widgets.widgets.widget_selectioncontainer.Tab):\n", " if csp.ticked(event):\n", " # widget.children = [datagrid, worldmap]\n", " data = {\n", - " \"raw_time\": [event.time],\n", + " \"time\": [event.time],\n", " \"longitude\": [event.longitude],\n", " \"latitude\": [event.latitude],\n", " \"magnitude\": [event.magnitude],\n", - " \"raw_creation_time\": [event.creation_time],\n", - " \"time\": [str(event.time)],\n", - " \"creation_time\": [str(event.creation_time)],\n", " }\n", " widget.children[0].update(data)\n", " widget.children[1].update(data)\n", @@ -363,12 +381,12 @@ " print(\"Start of graph building\")\n", " url = \"https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_day.quakeml\"\n", " interval = timedelta(seconds=60)\n", - " earthquakes = FetchEventData(interval, url=url)\n", + " earthquakes = EarthquakeEvent(interval, url=url)\n", " update_widget(earthquakes, widget=widget)\n", " csp.add_graph_output(\"Earthquakes\", earthquakes)\n", " print(\"End of graph building\")\n", "\n", - "start = datetime.utcnow() - timedelta(hours=6)\n", + "start = datetime.utcnow() - timedelta(hours=24)\n", "end = datetime.utcnow() + timedelta(minutes=10)\n", "csp.run(earthquake_graph, starttime=start, endtime=end, realtime=True)\n", "print(\"Done.\")" @@ -376,10 +394,18 @@ }, { "cell_type": "markdown", - "id": "c2db6101-9612-48f4-a526-29f0395821f7", + "id": "90cbb6a7-c683-44c7-a4e4-b7d7f308a08e", + "metadata": {}, + "source": [ + "## Conclusion" + ] + }, + { + "cell_type": "markdown", + "id": "1fb4b6de-d130-4ed1-ac05-f0aae2dd4ea0", "metadata": {}, "source": [ - "---" + "In this example, we created a push-pull adapter to process earthquake event data. We played back a day's worth of data before seamlessly transitioning to real-time mode and processing new events. Lastly, we displayed the data in a Perspective widget which plotted each earthquake on a world map." ] } ],