-
-
Notifications
You must be signed in to change notification settings - Fork 8.5k
Closed
Labels
Description
So I followed the guide on setting up OAuth on my server. For some reason when I try to send data to this endpoint I get a 400 - Bad Request {"detail":"There was an error parsing the body"}. I have also tried with the swaggerUI authorize button.
The data:
{"username": email, "password": password}
When I print out the response headers:
{username: test@test.com, password: password, content-type: text/plain; charset=utf-8}
The code:
@login_router.post("/token", response_model=Token)
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends()
) -> dict:
user = await authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=float(str(ACCESS_TOKEN_EXPIRE_MINUTES)))
access_token = create_access_token(
data={"sub": user.email, "scopes": form_data.scopes},
expires_delta=access_token_expires,
)
return {"access_token": access_token, "token_type": "bearer"}
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl=f"{config.API_V1_STR}/token",
scopes={"me": "Read information about the current user."},
)
def verify_password(plain_text_password: str, hashed_password: str) -> bool:
"""
This function will verify the plain text password with the stored hash
:param plain_text_password: The plain text password
:param hashed_password: The hash password stored in the data
:return: bool
"""
return bool(pwd_context.verify(plain_text_password, hashed_password))
def get_password_hash(password: str) -> str:
"""
This function will return the hash of a plain text password
:param password: The password in plain text
:return: Hash string
"""
return str(pwd_context.hash(password))
async def get_user(email: str) -> Optional[UserInDB]:
user = await get_user_by_email(email)
if user:
return UserInDB(**user.dict())
return None
async def authenticate_user(email: str, password: str) -> Optional[UserInDB]:
user = await get_user(email)
if not user:
return None
if not verify_password(password, user.password):
return None
return user
def create_access_token(*, data: dict, expires_delta: timedelta = None) -> bytes:
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(
to_encode, str(PASSWORD_SECRET_KEY), algorithm=str(PASSWORD_ALGORITHM)
)
return encoded_jwt
async def get_current_user(
security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
) -> User:
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(
token, str(PASSWORD_SECRET_KEY), algorithms=[PASSWORD_ALGORITHM]
)
email = payload.get("sub")
if email is None:
raise credentials_exception
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, email=email)
except (PyJWTError, ValidationError):
raise credentials_exception
user = await get_user(email=token_data.email)
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_data.scopes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
async def get_current_active_user(
current_user: User = Security(get_current_user, scopes=["me"])
) -> User:
if current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user