In [1]:
import requests
import time

In [2]:
import asyncio
import nest_asyncio

In [7]:
# Define base URL for API
BASE_URL = "http://localhost:8080/mktdata/session"

In [8]:
# Decode response from the API which is either JSON or a string
def decodeResponse(response : any) -> str:
    try:
        # Try to parse JSON, if it fails, handle the string response
        resp = response.json()
    except ValueError:  # If JSON decoding fails
        # Handle the case where response is a simple string
        resp = response.text
    return resp

def getSessionId(response):
    return decodeResponse(response)

In [9]:
# Create new session
def createSession() -> str:
    response = requests.post(BASE_URL)
    if response.status_code == 200:
        session_id = getSessionId(response)
        print(f"Session created: {session_id}")
    else:
        print(f"Failed to create session: {response.text}")
        session_id = None
    return session_id

In [10]:
# Rewind session
def rewindSession(session_id : str) -> None:
    if session_id:
        rewind_url = f"{BASE_URL}/rewind/{session_id}"
        response = requests.put(rewind_url)
        print(f"Rewind session response: {response.text}")

In [11]:
# Set replay speed
def setSpeed(session_id : str, speed : float) -> None:
    if session_id:
        speed_url = f"{BASE_URL}/speed/{session_id}/{speed}"
        response = requests.put(speed_url)
        print(f"Set replay speed response: {response.text}")


In [12]:
# Start session
def startSession(session_id : str) -> None:
    if session_id:
        start_url = f"{BASE_URL}/start/{session_id}"
        response = requests.put(start_url)
        print(f"Start session response: {response.text}")

In [None]:
sessions = []

# Create sessions
for sesssion_count in range(0, 100):
    sessionId = createSession()
    sessions.append(sessionId)

In [None]:
# Set speed
for session in sessions:
    setSpeed(session, 5000.0)

In [26]:
# Subscribe to session
from IPython.display import clear_output, display
async def subscribeSession(session_id : str) -> None:
    if session_id:
        start_url = f"{BASE_URL}/subscribe/{session_id}"
        response = requests.get(start_url, stream=True)
        print(f"Subscribe session {session_id} response: {response.status_code}")

        # Process and display streaming data
        # try:
        for chunk in response.iter_lines(decode_unicode=True):
            try:
                if chunk:
                    pass
                    # clear_output(wait=True)  # Clear previous output in notebook
                    # display(f"Streaming session: {session_id}, data: {chunk}")  # Display the current chunk
            except KeyboardInterrupt:
                print(f"Subscribe session {session_id} stopped manually")

        clear_output(wait=True)  # Clear previous output in notebook
        display(f"Subscribe session {session_id} finished") 


In [27]:
# Subscribe to 1 or more sessions asynchronously
async def subscribeSessions(sessions):
    for session in sessions:
        await asyncio.gather(subscribeSession(session))

In [None]:
# Start sessions
for session in sessions:
    startSession(session)

In [None]:
# Subscribe to sessions
await subscribeSessions(sessions)