Skip to content

Commit

Permalink
GoogleDriveHook: Fixing log message + adding more verbose documenta…
Browse files Browse the repository at this point in the history
…tion (#29694)
  • Loading branch information
aru-trackunit committed Mar 4, 2023
1 parent 88ed20a commit f55b957
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 4 deletions.
66 changes: 64 additions & 2 deletions airflow/providers/google/suite/hooks/drive.py
Expand Up @@ -21,6 +21,7 @@
from typing import IO, Any, Sequence

from googleapiclient.discovery import Resource, build
from googleapiclient.errors import Error as GoogleApiClientError
from googleapiclient.http import HttpRequest, MediaFileUpload

from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
Expand Down Expand Up @@ -159,6 +160,52 @@ def exists(
)
)

def _get_file_info(self, file_id: str):
"""
Returns Google API file_info object containing id, name, parents in the response
https://developers.google.com/drive/api/v3/reference/files/get
:param file_id: id as string representation of interested file
:return: file
"""
file_info = (
self.get_conn()
.files()
.get(
fileId=file_id,
fields="id,name,parents",
supportsAllDrives=True,
)
.execute(num_retries=2)
)
return file_info

def _resolve_file_path(self, file_id: str) -> str:
"""
Returns the full Google Drive path for given file_id
:param file_id: The id of a file in Google Drive
:return: Google Drive full path for a file
"""
has_reached_root = False
current_file_id = file_id
path: str = ""
while not has_reached_root:
# current_file_id can be file or directory id, Google API treats them the same way.
file_info = self._get_file_info(current_file_id)
if current_file_id == file_id:
path = f'{file_info["name"]}'
else:
path = f'{file_info["name"]}/{path}'

# Google API returns parents array if there is at least one object inside
if "parents" in file_info and len(file_info["parents"]) == 1:
# https://developers.google.com/drive/api/guides/ref-single-parent
current_file_id = file_info["parents"][0]
else:
has_reached_root = True
return path

def get_file_id(
self, folder_id: str, file_name: str, drive_id: str | None = None, *, include_trashed: bool = True
) -> dict:
Expand Down Expand Up @@ -213,6 +260,7 @@ def upload_file(
chunk_size: int = 100 * 1024 * 1024,
resumable: bool = False,
folder_id: str = "root",
show_full_target_path: bool = True,
) -> str:
"""
Uploads a file that is available locally to a Google Drive service.
Expand All @@ -227,6 +275,7 @@ def upload_file(
:param resumable: True if this is a resumable upload. False means upload
in a single request.
:param folder_id: The base/root folder id for remote_location (part of the drive URL of a folder).
:param show_full_target_path: If true then it reveals full available file path in the logs.
:return: File ID
"""
service = self.get_conn()
Expand All @@ -243,8 +292,21 @@ def upload_file(
.create(body=file_metadata, media_body=media, fields="id", supportsAllDrives=True)
.execute(num_retries=self.num_retries)
)
self.log.info("File %s uploaded to gdrive://%s.", local_location, remote_location)
return file.get("id")
file_id = file.get("id")

upload_location = remote_location

if folder_id != "root":
try:
upload_location = self._resolve_file_path(folder_id)
except GoogleApiClientError as e:
self.log.warning("A problem has been encountered when trying to resolve file path: ", e)

if show_full_target_path:
self.log.info("File %s uploaded to gdrive://%s.", local_location, upload_location)
else:
self.log.info("File %s has been uploaded successfully to gdrive", local_location)
return file_id

def download_file(self, file_id: str, file_handle: IO, chunk_size: int = 100 * 1024 * 1024):
"""
Expand Down
7 changes: 6 additions & 1 deletion airflow/providers/google/suite/transfers/local_to_drive.py
Expand Up @@ -40,7 +40,8 @@ class LocalFilesystemToGoogleDriveOperator(BaseOperator):
:ref:`howto/operator:LocalFilesystemToGoogleDriveOperator`
:param local_paths: Python list of local file paths
:param drive_folder: path of the Drive folder
:param drive_folder: path of the Drive folder, if folder_id param is given then drive_folder is a
sub path of folder_id.
:param gcp_conn_id: Airflow Connection ID for GCP
:param delete: should the local files be deleted after upload?
:param ignore_if_missing: if True, then don't fail even if all files
Expand All @@ -64,6 +65,7 @@ class LocalFilesystemToGoogleDriveOperator(BaseOperator):
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account
:param folder_id: The base/root folder id for each local path in the Drive folder
:param show_full_target_path: If true then it reveals full available file path in the logs.
:return: Remote file ids after upload
"""

