# 路径操作的高级配置
## OpenAPI 的 operationId

你可以在路径操作中通过参数 operation_id 设置要使用的 OpenAPI operationId。

务必确保每个操作路径的 operation_id 都是唯一的。



In [None]:
from fastapi import FastAPI

app = FastAPI()


@app.get("/items/", operation_id="some_specific_id_you_define")
async def read_items():
    return [{"item_id": "Foo"}]

## 使用 路径操作函数 的函数名作为 operationId

如果你想用你的 API 的函数名作为 operationId 的名字，你可以遍历一遍 API 的函数名，然后使用他们的 APIRoute.name 重写每个 路径操作 的 operation_id。

你应该在添加了所有 路径操作 之后执行此操作。

In [None]:
from fastapi import FastAPI
from fastapi.routing import APIRoute

app = FastAPI()


@app.get("/items/")
async def read_items():
    return [{"item_id": "Foo"}]


def use_route_names_as_operation_ids(app: FastAPI) -> None:
    """
    Simplify operation IDs so that generated API clients have simpler function
    names.

    Should be called only after all routes have been added.
    """
    for route in app.routes:
        if isinstance(route, APIRoute):
            route.operation_id = route.name  # in this case, 'read_items'


use_route_names_as_operation_ids(app)

## 从 OpenAPI 中排除

使用参数 include_in_schema 并将其设置为 False ，来从生成的 OpenAPI 方案中排除一个 路径操作（这样一来，就从自动化文档系统中排除掉了）。

In [None]:
from fastapi import FastAPI

app = FastAPI()


@app.get("/items/", include_in_schema=False)
async def read_items():
    return [{"item_id": "Foo"}]

## docstring 的高级描述

你可以限制 路径操作函数 的 docstring 中用于 OpenAPI 的行数。

添加一个 \f （一个「换页」的转义字符）可以使 FastAPI 在那一位置截断用于 OpenAPI 的输出。

剩余部分不会出现在文档中，但是其他工具（比如 Sphinx）可以使用剩余部分。

In [None]:
from typing import Optional, Set

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: Optional[str] = None
    price: float
    tax: Optional[float] = None
    tags: Set[str] = []


@app.post("/items/", response_model=Item, summary="Create an item")
async def create_item(item: Item):
    """
    Create an item with all the information:

    - **name**: each item must have a name
    - **description**: a long description
    - **price**: required
    - **tax**: if the item doesn't have tax, you can omit this
    - **tags**: a set of unique tag strings for this item
    \f
    :param item: User input.
    """
    return item

# 额外的状态码

FastAPI 默认使用 JSONResponse 返回一个响应，将你的 路径操作 中的返回内容放到该 JSONResponse 中。

FastAPI 会自动使用默认的状态码或者使用你在 路径操作 中设置的状态码。

## 额外的状态码
如果你想要返回主要状态码之外的状态码，你可以通过直接返回一个 Response 来实现，比如 JSONResponse，然后直接设置额外的状态码。

例如，假设你想有一个 路径操作 能够更新条目，并且更新成功时返回 200 「成功」 的 HTTP 状态码。

但是你也希望它能够接受新的条目。并且当这些条目不存在时，会自动创建并返回 201 「创建」的 HTTP 状态码。

要实现它，导入 JSONResponse，然后在其中直接返回你的内容，并将 status_code 设置为为你要的值。



In [None]:
from typing import Optional

from fastapi import Body, FastAPI, status
from fastapi.responses import JSONResponse

app = FastAPI()

items = {"foo": {"name": "Fighters", "size": 6}, "bar": {"name": "Tenders", "size": 3}}


@app.put("/items/{item_id}")
async def upsert_item(
    item_id: str, name: Optional[str] = Body(None), size: Optional[int] = Body(None)
):
    if item_id in items:
        item = items[item_id]
        item["name"] = name
        item["size"] = size
        return item
    else:
        item = {"name": name, "size": size}
        items[item_id] = item
        return JSONResponse(status_code=status.HTTP_201_CREATED, content=item)

当你直接返回一个像上面例子中的 Response 对象时，它会直接返回。

FastAPI 不会用模型等对该响应进行序列化。

确保其中有你想要的数据，且返回的值为合法的 JSON（如果你使用 JSONResponse 的话）

## 在 Response 中使用 jsonable_encoder
由于 FastAPI 并未对你返回的 Response 做任何改变，你必须确保你已经准备好响应内容。

例如，如果不首先将 Pydantic 模型转换为 dict，并将所有数据类型（如 datetime、UUID 等）转换为兼容 JSON 的类型，则不能将其放入JSONResponse中。

