Skip to content

Commit

Permalink
Support cognito admin user password auth flow (#3547)
Browse files Browse the repository at this point in the history
Applies the user credentials pattern from the ADMIN_NO_SRP_AUTH flow
to the ADMIN_USER_PASSWORD_AUTH auth flow for Cognito admin_initiate_auth
requests.

Co-authored-by: Robin Wilkins <r.wilkins@waracle.com>
  • Loading branch information
robjwilkins and Robin Wilkins committed Dec 15, 2020
1 parent c9c30b8 commit a31599d
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 76 deletions.
21 changes: 21 additions & 0 deletions moto/cognitoidp/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,27 @@ def admin_initiate_auth(self, user_pool_id, client_id, auth_flow, auth_parameter
"Session": session,
}

return self._log_user_in(user_pool, client, username)
elif auth_flow == "ADMIN_USER_PASSWORD_AUTH":
username = auth_parameters.get("USERNAME")
password = auth_parameters.get("PASSWORD")
user = user_pool.users.get(username)
if not user:
raise UserNotFoundError(username)

if user.password != password:
raise NotAuthorizedError(username)

if user.status == UserStatus["FORCE_CHANGE_PASSWORD"]:
session = str(uuid.uuid4())
self.sessions[session] = user_pool

return {
"ChallengeName": "NEW_PASSWORD_REQUIRED",
"ChallengeParameters": {},
"Session": session,
}

return self._log_user_in(user_pool, client, username)
elif auth_flow == "REFRESH_TOKEN":
refresh_token = auth_parameters.get("REFRESH_TOKEN")
Expand Down
168 changes: 92 additions & 76 deletions tests/test_cognitoidp/test_cognitoidp.py
Original file line number Diff line number Diff line change
Expand Up @@ -1198,7 +1198,7 @@ def test_admin_delete_user():
caught.should.be.true


def authentication_flow(conn):
def authentication_flow(conn, auth_flow):
username = str(uuid.uuid4())
temporary_password = str(uuid.uuid4())
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
Expand All @@ -1220,7 +1220,7 @@ def authentication_flow(conn):
result = conn.admin_initiate_auth(
UserPoolId=user_pool_id,
ClientId=client_id,
AuthFlow="ADMIN_NO_SRP_AUTH",
AuthFlow=auth_flow,
AuthParameters={"USERNAME": username, "PASSWORD": temporary_password},
)

Expand Down Expand Up @@ -1255,7 +1255,8 @@ def authentication_flow(conn):
def test_authentication_flow():
conn = boto3.client("cognito-idp", "us-west-2")

authentication_flow(conn)
for auth_flow in ["ADMIN_NO_SRP_AUTH", "ADMIN_USER_PASSWORD_AUTH"]:
authentication_flow(conn, auth_flow)


def user_authentication_flow(conn):
Expand Down Expand Up @@ -1397,49 +1398,54 @@ def test_token_legitimacy():
with open(os.path.join(os.path.dirname(__file__), path)) as f:
json_web_key = json.loads(f.read())["keys"][0]

outputs = authentication_flow(conn)
id_token = outputs["id_token"]
access_token = outputs["access_token"]
client_id = outputs["client_id"]
issuer = "https://cognito-idp.us-west-2.amazonaws.com/{}".format(
outputs["user_pool_id"]
)
id_claims = json.loads(jws.verify(id_token, json_web_key, "RS256"))
id_claims["iss"].should.equal(issuer)
id_claims["aud"].should.equal(client_id)
id_claims["token_use"].should.equal("id")
for k, v in outputs["additional_fields"].items():
id_claims[k].should.equal(v)
access_claims = json.loads(jws.verify(access_token, json_web_key, "RS256"))
access_claims["iss"].should.equal(issuer)
access_claims["aud"].should.equal(client_id)
access_claims["token_use"].should.equal("access")
for auth_flow in ["ADMIN_NO_SRP_AUTH", "ADMIN_USER_PASSWORD_AUTH"]:
outputs = authentication_flow(conn, auth_flow)
id_token = outputs["id_token"]
access_token = outputs["access_token"]
client_id = outputs["client_id"]
issuer = "https://cognito-idp.us-west-2.amazonaws.com/{}".format(
outputs["user_pool_id"]
)
id_claims = json.loads(jws.verify(id_token, json_web_key, "RS256"))
id_claims["iss"].should.equal(issuer)
id_claims["aud"].should.equal(client_id)
id_claims["token_use"].should.equal("id")
for k, v in outputs["additional_fields"].items():
id_claims[k].should.equal(v)
access_claims = json.loads(jws.verify(access_token, json_web_key, "RS256"))
access_claims["iss"].should.equal(issuer)
access_claims["aud"].should.equal(client_id)
access_claims["token_use"].should.equal("access")


@mock_cognitoidp
def test_change_password():
conn = boto3.client("cognito-idp", "us-west-2")

outputs = authentication_flow(conn)
for auth_flow in ["ADMIN_NO_SRP_AUTH", "ADMIN_USER_PASSWORD_AUTH"]:
outputs = authentication_flow(conn, auth_flow)

# Take this opportunity to test change_password, which requires an access token.
newer_password = str(uuid.uuid4())
conn.change_password(
AccessToken=outputs["access_token"],
PreviousPassword=outputs["password"],
ProposedPassword=newer_password,
)
# Take this opportunity to test change_password, which requires an access token.
newer_password = str(uuid.uuid4())
conn.change_password(
AccessToken=outputs["access_token"],
PreviousPassword=outputs["password"],
ProposedPassword=newer_password,
)

# Log in again, which should succeed without a challenge because the user is no
# longer in the force-new-password state.
result = conn.admin_initiate_auth(
UserPoolId=outputs["user_pool_id"],
ClientId=outputs["client_id"],
AuthFlow="ADMIN_NO_SRP_AUTH",
AuthParameters={"USERNAME": outputs["username"], "PASSWORD": newer_password},
)
# Log in again, which should succeed without a challenge because the user is no
# longer in the force-new-password state.
result = conn.admin_initiate_auth(
UserPoolId=outputs["user_pool_id"],
ClientId=outputs["client_id"],
AuthFlow="ADMIN_NO_SRP_AUTH",
AuthParameters={
"USERNAME": outputs["username"],
"PASSWORD": newer_password,
},
)

result["AuthenticationResult"].should_not.be.none
result["AuthenticationResult"].should_not.be.none


@mock_cognitoidp
Expand All @@ -1452,26 +1458,30 @@ def test_change_password__using_custom_user_agent_header():
my_config = Config(user_agent_extra="more/info", signature_version="v4")
conn = boto3.client("cognito-idp", "us-west-2", config=my_config)

outputs = authentication_flow(conn)
for auth_flow in ["ADMIN_NO_SRP_AUTH", "ADMIN_USER_PASSWORD_AUTH"]:
outputs = authentication_flow(conn, auth_flow)

# Take this opportunity to test change_password, which requires an access token.
newer_password = str(uuid.uuid4())
conn.change_password(
AccessToken=outputs["access_token"],
PreviousPassword=outputs["password"],
ProposedPassword=newer_password,
)
# Take this opportunity to test change_password, which requires an access token.
newer_password = str(uuid.uuid4())
conn.change_password(
AccessToken=outputs["access_token"],
PreviousPassword=outputs["password"],
ProposedPassword=newer_password,
)

# Log in again, which should succeed without a challenge because the user is no
# longer in the force-new-password state.
result = conn.admin_initiate_auth(
UserPoolId=outputs["user_pool_id"],
ClientId=outputs["client_id"],
AuthFlow="ADMIN_NO_SRP_AUTH",
AuthParameters={"USERNAME": outputs["username"], "PASSWORD": newer_password},
)
# Log in again, which should succeed without a challenge because the user is no
# longer in the force-new-password state.
result = conn.admin_initiate_auth(
UserPoolId=outputs["user_pool_id"],
ClientId=outputs["client_id"],
AuthFlow="ADMIN_NO_SRP_AUTH",
AuthParameters={
"USERNAME": outputs["username"],
"PASSWORD": newer_password,
},
)

result["AuthenticationResult"].should_not.be.none
result["AuthenticationResult"].should_not.be.none


@mock_cognitoidp
Expand Down Expand Up @@ -1737,36 +1747,42 @@ def test_initiate_auth_with_invalid_secret_hash():
@mock_cognitoidp
def test_setting_mfa():
conn = boto3.client("cognito-idp", "us-west-2")
result = authentication_flow(conn)
conn.associate_software_token(AccessToken=result["access_token"])
conn.verify_software_token(AccessToken=result["access_token"], UserCode="123456")
conn.set_user_mfa_preference(
AccessToken=result["access_token"],
SoftwareTokenMfaSettings={"Enabled": True, "PreferredMfa": True},
)
result = conn.admin_get_user(
UserPoolId=result["user_pool_id"], Username=result["username"]
)

result["UserMFASettingList"].should.have.length_of(1)
for auth_flow in ["ADMIN_NO_SRP_AUTH", "ADMIN_USER_PASSWORD_AUTH"]:
result = authentication_flow(conn, auth_flow)
conn.associate_software_token(AccessToken=result["access_token"])
conn.verify_software_token(
AccessToken=result["access_token"], UserCode="123456"
)
conn.set_user_mfa_preference(
AccessToken=result["access_token"],
SoftwareTokenMfaSettings={"Enabled": True, "PreferredMfa": True},
)
result = conn.admin_get_user(
UserPoolId=result["user_pool_id"], Username=result["username"]
)

result["UserMFASettingList"].should.have.length_of(1)


@mock_cognitoidp
def test_setting_mfa_when_token_not_verified():
conn = boto3.client("cognito-idp", "us-west-2")
result = authentication_flow(conn)
conn.associate_software_token(AccessToken=result["access_token"])

caught = False
try:
conn.set_user_mfa_preference(
AccessToken=result["access_token"],
SoftwareTokenMfaSettings={"Enabled": True, "PreferredMfa": True},
)
except conn.exceptions.InvalidParameterException:
caught = True
for auth_flow in ["ADMIN_NO_SRP_AUTH", "ADMIN_USER_PASSWORD_AUTH"]:
result = authentication_flow(conn, auth_flow)
conn.associate_software_token(AccessToken=result["access_token"])

caught.should.be.true
caught = False
try:
conn.set_user_mfa_preference(
AccessToken=result["access_token"],
SoftwareTokenMfaSettings={"Enabled": True, "PreferredMfa": True},
)
except conn.exceptions.InvalidParameterException:
caught = True

caught.should.be.true


@mock_cognitoidp
Expand Down

0 comments on commit a31599d

Please sign in to comment.