FlashFlood is an event recorder and streamer built on top of AWS S3, supporting distributed writes and fast distributed bulk reads. It can be used to store and retrieve information about transactions and events in JSON format, which can be quickly filtered with JMESPath. This notebook briefly explores its use.

Let's get started by instantiating an instance of the FlashFlood class:

In [1]:
import boto3
s3 = boto3.resource('s3')

from flashflood import FlashFlood
# You'll want to use any S3 bucket that you have access to.
ff = FlashFlood(s3, "my-flashflood-test-bucket", "my_prefix")

FlashFlood exposes a CRUD API for events:

In [2]:
import datetime
import uuid

event_data = b'my event data'
event_uuid = str(uuid.uuid4())
event_date = datetime.datetime.now()

ff.put(event_data, event_uuid, event_date)
event = ff.get_event(event_uuid)
print("This is the data: " + str(event.data))
print("This is the date: " + str(event.date))
print("This is the event ID: " + event.event_id)

new_event_data = b'i want to put new data'
ff.update_event(event_uuid, new_event_data)
print("This is the updated data: " + str(ff.get_event(event_uuid).data))

ff.delete_event(event_uuid)

Uploaded journal 2020-03-19T184045.516831Z--2020-03-19T184045.516831Z--new--80b94b04-72e0-4903-90e1-56880d8c992f
new journal 2020-03-19T184045.516831Z--2020-03-19T184045.516831Z--new--80b94b04-72e0-4903-90e1-56880d8c992f
This is the data: b'my event data'
This is the date: 2020-03-19 18:40:45.516831
This is the event ID: d3f90a85-06ea-4b39-9961-7156f2156bd6
This is the updated data: b'my event data'


All events belong to a journal. Journals can be created ad-hoc or manually:

In [3]:
ff.journal(minimum_number_of_events=1)
# See also ff.list_journals:
#     def list_journals(self, from_date: datetime=None, to_date: datetime=None) -> typing.Iterator[JournalID]

Found journal to combine 1969-12-31T160000.000000Z--1969-12-31T160000.000000Z--new--8d790487-666b-4299-bc89-71b1a2ceefab
combining journal 1969-12-31T160000.000000Z--1969-12-31T160000.000000Z--new--8d790487-666b-4299-bc89-71b1a2ceefab
Uploaded journal 1969-12-31T160000.000000Z--1969-12-31T160000.000000Z--2020-03-20T014048.825888Z--f382ad2e-3f45-4bfe-80b8-9a667f8f91b3


When events are created, they are assigned a date. You can stream events that have occurred between two dates. We'll create a bunch of dummy events, then try that:

In [4]:
import json

for i in range(40, 50):
    event_data = json.dumps({'mynum': i}).encode()
    event_uuid = str(uuid.uuid4())
    event_date = datetime.datetime.fromtimestamp(10000 * i)
    ff.put(event_data, event_uuid, event_date)

arbitrary_from_date = datetime.datetime.fromtimestamp(10000 * 42)
arbitrary_to_date = datetime.datetime.fromtimestamp(10000 * 48)

for event in ff.replay(from_date=arbitrary_from_date, to_date=arbitrary_to_date):
    print(event.data)

Uploaded journal 1970-01-05T070640.000000Z--1970-01-05T070640.000000Z--new--62a718da-15fb-46d4-bca6-114bc8119d52
new journal 1970-01-05T070640.000000Z--1970-01-05T070640.000000Z--new--62a718da-15fb-46d4-bca6-114bc8119d52
Uploaded journal 1970-01-05T095320.000000Z--1970-01-05T095320.000000Z--new--c30f9720-d25d-417c-9ea1-34fbc1a9f0cc
new journal 1970-01-05T095320.000000Z--1970-01-05T095320.000000Z--new--c30f9720-d25d-417c-9ea1-34fbc1a9f0cc
Uploaded journal 1970-01-05T124000.000000Z--1970-01-05T124000.000000Z--new--9df9d4b4-deb6-4bee-9c8a-00ad846df92a
new journal 1970-01-05T124000.000000Z--1970-01-05T124000.000000Z--new--9df9d4b4-deb6-4bee-9c8a-00ad846df92a
Uploaded journal 1970-01-05T152640.000000Z--1970-01-05T152640.000000Z--new--21db50e4-40f9-4ab3-87fa-f89e3edf592b
new journal 1970-01-05T152640.000000Z--1970-01-05T152640.000000Z--new--21db50e4-40f9-4ab3-87fa-f89e3edf592b
Uploaded journal 1970-01-05T181320.000000Z--1970-01-05T181320.000000Z--new--dc08b579-9eeb-4bf7-95bd-adc3b2062cd0
new

Since the event data is JSON, we can use JMESPath to filter it:

In [5]:
import jmespath

events = []
for event in ff.replay(from_date=arbitrary_from_date, to_date=arbitrary_to_date):
    events.append(json.loads(event.data))

expression = jmespath.compile('events[].mynum')
expression.search({'events': events})

replaying from journal 1970-01-05T124000.000000Z--1970-01-05T124000.000000Z--new--9df9d4b4-deb6-4bee-9c8a-00ad846df92a
replaying from journal 1970-01-05T124000.000000Z--1970-01-05T124000.000000Z--new--ee8ef69e-f460-4ba6-b0db-6e4b3dc2ace5
replaying from journal 1970-01-05T152640.000000Z--1970-01-05T152640.000000Z--new--21db50e4-40f9-4ab3-87fa-f89e3edf592b
replaying from journal 1970-01-05T152640.000000Z--1970-01-05T152640.000000Z--new--e15c5bf6-10e5-4eb3-bca3-48a9aef08159
replaying from journal 1970-01-05T181320.000000Z--1970-01-05T181320.000000Z--new--6462df35-648d-41a4-bbc1-793f76f384e9
replaying from journal 1970-01-05T181320.000000Z--1970-01-05T181320.000000Z--new--dc08b579-9eeb-4bf7-95bd-adc3b2062cd0
replaying from journal 1970-01-05T210000.000000Z--1970-01-05T210000.000000Z--new--216c64cf-53b4-433e-8cee-981847e8ccc3
replaying from journal 1970-01-05T210000.000000Z--1970-01-05T210000.000000Z--new--d9419e46-582e-4932-9a91-ac1764940573
replaying from journal 1970-01-05T234640.000000Z

[43, 43, 44, 44, 45, 45, 46, 46, 47, 47, 48, 48]