In [None]:
from dotenv import load_dotenv
import os
import requests
load_dotenv()
def get_aqi_str(aqi_key:str)->str:
    url = f'https://data.moenv.gov.tw/api/v2/aqx_p_488?api_key={aqi_key}&limit=1000&sort=datacreationdate desc&format=JSON'
    try:
        r = requests.get(url)
        r.raise_for_status()
    except Exception as e:
        print(f"發生錯誤:{e}")
    return r.text

aqi_data:str = get_aqi_str(aqi_key=os.environ['AQI_KEY'])

In [None]:
from pydantic import BaseModel,Field

class Record(BaseModel):
    站點: str = Field(alias='sitename')
    縣市: str = Field(alias='county')
    aqi: int
    狀態: str = Field(alias='status')
    pm25: float = Field(alias='pm2.5')
    緯度: float = Field(alias='latitude')
    經度: float = Field(alias='longitude')


class AQI(BaseModel):
    records: list[Record]

aqi = AQI.model_validate_json(aqi_data)

#驗証錯誤,pm2.5和aqi有空字串,無法轉換為float

In [None]:
from pydantic import BaseModel,Field,field_validator,field_serializer
from datetime import datetime

class Record(BaseModel):
    站點: str = Field(alias='sitename')
    縣市: str = Field(alias='county')
    日期: datetime = Field(alias='datacreationdate')
    aqi: float | None 
    狀態: str = Field(alias='status')
    pm25: float | None = Field(alias='pm2.5')
    緯度: float = Field(alias='latitude')
    經度: float = Field(alias='longitude')

    @field_validator("pm25","aqi",mode='before')
    @classmethod
    def pm25_for_empty_string(cls,v:str)->float:
        if v == '':
            return None
        else:
            return float(v)
    
    @field_serializer('緯度','經度')
    def four_digits(self,value:float)->float:
        return round(value,ndigits=4)
    
    @field_serializer('日期')
    def datetime_to_string(self,date:datetime) -> str:
        return date.strftime('%Y:%m:%d %H:%M:00')


class AQI(BaseModel):
    records: list[Record]

aqi = AQI.model_validate_json(aqi_data)

In [None]:
aqi_data:list[dict] = aqi.model_dump()['records']
for aqi_item in aqi_data:
    print(aqi_item)