Skip to content

Commit

Permalink
fixup! Update airflow/cli/commands/variable_command.py
Browse files Browse the repository at this point in the history
  • Loading branch information
ephraimbuddy committed Sep 1, 2023
1 parent 72602d2 commit 91fd57d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 22 deletions.
17 changes: 8 additions & 9 deletions airflow/cli/commands/variable_command.py
Expand Up @@ -89,16 +89,15 @@ def variables_import(args, session):
suc_count = fail_count = 0
skipped = set()
action_on_existing = args.action_on_existing_key
if action_on_existing == "fail":
existing_key = set(session.scalars(select(Variable.key).where(Variable.key.in_(var_json))))
if existing_key:
raise SystemExit(f"Failed. These keys: {sorted(existing_key)} already exists.")

existing_keys = set()
if action_on_existing != "overwrite":
existing_keys = set(session.scalars(select(Variable.key).where(Variable.key.in_(var_json))))
if action_on_existing == "fail" and existing_keys:
raise SystemExit(f"Failed. These keys: {sorted(existing_keys)} already exists.")
for k, v in var_json.items():
if action_on_existing == "skip":
if session.scalar(select(Variable).where(Variable.key == k)):
skipped.add(k)
continue
if action_on_existing == "skip" and k in existing_keys:
skipped.add(k)
continue
try:
Variable.set(k, v, serialize_json=not isinstance(v, str))
except Exception as e:
Expand Down
26 changes: 13 additions & 13 deletions airflow/www/views.py
Expand Up @@ -5142,29 +5142,29 @@ def varimport(self, session):
"""Import variables."""
try:
variable_dict = json.loads(request.files["file"].read())
action_if_exists = request.form.get("action_if_exists", "overwrite").lower()
action_on_existing = request.form.get("action_if_exists", "overwrite").lower()
except Exception:
self.update_redirect()
flash("Missing file or syntax error.", "error")
return redirect(self.get_redirect())
else:
if action_if_exists == "fail":
existing_key = set(
existing_keys = set()
if action_on_existing != "overwrite":
existing_keys = set(
session.scalars(select(models.Variable.key).where(models.Variable.key.in_(variable_dict)))
)
if existing_key:
failed_repr = ", ".join(repr(k) for k in sorted(existing_key))
flash(f"Failed. The variables with these keys: {failed_repr} already exists.")
logging.error(f"Failed. The variables with these keys: {failed_repr} already exists.")
return redirect(location=request.referrer)
if action_on_existing == "fail" and existing_keys:
failed_repr = ", ".join(repr(k) for k in sorted(existing_keys))
flash(f"Failed. The variables with these keys: {failed_repr} already exists.")
logging.error(f"Failed. The variables with these keys: {failed_repr} already exists.")
return redirect(location=request.referrer)
skipped = set()
suc_count = fail_count = 0
for k, v in variable_dict.items():
if action_if_exists == "skip":
if session.scalar(select(models.Variable).where(models.Variable.key == k)):
logging.warning("Variable: %s already exists, skipping.", k)
skipped.add(k)
continue
if action_on_existing == "skip" and k in existing_keys:
logging.warning("Variable: %s already exists, skipping.", k)
skipped.add(k)
continue
try:
models.Variable.set(k, v, serialize_json=not isinstance(v, str))
except Exception as exc:
Expand Down

0 comments on commit 91fd57d

Please sign in to comment.