-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
227 lines (183 loc) · 8.01 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
from typing import Any
import httpx
from mcp.server.fastmcp import FastMCP
from starlette.applications import Starlette
from mcp.server.sse import SseServerTransport
from starlette.requests import Request
from starlette.routing import Mount, Route
from mcp.server import Server
import uvicorn
# Initialize FastMCP server for Weather tools (SSE)
mcp = FastMCP("weather")
# Constants for NWS (National Weather Service) API
NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"
async def make_nws_request(url: str) -> dict[str, Any] | None:
"""Make a request to the NWS API with proper error handling.
This function handles the HTTP request to the NWS API, setting appropriate
headers and handling potential errors during the request.
Args:
url: The complete URL for the NWS API endpoint
Returns:
A dictionary containing the JSON response if successful, None otherwise
"""
# Set required headers for the NWS API
headers = {
"User-Agent": USER_AGENT, # NWS API requires a user agent
"Accept": "application/geo+json" # Request GeoJSON format
}
# Create an async HTTP client
async with httpx.AsyncClient() as client:
try:
# Make the GET request with timeout
response = await client.get(url, headers=headers, timeout=30.0)
response.raise_for_status() # Raise exception for 4XX/5XX responses
return response.json() # Parse and return JSON response
except Exception:
# Return None if any error occurs (connection, timeout, parsing, etc.)
return None
def format_alert(feature: dict) -> str:
"""Format an alert feature into a readable string.
Extracts relevant information from a weather alert feature and formats it
into a human-readable string.
Args:
feature: A dictionary containing a single weather alert feature
Returns:
A formatted string with key alert information
"""
# Extract properties from the feature
props = feature["properties"]
# Format the alert with important details
return f"""
Event: {props.get('event', 'Unknown')}
Area: {props.get('areaDesc', 'Unknown')}
Severity: {props.get('severity', 'Unknown')}
Description: {props.get('description', 'No description available')}
Instructions: {props.get('instruction', 'No specific instructions provided')}
"""
@mcp.tool()
async def get_alerts(state: str) -> str:
"""Get weather alerts for a US state.
Fetches active weather alerts from the NWS API for a specified US state.
Args:
state: Two-letter US state code (e.g. CA, NY)
Returns:
A formatted string containing all active alerts for the state,
or a message indicating no alerts or an error
"""
# Construct the URL for the state's active alerts
url = f"{NWS_API_BASE}/alerts/active/area/{state}"
# Make the API request
data = await make_nws_request(url)
# Check if the response is valid
if not data or "features" not in data:
return "Unable to fetch alerts or no alerts found."
# Check if there are any active alerts
if not data["features"]:
return "No active alerts for this state."
# Format each alert and join them with separators
alerts = [format_alert(feature) for feature in data["features"]]
return "\n---\n".join(alerts)
@mcp.tool()
async def get_forecast(latitude: float, longitude: float) -> str:
"""Get weather forecast for a location.
Fetches the weather forecast from the NWS API for a specified location
using latitude and longitude coordinates.
Args:
latitude: Latitude of the location
longitude: Longitude of the location
Returns:
A formatted string containing the forecast for the next 5 periods,
or an error message if the forecast couldn't be retrieved
"""
# First get the forecast grid endpoint using the coordinates
points_url = f"{NWS_API_BASE}/points/{latitude},{longitude}"
points_data = await make_nws_request(points_url)
# Check if we received valid point data
if not points_data:
return "Unable to fetch forecast data for this location."
# Extract the forecast URL from the points response
# NWS API requires this two-step process to get the forecast
forecast_url = points_data["properties"]["forecast"]
forecast_data = await make_nws_request(forecast_url)
# Check if we received valid forecast data
if not forecast_data:
return "Unable to fetch detailed forecast."
# Extract and format the forecast periods
periods = forecast_data["properties"]["periods"]
forecasts = []
for period in periods[:5]: # Only show next 5 periods
forecast = f"""
{period['name']}:
Temperature: {period['temperature']}°{period['temperatureUnit']}
Wind: {period['windSpeed']} {period['windDirection']}
Forecast: {period['detailedForecast']}
"""
forecasts.append(forecast)
# Join all forecast periods with separators
return "\n---\n".join(forecasts)
def create_starlette_app(mcp_server: Server, *, debug: bool = False) -> Starlette:
"""Create a Starlette application that can serve the provided MCP server with SSE.
Sets up a Starlette web application with routes for SSE (Server-Sent Events)
communication with the MCP server.
Args:
mcp_server: The MCP server instance to connect
debug: Whether to enable debug mode for the Starlette app
Returns:
A configured Starlette application
"""
# Create an SSE transport with a base path for messages
sse = SseServerTransport("/messages/")
async def handle_sse(request: Request) -> None:
"""Handler for SSE connections.
Establishes an SSE connection and connects it to the MCP server.
Args:
request: The incoming HTTP request
"""
# Connect the SSE transport to the request
async with sse.connect_sse(
request.scope,
request.receive,
request._send, # noqa: SLF001
) as (read_stream, write_stream):
# Run the MCP server with the SSE streams
await mcp_server.run(
read_stream,
write_stream,
mcp_server.create_initialization_options(),
)
# Create and return the Starlette application with routes
return Starlette(
debug=debug,
routes=[
Route("/sse", endpoint=handle_sse), # Endpoint for SSE connections
Mount("/messages/", app=sse.handle_post_message), # Endpoint for posting messages
],
)
if __name__ == "__main__":
# Get the underlying MCP server from the FastMCP instance
mcp_server = mcp._mcp_server # noqa: WPS437
import argparse
# Set up command-line argument parsing
parser = argparse.ArgumentParser(description='Run MCP server with configurable transport')
# Allow choosing between stdio and SSE transport modes
parser.add_argument('--transport', choices=['stdio', 'sse'], default='stdio',
help='Transport mode (stdio or sse)')
# Host configuration for SSE mode
parser.add_argument('--host', default='0.0.0.0',
help='Host to bind to (for SSE mode)')
# Port configuration for SSE mode
parser.add_argument('--port', type=int, default=8080,
help='Port to listen on (for SSE mode)')
args = parser.parse_args()
# Launch the server with the selected transport mode
if args.transport == 'stdio':
# Run with stdio transport (default)
# This mode communicates through standard input/output
mcp.run(transport='stdio')
else:
# Run with SSE transport (web-based)
# Create a Starlette app to serve the MCP server
starlette_app = create_starlette_app(mcp_server, debug=True)
# Start the web server with the configured host and port
uvicorn.run(starlette_app, host=args.host, port=args.port)