Expand All @@ -84,6 +86,7 @@ def __init__(
delegate_to: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
folder_id: str = "root",
show_full_target_path: bool = True,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -97,6 +100,7 @@ def __init__(
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
self.folder_id = folder_id
self.show_full_target_path = show_full_target_path

def execute(self, context: Context) -> list[str]:
hook = GoogleDriveHook(
Expand All @@ -117,6 +121,7 @@ def execute(self, context: Context) -> list[str]:
chunk_size=self.chunk_size,
resumable=self.resumable,
folder_id=self.folder_id,
show_full_target_path=self.show_full_target_path,
)

remote_file_ids.append(remote_file_id)
Expand Down
56 changes: 55 additions & 1 deletion tests/providers/google/suite/hooks/test_drive.py
Expand Up @@ -273,6 +273,27 @@ def test_get_file_id_when_one_file_exists(self, mock_get_conn):
result_value = self.gdrive_hook.get_file_id(folder_id, file_name, drive_id)
assert result_value == {"id": "ID_1", "mime_type": "text/plain"}

@mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook.get_conn")
def test_resolve_file_path_when_file_in_root_directory(self, mock_get_conn):
mock_get_conn.return_value.files.return_value.get.return_value.execute.side_effect = [
{"id": "ID_1", "name": "file.csv", "parents": ["ID_2"]},
{"id": "ID_2", "name": "root"},
]

result_value = self.gdrive_hook._resolve_file_path(file_id="ID_1")
assert result_value == "root/file.csv"

@mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook.get_conn")
def test_resolve_file_path_when_file_nested_in_2_directories(self, mock_get_conn):
mock_get_conn.return_value.files.return_value.get.return_value.execute.side_effect = [
{"id": "ID_1", "name": "file.csv", "parents": ["ID_2"]},
{"id": "ID_2", "name": "folder_A", "parents": ["ID_3"]},
{"id": "ID_3", "name": "root"},
]

result_value = self.gdrive_hook._resolve_file_path(file_id="ID_1")
assert result_value == "root/folder_A/file.csv"

@mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook.get_conn")
def test_get_file_id_when_multiple_files_exists(self, mock_get_conn):
folder_id = "abxy1z"
Expand Down Expand Up @@ -300,8 +321,9 @@ def test_get_file_id_when_no_file_exists(self, mock_get_conn):
@mock.patch("airflow.providers.google.suite.hooks.drive.MediaFileUpload")
@mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook.get_conn")
@mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook._ensure_folders_exists")
@mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook._resolve_file_path")
def test_upload_file_to_root_directory(
self, mock_ensure_folders_exists, mock_get_conn, mock_media_file_upload
self, mock_resolve_file_path, mock_ensure_folders_exists, mock_get_conn, mock_media_file_upload
):
mock_get_conn.return_value.files.return_value.create.return_value.execute.return_value = {
"id": "FILE_ID"
Expand All @@ -310,6 +332,7 @@ def test_upload_file_to_root_directory(
return_value = self.gdrive_hook.upload_file("local_path", "remote_path")

mock_ensure_folders_exists.assert_not_called()
mock_resolve_file_path.assert_not_called()
mock_get_conn.assert_has_calls(
[
mock.call()
Expand Down Expand Up @@ -353,3 +376,34 @@ def test_upload_file_to_subdirectory(
]
)
assert return_value == "FILE_ID"

@mock.patch("airflow.providers.google.suite.hooks.drive.MediaFileUpload")
@mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook.get_conn")
@mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook._ensure_folders_exists")
@mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook._resolve_file_path")
def test_upload_file_into_folder_id(
self, mock_resolve_file_path, mock_ensure_folders_exists, mock_get_conn, mock_media_file_upload
):
file_id = "FILE_ID"
folder_id = "FOLDER_ID"
mock_get_conn.return_value.files.return_value.create.return_value.execute.return_value = {
"id": file_id
}
mock_resolve_file_path.return_value = "Shared_Folder_A/Folder_B" # path for FOLDER_ID

return_value = self.gdrive_hook.upload_file("/tmp/file.csv", "/file.csv", folder_id=folder_id)

mock_ensure_folders_exists.assert_not_called()
mock_get_conn.assert_has_calls(
[
mock.call()
.files()
.create(
body={"name": "file.csv", "parents": [folder_id]},
fields="id",
media_body=mock_media_file_upload.return_value,
supportsAllDrives=True,
)
]
)
assert return_value == file_id
2 changes: 2 additions & 0 deletions tests/providers/google/suite/transfers/test_local_to_drive.py
Expand Up @@ -48,13 +48,15 @@ def test_execute(self, mock_hook):
chunk_size=100 * 1024 * 1024,
resumable=False,
folder_id="some_folder_id",
show_full_target_path=True,
),
mock.call(
local_location="test2",
remote_location="test_folder/test2",
chunk_size=100 * 1024 * 1024,
resumable=False,
folder_id="some_folder_id",
show_full_target_path=True,
),
]
mock_hook.return_value.upload_file.assert_has_calls(calls)

0 comments on commit f55b957

Please sign in to comment.