In [1]:
import grpc.aio as grpc_async
from concurrent import futures
import time
import rlcard
# Note: The Env type is usually needed for type hints, but we'll use rlcard.make
# import rlcard.envs.env as Env 
import game_pb2 as pb2
import game_pb2_grpc as pb2_grpc
import asyncio
import json # <-- Must import for serialization
import numpy # <-- Needed if rlcard uses numpy arrays in state, as json.dumps needs to handle them

# Extend JSONEncoder to handle numpy arrays commonly found in rlcard states
class NumpyEncoder(json.JSONEncoder):
    """Custom encoder for numpy data types (like np.int64, np.float32)"""
    def default(self, obj):
        if isinstance(obj, numpy.integer):
            return int(obj)
        elif isinstance(obj, numpy.floating):
            return float(obj)
        elif isinstance(obj, numpy.ndarray):
            # Convert NumPy array to a standard list for JSON
            return obj.tolist()
        return json.JSONEncoder.default(obj)

class GameEngineServicer(pb2_grpc.GameEngineServicer):
    
    # ProcessGame 方法同时接收和返回流
    async def ProcessGame(self, request_iterator, context):
        print(f"Game session started from client: {context.peer()}")
        
        # Game State Variables (Scope is per stream/session)
        env = None
        # 移除对 agent_ids 的引用
        is_game_active = False

        # --- 1. Request Handler (Reader Task) ---
        # This task reads client requests and processes game logic
        async def game_req_handler():
            # 仅保留 env 和 is_game_active
            nonlocal env, is_game_active
            
            # The async for loop correctly iterates over the client request stream
            async for req in request_iterator:
                
                # --- INIT Request ---
                if req.HasField("init"):
                    print(f"-> 收到 INIT 请求: {req.init.game_type}")
                    game_type = req.init.game_type
                    # 移除 agent_ids 的获取
                    
                    try:
                        # Initialize RLCard Environment
                        env = rlcard.make(game_type)
                        
                        # Send SUCCESS response
                        yield pb2.ProcessGameResponse(
                            init_response=pb2.GameInitResponse(type=pb2.GameInitResponse.SUCCESS)
                        )
                        print("<- 已发送 INIT 响应 (SUCCESS)")
                        
                        # Start game, get initial state
                        state_dict, i_player = env.reset()
                        is_game_active = True

                        state_json = json.dumps(state_dict, cls=NumpyEncoder)
                        
                        # Send initial state update
                        yield pb2.ProcessGameResponse(state_update=pb2.GameStateUpdate(
                            state=state_json,
                            is_over=False, # Must be env.is_over(), but initial state is never over
                            i_player=i_player, # 仅返回内部索引
                        ))

                    except Exception as e:
                        print(f"ERROR: Failed to initialize game: {e}")
                        yield pb2.ProcessGameResponse(
                            init_response=pb2.GameInitResponse(type=pb2.GameInitResponse.FAILURE, message=str(e))
                        )
                        is_game_active = False
                        # Note: Must yield the final error response before breaking the loop
                        break 
                
                # --- ACTION Request ---
                elif req.HasField("action") and is_game_active:
                    # ⚠️ 修复：Action 必须是 int 类型，从 Protobuf 字段中读取并转换为 int
                    try:
                        action_int = int(req.action.action) # Assuming action field is a string in Protobuf
                    except ValueError:
                        print(f"ERROR: Invalid action format received: {req.action.action}")
                        continue
                        
                    print(f"-> 收到 ACTION 请求: {action_int}")

                    next_state_dict, i_player = env.step(action_int)
                    
                    is_over = env.is_over()
                    
                    # ⚠️ 修复：将 Python 字典序列化为 JSON 字符串
                    state_json = json.dumps(next_state_dict, cls=NumpyEncoder)
                    
                    # Send GameStateUpdate
                    yield pb2.ProcessGameResponse(state_update=pb2.GameStateUpdate(
                        state=state_json,
                        is_over=is_over,
                        i_player=i_player, # 仅返回内部索引
                    ))
                    
                    # Check for game end
                    if is_over:
                        payoffs = env.get_payoffs() # ⚠️ 修复：必须调用方法
                        print(f"Game Over. Payoffs: {payoffs}")
                        yield pb2.ProcessGameResponse(end_status=pb2.GameEndStatus(
                            payoffs=payoffs
                            # 移除 agent_ids 的发送
                        ))
                        is_game_active = False # End game session logic
                        break # Close the stream reader as game is over

                # --- CONTROL Request ---
                elif req.HasField("control"):
                    control_type = req.control.type
                    print(f"-> 收到 CONTROL 请求: {pb2.GameControl.ControlType.Name(control_type)}")
                    
                    if control_type == pb2.GameControl.ControlType.PAUSE:
                        is_game_active = False
                    elif control_type == pb2.GameControl.ControlType.RESUME and env is not None:
                        is_game_active = True
                        # Reset the environment for a new game
                        state_dict, i_player = env.reset() 
                        state_json = json.dumps(state_dict, cls=NumpyEncoder)
                        
                        yield pb2.ProcessGameResponse(state_update=pb2.GameStateUpdate(
                            state=state_json,
                            is_over=False,
                            i_player=i_player, # 仅返回内部索引
                        ))
            
            print("Client request stream closed.")
            # When the client reader is done, the server's response stream should also close.
            # Returning from the generator implicitly closes the response stream.

        # 2. Return the generator/iterable created by the request handler
        # Since this is a bi-directional stream, the server responds as requests come in.
        # The 'game_req_handler' function is an async generator which yields responses.
        return game_req_handler()


# --- 修正后的异步 gRPC Server 启动函数 ---
async def serve():
    server = grpc_async.server(futures.ThreadPoolExecutor(max_workers=10))
    pb2_grpc.add_GameEngineServicer_to_server(GameEngineServicer(), server)
    server.add_insecure_port('[::]:50051')
    
    await server.start()
    print("gRPC Async Server started on port 50051...")
    
    try:
        await server.wait_for_termination()
    except KeyboardInterrupt:
        # Stop the server gracefully
        await server.stop(0)



In [None]:
# try:
#     asyncio.run(serve())
# except RuntimeError as e:
#     if "cannot be called from a running event loop" in str(e):
#         print("Existing event loop detected (e.g., in a notebook/interactive shell). Falling back to run_until_complete.")
#         # Fallback for environments with pre-existing loops
#         try:
#             loop = asyncio.get_event_loop()
#             loop.run_until_complete(serve())
#         except Exception as inner_e:
#             print(f"Failed to start server using fallback method: {inner_e}")
#     else:
#         print(f"An unexpected RuntimeError occurred: {e}")
# except KeyboardInterrupt:
#     print("Server shutdown complete.")
# except Exception as e:
#     print(f"An error occurred: {e}")
await serve()

gRPC Async Server started on port 50051...
