Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,16 @@ def _download(blob_name: str):
bucket = client.bucket(bucket_name=self.source_bucket)
blob = bucket.blob(blob_name=blob_name, chunk_size=self.chunk_size)

# GCS blob names are arbitrary Unicode strings produced by
# whoever can write to the source bucket; in many data-pipeline
# deployments that is an external producer rather than the DAG
# author. A blob name containing ``..`` segments or an
# absolute-path prefix would otherwise canonicalise outside the
# temp input directory and be written to wherever the worker's
# filesystem permissions allowed (``~/.bashrc``, ``cron.d``,
# ``.ssh/authorized_keys``). Validate the resolved form, but
# pass the un-resolved path downstream so callers that read
# ``temp_input_dir_path`` back out keep their existing shape.
destination_file = temp_input_dir_path / blob_name
# Containment check: ``blob_name`` originates outside the worker, and GCS
# allows object names containing ``..``. Resolve the target and assert it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,33 @@ def _resolve_destination_path(self, source_object: str, prefix: str | None = Non
source_object = os.path.relpath(source_object, start=prefix)
else:
source_object = os.path.basename(source_object)
return os.path.join(self.destination_path, source_object)
destination = os.path.join(self.destination_path, source_object)
# GCS object names are arbitrary UTF-8 strings and originate from
# whoever can write to the source bucket (frequently a lower-trust
# external producer). A name containing ``..`` segments or an
# absolute-path prefix would otherwise canonicalise outside the
# configured ``destination_path`` on the SFTP server: ``os.path.join``
# preserves ``..`` and an absolute ``source_object`` silently absorbs
# the destination prefix. Three escape patterns to refuse, while
# still allowing relative roots like ``"."`` or ``""`` (SFTP user's
# current/login directory) that depend on the configured base
# remaining the prefix of the resolved path.
resolved = os.path.normpath(destination)
escapes = resolved == ".." or resolved.startswith(".." + os.sep)
if not escapes and os.path.isabs(resolved) and not os.path.isabs(self.destination_path):
# ``source_object`` was itself absolute and absorbed the relative base.
escapes = True
if not escapes and os.path.isabs(self.destination_path):
base = os.path.normpath(self.destination_path)
if resolved != base and not resolved.startswith(base.rstrip(os.sep) + os.sep):
escapes = True
if escapes:
raise AirflowException(
f"Refusing to copy GCS object {source_object!r}: resolved destination "
f"{resolved!r} escapes configured destination_path "
f"{self.destination_path!r}."
)
return resolved

