Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dropbox ] Add logs #2555

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions connectors/sources/dropbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ async def _set_access_token(self):
"client_secret": self.app_secret,
}

self._logger.debug(
f"Generating an access token with url: {url}, headers: {headers}"
)
async with aiohttp.ClientSession() as session:
async with session.post(url=url, headers=headers, data=data) as response:
response_data = await response.json()
Expand Down Expand Up @@ -316,6 +319,9 @@ async def api_call(self, base_url, url_name, data=None, file_type=None, **kwargs
headers = self._get_request_headers(
file_type=file_type, url_name=url_name, kwargs=kwargs
)
self._logger.debug(
f"Making a POST call for url: {url} with a payload: {data}"
)
async with self._get_session.post(
url=url, headers=headers, data=data
) as response:
Expand Down Expand Up @@ -801,7 +807,7 @@ async def get_access_control(self):
self._logger.warning("DLS is not enabled. Skipping")
return

self._logger.info("Fetching members")
self._logger.info("Fetching users for Access Control sync")
async for users in self.dropbox_client.list_members():
for user in users.get("members", []):
yield await self._user_access_control_doc(user=user)
Expand Down Expand Up @@ -880,10 +886,11 @@ async def get_content(
download_func = self.download_func(is_shared, attachment, filename, folder_id)
if not download_func:
self._logger.warning(
f"Skipping the file: {filename} since it is not in the downloadable format."
f"Skipping file '{filename}' since it is not downloadable."
)
return

self._logger.debug(f"Downloading content for file: {filename}")
document = {
"_id": attachment["id"],
"_timestamp": attachment["server_modified"],
Expand Down Expand Up @@ -944,13 +951,15 @@ def _adapt_dropbox_shared_file_doc_to_es_doc(self, response):
}

async def _fetch_files_folders(self, path, folder_id=None):
self._logger.info(f"Fetching files and folders from path: '{path}'")
async for response in self.dropbox_client.get_files_folders(
path=path, folder_id=folder_id
):
for entry in response.get("entries"):
yield self._adapt_dropbox_doc_to_es_doc(response=entry), entry

async def _fetch_shared_files(self):
self._logger.info("Fetching shared files")
async for response in self.dropbox_client.get_shared_files():
for entry in response.get("entries"):
async for metadata in self.dropbox_client.get_received_file_metadata(
Expand All @@ -962,6 +971,7 @@ async def _fetch_shared_files(self):
), json_metadata

async def advanced_sync(self, rule):
self._logger.debug(f"Fetching files/folders for sync rule: {rule}")
async for response in self.dropbox_client.search_files_folders(rule=rule):
for entry in response.get("matches"):
data = entry.get("metadata", {}).get("metadata")
Expand Down Expand Up @@ -1010,6 +1020,7 @@ async def get_permission(self, permission, account_id):
return permissions

async def get_folder_permission(self, shared_folder_id, account_id):
self._logger.debug(f"Fetching permissions for folder: {shared_folder_id}")
if not shared_folder_id:
return [account_id]

Expand All @@ -1021,6 +1032,7 @@ async def get_folder_permission(self, shared_folder_id, account_id):
)

async def get_file_permission_without_batching(self, file_id, account_id):
self._logger.debug(f"Fetching permissions for file: {file_id}")
async for permission in self.dropbox_client.list_file_permission_without_batching(
file_id=file_id
):
Expand All @@ -1029,6 +1041,7 @@ async def get_file_permission_without_batching(self, file_id, account_id):
)

async def get_account_details(self):
self._logger.debug("Fetching account details")
response = await anext(
self.dropbox_client.api_call(
base_url=BASE_URLS["FILES_FOLDERS_BASE_URL"],
Expand All @@ -1042,13 +1055,15 @@ async def get_account_details(self):
return account_id, member_id

async def get_permission_list(self, item_type, item, account_id):
self._logger.debug(f"Fetching permissions for {item_type}")
if item_type == FOLDER:
shared_folder_id = item.get("shared_folder_id") or item.get(
"parent_shared_folder_id"
)
return await self.get_folder_permission(
shared_folder_id=shared_folder_id, account_id=account_id
)

return await self.get_file_permission_without_batching(
file_id=item.get("id"), account_id=account_id
)
Expand Down Expand Up @@ -1144,9 +1159,11 @@ async def add_document_to_list(self, func, account_id, folder_id, is_shared=Fals
batched_document = {}

async def fetch_file_folders_with_dls(self):
self._logger.info("Fetching permissions for files and folders")
account_id, member_id = await self.get_account_details()
self.dropbox_client.member_id = member_id
async for folder_id in self.get_team_folder_id():
self._logger.info(f"Iterating through folder with id '{folder_id}'")
async for mapped_document in self.add_document_to_list(
func=self._fetch_files_folders,
account_id=account_id,
Expand Down Expand Up @@ -1177,9 +1194,10 @@ async def get_docs(self, filtering=None):

elif filtering and filtering.has_advanced_rules():
advanced_rules = filtering.get_advanced_rules()
self._logger.debug(
f"Fetching documents using configured advanced sync rules: {advanced_rules}"
)
for rule in advanced_rules:
self._logger.debug(f"Fetching files using advanced sync rule: {rule}")

async for document, attachment in self.advanced_sync(rule=rule):
yield self.document_tuple(document=document, attachment=attachment)
else:
Expand Down