diff --git a/moto/route53/responses.py b/moto/route53/responses.py index cc7e822b4f1..62ae5b754e6 100644 --- a/moto/route53/responses.py +++ b/moto/route53/responses.py @@ -36,152 +36,125 @@ def _convert_to_bool(bool_str: Any) -> bool: # type: ignore[misc] def backend(self) -> Route53Backend: return route53_backends[self.current_account]["global"] - def list_or_create_hostzone_response( # type: ignore[return] - self, request: Any, full_url: str, headers: Any - ) -> TYPE_RESPONSE: - self.setup_class(request, full_url, headers) - - # Set these here outside the scope of the try/except - # so they're defined later when we call create_hosted_zone() + def create_hosted_zone(self) -> TYPE_RESPONSE: vpcid = None vpcregion = None - if request.method == "POST": - elements = xmltodict.parse(self.body) - zone_request = elements["CreateHostedZoneRequest"] - if "HostedZoneConfig" in zone_request: - zone_config = zone_request["HostedZoneConfig"] - comment = zone_config["Comment"] - if zone_request.get("VPC", {}).get("VPCId", None): - private_zone = True - else: - private_zone = self._convert_to_bool( - zone_config.get("PrivateZone", False) - ) + elements = xmltodict.parse(self.body) + zone_request = elements["CreateHostedZoneRequest"] + if "HostedZoneConfig" in zone_request: + zone_config = zone_request["HostedZoneConfig"] + comment = zone_config["Comment"] + if zone_request.get("VPC", {}).get("VPCId", None): + private_zone = True else: - comment = None - private_zone = False - - # It is possible to create a Private Hosted Zone without - # associating VPC at the time of creation. - if self._convert_to_bool(private_zone): - if zone_request.get("VPC", None) is not None: - vpcid = zone_request["VPC"].get("VPCId", None) - vpcregion = zone_request["VPC"].get("VPCRegion", None) - - name = zone_request["Name"] - caller_reference = zone_request["CallerReference"] - - if name[-1] != ".": - name += "." - delegation_set_id = zone_request.get("DelegationSetId") - - new_zone = self.backend.create_hosted_zone( - name, - comment=comment, - private_zone=private_zone, - caller_reference=caller_reference, - vpcid=vpcid, - vpcregion=vpcregion, - delegation_set_id=delegation_set_id, - ) - template = Template(CREATE_HOSTED_ZONE_RESPONSE).render(zone=new_zone) - headers = { - "Location": f"https://route53.amazonaws.com/2013-04-01/hostedzone/{new_zone.id}" - } - return 201, headers, template - - elif request.method == "GET": - max_size = self.querystring.get("maxitems", [None])[0] - if max_size: - max_size = int(max_size) - marker = self.querystring.get("marker", [None])[0] - zone_page, next_marker = self.backend.list_hosted_zones( - marker=marker, max_size=max_size - ) - template = Template(LIST_HOSTED_ZONES_RESPONSE).render( - zones=zone_page, - marker=marker, - next_marker=next_marker, - max_items=max_size, - ) - return 200, headers, template + private_zone = self._convert_to_bool( + zone_config.get("PrivateZone", False) + ) + else: + comment = None + private_zone = False + + # It is possible to create a Private Hosted Zone without + # associating VPC at the time of creation. + if self._convert_to_bool(private_zone): + if zone_request.get("VPC", None) is not None: + vpcid = zone_request["VPC"].get("VPCId", None) + vpcregion = zone_request["VPC"].get("VPCRegion", None) + + name = zone_request["Name"] + caller_reference = zone_request["CallerReference"] + + if name[-1] != ".": + name += "." + delegation_set_id = zone_request.get("DelegationSetId") + + new_zone = self.backend.create_hosted_zone( + name, + comment=comment, + private_zone=private_zone, + caller_reference=caller_reference, + vpcid=vpcid, + vpcregion=vpcregion, + delegation_set_id=delegation_set_id, + ) + template = Template(CREATE_HOSTED_ZONE_RESPONSE).render(zone=new_zone) + headers = { + "Location": f"https://route53.amazonaws.com/2013-04-01/hostedzone/{new_zone.id}", + "status": 201, + } + return 201, headers, template + + def list_hosted_zones(self) -> str: + max_size = self.querystring.get("maxitems", [None])[0] + if max_size: + max_size = int(max_size) + marker = self.querystring.get("marker", [None])[0] + zone_page, next_marker = self.backend.list_hosted_zones( + marker=marker, max_size=max_size + ) + template = Template(LIST_HOSTED_ZONES_RESPONSE).render( + zones=zone_page, + marker=marker, + next_marker=next_marker, + max_items=max_size, + ) + return template - def list_hosted_zones_by_name_response( - self, request: Any, full_url: str, headers: Any - ) -> TYPE_RESPONSE: - self.setup_class(request, full_url, headers) + def list_hosted_zones_by_name(self) -> str: query_params = parse_qs(self.parsed_url.query) dnsnames = query_params.get("dnsname") dnsname, zones = self.backend.list_hosted_zones_by_name(dnsnames) template = Template(LIST_HOSTED_ZONES_BY_NAME_RESPONSE) - return 200, headers, template.render(zones=zones, dnsname=dnsname, xmlns=XMLNS) + return template.render(zones=zones, dnsname=dnsname, xmlns=XMLNS) - def list_hosted_zones_by_vpc_response( - self, request: Any, full_url: str, headers: Any - ) -> TYPE_RESPONSE: - self.setup_class(request, full_url, headers) + def list_hosted_zones_by_vpc(self) -> str: query_params = parse_qs(self.parsed_url.query) vpc_id = query_params.get("vpcid")[0] # type: ignore zones = self.backend.list_hosted_zones_by_vpc(vpc_id) template = Template(LIST_HOSTED_ZONES_BY_VPC_RESPONSE) - return 200, headers, template.render(zones=zones, xmlns=XMLNS) + return template.render(zones=zones, xmlns=XMLNS) - def get_hosted_zone_count_response( - self, request: Any, full_url: str, headers: Any - ) -> TYPE_RESPONSE: - self.setup_class(request, full_url, headers) + def get_hosted_zone_count(self) -> str: num_zones = self.backend.get_hosted_zone_count() template = Template(GET_HOSTED_ZONE_COUNT_RESPONSE) - return 200, headers, template.render(zone_count=num_zones, xmlns=XMLNS) + return template.render(zone_count=num_zones, xmlns=XMLNS) - def get_or_delete_hostzone_response( # type: ignore[return] - self, request: Any, full_url: str, headers: Any - ) -> TYPE_RESPONSE: - self.setup_class(request, full_url, headers) + def get_hosted_zone(self) -> str: zoneid = self.parsed_url.path.rstrip("/").rsplit("/", 1)[1] + the_zone = self.backend.get_hosted_zone(zoneid) + template = Template(GET_HOSTED_ZONE_RESPONSE) + return template.render(zone=the_zone) - if request.method == "GET": - the_zone = self.backend.get_hosted_zone(zoneid) - template = Template(GET_HOSTED_ZONE_RESPONSE) - return 200, headers, template.render(zone=the_zone) - elif request.method == "DELETE": - self.backend.delete_hosted_zone(zoneid) - return 200, headers, DELETE_HOSTED_ZONE_RESPONSE - elif request.method == "POST": - elements = xmltodict.parse(self.body) - comment = elements.get("UpdateHostedZoneCommentRequest", {}).get( - "Comment", None - ) - zone = self.backend.update_hosted_zone_comment(zoneid, comment) - template = Template(UPDATE_HOSTED_ZONE_COMMENT_RESPONSE) - return 200, headers, template.render(zone=zone) + def delete_hosted_zone(self) -> str: + zoneid = self.parsed_url.path.rstrip("/").rsplit("/", 1)[1] + self.backend.delete_hosted_zone(zoneid) + return DELETE_HOSTED_ZONE_RESPONSE - def get_dnssec_response( # type: ignore[return] - self, request: Any, full_url: str, headers: Any - ) -> TYPE_RESPONSE: - # returns static response - # TODO: implement enable/disable dnssec apis - self.setup_class(request, full_url, headers) + def update_hosted_zone_comment(self) -> str: + zoneid = self.parsed_url.path.rstrip("/").rsplit("/", 1)[1] - method = request.method + elements = xmltodict.parse(self.body) + comment = elements.get("UpdateHostedZoneCommentRequest", {}).get( + "Comment", None + ) + zone = self.backend.update_hosted_zone_comment(zoneid, comment) + template = Template(UPDATE_HOSTED_ZONE_COMMENT_RESPONSE) + return template.render(zone=zone) + def get_dnssec(self) -> str: + # TODO: implement enable/disable dnssec apis zoneid = self.parsed_url.path.rstrip("/").rsplit("/", 2)[1] - if method == "GET": - self.backend.get_dnssec(zoneid) - return 200, headers, GET_DNSSEC - - def associate_vpc_response( - self, request: Any, full_url: str, headers: Any - ) -> TYPE_RESPONSE: - self.setup_class(request, full_url, headers) + self.backend.get_dnssec(zoneid) + return GET_DNSSEC + def associate_vpc_with_hosted_zone(self) -> str: zoneid = self.parsed_url.path.rstrip("/").rsplit("/", 2)[1] elements = xmltodict.parse(self.body) - comment = vpc = elements.get("AssociateVPCWithHostedZoneRequest", {}).get( + comment = elements.get("AssociateVPCWithHostedZoneRequest", {}).get( "Comment", {} ) vpc = elements.get("AssociateVPCWithHostedZoneRequest", {}).get("VPC", {}) @@ -191,17 +164,13 @@ def associate_vpc_response( self.backend.associate_vpc_with_hosted_zone(zoneid, vpcid, vpcregion) template = Template(ASSOCIATE_VPC_RESPONSE) - return 200, headers, template.render(comment=comment) - - def disassociate_vpc_response( - self, request: Any, full_url: str, headers: Any - ) -> TYPE_RESPONSE: - self.setup_class(request, full_url, headers) + return template.render(comment=comment) + def disassociate_vpc_from_hosted_zone(self) -> str: zoneid = self.parsed_url.path.rstrip("/").rsplit("/", 2)[1] elements = xmltodict.parse(self.body) - comment = vpc = elements.get("DisassociateVPCFromHostedZoneRequest", {}).get( + comment = elements.get("DisassociateVPCFromHostedZoneRequest", {}).get( "Comment", {} ) vpc = elements.get("DisassociateVPCFromHostedZoneRequest", {}).get("VPC", {}) @@ -210,183 +179,153 @@ def disassociate_vpc_response( self.backend.disassociate_vpc_from_hosted_zone(zoneid, vpcid) template = Template(DISASSOCIATE_VPC_RESPONSE) - return 200, headers, template.render(comment=comment) - - def rrset_response( # type: ignore[return] - self, request: Any, full_url: str, headers: Any - ) -> TYPE_RESPONSE: - self.setup_class(request, full_url, headers) - - method = request.method + return template.render(comment=comment) + def change_resource_record_sets(self) -> str: zoneid = self.parsed_url.path.rstrip("/").rsplit("/", 2)[1] - if method == "POST": - elements = xmltodict.parse(self.body) - - change_list = elements["ChangeResourceRecordSetsRequest"]["ChangeBatch"][ - "Changes" - ]["Change"] - if not isinstance(change_list, list): - change_list = [ - elements["ChangeResourceRecordSetsRequest"]["ChangeBatch"][ - "Changes" - ]["Change"] - ] + elements = xmltodict.parse(self.body) - # Enforce quotas https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/DNSLimitations.html#limits-api-requests-changeresourcerecordsets - # - A request cannot contain more than 1,000 ResourceRecord elements. When the value of the Action element is UPSERT, each ResourceRecord element is counted twice. - effective_rr_count = 0 - for value in change_list: - record_set = value["ResourceRecordSet"] - if ( - "ResourceRecords" not in record_set - or not record_set["ResourceRecords"] - ): - continue - resource_records = list(record_set["ResourceRecords"].values())[0] + change_list = elements["ChangeResourceRecordSetsRequest"]["ChangeBatch"][ + "Changes" + ]["Change"] + if not isinstance(change_list, list): + change_list = [ + elements["ChangeResourceRecordSetsRequest"]["ChangeBatch"]["Changes"][ + "Change" + ] + ] + + # Enforce quotas https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/DNSLimitations.html#limits-api-requests-changeresourcerecordsets + # - A request cannot contain more than 1,000 ResourceRecord elements. When the value of the Action element is UPSERT, each ResourceRecord element is counted twice. + effective_rr_count = 0 + for value in change_list: + record_set = value["ResourceRecordSet"] + if "ResourceRecords" not in record_set or not record_set["ResourceRecords"]: + continue + resource_records = list(record_set["ResourceRecords"].values())[0] + effective_rr_count += len(resource_records) + if value["Action"] == "UPSERT": effective_rr_count += len(resource_records) - if value["Action"] == "UPSERT": - effective_rr_count += len(resource_records) - if effective_rr_count > 1000: - raise InvalidChangeBatch - - self.backend.change_resource_record_sets(zoneid, change_list) - - return 200, headers, CHANGE_RRSET_RESPONSE - - elif method == "GET": - querystring = parse_qs(self.parsed_url.query) - template = Template(LIST_RRSET_RESPONSE) - start_type = querystring.get("type", [None])[0] - start_name = querystring.get("name", [None])[0] - max_items = int(querystring.get("maxitems", ["300"])[0]) - - if start_type and not start_name: - return 400, headers, "The input is not valid" - - ( - record_sets, - next_name, - next_type, - is_truncated, - ) = self.backend.list_resource_record_sets( - zoneid, - start_type=start_type, # type: ignore - start_name=start_name, # type: ignore - max_items=max_items, - ) - r_template = template.render( - record_sets=record_sets, - next_name=next_name, - next_type=next_type, - max_items=max_items, - is_truncated=is_truncated, - ) - return 200, headers, r_template + if effective_rr_count > 1000: + raise InvalidChangeBatch - def health_check_response1( # type: ignore[return] - self, request: Any, full_url: str, headers: Any - ) -> TYPE_RESPONSE: - self.setup_class(request, full_url, headers) + self.backend.change_resource_record_sets(zoneid, change_list) - method = request.method - - if method == "POST": - json_body = xmltodict.parse(self.body)["CreateHealthCheckRequest"] - caller_reference = json_body["CallerReference"] - config = json_body["HealthCheckConfig"] - health_check_args = { - "ip_address": config.get("IPAddress"), - "port": config.get("Port"), - "type": config["Type"], - "resource_path": config.get("ResourcePath"), - "fqdn": config.get("FullyQualifiedDomainName"), - "search_string": config.get("SearchString"), - "request_interval": config.get("RequestInterval"), - "failure_threshold": config.get("FailureThreshold"), - "health_threshold": config.get("HealthThreshold"), - "measure_latency": config.get("MeasureLatency"), - "inverted": config.get("Inverted"), - "disabled": config.get("Disabled"), - "enable_sni": config.get("EnableSNI"), - "children": config.get("ChildHealthChecks", {}).get("ChildHealthCheck"), - "regions": config.get("Regions", {}).get("Region"), - } - health_check = self.backend.create_health_check( - caller_reference, health_check_args - ) - template = Template(CREATE_HEALTH_CHECK_RESPONSE) - return 201, headers, template.render(health_check=health_check, xmlns=XMLNS) - elif method == "GET": - template = Template(LIST_HEALTH_CHECKS_RESPONSE) - health_checks = self.backend.list_health_checks() - return ( - 200, - headers, - template.render(health_checks=health_checks, xmlns=XMLNS), - ) + return CHANGE_RRSET_RESPONSE - def health_check_response2( # type: ignore[return] - self, request: Any, full_url: str, headers: Any - ) -> TYPE_RESPONSE: - self.setup_class(request, full_url, headers) + def list_resource_record_sets(self) -> TYPE_RESPONSE: + zoneid = self.parsed_url.path.rstrip("/").rsplit("/", 2)[1] + querystring = parse_qs(self.parsed_url.query) + template = Template(LIST_RRSET_RESPONSE) + start_type = querystring.get("type", [None])[0] + start_name = querystring.get("name", [None])[0] + max_items = int(querystring.get("maxitems", ["300"])[0]) + + if start_type and not start_name: + return 400, {"status": 400}, "The input is not valid" + + ( + record_sets, + next_name, + next_type, + is_truncated, + ) = self.backend.list_resource_record_sets( + zoneid, + start_type=start_type, # type: ignore + start_name=start_name, # type: ignore + max_items=max_items, + ) + r_template = template.render( + record_sets=record_sets, + next_name=next_name, + next_type=next_type, + max_items=max_items, + is_truncated=is_truncated, + ) + return 200, {}, r_template + + def create_health_check(self) -> TYPE_RESPONSE: + json_body = xmltodict.parse(self.body)["CreateHealthCheckRequest"] + caller_reference = json_body["CallerReference"] + config = json_body["HealthCheckConfig"] + health_check_args = { + "ip_address": config.get("IPAddress"), + "port": config.get("Port"), + "type": config["Type"], + "resource_path": config.get("ResourcePath"), + "fqdn": config.get("FullyQualifiedDomainName"), + "search_string": config.get("SearchString"), + "request_interval": config.get("RequestInterval"), + "failure_threshold": config.get("FailureThreshold"), + "health_threshold": config.get("HealthThreshold"), + "measure_latency": config.get("MeasureLatency"), + "inverted": config.get("Inverted"), + "disabled": config.get("Disabled"), + "enable_sni": config.get("EnableSNI"), + "children": config.get("ChildHealthChecks", {}).get("ChildHealthCheck"), + "regions": config.get("Regions", {}).get("Region"), + } + health_check = self.backend.create_health_check( + caller_reference, health_check_args + ) + template = Template(CREATE_HEALTH_CHECK_RESPONSE) + return ( + 201, + {"status": 201}, + template.render(health_check=health_check, xmlns=XMLNS), + ) + + def list_health_checks(self) -> str: + template = Template(LIST_HEALTH_CHECKS_RESPONSE) + health_checks = self.backend.list_health_checks() + return template.render(health_checks=health_checks, xmlns=XMLNS) - method = request.method + def get_health_check(self) -> str: health_check_id = self.parsed_url.path.split("/")[-1] - if method == "GET": - health_check = self.backend.get_health_check(health_check_id) - template = Template(GET_HEALTH_CHECK_RESPONSE) - return 200, headers, template.render(health_check=health_check) - elif method == "DELETE": - self.backend.delete_health_check(health_check_id) - template = Template(DELETE_HEALTH_CHECK_RESPONSE) - return 200, headers, template.render(xmlns=XMLNS) - elif method == "POST": - config = xmltodict.parse(self.body)["UpdateHealthCheckRequest"] - health_check_args = { - "ip_address": config.get("IPAddress"), - "port": config.get("Port"), - "resource_path": config.get("ResourcePath"), - "fqdn": config.get("FullyQualifiedDomainName"), - "search_string": config.get("SearchString"), - "failure_threshold": config.get("FailureThreshold"), - "health_threshold": config.get("HealthThreshold"), - "inverted": config.get("Inverted"), - "disabled": config.get("Disabled"), - "enable_sni": config.get("EnableSNI"), - "children": config.get("ChildHealthChecks", {}).get("ChildHealthCheck"), - "regions": config.get("Regions", {}).get("Region"), - } - health_check = self.backend.update_health_check( - health_check_id, health_check_args - ) - template = Template(UPDATE_HEALTH_CHECK_RESPONSE) - return 200, headers, template.render(health_check=health_check) + health_check = self.backend.get_health_check(health_check_id) + template = Template(GET_HEALTH_CHECK_RESPONSE) + return template.render(health_check=health_check) - def health_check_status_response( # type: ignore[return] - self, request: Any, full_url: str, headers: Any - ) -> TYPE_RESPONSE: - self.setup_class(request, full_url, headers) + def delete_health_check(self) -> str: + health_check_id = self.parsed_url.path.split("/")[-1] + self.backend.delete_health_check(health_check_id) + template = Template(DELETE_HEALTH_CHECK_RESPONSE) + return template.render(xmlns=XMLNS) - method = request.method + def update_health_check(self) -> str: + health_check_id = self.parsed_url.path.split("/")[-1] + config = xmltodict.parse(self.body)["UpdateHealthCheckRequest"] + health_check_args = { + "ip_address": config.get("IPAddress"), + "port": config.get("Port"), + "resource_path": config.get("ResourcePath"), + "fqdn": config.get("FullyQualifiedDomainName"), + "search_string": config.get("SearchString"), + "failure_threshold": config.get("FailureThreshold"), + "health_threshold": config.get("HealthThreshold"), + "inverted": config.get("Inverted"), + "disabled": config.get("Disabled"), + "enable_sni": config.get("EnableSNI"), + "children": config.get("ChildHealthChecks", {}).get("ChildHealthCheck"), + "regions": config.get("Regions", {}).get("Region"), + } + health_check = self.backend.update_health_check( + health_check_id, health_check_args + ) + template = Template(UPDATE_HEALTH_CHECK_RESPONSE) + return template.render(health_check=health_check) + def get_health_check_status(self) -> str: health_check_match = re.search( r"healthcheck/(?P[^/]+)/status$", self.parsed_url.path ) health_check_id = health_check_match.group("health_check_id") # type: ignore[union-attr] - if method == "GET": - self.backend.get_health_check(health_check_id) - template = Template(GET_HEALTH_CHECK_STATUS_RESPONSE) - return ( - 200, - headers, - template.render( - timestamp=iso_8601_datetime_with_milliseconds(), - ), - ) + self.backend.get_health_check(health_check_id) + template = Template(GET_HEALTH_CHECK_STATUS_RESPONSE) + return template.render(timestamp=iso_8601_datetime_with_milliseconds()) def not_implemented_response( self, request: Any, full_url: str, headers: Any @@ -431,131 +370,98 @@ def list_or_change_tags_for_resource_request( # type: ignore[return] template = Template(CHANGE_TAGS_FOR_RESOURCE_RESPONSE) return 200, headers, template.render() - def get_change(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] - self.setup_class(request, full_url, headers) + def get_change(self) -> str: + change_id = self.parsed_url.path.rstrip("/").rsplit("/", 1)[1] + template = Template(GET_CHANGE_RESPONSE) + return template.render(change_id=change_id, xmlns=XMLNS) - if request.method == "GET": - change_id = self.parsed_url.path.rstrip("/").rsplit("/", 1)[1] - template = Template(GET_CHANGE_RESPONSE) - return 200, headers, template.render(change_id=change_id, xmlns=XMLNS) + def create_query_logging_config(self) -> TYPE_RESPONSE: + json_body = xmltodict.parse(self.body)["CreateQueryLoggingConfigRequest"] + hosted_zone_id = json_body["HostedZoneId"] + log_group_arn = json_body["CloudWatchLogsLogGroupArn"] - def list_or_create_query_logging_config_response( # type: ignore[return] - self, request: Any, full_url: str, headers: Any - ) -> TYPE_RESPONSE: - self.setup_class(request, full_url, headers) + query_logging_config = self.backend.create_query_logging_config( + self.region, hosted_zone_id, log_group_arn + ) - if request.method == "POST": - json_body = xmltodict.parse(self.body)["CreateQueryLoggingConfigRequest"] - hosted_zone_id = json_body["HostedZoneId"] - log_group_arn = json_body["CloudWatchLogsLogGroupArn"] + template = Template(CREATE_QUERY_LOGGING_CONFIG_RESPONSE) + headers = {"Location": query_logging_config.location, "status": 201} + return ( + 201, + headers, + template.render(query_logging_config=query_logging_config, xmlns=XMLNS), + ) - query_logging_config = self.backend.create_query_logging_config( - self.region, hosted_zone_id, log_group_arn - ) + def list_query_logging_configs(self) -> str: + hosted_zone_id = self._get_param("hostedzoneid") + next_token = self._get_param("nexttoken") + max_results = self._get_int_param("maxresults") + + # The paginator picks up named arguments, returns tuple. + # pylint: disable=unbalanced-tuple-unpacking + all_configs, next_token = self.backend.list_query_logging_configs( + hosted_zone_id=hosted_zone_id, + next_token=next_token, + max_results=max_results, + ) - template = Template(CREATE_QUERY_LOGGING_CONFIG_RESPONSE) - headers["Location"] = query_logging_config.location - return ( - 201, - headers, - template.render(query_logging_config=query_logging_config, xmlns=XMLNS), - ) + template = Template(LIST_QUERY_LOGGING_CONFIGS_RESPONSE) + return template.render( + query_logging_configs=all_configs, next_token=next_token, xmlns=XMLNS + ) - elif request.method == "GET": - hosted_zone_id = self._get_param("hostedzoneid") - next_token = self._get_param("nexttoken") - max_results = self._get_int_param("maxresults") - - # The paginator picks up named arguments, returns tuple. - # pylint: disable=unbalanced-tuple-unpacking - ( - all_configs, - next_token, - ) = self.backend.list_query_logging_configs( - hosted_zone_id=hosted_zone_id, - next_token=next_token, - max_results=max_results, - ) + def get_query_logging_config(self) -> str: + query_logging_config_id = self.parsed_url.path.rstrip("/").rsplit("/", 1)[1] - template = Template(LIST_QUERY_LOGGING_CONFIGS_RESPONSE) - return ( - 200, - headers, - template.render( - query_logging_configs=all_configs, - next_token=next_token, - xmlns=XMLNS, - ), - ) + query_logging_config = self.backend.get_query_logging_config( + query_logging_config_id + ) + template = Template(GET_QUERY_LOGGING_CONFIG_RESPONSE) + return template.render(query_logging_config=query_logging_config, xmlns=XMLNS) - def get_or_delete_query_logging_config_response( # type: ignore[return] - self, request: Any, full_url: str, headers: Any - ) -> TYPE_RESPONSE: - self.setup_class(request, full_url, headers) + def delete_query_logging_config(self) -> str: query_logging_config_id = self.parsed_url.path.rstrip("/").rsplit("/", 1)[1] + self.backend.delete_query_logging_config(query_logging_config_id) + return "" + + def list_reusable_delegation_sets(self) -> str: + delegation_sets = self.backend.list_reusable_delegation_sets() + template = self.response_template(LIST_REUSABLE_DELEGATION_SETS_TEMPLATE) + return template.render( + delegation_sets=delegation_sets, + marker=None, + is_truncated=False, + max_items=100, + ) - if request.method == "GET": - query_logging_config = self.backend.get_query_logging_config( - query_logging_config_id - ) - template = Template(GET_QUERY_LOGGING_CONFIG_RESPONSE) - return ( - 200, - headers, - template.render(query_logging_config=query_logging_config, xmlns=XMLNS), - ) - - elif request.method == "DELETE": - self.backend.delete_query_logging_config(query_logging_config_id) - return 200, headers, "" + def create_reusable_delegation_set(self) -> TYPE_RESPONSE: + elements = xmltodict.parse(self.body) + root_elem = elements["CreateReusableDelegationSetRequest"] + caller_reference = root_elem.get("CallerReference") + hosted_zone_id = root_elem.get("HostedZoneId") + delegation_set = self.backend.create_reusable_delegation_set( + caller_reference=caller_reference, hosted_zone_id=hosted_zone_id + ) + template = self.response_template(CREATE_REUSABLE_DELEGATION_SET_TEMPLATE) + return ( + 201, + {"Location": delegation_set.location, "status": 201}, + template.render(delegation_set=delegation_set), + ) - def reusable_delegation_sets( # type: ignore[return] - self, request: Any, full_url: str, headers: Any - ) -> TYPE_RESPONSE: - self.setup_class(request, full_url, headers) - if request.method == "GET": - delegation_sets = self.backend.list_reusable_delegation_sets() - template = self.response_template(LIST_REUSABLE_DELEGATION_SETS_TEMPLATE) - return ( - 200, - {}, - template.render( - delegation_sets=delegation_sets, - marker=None, - is_truncated=False, - max_items=100, - ), - ) - elif request.method == "POST": - elements = xmltodict.parse(self.body) - root_elem = elements["CreateReusableDelegationSetRequest"] - caller_reference = root_elem.get("CallerReference") - hosted_zone_id = root_elem.get("HostedZoneId") - delegation_set = self.backend.create_reusable_delegation_set( - caller_reference=caller_reference, hosted_zone_id=hosted_zone_id - ) - template = self.response_template(CREATE_REUSABLE_DELEGATION_SET_TEMPLATE) - return ( - 201, - {"Location": delegation_set.location}, - template.render(delegation_set=delegation_set), - ) + def get_reusable_delegation_set(self) -> str: + ds_id = self.parsed_url.path.rstrip("/").rsplit("/")[-1] + delegation_set = self.backend.get_reusable_delegation_set( + delegation_set_id=ds_id + ) + template = self.response_template(GET_REUSABLE_DELEGATION_SET_TEMPLATE) + return template.render(delegation_set=delegation_set) - def reusable_delegation_set( # type: ignore[return] - self, request: Any, full_url: str, headers: Any - ) -> TYPE_RESPONSE: - self.setup_class(request, full_url, headers) + def delete_reusable_delegation_set(self) -> str: ds_id = self.parsed_url.path.rstrip("/").rsplit("/")[-1] - if request.method == "GET": - delegation_set = self.backend.get_reusable_delegation_set( - delegation_set_id=ds_id - ) - template = self.response_template(GET_REUSABLE_DELEGATION_SET_TEMPLATE) - return 200, {}, template.render(delegation_set=delegation_set) - if request.method == "DELETE": - self.backend.delete_reusable_delegation_set(delegation_set_id=ds_id) - template = self.response_template(DELETE_REUSABLE_DELEGATION_SET_TEMPLATE) - return 200, {}, template.render() + self.backend.delete_reusable_delegation_set(delegation_set_id=ds_id) + template = self.response_template(DELETE_REUSABLE_DELEGATION_SET_TEMPLATE) + return template.render() LIST_TAGS_FOR_RESOURCE_RESPONSE = """ diff --git a/moto/route53/urls.py b/moto/route53/urls.py index 7f18fd3cc98..3c7ea933665 100644 --- a/moto/route53/urls.py +++ b/moto/route53/urls.py @@ -22,66 +22,26 @@ def tag_response2(request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: url_paths = { - r"{0}/(?P[\d_-]+)/hostedzone$": Route53.method_dispatch( - Route53.list_or_create_hostzone_response - ), - r"{0}/(?P[\d_-]+)/hostedzone/(?P[^/]+)$": Route53.method_dispatch( - Route53.get_or_delete_hostzone_response - ), - r"{0}/(?P[\d_-]+)/hostedzone/(?P[^/]+)/rrset$": Route53.method_dispatch( - Route53.rrset_response - ), - r"{0}/(?P[\d_-]+)/hostedzone/(?P[^/]+)/rrset/$": Route53.method_dispatch( - Route53.rrset_response - ), - r"{0}/(?P[\d_-]+)/hostedzone/(?P[^/]+)/dnssec$": Route53.method_dispatch( - Route53.get_dnssec_response - ), - r"{0}/(?P[\d_-]+)/hostedzone/(?P[^/]+)/dnssec/$": Route53.method_dispatch( - Route53.get_dnssec_response - ), - r"{0}/(?P[\d_-]+)/hostedzone/(?P[^/]+)/associatevpc/?$": Route53.method_dispatch( - Route53.associate_vpc_response - ), - r"{0}/(?P[\d_-]+)/hostedzone/(?P[^/]+)/disassociatevpc/?$": Route53.method_dispatch( - Route53.disassociate_vpc_response - ), - r"{0}/(?P[\d_-]+)/hostedzonesbyname": Route53.method_dispatch( - Route53.list_hosted_zones_by_name_response - ), - r"{0}/(?P[\d_-]+)/hostedzonesbyvpc": Route53.method_dispatch( - Route53.list_hosted_zones_by_vpc_response - ), - r"{0}/(?P[\d_-]+)/hostedzonecount": Route53.method_dispatch( - Route53.get_hosted_zone_count_response - ), - r"{0}/(?P[\d_-]+)/healthcheck$": Route53.method_dispatch( - Route53.health_check_response1 - ), - r"{0}/(?P[\d_-]+)/healthcheck/(?P[^/]+)$": Route53.method_dispatch( - Route53.health_check_response2 - ), - r"{0}/(?P[\d_-]+)/healthcheck/(?P[^/]+)/status$": Route53.method_dispatch( - Route53.health_check_status_response - ), + r"{0}/(?P[\d_-]+)/hostedzone$": Route53.dispatch, + r"{0}/(?P[\d_-]+)/hostedzone/(?P[^/]+)$": Route53.dispatch, + r"{0}/(?P[\d_-]+)/hostedzone/(?P[^/]+)/rrset$": Route53.dispatch, + r"{0}/(?P[\d_-]+)/hostedzone/(?P[^/]+)/rrset/$": Route53.dispatch, + r"{0}/(?P[\d_-]+)/hostedzone/(?P[^/]+)/dnssec$": Route53.dispatch, + r"{0}/(?P[\d_-]+)/hostedzone/(?P[^/]+)/dnssec/$": Route53.dispatch, + r"{0}/(?P[\d_-]+)/hostedzone/(?P[^/]+)/associatevpc/?$": Route53.dispatch, + r"{0}/(?P[\d_-]+)/hostedzone/(?P[^/]+)/disassociatevpc/?$": Route53.dispatch, + r"{0}/(?P[\d_-]+)/hostedzonesbyname": Route53.dispatch, + r"{0}/(?P[\d_-]+)/hostedzonesbyvpc": Route53.dispatch, + r"{0}/(?P[\d_-]+)/hostedzonecount": Route53.dispatch, + r"{0}/(?P[\d_-]+)/healthcheck$": Route53.dispatch, + r"{0}/(?P[\d_-]+)/healthcheck/(?P[^/]+)$": Route53.dispatch, + r"{0}/(?P[\d_-]+)/healthcheck/(?P[^/]+)/status$": Route53.dispatch, r"{0}/(?P[\d_-]+)/tags/healthcheck/(?P[^/]+)$": tag_response1, r"{0}/(?P[\d_-]+)/tags/hostedzone/(?P[^/]+)$": tag_response2, - r"{0}/(?P[\d_-]+)/trafficpolicyinstances/*": Route53.method_dispatch( - Route53.not_implemented_response - ), - r"{0}/(?P[\d_-]+)/change/(?P[^/]+)$": Route53.method_dispatch( - Route53.get_change - ), - r"{0}/(?P[\d_-]+)/queryloggingconfig$": Route53.method_dispatch( - Route53.list_or_create_query_logging_config_response - ), - r"{0}/(?P[\d_-]+)/queryloggingconfig/(?P[^/]+)$": Route53.method_dispatch( - Route53.get_or_delete_query_logging_config_response - ), - r"{0}/(?P[\d_-]+)/delegationset$": Route53.method_dispatch( - Route53.reusable_delegation_sets - ), - r"{0}/(?P[\d_-]+)/delegationset/(?P[^/]+)$": Route53.method_dispatch( - Route53.reusable_delegation_set - ), + r"{0}/(?P[\d_-]+)/trafficpolicyinstances/*": Route53.dispatch, + r"{0}/(?P[\d_-]+)/change/(?P[^/]+)$": Route53.dispatch, + r"{0}/(?P[\d_-]+)/queryloggingconfig$": Route53.dispatch, + r"{0}/(?P[\d_-]+)/queryloggingconfig/(?P[^/]+)$": Route53.dispatch, + r"{0}/(?P[\d_-]+)/delegationset$": Route53.dispatch, + r"{0}/(?P[\d_-]+)/delegationset/(?P[^/]+)$": Route53.dispatch, }