# Gateway Client Example
Here is a notebook demonstrating the fundamentals of the `GatewayClient` object.

This class is a convenience wrapper around the various `HTTP` requests used to interact with a running `GatewayServer` (with `REST` endpoints enabled). It relies on the [httpx](https://www.python-httpx.org/) and [aiohttp](https://docs.aiohttp.org/en/stable/index.html) libraries, and derives most of its functionality from the [openapi](https://www.openapis.org/) specification provided on the running `GatewayServer` (usually available at `/openapi.json`).


## Client Configuration
A `GatewayClient` is configured via a `GatewayClientConfig`, a minimal [pydantic](https://docs.pydantic.dev/) model to specify details such as protocol (`http`/`https`), host, port, etc. By default, we should provide `host` and `port`. 

We can also specify in the config `return_raw_json`, which specifies whether we would like to return the raw json response, or a `ResponseWrapper` object for `REST` requests. The `ResponseWrapper` object can provide both the raw json message, as well as a pandas dataframe. The `ResponseWrapper` contains additional type information which will create column names and utilize the correct data type for the constructed pandas dataframe.


## Client Methods

A client as a small number of general-purpose methods. In alphabetical order:

- `controls`: managment controls for monitoring/configurating the running server
- `last`: get the last ticked data value on a channel
- `lookup`: lookup a piece of data by `id`
- `next`: get the next ticked data value on a channel
- `send`: send some data onto a channel
- `state`: get the value of a given channel's state accumulator


Additionally, a client has some streaming methods available when websockets are configured:

- `stream`: call a callback when a channel ticks
- `subscribe`: subscribe to data on a channel
- `unsubscribe`: unsubscribe to data on a channel

Let's explore some of the functionality of the basic demo server. To start, we should run the demo server in another process (with the omnibus config):

```bash
csp-gateway-start --config-dir=csp_gateway/server/demo +config=omnibus
```

By default, this will run the server on `localhost:8000`.

## Imports
All the important objects are hoisted to the top level `csp_gateway`. Lets import and setup our client.

In [2]:
from csp_gateway import GatewayClient, GatewayClientConfig

In [3]:
config = GatewayClientConfig(host="localhost", port=8000, return_raw_json=False)
client = GatewayClient(config)

The first time we use our client, it will take a little longer than usual as it tries to interrogate the running server's `openapi` specification for available methods. Once done, our request will go through, and subsequent requests will leverage this specification. Let's start with some simple status checks. If we're curious about available endpoints, we can navigate to [http://localhost:8000/redoc](http://localhost:8000/redoc) (or generally `http://<hostname>:<port>/redoc` if we're running on a different host)

In [4]:
# heartbeat check
client.controls("heartbeat").as_json()

[{'id': '7180742829515',
  'timestamp': '2025-03-14T06:00:12.399921',
  'name': 'heartbeat',
  'status': 'ok',
  'data': {},
  'data_str': ''}]

In [None]:
# openapi spec
from IPython.display import JSON

JSON(client.openapi_spec)

<IPython.core.display.JSON object>

In [6]:
# machine stats
client.controls("stats").as_json()

[{'id': '7180742829516',
  'timestamp': '2025-03-14T06:00:12.562785',
  'name': 'stats',
  'status': 'ok',
  'data': {'cpu': 0.0,
   'memory': 63.7,
   'memory-total': 36.28,
   'pid': 1750281,
   'active_threads': 4,
   'max_threads': 'unlimited',
   'now': '2025-03-14T06:00:12.564110',
   'csp-now': '2025-03-14T06:00:12.563004',
   'host': 'devqtccrt06',
   'user': 'nk12433'},
  'data_str': '{"cpu":0.0,"memory":63.7,"memory-total":36.28,"pid":1750281,"active_threads":4,"max_threads":"unlimited","now":"2025-03-14T06:00:12.564110","csp-now":"2025-03-14T06:00:12.563004","host":"devqtccrt06","user":"nk12433"}'}]

## Last, State, Lookup, Send
Let's look at what channels we have available for `last`:

In [7]:
client.last().as_json()

['controls', 'basket', 'never_ticks', 'example', 'example_list', 'str_basket']

In [8]:
client.last("example").as_json()

[{'id': '7182204802295',
  'timestamp': '2025-03-14T06:00:12.274734',
  'x': 31,
  'y': '303030',
  'internal_csp_struct': {'z': 12},
  'data': [],
  'mapping': {},
  'dt': '2025-03-14T01:59:42.200557',
  'd': '2025-03-14'}]

In [9]:
client.last("basket").as_json()

[{'id': '7182204802295',
  'timestamp': '2025-03-14T06:00:12.274734',
  'x': 31,
  'y': '303030',
  'internal_csp_struct': {'z': 12},
  'data': [],
  'mapping': {},
  'dt': '2025-03-14T01:59:42.200557',
  'd': '2025-03-14'},
 {'id': '7182204802292',
  'timestamp': '2025-03-14T06:00:12.269265',
  'x': 30,
  'y': '3030',
  'internal_csp_struct': {'z': 12},
  'data': [0.918037522903988,
   0.3399226261577962,
   0.7909028689829906,
   0.7039079146913827,
   0.8210556057950636,
   0.07325613777603324,
   0.36257073731882594,
   0.3946540193539001,
   0.5774311632755368,
   0.20316200054623546],
  'mapping': {'30': 30},
  'dt': '2025-03-14T01:59:42.200557',
  'd': '2025-03-14'},
 {'id': '7182204802293',
  'timestamp': '2025-03-14T06:00:12.274455',
  'x': 30,
  'y': '303030',
  'internal_csp_struct': {'z': 12},
  'data': [0.4562437422860882,
   0.9004249706550138,
   0.6477939153762935,
   0.3803206666410205,
   0.775542818358652,
   0.021906915016978834,
   0.8893453580688346,
   0.82770480

In [10]:
client.last("basket").as_pandas_df()

Unnamed: 0,id,timestamp,x,y,data,dt,d,internal_csp_struct.z,mapping.30,mapping
0,7182204802295,2025-03-14T06:00:12.274734,31,303030,[],2025-03-14T01:59:42.200557,2025-03-14,12,,
1,7182204802292,2025-03-14T06:00:12.269265,30,3030,"[0.918037522903988, 0.3399226261577962, 0.7909...",2025-03-14T01:59:42.200557,2025-03-14,12,30.0,
2,7182204802293,2025-03-14T06:00:12.274455,30,303030,"[0.4562437422860882, 0.9004249706550138, 0.647...",2025-03-14T01:59:42.200557,2025-03-14,12,30.0,


State channels are used to perform state accumulation over a number of ticks. The example server doesn't do anything too interesting, but we can access these channels nevertheless.

In [11]:
client.state().as_json()

['example']

In [12]:
client.state("example").as_pandas_df().tail()

# We note that there are a large number of columns in the above dataframe.
# This is because `mapping` is a dict with different keys for eery row.
# To accomodate all of them, the returned pandas dataframe has a column for any key present in the `mapping` attribute of any `ExampleData` Struct

Unnamed: 0,id,timestamp,x,y,data,dt,d,internal_csp_struct.z,mapping.1,mapping.2,...,mapping.22,mapping.23,mapping.24,mapping.25,mapping.26,mapping.27,mapping.28,mapping.29,mapping.30,mapping
40,7182204802285,2025-03-14T06:00:10.264122,28,282828,"[0.5303914536525036, 0.0010628925807091294, 0....",2025-03-14T01:59:42.200557,2025-03-14,12,,,...,,,,,,,28.0,,,
41,7182204802286,2025-03-14T06:00:10.264259,29,282828,[],2025-03-14T01:59:42.200557,2025-03-14,12,,,...,,,,,,,,,,
42,7182204802290,2025-03-14T06:00:11.264179,29,292929,"[0.7784380063762434, 0.7909368694127872, 0.647...",2025-03-14T01:59:42.200557,2025-03-14,12,,,...,,,,,,,,29.0,,
43,7182204802294,2025-03-14T06:00:12.274565,30,303030,"[0.847968392664959, 0.34935942519328944, 0.265...",2025-03-14T01:59:42.200557,2025-03-14,12,,,...,,,,,,,,,30.0,
44,7182204802295,2025-03-14T06:00:12.274734,31,303030,[],2025-03-14T01:59:42.200557,2025-03-14,12,,,...,,,,,,,,,,


We can lookup individual data points using `lookup`. This is a bit of a silly example when we're just looking at a single channel, but can be valuable when you have lots of interconnected channels.

In [13]:
# get the last tick, then lookup by id
last = client.last("example").as_json()[0]
last_id = last["id"]
client.lookup("example", last_id).as_json()

[{'id': '7182204802295',
  'timestamp': '2025-03-14T06:00:12.274734',
  'x': 31,
  'y': '303030',
  'internal_csp_struct': {'z': 12},
  'data': [],
  'mapping': {},
  'dt': '2025-03-14T01:59:42.200557',
  'd': '2025-03-14'}]

Finally, we can send our own data into the API using `send`.

In [14]:
client.send("example", {"x": 12, "y": "HEY!", "internal_csp_struct": {"z": 13}})

client.state("example").as_pandas_df().tail()

Unnamed: 0,id,timestamp,x,y,data,dt,d,internal_csp_struct.z,mapping.1,mapping.2,...,mapping.22,mapping.23,mapping.24,mapping.25,mapping.26,mapping.27,mapping.28,mapping.29,mapping.30,mapping
42,7182204802290,2025-03-14T06:00:11.264179,29,292929,"[0.7784380063762434, 0.7909368694127872, 0.647...",2025-03-14T01:59:42.200557,2025-03-14,12,,,...,,,,,,,,29.0,,
43,7182204802294,2025-03-14T06:00:12.274565,30,303030,"[0.847968392664959, 0.34935942519328944, 0.265...",2025-03-14T01:59:42.200557,2025-03-14,12,,,...,,,,,,,,,30.0,
44,7182204802295,2025-03-14T06:00:12.274734,31,303030,[],2025-03-14T01:59:42.200557,2025-03-14,12,,,...,,,,,,,,,,
45,7182204802296,2025-03-14T06:00:13.034311,12,HEY!,[],2025-03-14T01:59:42.200557,2025-03-14,13,,,...,,,,,,,,,,
46,7182204802297,2025-03-14T06:00:13.035209,13,HEY!,[],2025-03-14T01:59:42.200557,2025-03-14,12,,,...,,,,,,,,,,


The REST API uses pydantic validation for `send` requests. Since `ExampleData` has a `__validators__` attribute defined, the pydantic version of the GatewayStruct has those functions ran for validation before propagating the sent value through the graph. `ExampleData` has validation performed that asserts the value of `x` is not negative. When we try to pass a negative value in, we get an error on the send, but the graph does not crash. The details of the error are provided in the response.

In [15]:
client.send("example", {"x": -12, "y": "HEY!"})

ServerUnprocessableException: [{'type': 'list_type', 'loc': ['body', 'list[function-wrap[create_instance()]]'], 'msg': 'Input should be a valid list', 'input': {'x': -12, 'y': 'HEY!'}}, {'type': 'value_error', 'loc': ['body', 'function-wrap[create_instance()]', 'x'], 'msg': 'Value error, value must be non-negative.', 'input': -12, 'ctx': {'error': {}}}]

## Next
The running `GatewayServer` is a synchronous system, and we're interacting it via asynchronous `REST` requests. However, we can still perform actions like "wait for the next tick". This can be dangerous and lead to race conditions, but it can still be useful in certain circumstances.

In [16]:
client.next("example").as_json()

[{'id': '7182204802346',
  'timestamp': '2025-03-14T06:00:23.264154',
  'x': 41,
  'y': '414141',
  'internal_csp_struct': {'z': 12},
  'data': [0.8434439910392343,
   0.16425120769202894,
   0.4614001200974842,
   0.5296539732016515,
   0.7793370635405895,
   0.8968921920130971,
   0.16515287895221997,
   0.9002092496268692,
   0.15540237095341358,
   0.5879171201018687],
  'mapping': {'41': 41},
  'dt': '2025-03-14T01:59:42.200557',
  'd': '2025-03-14'}]

Note that this call will **block** until the next value ticks.

## Streaming
If our webserver is configured with websockets, we can also stream data out of channels. A simple example that prints out channel data is provided.

In [17]:
client.stream(channels=["example"], callback=print)

{'channel': 'example', 'data': [{'id': '7182204802364', 'timestamp': '2025-03-14T06:00:27.264171', 'x': 45, 'y': '454545', 'internal_csp_struct': {'z': 12}, 'data': [0.6512409372233339, 0.07206425449741505, 0.5459898400764014, 0.9069291031159893, 0.5632205038815109, 0.6358680416497396, 0.4031860150585428, 0.18951576663682146, 0.13029048524663422, 0.6537721971743142], 'mapping': {'45': 45}, 'dt': '2025-03-14T01:59:42.200557', 'd': '2025-03-14'}]}
{'channel': 'example', 'data': [{'id': '7182204802368', 'timestamp': '2025-03-14T06:00:28.264253', 'x': 46, 'y': '464646', 'internal_csp_struct': {'z': 12}, 'data': [0.13347377162286045, 0.14766463082192083, 0.890098581009976, 0.5762928909003212, 0.9784598838743355, 0.9093206966012796, 0.4421107198172203, 0.6351323300210804, 0.8580132723903817, 0.8155427123499257], 'mapping': {'46': 46}, 'dt': '2025-03-14T01:59:42.200557', 'd': '2025-03-14'}]}
{'channel': 'example', 'data': [{'id': '7182204802369', 'timestamp': '2025-03-14T06:00:28.264459', 'x'

KeyboardInterrupt: 

## Asynchronous client
All of the above can also be used in an `async` fashion. Note that by default, the `GatewayClient` class is an alias for the `SyncGatewayClient` class. The only differences are:

- all methods are `async` instead of synchronous
- `stream` is an infinite generator, rather than callback-based (so takes no `callback` argument)

In [18]:
from csp_gateway import AsyncGatewayClient

In [19]:
async_client = AsyncGatewayClient(config)

In [20]:
# We can subscribe to multiple channels, including dict-baskets
async def print_all():
    async for datum in async_client.stream(channels=["example", "example_list", ("basket", "A")]):
        print(datum)

In [21]:
await print_all()

{'channel': 'example_list', 'data': [{'id': '7182204802436', 'timestamp': '2025-03-14T06:00:43.264033', 'x': 61, 'y': '616161', 'internal_csp_struct': {'z': 12}, 'data': [0.2012890963221904, 0.702376333618334, 0.361342696585941, 0.7383356341385855, 0.5439924770669536, 0.5348101969493174, 0.7261873522256002, 0.8653648126326041, 0.22990047589634643, 0.6788906270823372], 'mapping': {'61': 61}, 'dt': '2025-03-14T01:59:42.200557', 'd': '2025-03-14'}]}
{'channel': 'example', 'data': [{'id': '7182204802436', 'timestamp': '2025-03-14T06:00:43.264033', 'x': 61, 'y': '616161', 'internal_csp_struct': {'z': 12}, 'data': [0.2012890963221904, 0.702376333618334, 0.361342696585941, 0.7383356341385855, 0.5439924770669536, 0.5348101969493174, 0.7261873522256002, 0.8653648126326041, 0.22990047589634643, 0.6788906270823372], 'mapping': {'61': 61}, 'dt': '2025-03-14T01:59:42.200557', 'd': '2025-03-14'}]}
{'channel': 'basket', 'key': 'A', 'data': [{'id': '7182204802433', 'timestamp': '2025-03-14T06:00:43.26

CancelledError: 

In [22]:
# We can susbscribe to every tick from a dict basket by just subscribing to the full basket
async def print_all_basket():
    async for datum in async_client.stream(channels=["str_basket"]):
        print(datum)

In [23]:
await print_all_basket()

{'channel': 'str_basket', 'key': 'a', 'data': [{'id': '7182204802482', 'timestamp': '2025-03-14T06:00:54.263974', 'x': 72, 'y': '72', 'internal_csp_struct': {'z': 12}, 'data': [0.0024343396429761244, 0.25557971088785525, 0.8830889612103591, 0.5248708862100656, 0.13301440206288329, 0.0613374033071844, 0.9533501115434322, 0.17519376229750483, 0.20826938545917772, 0.3154244464812893], 'mapping': {'72': 72}, 'dt': '2025-03-14T01:59:42.200557', 'd': '2025-03-14'}]}
{'channel': 'str_basket', 'key': 'b', 'data': [{'id': '7182204802483', 'timestamp': '2025-03-14T06:00:54.264023', 'x': 72, 'y': '7272', 'internal_csp_struct': {'z': 12}, 'data': [0.8477714975000878, 0.8761447163517748, 0.4132611178713649, 0.035190170106964236, 0.6786254800475836, 0.9475667654798842, 0.5866246994077503, 0.26298570997111104, 0.31639906057837064, 0.3663114982586084], 'mapping': {'72': 72}, 'dt': '2025-03-14T01:59:42.200557', 'd': '2025-03-14'}]}
{'channel': 'str_basket', 'key': 'c', 'data': [{'id': '7182204802484', 

CancelledError: 