对于这些情况，在将数据传递给响应之前，你可以使用 jsonable_encoder 来转换你的数据。

In [None]:
from datetime import datetime
from typing import Optional

from fastapi import FastAPI
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
from pydantic import BaseModel


class Item(BaseModel):
    title: str
    timestamp: datetime
    description: Optional[str] = None


app = FastAPI()


@app.put("/items/{id}")
def update_item(id: str, item: Item):
    json_compatible_item_data = jsonable_encoder(item)
    return JSONResponse(content=json_compatible_item_data)

# 自定义响应 - HTML，流，文件和其他

FastAPI 默认会使用 JSONResponse 返回响应。

你可以通过直接返回 Response 来重载它，参见 直接返回响应。

但如果你直接返回 Response，返回数据不会自动转换，也不会自动生成文档（例如，在 HTTP 头 Content-Type 中包含特定的「媒体类型」作为生成的 OpenAPI 的一部分）。

你还可以在 路径操作装饰器 中声明你想用的 Response。

你从 路径操作函数 中返回的内容将被放在该 Response 中。

并且如果该 Response 有一个 JSON 媒体类型（application/json），比如使用 JSONResponse 或者 UJSONResponse 的时候，返回的数据将使用你在路径操作装饰器中声明的任何 Pydantic 的 response_model 自动转换（和过滤）。

## 使用 ORJSONResponse
例如，如果你需要压榨性能，你可以安装并使用 orjson 并将响应设置为 ORJSONResponse。

导入你想要使用的 Response 类（子类）然后在 路径操作装饰器 中声明它。

In [None]:
from fastapi import FastAPI
from fastapi.responses import ORJSONResponse

app = FastAPI()


@app.get("/items/", response_class=ORJSONResponse)
async def read_items():
    return [{"item_id": "Foo"}]

参数 response_class 也会用来定义响应的「媒体类型」。

在这个例子中，HTTP 头的 Content-Type 会被设置成 application/json。

并且在 OpenAPI 文档中也会这样记录。

ORJSONResponse 目前只在 FastAPI 中可用，而在 Starlette 中不可用。

## HTML 响应

使用 HTMLResponse 来从 FastAPI 中直接返回一个 HTML 响应。

导入 HTMLResponse。
将 HTMLResponse 作为你的 路径操作 的 response_class 参数传入。

In [None]:
from fastapi import FastAPI
from fastapi.responses import HTMLResponse

app = FastAPI()


@app.get("/items/", response_class=HTMLResponse)
async def read_items():
    return """
    <html>
        <head>
            <title>Some HTML in here</title>
        </head>
        <body>
            <h1>Look ma! HTML!</h1>
        </body>
    </html>
    """

### 返回一个 Response

In [None]:
from fastapi import FastAPI
from fastapi.responses import HTMLResponse

app = FastAPI()


@app.get("/items/")
async def read_items():
    html_content = """
    <html>
        <head>
            <title>Some HTML in here</title>
        </head>
        <body>
            <h1>Look ma! HTML!</h1>
        </body>
    </html>
    """
    return HTMLResponse(content=html_content, status_code=200)

### OpenAPI 中的文档和重载 Response

如果你想要在函数内重载响应，但是同时在 OpenAPI 中文档化「媒体类型」，你可以使用 response_class 参数并返回一个 Response 对象。

接着 response_class 参数只会被用来文档化 OpenAPI 的 路径操作，你的 Response 用来返回响应。

### 直接返回 HTMLResponse

In [None]:
from fastapi import FastAPI
from fastapi.responses import HTMLResponse

app = FastAPI()


def generate_html_response():
    html_content = """
    <html>
        <head>
            <title>Some HTML in here</title>
        </head>
        <body>
            <h1>Look ma! HTML!</h1>
        </body>
    </html>
    """
    return HTMLResponse(content=html_content, status_code=200)


@app.get("/items/", response_class=HTMLResponse)
async def read_items():
    return generate_html_response()

在这个例子中，函数 generate_html_response() 已经生成并返回 Response 对象而不是在 str 中返回 HTML。

通过返回函数 generate_html_response() 的调用结果，你已经返回一个重载 FastAPI 默认行为的 Response 对象，

但如果你在 response_class 中也传入了 HTMLResponse，FastAPI 会知道如何在 OpenAPI 和交互式文档中使用 text/html 将其文档化为 HTML。

## 可用响应
这里有一些可用的响应。

要记得你可以使用 Response 来返回任何其他东西，甚至创建一个自定义的子类。

