In [20]:
# 获取市场状态函数
import finnhub

def get_market_status(exchange: str = "US") -> str:
    """获取指定交易所的市场状态
    
    Args:
        exchange: 交易所代码，默认为 'US'（美国股市）
                  可选值: 'US', 'NYSE', 'NASDAQ', 'HK', 'JP', 'UK', 'DE' 等
    
    Returns:
        市场状态的文本描述
    """
    try:
        # 创建 Finnhub 客户端
        finnhub_client = finnhub.Client(api_key="d5ssulhr01qmiccbs4qgd5ssulhr01qmiccbs4r0")
        
        # 获取市场状态
        data = finnhub_client.market_status(exchange=exchange)
        
        # 提取关键信息
        is_open = data.get('isOpen', False)
        timezone = data.get('timezone', 'Unknown')
        session = data.get('session')
        
        # 构建文本返回
        status_text = "开盘" if is_open else "收盘"
        result = f"{exchange} 交易所当前状态：{status_text}，时区：{timezone}"
        
        if session:
            result += f"，交易时段：{session}"
        
        return result
        
    except Exception as e:
        return f"获取 {exchange} 市场状态失败：{str(e)}"

# 测试函数
print("测试获取美国股市状态：")
print(get_market_status("US"))


测试获取美国股市状态：
US 交易所当前状态：收盘，时区：America/New_York


In [None]:
# DrSai 多智能体数据监测系统
# 使用 AGSwarm 实现去中心化型多智能体协作

import asyncio
import random
import os
import json
from datetime import datetime

# ==================== 工具定义 ====================
# 共享状态存储（用于模拟实时数据流）
data_state = {
    "current_value": random.randint(0, 100),
    "last_value": random.randint(0, 100),
    "alerts": []
}

def generate_random_data() -> str:
    """生成随机数据（0-100）并更新状态"""
    data_state["last_value"] = data_state["current_value"]
    data_state["current_value"] = random.randint(0, 100)
    change = abs(data_state["current_value"] - data_state["last_value"])
    
    result = {
        "current_value": data_state["current_value"],
        "last_value": data_state["last_value"],
        "change": change,
        "timestamp": datetime.now().strftime("%H:%M:%S")
    }
    return json.dumps(result, ensure_ascii=False)

def check_data_threshold(threshold: int = 59) -> str:
    """检查数据是否超过阈值"""
    change = abs(data_state["current_value"] - data_state["last_value"])
    
    if change >= threshold:
        alert = {
            "time": datetime.now().strftime("%H:%M:%S"),
            "current_value": data_state["current_value"],
            "change": change,
            "threshold": threshold,
            "status": "ALERT"
        }
        data_state["alerts"].append(alert)
        return json.dumps(alert, ensure_ascii=False)
    else:
        return json.dumps({
            "status": "NORMAL",
            "change": change,
            "threshold": threshold,
            "message": f"Change {change} is below threshold {threshold}"
        }, ensure_ascii=False)

def get_alert_summary() -> str:
    """获取异常汇总报告"""
    alert_count = len(data_state["alerts"])
    
    summary = {
        "total_alerts": alert_count,
        "latest_alert": data_state["alerts"][-1] if alert_count > 0 else None,
        "current_value": data_state["current_value"]
    }
    return json.dumps(summary, ensure_ascii=False)

# ==================== 在 Jupyter 中测试工具 ====================
print("=== 测试工具函数 ===")
print("1. 生成随机数据:")
print(generate_random_data())

print("\n2. 检查阈值:")
print(check_data_threshold(59))

print("\n3. 生成更多数据并检查:")
for i in range(5):
    print(f"\nRound {i+1}:")
    print(generate_random_data())
    result = check_data_threshold(59)
    print(result)
    if "ALERT" in result:
        print("WARNING: Abnormal detected!")

print("\n4. 异常汇总:")
print(get_alert_summary())

