In [1]:
import asyncio
import websockets
import json
from influxdb_client import InfluxDBClient
from influxdb_client.client.query_api import QueryApi

# InfluxDB connection setup
INFLUXDB_TOKEN = "MPdB-4ycSA6YftE2GBtV3Ml6HnT_6yEj4xoUvsvs0UmEYT-QykcKLxQ9PiQffYFPprmdJCufwLmYHr6BRL-F-A=="
INFLUXDB_ORG = "Devnullx"
INFLUXDB_BUCKET = "trade_sim3"
INFLUXDB_URL = "http://165.232.189.85:8086"  # Change as per your setup

# Function to query data from InfluxDB
async def query_influxdb():
    query = f'''
    from(bucket: "{INFLUXDB_BUCKET}")
      |> range(start: -1h)
      |> filter(fn: (r) => r["_measurement"] == "trade")
      |> filter(fn: (r) => r["_field"] == "Close" or r["_field"] == "High" or r["_field"] == "Low" or r["_field"] == "Open" or r["_field"] == "Volume")
      |> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
      |> yield(name: "mean")
    '''
    
    client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
    query_api = client.query_api()
    
    result = query_api.query(query=query)

    # A dictionary to store data grouped by time (timestamp as key)
    grouped_data = {}

    # Process the result and group data by timestamp
    for table in result:
        for record in table.records:
            time = str(record.get_time())
            field = record.get_field()
            value = record.get_value()

            # Initialize the dictionary for this timestamp if it doesn't exist
            if time not in grouped_data:
                grouped_data[time] = {"time": time}
            
            # Add the field-value pair to the dictionary for this timestamp
            grouped_data[time][field] = value

    client.close()
    
    # Convert grouped data to a list of dictionaries sorted by timestamp
    data_list = list(grouped_data.values())
    return data_list

# WebSocket handler function
async def websocket_handler(websocket, path):
    print("Client connected")

    # Query data from InfluxDB
    influx_data = await query_influxdb()

    # Send each data entry (grouped by timestamp) line-by-line with a 1-second delay
    for entry in influx_data:
        await websocket.send(json.dumps(entry))  # Send one entry at a time
        print(f"Sent InfluxDB data: {entry}")
        await asyncio.sleep(5)  # Wait for 1 second before sending the next entry

    async for message in websocket:
        print(f"Received message from client: {message}")
        await websocket.send(f"Echo: {message}")

# Function to start the WebSocket server
async def start_websocket_server():
    async with websockets.serve(websocket_handler, "0.0.0.0", 12345):
        print("WebSocket server started on ws://0.0.0.0:12345")
        await asyncio.Future()  # Keep the server running indefinitely

# Main function to start the WebSocket server
async def main():
    await start_websocket_server()

# Function to run the event loop (handles interactive environments like Jupyter as well)
def run():
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:  # No event loop is running
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop = asyncio.get_event_loop()

    # Run the main function
    if loop.is_running():
        print("Event loop is already running. Adding main function as a task.")
        loop.create_task(main())
    else:
        loop.run_until_complete(main())

# Run the script
if __name__ == "__main__":
    run()