[LOOK THIS](https://fastapi.tiangolo.com/zh/advanced/custom-response)

### 对类似文件的对象使用 StreamingResponse

如果您有类似文件的对象（例如，由 open() 返回的对象），则可以在 StreamingResponse 中将其返回。

包括许多与云存储，视频处理等交互的库。


In [None]:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

some_file_path = "large-video-file.mp4"
app = FastAPI()


@app.get("/")
def main():
    def iterfile():  # (1)
        with open(some_file_path, mode="rb") as file_like:  # (2)
            yield from file_like  # (3)

    return StreamingResponse(iterfile(), media_type="video/mp4")

注意在这里，因为我们使用的是不支持 async 和 await 的标准 open()，我们使用普通的 def 声明了路径操作。

## FileResponse

异步传输文件作为响应。

与其他响应类型相比，接受不同的参数集进行实例化：

- path - 要流式传输的文件的文件路径。
- headers - 任何自定义响应头，传入字典类型。
- media_type - 给出媒体类型的字符串。如果未设置，则文件名或路径将用于推断媒体类型。
- filename - 如果给出，它将包含在响应的 Content-Disposition 中。
- 文件响应将包含适当的 Content-Length，Last-Modified 和 ETag 的响应头。

In [None]:
from fastapi import FastAPI
from fastapi.responses import FileResponse

some_file_path = "large-video-file.mp4"
app = FastAPI()


@app.get("/")
async def main():
    return FileResponse(some_file_path)

# OpenAPI 中的其他响应

## 带模型的附加响应

您可以将参数 `responses` 传递给路径操作装饰器


In [None]:
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from pydantic import BaseModel


class Item(BaseModel):
    id: str
    value: str


class Message(BaseModel):
    message: str


app = FastAPI()


@app.get("/items/{item_id}", response_model=Item, responses={404: {"model": Message}})
async def read_item(item_id: str):
    if item_id == "foo":
        return {"id": "foo", "value": "there goes my hero"}
    else:
        return JSONResponse(status_code=404, content={"message": "Item not found"})


In [None]:
from typing import Optional

from fastapi import FastAPI
from fastapi.responses import FileResponse
from pydantic import BaseModel


class Item(BaseModel):
    id: str
    value: str


app = FastAPI()

# 直接指定类型
@app.get(
    "/items/{item_id}",
    response_model=Item,
    responses={
        200: {
            "content": {"image/png": {}},
            "description": "Return the JSON item or an image.",
        }
    },
)
async def read_item(item_id: str, img: Optional[bool] = None):
    if img:
        return FileResponse("image.png", media_type="image/png")
    else:
        return {"id": "foo", "value": "there goes my hero"}


## 组合信息

通过 response_model, status_code, 与 responses 参数，可以从多个位置组合响应信息

In [None]:
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from pydantic import BaseModel


class Item(BaseModel):
    id: str
    value: str


class Message(BaseModel):
    message: str


app = FastAPI()


@app.get(
    "/items/{item_id}",
    response_model=Item,
    responses={
        404: {"model": Message, "description": "The item was not found"},
        200: {
            "description": "Item requested by ID",
            "content": {
                "application/json": {
                    "example": {"id": "bar", "value": "The bar tenders"}
                }
            },
        },
    },
)
async def read_item(item_id: str):
    if item_id == "foo":
        return {"id": "foo", "value": "there goes my hero"}
    else:
        return JSONResponse(status_code=404, content={"message": "Item not found"})

## 结合预定义响应和自定义响应

对于这种情况，可以先用字典解包构建新的信息字典

In [None]:
old_dict = {
    "old key": "old value",
    "second old key": "second old value",
}
new_dict = {**old_dict, "new key": "new value"}

In [None]:
from typing import Optional

from fastapi import FastAPI
from fastapi.responses import FileResponse
from pydantic import BaseModel


class Item(BaseModel):
    id: str
    value: str


responses = {
    404: {"description": "Item not found"},
    302: {"description": "The item was moved"},
    403: {"description": "Not enough privileges"},
}


app = FastAPI()


@app.get(
    "/items/{item_id}",
    response_model=Item,
    responses={**responses, 200: {"content": {"image/png": {}}}},
)
async def read_item(item_id: str, img: Optional[bool] = None):
    if img:
        return FileResponse("image.png", media_type="image/png")
    else:
        return {"id": "foo", "value": "there goes my hero"}

# 响应 Cookies

## 使用 Response 参数

感觉这玩意丢在依赖里面修改响应很好

In [None]:
from fastapi import FastAPI, Response

app = FastAPI()


@app.post("/cookie-and-object/")
def create_cookie(response: Response):
    response.set_cookie(key="fakesession", value="fake-cookie-session-value")
    return {"message": "Come to the dark side, we have cookies"}

## 直接返回 Response

In [None]:
from fastapi import FastAPI
from fastapi.responses import JSONResponse

app = FastAPI()


@app.post("/cookie/")
def create_cookie():
    content = {"message": "Come to the dark side, we have cookies"}
    response = JSONResponse(content=content)
    response.set_cookie(key="fakesession", value="fake-cookie-session-value")
    return response

# 响应 Headers

## 使用 Response 参数

In [None]:
from fastapi import FastAPI, Response

app = FastAPI()


@app.get("/headers-and-object/")
def get_headers(response: Response):
    response.headers["X-Cat-Dog"] = "alone in the world"
    return {"message": "Hello World"}

## 直接返回 Response

In [None]:
from fastapi import FastAPI
from fastapi.responses import JSONResponse

app = FastAPI()


@app.get("/headers/")
def get_headers():
    content = {"message": "Hello World"}
    headers = {"X-Cat-Dog": "alone in the world", "Content-Language": "en-US"}
    return JSONResponse(content=content, headers=headers)

# 更改响应的状态码

In [None]:
from fastapi import FastAPI, Response, status

app = FastAPI()

tasks = {"foo": "Listen to the Bar Fighters"}


@app.put("/get-or-create-task/{task_id}", status_code=200)
def get_or_create_task(task_id: str, response: Response):
    if task_id not in tasks:
        tasks[task_id] = "This didn't exist before"
        response.status_code = status.HTTP_201_CREATED
    return tasks[task_id]



# 进阶依赖

## 参数化依赖

In [None]:
from fastapi import Depends, FastAPI

app = FastAPI()


class FixedContentQueryChecker:
    def __init__(self, fixed_content: str):
        self.fixed_content = fixed_content

    def __call__(self, q: str = ""):
        if q:
            return self.fixed_content in q
        return False


checker = FixedContentQueryChecker("bar")


@app.get("/query-checker/")
async def read_query_check(fixed_content_included: bool = Depends(checker)):
    return {"fixed_content_in_query": fixed_content_included}

# 进阶安全性

## OAuth2 范围

OAuth2 定义 "范围" 是一个由空格分割的字符串列表

这些范围代表着权限，举个例子：
- users:read or users:write
- instagram_basic 

## 全局视图

看看我们对于之前 Oauth2 与 jwt 认证的代码修改


In [None]:
from datetime import datetime, timedelta
from typing import List, Optional

from fastapi import Depends, FastAPI, HTTPException, Security, status
from fastapi.security import (
    OAuth2PasswordBearer,
    OAuth2PasswordRequestForm,
    SecurityScopes,
)
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Chains",
        "email": "alicechains@example.com",
        "hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
        "disabled": True,
    },
}


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Optional[str] = None
    scopes: List[str] = []


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# 我们声明了2个 Oauth2 范围：me 和 items
oauth2_scheme = OAuth2PasswordBearer(
    tokenUrl="token",
    scopes={"me": "Read information about the current user.", "items": "Read items."},
)