# ==================== DrSai 多智能体系统 ====================
# 以下代码需要保存为 .py 文件，在指定的Python环境中运行
"""
# 完整的多智能体系统代码（保存为 data_monitor_team.py）

import os
import asyncio
from drsai import AssistantAgent, HepAIChatCompletionClient
from drsai import AGSwarm, HandoffTermination, TextMentionTermination
from drsai import Console, run_worker

def create_multi_agent_team():
    model_client = HepAIChatCompletionClient(
        model="deepseek-ai/deepseek-v3",
        api_key=os.environ.get("HEPAI_API_KEY"),
        base_url="https://aiapi.ihep.ac.cn/apiv2"
    )
    
    monitor_agent = AssistantAgent(
        name="monitor_agent",
        handoffs=["reporter_agent", "user"],
        tools=[generate_random_data, check_data_threshold],
        system_message="You are a data monitoring agent. "
            "Use generate_random_data to get new data. "
            "Use check_data_threshold to check for abnormalities (threshold 59). "
            "If abnormal detected (change >= 59), handoff to reporter_agent. "
            "Say TERMINATE when done.",
        model_client=model_client
    )
    
    reporter_agent = AssistantAgent(
        name="reporter_agent",
        handoffs=["monitor_agent", "user"],
        tools=[get_alert_summary],
        system_message="You are a reporting agent. "
            "Receive alerts from monitor_agent. "
            "Use get_alert_summary to get alert summary. "
            "Report status to user clearly. "
            "After reporting, handoff back to monitor_agent.",
        model_client=model_client
    )
    
    termination = HandoffTermination(target="user") | TextMentionTermination("TERMINATE")
    team = AGSwarm(
        participants=[monitor_agent, reporter_agent],
        termination_condition=termination
    )
    return team

async def main():
    await Console(create_multi_agent_team().run_stream(
        task="Start data monitoring, handoff to reporter when abnormal detected."
    ))

if __name__ == "__main__":
    asyncio.run(main())
"""

In [2]:
#https://pypi.org/project/websocket_client/
import websocket
import csv
import json
import os
from datetime import datetime
from collections import defaultdict

# CSV文件路径
CSV_DIR = "/data/juno/lin/agent/drsai-main/my_agent/stock_monitor/data"
CSV_FILE = os.path.join(CSV_DIR, "stock_data.csv")

# 用于去重的集合：存储已写入的 (timestamp, symbol) 组合
written_records = set()

