# End-To-End Python Client Example

This notebook creates a project named `test`, creates a dummy stream, and writes and reads to it.

In [None]:
%load_ext autoreload
%autoreload 2
import asyncio
import beneath
import sys
import secrets
import time

Create a client (assumes you have already authenticated with `beneath auth SECRET` on the command-line)

In [None]:
client = beneath.Client(write_delay_ms=100)
await client.start()

Get user and organization info

In [None]:
me = await client.admin.organizations.find_me()
organization_name = me["name"]
organization_id = me["organizationID"]

Get or create test project

In [None]:
project_name = "test"
project = await client.admin.projects.create(organization_id=organization_id, project_name=project_name)

Create test stream

In [None]:
stream_path = f"{organization_name}/{project_name}/dummies"
schema = """
type Dummmy @schema {
    a: String! @key
    b: Int!
    c: Int
    d: Bytes16
}
"""

In [None]:
stream = await client.create_stream(
    stream_path=stream_path,
    schema=schema,
    update_if_exists=True,
)
instance = stream.primary_instance

Create function for generating random records for the stream

In [None]:
def generate_record():
    return {
        "a": secrets.token_urlsafe(30),
        "b": secrets.randbelow(sys.maxsize),
        "c": None,
        "d": secrets.token_bytes(16),   
    }

Write records to the stream

In [None]:
n = 1000
for _ in range(n):
    record = generate_record()
    await instance.write(record)

await client.force_flush()

Write records forever

In [None]:
delay_seconds = 2
while True:
    record = generate_record()
    await instance.write(record)
    await asyncio.sleep(delay_seconds)

Read all records really easily

In [None]:
df = await beneath.easy_read(stream_path)
df

Read some records with lower-level APIs

In [None]:
cursor = await instance.query_index()
df = await cursor.read_next(to_dataframe=True)
df

Read all the records with lower-level APIs

In [None]:
cursor = await instance.query_index()
df = await cursor.read_all(to_dataframe=True)
df

Peek at the latest writes

In [None]:
cursor = await instance.query_log(peek=True)
df = await cursor.read_next(to_dataframe=True)
df

Write some more and fetch the changes

In [None]:
cursor = await instance.query_log()

n = 25
await instance.write([generate_record() for _ in range(n)])
await client.force_flush()
await asyncio.sleep(2)

df = await cursor.read_next_changes(to_dataframe=True)
df

Helper to write records in the background for next demos

In [None]:
async def write_forever():
    n = 10
    sleep = 1
    while True:
        records = (generate_record() for _ in range(n))
        await instance.write(records)
        print(f"Wrote {n} records")
        await asyncio.sleep(sleep)

Write and subscribe to changes with a callback

In [None]:
async def subscribe_forever_callback():
    async def cb(records, cursor):
        print(f"Received {len(records)} records – Sample: {records[0]['a']}")
    cursor = await instance.query_log()
    await cursor.subscribe_changes_with_callback(callback=cb, poll_at_most_every_ms=100)

task = asyncio.create_task(write_forever())
try:
    await subscribe_forever_callback()
finally:
    task.cancel()

Write and subscribe to changes with async iterator

In [None]:
async def subscribe_forever():
    cursor = await instance.query_log()
    iterator = cursor.subscribe_changes(poll_at_most_every_ms=100)
    async for records in iterator:
        print(f"Received {len(records)} records – Sample: {records[0]['a']}")

task = asyncio.create_task(write_forever())
try:
    await subscribe_forever()
finally:
    task.cancel()

Consume forever

In [None]:
async def consume_forever():
    i = 0
    def count(record):
        nonlocal i
        i += 1
        if i % 10 == 0:
            print(f"Consumed {i} records")
    
    consumer = await client.consumer(stream_path)
    await consumer.subscribe(count, changes_only=True)

task = asyncio.create_task(write_forever())
try:
    await consume_forever()
finally:
    task.cancel()

Checkpointed consumer

In [None]:
subscription_path = f"{organization_name}/{project_name}/test-sub"
consumer = await client.consumer(stream_path, subscription_path=subscription_path, batch_size=10)
await consumer.reset()

i = 0
async def cb(record):
    global i
    i += 1
    print(f"{i}: {consumer.cursor.replay_cursor}")
    await asyncio.sleep(2)

await consumer.replay(cb)

Delta and changes only consumer

In [None]:
def cb(record):
    print(record)

consumer = await client.consumer(stream_path)

await consumer.subscribe(cb, changes_only=True, stop_when_idle=True)
# expecting nothing

await instance.write(generate_record())
await asyncio.sleep(2)
await consumer.subscribe(cb, changes_only=True, stop_when_idle=True)
# expecting one record

await instance.write([generate_record() for _ in range(5)])
await asyncio.sleep(2)
await consumer.subscribe(cb, changes_only=True, stop_when_idle=True)
# expecting five records

### Pipelines

Prepare pipeline parameters

In [None]:
version = 0

generating_service = f"{organization_name}/{project_name}/generator"
generated_stream = f"{organization_name}/{project_name}/generated"
generated_schema = """
type Generated @schema {
    a: Int! @key
    b: String
}
"""

processing_service = f"{organization_name}/{project_name}/processor"
processed_stream = f"{organization_name}/{project_name}/processed"
processed_schema = """
type Processed @schema {
    a: Int! @key
    b: String
    double_a: Int!
}
"""

Run generating pipeline forever (until interrupt)

In [None]:
try:
    stream = await client.find_stream(stream_path=generated_stream)
except Exception as e:
    ex = e

In [None]:
p = beneath.Pipeline(
    action="run",
    strategy="batch",
    version=15,
    service_path=generating_service,
)

async def generate(p):
    i = await p.checkpoints.get("i", default=0)
    p.logger.info("Starting generate at i=%i", i)
    while True:
        yield { "a": i, "b": secrets.token_urlsafe(10) }
        i += 1
        await p.checkpoints.set("i", i)
        if i >= 100:
            yield beneath.PIPELINE_IDLE
            await asyncio.sleep(1)

generated = p.generate(generate)
p.write_stream(generated, stream_path=generated_stream, schema=generated_schema)

await p.run()

Delta process the generated rows to processed

In [None]:
p = beneath.Pipeline(
    action="run",
    strategy="delta",
    version=version,
    service_path=processing_service,
)

async def process(record):
    if record["a"] % 2 == 0:
        return {
            "a": record["a"],
            "b": record["b"],
            "double_a": record["a"] * 2,
        }

generated = p.read_stream(generated_stream)
processed = p.apply(generated, process)
p.write_stream(processed, stream_path=processed_stream, schema=processed_schema)

await p.run()