app = FastAPI()


def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
    return pwd_context.hash(password)


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user(
    security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
):
    if security_scopes.scopes:
        authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
    else:
        authenticate_value = f"Bearer"
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": authenticate_value},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_scopes = payload.get("scopes", [])
        token_data = TokenData(scopes=token_scopes, username=username)
    except (JWTError, ValidationError):
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    for scope in security_scopes.scopes:
        if scope not in token_data.scopes:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Not enough permissions",
                headers={"WWW-Authenticate": authenticate_value},
            )
    return user

# 使用 Security 模块来实现认证，它可以像依赖那样使用
async def get_current_active_user(
    current_user: User = Security(get_current_user, scopes=["me"])
):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        # 使用包含 scopes 的JWT返回
        data={"sub": user.username, "scopes": form_data.scopes},
        expires_delta=access_token_expires,
    )
    return {"access_token": access_token, "token_type": "bearer"}


@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

# 使用 Security 进行安全认证
@app.get("/users/me/items/")
async def read_own_items(
    current_user: User = Security(get_current_active_user, scopes=["items"])
):
    return [{"item_id": "Foo", "owner": current_user.username}]


@app.get("/status/")
async def read_system_status(current_user: User = Depends(get_current_user)):
    return {"status": "ok"}

# HTTP 基本身份验证

## 简单的 HTTP 基本身份验证

这告诉浏览器显示用户名和密码的集成提示。

然后，当您键入该用户名和密码时，浏览器会自动将它们发送到headers中

当您第一次尝试打开 URL（或单击文档中的“执行”按钮）时，浏览器会询问您的用户名和密码

