# 天气预报项目测试文件

### 获得天气数据

In [1]:
# 旧的用于获取天气的api代码
'''
## 使用API获得数据的旧代码
# def test_of_data():
#     city = "Perth"
#     # 发送 GET 请求
#     # 标准化格式
#     # response = requests.get(f'https://wttr.in/{city}?3&format=%l:+%c+%t+%C+%w')
#     # %l: 地址
#     # %c: 当前天气图标
#     # %t: 温度
#     # %C：天气描述
#     # %w: 风速
#     # 输出响应内容
#
#     # 访问默认格式
#     response = requests.get(f'https://wttr.in/{city}?format=j1')
#     data = response.json()
#     #输出完整data用于观察数据格式
#     print(data)
#     # 访问未来三天的数据
#     for day in data['weather']:
#         date = day['date']
#         maxtemp = day['maxtempC']
#         mintemp = day['mintempC']
#         description = day['hourly'][4]['weatherDesc'][0]['value']  # 中午的天气
#         print(f"{date}: {description}, {mintemp}°C ~ {maxtemp}°C")
#         print(len(data['weather']))
'''
from collections import defaultdict

import requests


# 获取天气数据（返回JSON数据）
def get_weather_forecast(city, max_day):
    """
    每个函数需要包含一个文档描述，其中包含函数说明，输入和输出示例
    :param city:城市名称
    :param max_day:1-5的正整数
    """
    # api's key
    api_key = "56d26083c4e5d4828784871da1b7b0b3"
    # api
    url = "https://api.openweathermap.org/data/2.5/forecast"
    params = {"q": city, "appid": api_key, "units": "metric", "lang": "zh_cn"  # todo:修改语言
              }
    try:
        response = requests.get(url, params=params)
        # 若无响应则抛出异常
        response.raise_for_status()
        data = response.json()
        # 尝试输出
        # print(data)

        if response.status_code != 200:
            print("请求失败:", data.get("message", "未知错误"))
            return None

        # 按天整理每3小时的天气数据
        forecast_by_day = defaultdict(list)
        for item in data["list"]:
            # 日期
            date = item["dt_txt"].split(" ")[0]
            # 温度
            temp = item["main"]["temp"]
            # 描述
            desc = item["weather"][0]["description"]
            # 风速
            speed = item["wind"]["speed"]
            # 湿度
            humidity = item["main"]["humidity"]
            # 降水率
            pop = item.get("pop", 0)
            time = item["dt_txt"]
            forecast_by_day[date].append((time, temp, desc, speed, humidity, pop))

        # 输出每日天气（优先取 12:00 的数据）
        # print(f"城市：{data['city']['name']}")
        result = {"city": data["city"]["name"], "forecast": []}
        for date, items in list(forecast_by_day.items())[:max_day]:  # 最多5天
            # 选取12:00或默认第一条
            mid = next((x for x in items if "12:00" in x[0]), items[0])
            # 测试语句
            # print(f"日期：{date},天气情况：{mid[2]},气温：{mid[1]}°C,风速:{mid[3]}, 湿度:{mid[4]},降水率:{mid[5]}")

            result["forecast"].append(
                {"date": date, "time": mid[0], "temperature": mid[1], "description": mid[2], "speed": mid[3],
                 "humidity": mid[4], "pop": mid[5]

                 })

        return result, forecast_by_day  # result 是天气概述，data是具体天气数据，用于获得折线图
    except requests.exceptions.RequestException as e:  # 捕获所有requests相关的异常
        print(f"网络请求失败：请检查网络链接\n{e}")
        return None  # 返回失败的返回值

### 自然语言处理

In [2]:
import re


# 定义处理函数
def res_pop():
    return "pop"


def res_describe_weather():
    return "describe"


def res_clothing():
    return "temperature"


def res_temperature():
    return "temperature"


def res_wind():
    return "wind"


def res_humidity():
    return "humidity"


# 问题和响应的映射字典
response_mapping = {
    "umbrella": res_pop,
    "rain": res_pop,
    "precipitation": res_pop,
    "wet": res_pop,
    "showers": res_pop,
    "rainy": res_pop,
    "storms": res_pop,
    "need an umbrella": res_pop,

    "weather": res_describe_weather,
    "forecast": res_describe_weather,
    "today's weather": res_describe_weather,
    "how's the weather": res_describe_weather,
    "what's the weather like": res_describe_weather,

    "clothing": res_clothing,
    "dress": res_clothing,
    "wear": res_clothing,
    "what to wear": res_clothing,
    "should I wear": res_clothing,
    "clothes": res_clothing,

    "temperature": res_temperature,
    "how hot": res_temperature,
    "how cold": res_temperature,
    "hot": res_temperature,
    "cold": res_temperature,
    "temperature today": res_temperature,

    "wind": res_wind,  # 风速
    "wind speed": res_wind,  # 风速
    "how windy": res_wind,  # 风大吗
    "windy": res_wind,  # 风

    "humidity": res_humidity,  # 湿度
    "how humid": res_humidity,  # 湿吗
    "humid": res_humidity  # 湿度
}


