Skip to content

Commit

Permalink
Refactor: Improve detection of duplicates and list sorting (#33675)
Browse files Browse the repository at this point in the history
  • Loading branch information
eumiro committed Aug 24, 2023
1 parent 53a8973 commit 2dbb963
Show file tree
Hide file tree
Showing 10 changed files with 33 additions and 53 deletions.
2 changes: 1 addition & 1 deletion airflow/example_dags/example_params_trigger_ui.py
Expand Up @@ -33,7 +33,7 @@

with DAG(
dag_id=Path(__file__).stem,
description=__doc__[0 : __doc__.find(".")],
description=__doc__.partition(".")[0],
doc_md=__doc__,
schedule=None,
start_date=datetime.datetime(2022, 3, 4),
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_params_ui_tutorial.py
Expand Up @@ -35,7 +35,7 @@

with DAG(
dag_id=Path(__file__).stem,
description=__doc__[0 : __doc__.find(".")],
description=__doc__.partition(".")[0],
doc_md=__doc__,
schedule=None,
start_date=datetime.datetime(2022, 3, 4),
Expand Down
6 changes: 2 additions & 4 deletions airflow/providers/cncf/kubernetes/pod_launcher_deprecated.py
Expand Up @@ -185,16 +185,14 @@ def parse_log_line(self, line: str) -> tuple[str | None, str]:
:param line: k8s log line
:return: timestamp and log message
"""
split_at = line.find(" ")
if split_at == -1:
timestamp, sep, message = line.strip().partition(" ")
if not sep:
self.log.error(
"Error parsing timestamp (no timestamp in message: %r). "
"Will continue execution but won't update timestamp",
line,
)
return None, line
timestamp = line[:split_at]
message = line[split_at + 1 :].rstrip()
return timestamp, message

def _task_status(self, event):
Expand Down
6 changes: 2 additions & 4 deletions airflow/providers/cncf/kubernetes/utils/pod_manager.py
Expand Up @@ -551,16 +551,14 @@ def parse_log_line(self, line: str) -> tuple[DateTime | None, str]:
:param line: k8s log line
:return: timestamp and log message
"""
split_at = line.find(" ")
if split_at == -1:
timestamp, sep, message = line.strip().partition(" ")
if not sep:
self.log.error(
"Error parsing timestamp (no timestamp in message %r). "
"Will continue execution but won't update timestamp",
line,
)
return None, line
timestamp = line[:split_at]
message = line[split_at + 1 :].rstrip()
try:
last_log_time = cast(DateTime, pendulum.parse(timestamp))
except ParserError:
Expand Down
7 changes: 2 additions & 5 deletions airflow/providers/databricks/operators/databricks_repos.py
Expand Up @@ -105,11 +105,8 @@ def __init__(
def __detect_repo_provider__(url):
provider = None
try:
netloc = urlsplit(url).netloc
idx = netloc.rfind("@")
if idx != -1:
netloc = netloc[(idx + 1) :]
netloc = netloc.lower()
netloc = urlsplit(url).netloc.lower()
_, _, netloc = netloc.rpartition("@")
provider = DatabricksReposCreateOperator.__git_providers__.get(netloc)
if provider is None and DatabricksReposCreateOperator.__aws_code_commit_regexp__.match(netloc):
provider = "awsCodeCommit"
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/docker/operators/docker.py
Expand Up @@ -486,7 +486,7 @@ def format_command(command: list[str] | str | None) -> list[str] | str | None:
:return: the command (or commands)
"""
if isinstance(command, str) and command.strip().find("[") == 0:
if isinstance(command, str) and command.strip().startswith("["):
command = ast.literal_eval(command)
return command

Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/singularity/operators/singularity.py
Expand Up @@ -167,7 +167,7 @@ def execute(self, context: Context) -> None:
self.log.info("Output from command %s", result["message"])

def _get_command(self) -> Any | None:
if self.command is not None and self.command.strip().find("[") == 0: # type: ignore
if self.command is not None and self.command.strip().startswith("["): # type: ignore
commands = ast.literal_eval(self.command)
else:
commands = self.command
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/log/json_formatter.py
Expand Up @@ -37,7 +37,7 @@ def __init__(self, fmt=None, datefmt=None, style="%", json_fields=None, extras=N
self.extras = extras

def usesTime(self):
return self.json_fields.count("asctime") > 0
return "asctime" in self.json_fields

def format(self, record):
super().format(record)
Expand Down
17 changes: 3 additions & 14 deletions dev/breeze/src/airflow_breeze/utils/selective_checks.py
Expand Up @@ -675,20 +675,9 @@ def parallel_test_types_list_as_string(self) -> str | None:

# this should be hard-coded as we want to have very specific sequence of tests
sorting_order = ["Core", "Providers[-amazon,google]", "Other", "Providers[amazon]", "WWW"]

def sort_key(t: str) -> str:
# Put the test types in the order we want them to run
if t in sorting_order:
return str(sorting_order.index(t))
else:
return str(len(sorting_order)) + t

return " ".join(
sorted(
current_test_types,
key=sort_key,
)
)
sort_key = {item: i for i, item in enumerate(sorting_order)}
# Put the test types in the order we want them to run
return " ".join(sorted(current_test_types, key=lambda x: (sort_key.get(x, len(sorting_order)), x)))

@cached_property
def basic_checks_only(self) -> bool:
Expand Down
40 changes: 19 additions & 21 deletions scripts/in_container/run_provider_yaml_files_check.py
Expand Up @@ -319,14 +319,13 @@ def check_duplicates_in_integrations_names_of_hooks_sensors_operators(yaml_files
yaml_files.items(), ["sensors", "operators", "hooks", "triggers"]
):
resource_data = provider_data.get(resource_type, [])
current_integrations = [r.get("integration-name", "") for r in resource_data]
if len(current_integrations) != len(set(current_integrations)):
for integration in current_integrations:
if current_integrations.count(integration) > 1:
errors.append(
f"Duplicated content of '{resource_type}/integration-name/{integration}' "
f"in file: {yaml_file_path}"
)
count_integrations = Counter(r.get("integration-name", "") for r in resource_data)
for integration, count in count_integrations.items():
if count > 1:
errors.append(
f"Duplicated content of '{resource_type}/integration-name/{integration}' "
f"in file: {yaml_file_path}"
)


def check_completeness_of_list_of_transfers(yaml_files: dict[str, dict]):
Expand Down Expand Up @@ -422,19 +421,18 @@ def check_duplicates_in_list_of_transfers(yaml_files: dict[str, dict]):
for yaml_file_path, provider_data in yaml_files.items():
resource_data = provider_data.get(resource_type, [])

source_target_integrations = [
count_integrations = Counter(
(r.get("source-integration-name", ""), r.get("target-integration-name", ""))
for r in resource_data
]
if len(source_target_integrations) != len(set(source_target_integrations)):
for integration_couple in source_target_integrations:
if source_target_integrations.count(integration_couple) > 1:
errors.append(
f"Duplicated content of \n"
f" '{resource_type}/source-integration-name/{integration_couple[0]}' "
f" '{resource_type}/target-integration-name/{integration_couple[1]}' "
f"in file: {yaml_file_path}"
)
)
for (source, target), count in count_integrations.items():
if count > 1:
errors.append(
f"Duplicated content of \n"
f" '{resource_type}/source-integration-name/{source}' "
f" '{resource_type}/target-integration-name/{target}' "
f"in file: {yaml_file_path}"
)


def check_invalid_integration(yaml_files: dict[str, dict]):
Expand Down Expand Up @@ -533,8 +531,8 @@ def check_doc_files(yaml_files: dict[str, dict]):


def check_unique_provider_name(yaml_files: dict[str, dict]):
provider_names = [d["name"] for d in yaml_files.values()]
duplicates = {x for x in provider_names if provider_names.count(x) > 1}
name_counter = Counter(d["name"] for d in yaml_files.values())
duplicates = {k for k, v in name_counter.items() if v > 1}
if duplicates:
errors.append(f"Provider name must be unique. Duplicates: {duplicates}")

Expand Down

0 comments on commit 2dbb963

Please sign in to comment.