def _copy_single_object(
self,
Expand Down
58 changes: 58 additions & 0 deletions providers/google/tests/unit/google/cloud/operators/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,64 @@ def test_get_openlineage_facets_on_complete(
assert all(element in lineage.outputs for element in outputs)
assert all(element in outputs for element in lineage.outputs)

@pytest.mark.parametrize(
"blob_name",
[
pytest.param("inbound/../../../../etc/passwd", id="dotdot-segments"),
pytest.param("/etc/passwd", id="absolute-path"),
],
)
@mock.patch("airflow.providers.google.cloud.operators.gcs.subprocess")
@mock.patch("airflow.providers.google.cloud.operators.gcs.TemporaryDirectory")
@mock.patch("airflow.providers.google.cloud.operators.gcs.GCSHook")
def test_execute_rejects_traversal_blob_name(
self, mock_hook, mock_tempdir, mock_subprocess, tmp_path, blob_name
):
# GCS blob names are arbitrary Unicode strings written by whoever can
# produce to the source bucket. A name containing ``..`` segments or an
# absolute-path prefix would otherwise resolve outside the operator's
# temp input directory; ``_download`` must refuse such names rather
# than write the blob to the worker filesystem.
source_dir = tmp_path / "source"
source_dir.mkdir()
dest_dir = tmp_path / "destination"
dest_dir.mkdir()
mock_tempdir.return_value.__enter__.side_effect = [str(source_dir), str(dest_dir)]
mock_hook.return_value.list_by_timespan.return_value = [blob_name]
self._setup_gcs_client_chain(mock_hook)

mock_proc = mock.MagicMock()
mock_proc.returncode = 0
mock_proc.stdout.readline = lambda: b""
mock_proc.wait.return_value = None
mock_subprocess.Popen.return_value.__enter__.return_value = mock_proc
mock_subprocess.PIPE = "pipe"
mock_subprocess.STDOUT = "stdout"

timespan_start = datetime(2015, 2, 1, tzinfo=timezone.utc)
timespan_end = timespan_start + timedelta(hours=1)
context = dict(
logical_date=timespan_start,
data_interval_start=timespan_start,
data_interval_end=timespan_end,
ti=mock.Mock(),
task=mock.MagicMock(),
)

op = GCSTimeSpanFileTransformOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
source_prefix="inbound",
source_gcp_conn_id="",
destination_bucket=TEST_BUCKET + "_dest",
destination_prefix="dest",
destination_gcp_conn_id="",
transform_script="script.py",
)

with pytest.raises(ValueError, match="escapes the temp directory"):
op.execute(context=context)

@pytest.mark.parametrize(
("workers", "should_raise"),
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,3 +466,94 @@ def test_create_intermediate_dirs_false(self, sftp_hook_mock, gcp_hook_mock):
task.execute(None)

sftp_hook_mock.return_value.create_directory.assert_not_called()

@pytest.mark.parametrize(
"source_object",
[
pytest.param("incoming/../../../../etc/passwd", id="dotdot-segments"),
pytest.param("/etc/passwd", id="absolute-path"),
],
)
def test_resolve_destination_path_rejects_escape(self, source_object):
# ``_resolve_destination_path`` is the GCSToSFTPOperator method that
# joins a GCS object name with the configured ``destination_path``.
# When the joined path canonicalises outside the destination — either
# via ``..`` segments or an absolute ``source_object`` that absorbs
# the prefix — the method must refuse rather than hand the path to
# the SFTP server, where the server would resolve it on its own host.
task = GCSToSFTPOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
source_object="incoming/*",
destination_path="/srv/sftp/incoming",
keep_directory_structure=True,
gcp_conn_id=GCP_CONN_ID,
sftp_conn_id=SFTP_CONN_ID,
)
with pytest.raises(AirflowException, match="escapes configured destination_path"):
task._resolve_destination_path(source_object)

def test_resolve_destination_path_allows_benign_nested(self):
# The new validation is post-join normalisation; benign nested paths
# under the destination must still resolve cleanly.
task = GCSToSFTPOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
source_object="incoming/*",
destination_path="/srv/sftp/incoming",
keep_directory_structure=True,
gcp_conn_id=GCP_CONN_ID,
sftp_conn_id=SFTP_CONN_ID,
)
assert (
task._resolve_destination_path("incoming/sub/dir/file.csv")
== "/srv/sftp/incoming/incoming/sub/dir/file.csv"
)

@pytest.mark.parametrize(
("destination_path", "source_object", "expected"),
[
pytest.param(".", "file.txt", "file.txt", id="dot-base-benign"),
pytest.param("", "file.txt", "file.txt", id="empty-base-benign"),
pytest.param(".", "sub/dir/file.txt", "sub/dir/file.txt", id="dot-base-nested"),
],
)
def test_resolve_destination_path_allows_relative_base(self, destination_path, source_object, expected):
# ``destination_path="."`` and ``destination_path=""`` are valid SFTP
# destinations — they refer to the SFTP user's login / current
# directory. The validation must allow these benign uploads while
# still rejecting ``..`` escapes and absolute-path absorption (covered
# by the next test).
task = GCSToSFTPOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
source_object="*",
destination_path=destination_path,
keep_directory_structure=True,
gcp_conn_id=GCP_CONN_ID,
sftp_conn_id=SFTP_CONN_ID,
)
assert task._resolve_destination_path(source_object) == expected

@pytest.mark.parametrize(
"source_object",
[
pytest.param("../etc/passwd", id="dotdot-escape-from-relative-base"),
pytest.param("/etc/passwd", id="absolute-absorbs-relative-base"),
],
)
def test_resolve_destination_path_rejects_escape_from_relative_base(self, source_object):
# With a relative ``destination_path`` such as ``"."``, ``..`` segments
# escape the configured root and absolute ``source_object`` values
# absorb it entirely — both must still be refused.
task = GCSToSFTPOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
source_object="*",
destination_path=".",
keep_directory_structure=True,
gcp_conn_id=GCP_CONN_ID,
sftp_conn_id=SFTP_CONN_ID,
)
with pytest.raises(AirflowException, match="escapes configured destination_path"):
task._resolve_destination_path(source_object)
2 changes: 1 addition & 1 deletion scripts/ci/prek/known_airflow_exceptions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ providers/google/src/airflow/providers/google/cloud/transfers/facebook_ads_to_gc
providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py::7
providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_gcs.py::6
providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_local.py::1
providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_sftp.py::1
providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_sftp.py::2
providers/google/src/airflow/providers/google/cloud/transfers/s3_to_gcs.py::2
providers/google/src/airflow/providers/google/cloud/transfers/sftp_to_gcs.py::1
providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py::2
Expand Down
Loading