In [None]:
from fastapi import Depends, FastAPI
from fastapi.security import HTTPBasic, HTTPBasicCredentials

app = FastAPI()

security = HTTPBasic()


@app.get("/users/me")
def read_current_user(credentials: HTTPBasicCredentials = Depends(security)):
    return {"username": credentials.username, "password": credentials.password}

## 检查用户名

In [None]:
import secrets

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials

app = FastAPI()

security = HTTPBasic()


def get_current_username(credentials: HTTPBasicCredentials = Depends(security)):
    correct_username = secrets.compare_digest(credentials.username, "stanleyjobson")
    correct_password = secrets.compare_digest(credentials.password, "swordfish")
    if not (correct_username and correct_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect email or password",
            headers={"WWW-Authenticate": "Basic"},
        )
    return credentials.username


@app.get("/users/me")
def read_current_user(username: str = Depends(get_current_username)):
    return {"username": username}

通过使用 secrets.compare_digest() 它将可以安全地抵御一种称为“定时攻击”的攻击。

## 返回错误信息

In [None]:
import secrets

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials

app = FastAPI()

security = HTTPBasic()


def get_current_username(credentials: HTTPBasicCredentials = Depends(security)):
    correct_username = secrets.compare_digest(credentials.username, "stanleyjobson")
    correct_password = secrets.compare_digest(credentials.password, "swordfish")
    if not (correct_username and correct_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect email or password",
            headers={"WWW-Authenticate": "Basic"},
        )
    return credentials.username


@app.get("/users/me")
def read_current_user(username: str = Depends(get_current_username)):
    return {"username": username}

# 直接使用 Request 对象

这将意味着 FastAPI 不会对其做任何验证

但是在某些特定情况下，获取 Request 对象很有用。

In [None]:
from fastapi import FastAPI, Request

app = FastAPI()


@app.get("/items/{item_id}")
def read_root(item_id: str, request: Request):
    client_host = request.client.host
    return {"client_host": client_host, "item_id": item_id}

# 使用数据类 dataclass

它支持相同的：

- 数据验证
- 数据序列化
- 数据文档

In [None]:
from dataclasses import dataclass
from typing import Optional

from fastapi import FastAPI


@dataclass
class Item:
    name: str
    price: float
    description: Optional[str] = None
    tax: Optional[float] = None


app = FastAPI()


@app.post("/items/")
async def create_item(item: Item):
    return item

也可以将数据类提交给 response_model 参数

数据类将自动转换为 Pydantic 数据类

In [None]:
from dataclasses import dataclass, field
from typing import List, Optional

from fastapi import FastAPI


@dataclass
class Item:
    name: str
    price: float
    tags: List[str] = field(default_factory=list)
    description: Optional[str] = None
    tax: Optional[float] = None


app = FastAPI()


@app.get("/items/next", response_model=Item)
async def read_next_item():
    return {
        "name": "Island In The Moon",
        "price": 12.99,
        "description": "A place to be be playin' and havin' fun",
        "tags": ["breater"],
    }

# 中间件进阶

## 添加 ASGI 中间件



In [None]:
from fastapi import FastAPI
from unicorn import UnicornMiddleware

app = FastAPI()

app.add_middleware(UnicornMiddleware, some_config="rainbow")

## 集成中间件

FastAPI 包含几个用于常见用例的中间件，我们接下来会看到如何使用它们



### HTTPSRedirectMiddleware

强制所有传入请求必须是 https 或 wss。

任何对 http 或 ws 的传入请求都将被重定向到安全方案。

In [None]:
from fastapi import FastAPI
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware

app = FastAPI()

app.add_middleware(HTTPSRedirectMiddleware)


@app.get("/")
async def main():
    return {"message": "Hello World"}

### TrustedHostMiddleware

强制所有传入请求都具有正确设置的 Host 标头，以防止 HTTP Host Header 攻击。

In [None]:
from fastapi import FastAPI
from fastapi.middleware.trustedhost import TrustedHostMiddleware

app = FastAPI()

app.add_middleware(
    TrustedHostMiddleware, allowed_hosts=["example.com", "*.example.com"]
)


@app.get("/")
async def main():
    return {"message": "Hello World"}

### GZipMiddleware

处理任何在 Accept-Encoding 标头中包含“gzip”的请求的 GZip 响应。

中间件将处理标准响应和流响应。

In [None]:
from fastapi import FastAPI
from fastapi.middleware.gzip import GZipMiddleware

app = FastAPI()

app.add_middleware(GZipMiddleware, minimum_size=1000)


@app.get("/")
async def main():
    return "somebigcontent"