diff --git a/providers/google/src/airflow/providers/google/cloud/operators/gcs.py b/providers/google/src/airflow/providers/google/cloud/operators/gcs.py index c649962b217fd..737c3a9b06045 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/gcs.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/gcs.py @@ -897,6 +897,16 @@ def _download(blob_name: str): bucket = client.bucket(bucket_name=self.source_bucket) blob = bucket.blob(blob_name=blob_name, chunk_size=self.chunk_size) + # GCS blob names are arbitrary Unicode strings produced by + # whoever can write to the source bucket; in many data-pipeline + # deployments that is an external producer rather than the DAG + # author. A blob name containing ``..`` segments or an + # absolute-path prefix would otherwise canonicalise outside the + # temp input directory and be written to wherever the worker's + # filesystem permissions allowed (``~/.bashrc``, ``cron.d``, + # ``.ssh/authorized_keys``). Validate the resolved form, but + # pass the un-resolved path downstream so callers that read + # ``temp_input_dir_path`` back out keep their existing shape. destination_file = temp_input_dir_path / blob_name # Containment check: ``blob_name`` originates outside the worker, and GCS # allows object names containing ``..``. Resolve the target and assert it diff --git a/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_sftp.py b/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_sftp.py index 664c6aea3190a..f66395fbfa2ad 100644 --- a/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_sftp.py +++ b/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_sftp.py @@ -176,7 +176,33 @@ def _resolve_destination_path(self, source_object: str, prefix: str | None = Non source_object = os.path.relpath(source_object, start=prefix) else: source_object = os.path.basename(source_object) - return os.path.join(self.destination_path, source_object) + destination = os.path.join(self.destination_path, source_object) + # GCS object names are arbitrary UTF-8 strings and originate from + # whoever can write to the source bucket (frequently a lower-trust + # external producer). A name containing ``..`` segments or an + # absolute-path prefix would otherwise canonicalise outside the + # configured ``destination_path`` on the SFTP server: ``os.path.join`` + # preserves ``..`` and an absolute ``source_object`` silently absorbs + # the destination prefix. Three escape patterns to refuse, while + # still allowing relative roots like ``"."`` or ``""`` (SFTP user's + # current/login directory) that depend on the configured base + # remaining the prefix of the resolved path. + resolved = os.path.normpath(destination) + escapes = resolved == ".." or resolved.startswith(".." + os.sep) + if not escapes and os.path.isabs(resolved) and not os.path.isabs(self.destination_path): + # ``source_object`` was itself absolute and absorbed the relative base. + escapes = True + if not escapes and os.path.isabs(self.destination_path): + base = os.path.normpath(self.destination_path) + if resolved != base and not resolved.startswith(base.rstrip(os.sep) + os.sep): + escapes = True + if escapes: + raise AirflowException( + f"Refusing to copy GCS object {source_object!r}: resolved destination " + f"{resolved!r} escapes configured destination_path " + f"{self.destination_path!r}." + ) + return resolved def _copy_single_object( self, diff --git a/providers/google/tests/unit/google/cloud/operators/test_gcs.py b/providers/google/tests/unit/google/cloud/operators/test_gcs.py index f48aaddba7566..c8c317ae87050 100644 --- a/providers/google/tests/unit/google/cloud/operators/test_gcs.py +++ b/providers/google/tests/unit/google/cloud/operators/test_gcs.py @@ -676,6 +676,64 @@ def test_get_openlineage_facets_on_complete( assert all(element in lineage.outputs for element in outputs) assert all(element in outputs for element in lineage.outputs) + @pytest.mark.parametrize( + "blob_name", + [ + pytest.param("inbound/../../../../etc/passwd", id="dotdot-segments"), + pytest.param("/etc/passwd", id="absolute-path"), + ], + ) + @mock.patch("airflow.providers.google.cloud.operators.gcs.subprocess") + @mock.patch("airflow.providers.google.cloud.operators.gcs.TemporaryDirectory") + @mock.patch("airflow.providers.google.cloud.operators.gcs.GCSHook") + def test_execute_rejects_traversal_blob_name( + self, mock_hook, mock_tempdir, mock_subprocess, tmp_path, blob_name + ): + # GCS blob names are arbitrary Unicode strings written by whoever can + # produce to the source bucket. A name containing ``..`` segments or an + # absolute-path prefix would otherwise resolve outside the operator's + # temp input directory; ``_download`` must refuse such names rather + # than write the blob to the worker filesystem. + source_dir = tmp_path / "source" + source_dir.mkdir() + dest_dir = tmp_path / "destination" + dest_dir.mkdir() + mock_tempdir.return_value.__enter__.side_effect = [str(source_dir), str(dest_dir)] + mock_hook.return_value.list_by_timespan.return_value = [blob_name] + self._setup_gcs_client_chain(mock_hook) + + mock_proc = mock.MagicMock() + mock_proc.returncode = 0 + mock_proc.stdout.readline = lambda: b"" + mock_proc.wait.return_value = None + mock_subprocess.Popen.return_value.__enter__.return_value = mock_proc + mock_subprocess.PIPE = "pipe" + mock_subprocess.STDOUT = "stdout" + + timespan_start = datetime(2015, 2, 1, tzinfo=timezone.utc) + timespan_end = timespan_start + timedelta(hours=1) + context = dict( + logical_date=timespan_start, + data_interval_start=timespan_start, + data_interval_end=timespan_end, + ti=mock.Mock(), + task=mock.MagicMock(), + ) + + op = GCSTimeSpanFileTransformOperator( + task_id=TASK_ID, + source_bucket=TEST_BUCKET, + source_prefix="inbound", + source_gcp_conn_id="", + destination_bucket=TEST_BUCKET + "_dest", + destination_prefix="dest", + destination_gcp_conn_id="", + transform_script="script.py", + ) + + with pytest.raises(ValueError, match="escapes the temp directory"): + op.execute(context=context) + @pytest.mark.parametrize( ("workers", "should_raise"), [ diff --git a/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_sftp.py b/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_sftp.py index 28ca53acdb55f..f82030a74661a 100644 --- a/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_sftp.py +++ b/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_sftp.py @@ -466,3 +466,94 @@ def test_create_intermediate_dirs_false(self, sftp_hook_mock, gcp_hook_mock): task.execute(None) sftp_hook_mock.return_value.create_directory.assert_not_called() + + @pytest.mark.parametrize( + "source_object", + [ + pytest.param("incoming/../../../../etc/passwd", id="dotdot-segments"), + pytest.param("/etc/passwd", id="absolute-path"), + ], + ) + def test_resolve_destination_path_rejects_escape(self, source_object): + # ``_resolve_destination_path`` is the GCSToSFTPOperator method that + # joins a GCS object name with the configured ``destination_path``. + # When the joined path canonicalises outside the destination — either + # via ``..`` segments or an absolute ``source_object`` that absorbs + # the prefix — the method must refuse rather than hand the path to + # the SFTP server, where the server would resolve it on its own host. + task = GCSToSFTPOperator( + task_id=TASK_ID, + source_bucket=TEST_BUCKET, + source_object="incoming/*", + destination_path="/srv/sftp/incoming", + keep_directory_structure=True, + gcp_conn_id=GCP_CONN_ID, + sftp_conn_id=SFTP_CONN_ID, + ) + with pytest.raises(AirflowException, match="escapes configured destination_path"): + task._resolve_destination_path(source_object) + + def test_resolve_destination_path_allows_benign_nested(self): + # The new validation is post-join normalisation; benign nested paths + # under the destination must still resolve cleanly. + task = GCSToSFTPOperator( + task_id=TASK_ID, + source_bucket=TEST_BUCKET, + source_object="incoming/*", + destination_path="/srv/sftp/incoming", + keep_directory_structure=True, + gcp_conn_id=GCP_CONN_ID, + sftp_conn_id=SFTP_CONN_ID, + ) + assert ( + task._resolve_destination_path("incoming/sub/dir/file.csv") + == "/srv/sftp/incoming/incoming/sub/dir/file.csv" + ) + + @pytest.mark.parametrize( + ("destination_path", "source_object", "expected"), + [ + pytest.param(".", "file.txt", "file.txt", id="dot-base-benign"), + pytest.param("", "file.txt", "file.txt", id="empty-base-benign"), + pytest.param(".", "sub/dir/file.txt", "sub/dir/file.txt", id="dot-base-nested"), + ], + ) + def test_resolve_destination_path_allows_relative_base(self, destination_path, source_object, expected): + # ``destination_path="."`` and ``destination_path=""`` are valid SFTP + # destinations — they refer to the SFTP user's login / current + # directory. The validation must allow these benign uploads while + # still rejecting ``..`` escapes and absolute-path absorption (covered + # by the next test). + task = GCSToSFTPOperator( + task_id=TASK_ID, + source_bucket=TEST_BUCKET, + source_object="*", + destination_path=destination_path, + keep_directory_structure=True, + gcp_conn_id=GCP_CONN_ID, + sftp_conn_id=SFTP_CONN_ID, + ) + assert task._resolve_destination_path(source_object) == expected + + @pytest.mark.parametrize( + "source_object", + [ + pytest.param("../etc/passwd", id="dotdot-escape-from-relative-base"), + pytest.param("/etc/passwd", id="absolute-absorbs-relative-base"), + ], + ) + def test_resolve_destination_path_rejects_escape_from_relative_base(self, source_object): + # With a relative ``destination_path`` such as ``"."``, ``..`` segments + # escape the configured root and absolute ``source_object`` values + # absorb it entirely — both must still be refused. + task = GCSToSFTPOperator( + task_id=TASK_ID, + source_bucket=TEST_BUCKET, + source_object="*", + destination_path=".", + keep_directory_structure=True, + gcp_conn_id=GCP_CONN_ID, + sftp_conn_id=SFTP_CONN_ID, + ) + with pytest.raises(AirflowException, match="escapes configured destination_path"): + task._resolve_destination_path(source_object) diff --git a/scripts/ci/prek/known_airflow_exceptions.txt b/scripts/ci/prek/known_airflow_exceptions.txt index bd4570fc55f25..a5621aa95e624 100644 --- a/scripts/ci/prek/known_airflow_exceptions.txt +++ b/scripts/ci/prek/known_airflow_exceptions.txt @@ -305,7 +305,7 @@ providers/google/src/airflow/providers/google/cloud/transfers/facebook_ads_to_gc providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py::7 providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_gcs.py::6 providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_local.py::1 -providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_sftp.py::1 +providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_sftp.py::2 providers/google/src/airflow/providers/google/cloud/transfers/s3_to_gcs.py::2 providers/google/src/airflow/providers/google/cloud/transfers/sftp_to_gcs.py::1 providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py::2