# 初始化CSV文件（如果不存在则创建并写入表头）
def init_csv_file():
    global written_records
    
    if not os.path.exists(CSV_FILE):
        with open(CSV_FILE, 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            writer.writerow(['timestamp', 'datetime', 'symbol', 'price', 'volume'])
        print(f"CSV file created: {CSV_FILE}")
    else:
        print(f"CSV file already exists: {CSV_FILE}")
        # 加载已有记录到去重集合
        load_existing_records()

# 从现有CSV文件加载已有记录，用于去重
def load_existing_records():
    global written_records
    try:
        with open(CSV_FILE, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            count = 0
            for row in reader:
                # 使用 (timestamp, symbol) 作为唯一标识
                key = (row['timestamp'], row['symbol'])
                written_records.add(key)
                count += 1
        print(f"Loaded {count} existing records for deduplication")
    except Exception as e:
        print(f"Error loading existing records: {e}")

# 写入数据到CSV（带去重）
def write_to_csv(symbol, price, volume, timestamp):
    global written_records
    
    # 创建唯一标识
    key = (str(timestamp), symbol)
    
    # 检查是否已存在相同时间戳和symbol的记录
    if key in written_records:
        return  # 跳过重复记录
    
    # 将时间戳转换为可读时间
    dt = datetime.fromtimestamp(timestamp / 1000)  # 毫秒时间戳
    datetime_str = dt.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]  # 保留毫秒
    
    with open(CSV_FILE, 'a', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerow([timestamp, datetime_str, symbol, price, volume])
    
    # 记录到去重集合
    written_records.add(key)
    
    #print(f"Saved: {symbol} | Price: {price} | Volume: {volume} | Time: {datetime_str}")

def on_message(ws, message):
    # 解析JSON数据
    try:
        data = json.loads(message)
        
        # 检查是否是交易数据
        if data.get('type') == 'trade' and 'data' in data:
            trades = data['data']
            for trade in trades:
                symbol = trade.get('s', 'N/A')
                price = trade.get('p', 0)
                volume = trade.get('v', 0)
                timestamp = trade.get('t', 0)
                
                # 写入CSV（带去重检查）
                write_to_csv(symbol, price, volume, timestamp)
    except json.JSONDecodeError:
        # 如果不是JSON格式，直接打印原始消息
        print(message)

def on_error(ws, error):
    print(f"Error: {error}")

def on_close(ws, close_status_code, close_msg):
    print("### closed ###")

def on_open(ws):
    print("### connected ###")
    # 订阅股票
    # ws.send('{"type":"subscribe","symbol":"AAPL"}')
    # ws.send('{"type":"subscribe","symbol":"AMZN"}')
    ws.send('{"type":"subscribe","symbol":"BINANCE:BTCUSDT"}')
    # ws.send('{"type":"subscribe","symbol":"IC MARKETS:1"}')

if __name__ == "__main__":
    # 初始化CSV文件
    init_csv_file()
    
    # 启用WebSocket追踪（可选，设为False减少输出）
    websocket.enableTrace(False)
    
    ws = websocket.WebSocketApp("wss://ws.finnhub.io?token=d5ssulhr01qmiccbs4qgd5ssulhr01qmiccbs4r0",
                              on_message = on_message,
                              on_error = on_error,
                              on_close = on_close)
    ws.on_open = on_open
    
    # 运行WebSocket（可以使用ping_interval和ping_timeout保持连接）
    ws.run_forever(ping_interval=30, ping_timeout=10)

CSV file already exists: /data/juno/lin/agent/drsai-main/my_agent/stock_monitor/data/stock_data.csv
Loaded 19092 existing records for deduplication
### connected ###
Error: 
### closed ###


In [1]:
# 获取单条BINANCE:BTCUSDT交易数据并停止
import websocket
import json
import time

# 用于接收数据的变量
received_data = None
data_received = False

def on_message_single(ws, message):
    global received_data, data_received
    try:
        data = json.loads(message)
        if data.get('type') == 'trade' and 'data' in data:
            trades = data['data']
            for trade in trades:
                if trade.get('s') == 'BINANCE:BTCUSDT':
                    received_data = trade
                    data_received = True
                    print("=" * 50)
                    print("收到BINANCE:BTCUSDT交易数据:")
                    print(f"  股票代码: {trade.get('s')}")
                    print(f"  价格    : {trade.get('p')}")
                    print(f"  成交量  : {trade.get('v')}")
                    print(f"  时间戳  : {trade.get('t')}")
                    print("=" * 50)
                    # 收到数据后关闭连接
                    ws.close()
                    break
    except json.JSONDecodeError:
        pass

def on_error_single(ws, error):
    print(f"Error: {error}")

def on_close_single(ws, close_status_code, close_msg):
    print("### 连接已关闭 ###")

def on_open_single(ws):
    print("### 连接已建立，等待数据... ###")
    ws.send('{"type":"subscribe","symbol":"BINANCE:BTCUSDT"}')

# 创建WebSocket连接
ws_single = websocket.WebSocketApp(
    "wss://ws.finnhub.io?token=d5ssulhr01qmiccbs4qgd5ssulhr01qmiccbs4r0",
    on_message=on_message_single,
    on_error=on_error_single,
    on_close=on_close_single
)
ws_single.on_open = on_open_single

# 设置超时时间（秒）
TIMEOUT = 10
start_time = time.time()

# 迯行WebSocket（在后台运行）
print("开始获取BINANCE:BTCUSDT实时交易数据...")
ws_single.run_forever()

# 输出结果
if received_data:
    print("\n" + "=" * 50)
    print("最终获取的数据:")
    print(json.dumps(received_data, indent=2, ensure_ascii=False))
    print("=" * 50)
else:
    print("未获取到数据，请检查网络连接")

开始获取BINANCE:BTCUSDT实时交易数据...
### 连接已建立，等待数据... ###
收到BINANCE:BTCUSDT交易数据:
  股票代码: BINANCE:BTCUSDT
  价格    : 82579.07
  成交量  : 0.00088
  时间戳  : 1769759226528
### 连接已关闭 ###

最终获取的数据:
{
  "c": null,
  "p": 82579.07,
  "s": "BINANCE:BTCUSDT",
  "t": 1769759226528,
  "v": 0.00088
}


In [9]:
TIMEOUT = 10
start_time = time.time()

# 迯行WebSocket（在后台运行）
print("开始获取BINANCE:BTCUSDT实时交易数据...")
ws_single.run_forever()

# 输出结果
if received_data:
    print("\n" + "=" * 50)
    print("最终获取的数据:")
    print(json.dumps(received_data, indent=2, ensure_ascii=False))
    print("=" * 50)
else:
    print("未获取到数据，请检查网络连接")

开始获取BINANCE:BTCUSDT实时交易数据...
### 连接已建立，等待数据... ###
收到BINANCE:BTCUSDT交易数据:
  股票代码: BINANCE:BTCUSDT
  价格    : 87867.51
  成交量  : 0.0012
  时间戳  : 1769655636153
### 连接已关闭 ###

最终获取的数据:
{
  "c": null,
  "p": 87867.51,
  "s": "BINANCE:BTCUSDT",
  "t": 1769655636153,
  "v": 0.0012
}


In [15]:
import finnhub
finnhub_client = finnhub.Client(api_key="d5ssulhr01qmiccbs4qgd5ssulhr01qmiccbs4r0")

data = finnhub_client.market_status(exchange='US')
print(data)


{'exchange': 'US', 'holiday': None, 'isOpen': False, 'session': None, 't': 1769667956, 'timezone': 'America/New_York'}


In [5]:
import finnhub
finnhub_client = finnhub.Client(api_key="d5ssulhr01qmiccbs4qgd5ssulhr01qmiccbs4r0")

print(finnhub_client.market_holiday(exchange='US'))

{'data': [{'eventName': 'Christmas', 'atDate': '2027-12-24', 'tradingHour': '', 'postMarket': ''}, {'eventName': 'Thanksgiving Day', 'atDate': '2027-11-26', 'tradingHour': '09:30-13:00', 'postMarket': '13:00:17:00'}, {'eventName': 'Thanksgiving Day', 'atDate': '2027-11-25', 'tradingHour': '', 'postMarket': ''}, {'eventName': 'Labor Day', 'atDate': '2027-09-06', 'tradingHour': '', 'postMarket': ''}, {'eventName': 'Independence Day', 'atDate': '2027-07-05', 'tradingHour': '', 'postMarket': ''}, {'eventName': 'Juneteenth', 'atDate': '2027-06-18', 'tradingHour': '', 'postMarket': ''}, {'eventName': 'Memorial Day', 'atDate': '2027-05-31', 'tradingHour': '', 'postMarket': ''}, {'eventName': 'Good Friday', 'atDate': '2027-03-26', 'tradingHour': '', 'postMarket': ''}, {'eventName': "Washington's Birthday", 'atDate': '2027-02-15', 'tradingHour': '', 'postMarket': ''}, {'eventName': 'Birthday of Martin Luther King, Jr', 'atDate': '2027-01-18', 'tradingHour': '', 'postMarket': ''}, {'eventName': 