# 输入处理函数
def get_weather_response(question):
    # 遍历字典进行关键词匹配
    for keyword, handler in response_mapping.items():
        # re.IGNORECASE 忽略大小写，s?表示匹配单复数
        if re.search(r"\b" + re.escape(keyword) + r"s?\b", question, re.IGNORECASE):
            return handler()
    return "抱歉，我无法理解您的问题。"
# 断点测试
# print(get_weather_response("perth"))


### 对获得的天气信息处理为供折线使用的列表数据

In [3]:
from get_weather_data import get_weather_forecast
from natural_language_responses import get_weather_response


def get_line_chart_data(questions, city, max_day):  # 传入参数
    result, forecast_by_day = get_weather_forecast(city, max_day)  # 获得参数
    answer = get_weather_response(questions)  # 获得参数

    def handle_pop():
        precipitation_list = []
        for date, items in forecast_by_day.items():
            for item in items:
                time = item[0]
                pop = item[5]
                precipitation_list.append((time, pop))
        return precipitation_list

        # print("处理带伞相关问题。")

    def handle_describe():
        precipitation_list = []
        for date, items in forecast_by_day.items():
            for item in items:
                time = item[0]
                weather = item[2]
                precipitation_list.append((time, weather))
        return precipitation_list
        # print("处理天气描述相关问题。")

    def handle_cloth():
        precipitation_list = []
        for date, items in forecast_by_day.items():
            for item in items:
                time = item[0]
                temperature = item[1]
                precipitation_list.append((time, temperature))
        return precipitation_list
        # print("处理穿衣建议相关问题。")

    def handle_temperature():
        precipitation_list = []
        for date, items in forecast_by_day.items():
            for item in items:
                time = item[0]
                temperature = item[1]
                precipitation_list.append((time, temperature))
        return precipitation_list
        # print("处理温度相关问题。")

    def handle_wind():
        precipitation_list = []
        for date, items in forecast_by_day.items():
            for item in items:
                time = item[0]
                wind = item[3]
                precipitation_list.append((time, wind))
        return precipitation_list
        # print("处理风速相关问题。")

    def handle_humidity():
        precipitation_list = []
        for date, items in forecast_by_day.items():
            for item in items:
                time = item[0]
                humidity = item[4]
                precipitation_list.append((time, humidity))
        return precipitation_list
        # print("处理湿度相关问题。")

    def process_answer(answer):
        if answer == "pop":
            index = 1
            return handle_pop(),index,answer
        elif answer == "describe":
            index = 1
            return handle_describe(),index,answer
        elif answer == "cloth":
            index = 1
            return handle_cloth(),index,answer
        elif answer == "temperature":
            index = 1
            return handle_temperature(),index,answer
        elif answer == "wind":
            index = 1
            return handle_wind(),index,answer
        elif answer == "humidity":
            index = 1
            return handle_humidity(),index,answer
        else:
            index = 0
            lines = ["您的问题有些模糊。给您提供全部数据的简单描述，如需折线图，请具体询问：如“天气怎么样，是否会降水”"]
            for day in result['forecast']:
                lines.append(
                    f"日期：{day['date']},天气情况：{day['description']},气温：{day['temperature']}°C,"
                    f"风速:{day['speed']}, 湿度:{day['humidity']},降水率:{day['pop']}"
                )
            answer = "null"
            return lines, index,answer

    return process_answer(answer)  # 关键补充：外层返回结果





### 生成折线图

In [4]:
from pygments.lexers import go
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime
import plotly.graph_objects as go

def line_chart(questions, city, max_day):
    data, index,answer = get_line_chart_data(questions, city, max_day) #获取数据
    if index == 0: #没有返回数据，仅输出
        print("\n".join(data))
    else:
        print("1")
        title_of_image = "null"
        if answer == "pop":
            title_of_image = "Rainfall prediction chart"
        elif answer == "describe":
            title_of_image = "Rainfall prediction chart"
        elif answer == "cloth":
            title_of_image = "Rainfall prediction chart"
        elif answer == "temperature":
            title_of_image = "Rainfall prediction chart"
        elif answer == "wind":
            title_of_image = "Rainfall prediction chart"
        elif answer == "humidity":
            title_of_image = "Rainfall prediction chart"
        print(f"您询问的问题与{answer}相关正在生成折线图")
        times = [datetime.strptime(item[0], "%Y-%m-%d %H:%M:%S") for item in data]
        values = [item[1] for item in data]

        # 使用 Plotly 创建交互式图表
        fig = go.Figure()

        # 添加数据到图表
        fig.add_trace(go.Scatter(x=times, y=values, mode='lines+markers', name='Precipitation Probability'))

        # 更新布局
        fig.update_layout(
            title="Precipitation Over Time",
            xaxis_title="Date and Time",
            yaxis_title="Precipitation Probability",
            xaxis=dict(tickformat="%Y-%m-%d %H:%M:%S", tickangle=45),
            autosize=True
        )

        # 显示图表
        fig.show()

### 主函数

In [5]:
from line_chart_data import get_line_chart_data


def main():
    line_chart("weather", "perth", 5)


# better way to run the main code
if __name__ == "__main__":
    main()


1
您询问的问题与describe相关正在生成折线图