Event loop is already running. Adding main function as a task.
WebSocket server started on ws://0.0.0.0:12345
Client connected
Sent InfluxDB data: {'time': '2024-09-22 19:36:00+00:00', 'Close': 1203.2290101562303, 'High': 1203.9230842854154, 'Low': 1202.4166495671839, 'Open': 1202.8447543249983, 'Volume': 193.72185470477788}
Sent InfluxDB data: {'time': '2024-09-22 19:37:00+00:00', 'Close': 1202.3603002937975, 'High': 1203.6438561314064, 'Low': 1201.9906279121599, 'Open': 1202.8394730316038, 'Volume': 209.92207356513853}
Sent InfluxDB data: {'time': '2024-09-22 19:38:00+00:00', 'Close': 1201.852243833626, 'High': 1202.9320085438424, 'Low': 1200.4982814093871, 'Open': 1202.156068439906, 'Volume': 199.9066449747201}
Sent InfluxDB data: {'time': '2024-09-22 19:39:00+00:00', 'Close': 1201.8840318787065, 'High': 1202.7374164194457, 'Low': 1201.5605985055356, 'Open': 1202.21575278826, 'Volume': 209.30878602160917}
Sent InfluxDB data: {'time': '2024-09-22 19:39:19.969031+00:00', 'Close': 1201

In [1]:
import asyncio
import websockets
import json
from influxdb_client import InfluxDBClient
from influxdb_client.client.query_api import QueryApi
import pandas as pd

# InfluxDB connection setup
INFLUXDB_TOKEN = "your-valid-token"
INFLUXDB_ORG = "Devnullx"
INFLUXDB_BUCKET = "trade_sim3"
INFLUXDB_URL = "http://165.232.189.85:8086"

# Function to query data from InfluxDB
async def query_influxdb():
    query = f'''
    from(bucket: "{INFLUXDB_BUCKET}")
      |> range(start: -1h)
      |> filter(fn: (r) => r["_measurement"] == "trade")
      |> filter(fn: (r) => r["_field"] == "Close" or r["_field"] == "High" or r["_field"] == "Low" or r["_field"] == "Open" or r["_field"] == "Volume")
      |> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
      |> yield(name: "mean")
    '''
    
    try:
        client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
        query_api = client.query_api()
        result = query_api.query(query=query)

        grouped_data = {}

        for table in result:
            for record in table.records:
                time = str(record.get_time())
                field = record.get_field()
                value = record.get_value()

                if time not in grouped_data:
                    grouped_data[time] = {"time": time}
                
                grouped_data[time][field] = value

        client.close()

        data_list = list(grouped_data.values())
        df = pd.DataFrame(data_list)
        df['group'] = (df.index // 5)

        df_resampled = df.groupby('group').agg({
            'time': 'first',
            'Open': 'first',
            'High': 'max',
            'Low': 'min',
            'Close': 'last',
            'Volume': 'sum'
        }).reset_index(drop=True)
        return df_resampled.to_dict(orient='records')

    except Exception as e:
        print(f"Error querying InfluxDB: {e}")
        raise

# WebSocket handler function
async def websocket_handler(websocket, path):
    print("Client connected")

    try:
        influx_data = await query_influxdb()

        for entry in influx_data:
            await websocket.send(json.dumps(entry))
            print(f"Sent InfluxDB data: {entry}")
            await asyncio.sleep(5)
    except websockets.exceptions.ConnectionClosedOK:
        print("Client disconnected normally")
    except Exception as e:
        print(f"Unexpected error in WebSocket handler: {e}")
    finally:
        print("Connection handler ended.")

# Function to start the WebSocket server
async def start_websocket_server(stop_event):
    async with websockets.serve(websocket_handler, "0.0.0.0", 12345):
        print("WebSocket server started on ws://0.0.0.0:12345")
        await stop_event.wait()  # Wait until we signal the event loop to stop

# Main function to start the WebSocket server
async def main():
    stop_event = asyncio.Event()  # Create an event to control when to stop
    server_task = asyncio.create_task(start_websocket_server(stop_event))

    try:
        await asyncio.Future()  # Keep the server running indefinitely
    except asyncio.CancelledError:
        print("Server shutdown initiated.")
    finally:
        stop_event.set()  # Signal to stop the server

# Function to run the event loop (handles interactive environments like Jupyter as well)
def run():
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

    if loop.is_running():
        print("Event loop is already running. Adding main function as a task.")
        asyncio.ensure_future(main())  # Add the main task to the running event loop
    else:
        loop.run_until_complete(main())

# Run the script
if __name__ == "__main__":
    run()


Event loop is already running. Adding main function as a task.


Task was destroyed but it is pending!
task: <Task pending name='Task-5' coro=<main() done, defined at /tmp/ipykernel_1092568/2068339797.py:88> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x771c227c3c10>()]>>


WebSocket server started on ws://0.0.0.0:12345


In [8]:
import asyncio
import websockets
import json
from influxdb_client import InfluxDBClient
from influxdb_client.client.query_api import QueryApi
import pandas as pd
# InfluxDB connection setup
INFLUXDB_TOKEN = "z48qf97LY6cXBZTKozR4-2Mpc-fqHGiS9WraxPlILKxQGg0rj10axPklmEgx81Q4WrhdRu_or1S6y101GdJlMg=="
INFLUXDB_ORG = "Devnullx"
INFLUXDB_BUCKET = "trade_sim2"
INFLUXDB_URL = "http://165.232.189.85:8086"  # Change as per your setup

# Function to query data from InfluxDB
async def query_influxdb():
    query = f'''
    from(bucket: "{INFLUXDB_BUCKET}")
      |> range(start: -1h)
      |> filter(fn: (r) => r["_measurement"] == "trade")
      |> filter(fn: (r) => r["_field"] == "Close" or r["_field"] == "High" or r["_field"] == "Low" or r["_field"] == "Open" or r["_field"] == "Volume")
      |> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
      |> yield(name: "mean")
    '''
    
    client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
    query_api = client.query_api()
    
    result = query_api.query(query=query)

    # A dictionary to store data grouped by time (timestamp as key)
    grouped_data = {}

    # Process the result and group data by timestamp
    for table in result:
        for record in table.records:
            time = str(record.get_time())
            field = record.get_field()
            value = record.get_value()

            # Initialize the dictionary for this timestamp if it doesn't exist
            if time not in grouped_data:
                grouped_data[time] = {"time": time}
            
            # Add the field-value pair to the dictionary for this timestamp
            grouped_data[time][field] = value

    client.close()
    
    # Convert grouped data to a list of dictionaries sorted by timestamp
    data_list = list(grouped_data.values())
    df = pd.DataFrame(data_list)
    df['group'] = (df.index // 5)

    # Aggregate the data
    df_resampled = df.groupby('group').agg({
        'time': 'first',
        'Open': 'first',   # Opening price for the 5-second period is the first opening price
        'High': 'max',     # Highest price during the 5-second period
        'Low': 'min',      # Lowest price during the 5-second period
        'Close': 'last',   # Closing price for the 5-second period is the last closing price
        'Volume': 'sum'    # Sum of the volume over the 5-second period
    }).reset_index(drop=True)
    data= df_resampled.to_dict(orient='records')
    return data
await query_influxdb()

AttributeError: 'list' object has no attribute 'dtype'