# Setup

These common variables, functions, and classes are used through the remainder of the example.

Note the two functions which simulate an unreliable connection by, respectively, raising an exception 5% of the time instead of yielding:

- A top level array element
- A line of text

In [None]:
import ijson
import io
import random

request = {
    "query": "ticks",
    "source": "cme",
    "product": "NGX2",
    "date": "2022-09-22",
    "start_time": "22:00:00",
    "end_time": "22:15:00",
    "time_zone": "UTC",
    "messages": ["TRD"],
}


class UnreliableStreamError(Exception):
    def __init__(self):
        super().__init__("Random failure")


def unreliable_stream_json(conn, request):
    r = random.SystemRandom()
    for obj in ijson.items(conn.stream(request), "item"):
        num = r.random()
        if num > 0.95:
            raise UnreliableStreamError()
        yield obj


def unreliable_stream_lines(conn, request):
    r = random.SystemRandom()
    raw = conn.stream(request)
    buffered = io.BufferedReader(raw)
    stream = io.TextIOWrapper(buffered)
    while True:
        num = r.random()
        if num > 0.95:
            raise UnreliableStreamError()
        line = stream.readline()
        if line == "":
            break
        yield line

# Reliable

This example does not simulate an unreliable connection instead establishing ground truth (i.e. subsequent examples should generate identical output despite the simulated unreliable connection).

In [None]:
import hpq
import ijson
import IPython.display
import tabulate

rows = list(
    map(hpq.format, ijson.items(hpq.create_web_socket_client().stream(request), "item"))
)
total = len(rows)
IPython.display.display(
    IPython.display.HTML(f"<p>Transferred {total} unique updates</p>")
)
table = tabulate.tabulate(rows, tablefmt="html", headers="keys")
IPython.display.display(IPython.display.HTML(table))

# Streaming JSON

Uses a streaming JSON parser to form a reliable layer on top of a (simulated) unreliable connection. Each successive top level array element is not only yielded into the result set but also used to construct an `hpq.Position` object which is used to resume the request on failure.

In [None]:
import hpq
import IPython.display
import tabulate

pos = None
rows = []
transferred = 0
restarts = 0
while True:
    try:
        local_request = request
        if pos is not None:
            local_request = pos.request(local_request)
        for obj in unreliable_stream_json(
            hpq.create_web_socket_client(), local_request
        ):
            transferred += 1
            if pos is not None and not pos.predicate(obj):
                continue
            pos = None
            rows.append(obj)
    except UnreliableStreamError:
        if len(rows) != 0:
            pos = hpq.Position(rows[len(rows) - 1])
        restarts += 1
        continue
    break
IPython.display.display(
    IPython.display.HTML(f"<p>Query restarted {restarts} times</p>")
)
IPython.display.display(
    IPython.display.HTML(f"<p>Transferred {transferred} updates total</p>")
)
total = len(rows)
IPython.display.display(
    IPython.display.HTML(f"<p>Transferred {total} unique updates</p>")
)
table = tabulate.tabulate(list(map(hpq.format, rows)), tablefmt="html", headers="keys")
IPython.display.display(IPython.display.HTML(table))

# JSON Lines

Some environments may not have easy access to a streaming JSON parser. In this case the HPQ API's limited support for JSON Lines may be used.

The following cell makes the same request as the preceding cells except it requests `application/x-ndjson` (i.e. JSON Lines) format. This causes each JSON object to be sent on its own line rather than as an element of a top level array. Parsing proceeds line-by-line with each lined parsed, yielded, and used to construct an `hpq.Position` object which may be used to resume the response on failure.

In [None]:
import copy
import hpq
import json
import IPython.display
import tabulate

request_with_format = copy.copy(request)
request_with_format["format"] = "application/x-ndjson"
pos = None
rows = []
transferred = 0
restarts = 0
while True:
    try:
        local_request = request_with_format
        if pos is not None:
            local_request = pos.request(request_with_format)
        for line in unreliable_stream_lines(
            hpq.create_web_socket_client(), local_request
        ):
            transferred += 1
            obj = json.loads(line)
            if pos is not None and not pos.predicate(obj):
                continue
            pos = None
            rows.append(obj)
    except UnreliableStreamError:
        if len(rows) != 0:
            pos = hpq.Position(rows[len(rows) - 1])
        restarts += 1
        continue
    break
IPython.display.display(
    IPython.display.HTML(f"<p>Query restarted {restarts} times</p>")
)
IPython.display.display(
    IPython.display.HTML(f"<p>Transferred {transferred} updates total</p>")
)
total = len(rows)
IPython.display.display(
    IPython.display.HTML(f"<p>Transferred {total} unique updates</p>")
)
table = tabulate.tabulate(list(map(hpq.format, rows)), tablefmt="html", headers="keys")
IPython.display.display(IPython.display.HTML(table))