hi. im want to transfer files quickly to my other account using python. but i get a problem whenever i tried, its always return
which clearly i have on the other account. i already give editor permission.
here is my code.Note this is from AI
import argparse
import json
import logging
import os
import random
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence
from urllib.parse import parse_qs
from google.auth.transport.requests import Request
from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
SCOPES = ["https://www.googleapis.com/auth/drive"]
FOLDER_MIME = "application/vnd.google-apps.folder"
RETRYABLE_REASONS = {
"rateLimitExceeded",
"userRateLimitExceeded",
"sharingRateLimitExceeded",
"backendError",
"internalError",
}
NON_RETRYABLE_REASONS = {"pendingOwnerWriterRequired"}
RETRYABLE_STATUS = {403, 429, 500, 502, 503, 504}
def parse_args() -> argparse.Namespace:
base_dir = Path(__file__).resolve().parent
parser = argparse.ArgumentParser()
parser.add_argument("--auth-mode", choices=["oauth", "service-account"], default="oauth")
parser.add_argument("--credentials-file")
parser.add_argument("--config-file", default=str(base_dir / "request_config.json"))
parser.add_argument("--token-file", default=str(base_dir / "request_token.json"))
parser.add_argument("--oauth-redirect-uri")
parser.add_argument("--subject-email")
parser.add_argument("--folder-id")
parser.add_argument("--target-owner-email")
parser.add_argument("--page-size", type=int, default=200)
parser.add_argument("--batch-size", type=int, default=100)
parser.add_argument("--max-files", type=int)
parser.add_argument("--max-retries", type=int, default=8)
parser.add_argument("--min-backoff", type=float, default=1.0)
parser.add_argument("--max-backoff", type=float, default=32.0)
parser.add_argument("--send-notification-email", action="store_true", default=True)
parser.add_argument("--no-send-notification-email", dest="send_notification_email", action="store_false")
parser.add_argument("--log-file", default=str(base_dir / "request.log"))
parser.add_argument("--result-file", default=str(base_dir / "request_results.jsonl"))
parser.add_argument("--dry-run", action="store_true")
return parser.parse_args()
def guess_credentials_file(auth_mode: str) -> Optional[str]:
base_dir = Path(__file__).resolve().parent
if auth_mode == "service-account":
patterns = [
"service-account.json",
"*service-account*.json",
"*.service-account.json",
]
else:
patterns = [
"cred.json",
"oauth-client.json",
"credentials.json",
"client_secret*.json",
"*oauth*.json",
]
for pattern in patterns:
matches = sorted(base_dir.glob(pattern))
if matches:
return str(matches[0])
return None
def prompt_value(label: str, current_value: Optional[str], default: Optional[str] = None) -> str:
if current_value:
return current_value
if default:
entered = input(f"{label} [{default}]: ").strip()
return entered or default
while True:
entered = input(f"{label}: ").strip()
if entered:
return entered
def load_json_file(path: str) -> Dict[str, Any]:
try:
payload = json.loads(Path(path).read_text(encoding="utf-8"))
except Exception:
return {}
return payload if isinstance(payload, dict) else {}
def save_json_file(path: str, payload: Dict[str, Any]) -> None:
config_path = Path(path)
config_path.parent.mkdir(parents=True, exist_ok=True)
config_path.write_text(json.dumps(payload, ensure_ascii=True, indent=2), encoding="utf-8")
def normalize_args(args: argparse.Namespace) -> argparse.Namespace:
saved_config = load_json_file(args.config_file)
guessed_credentials = guess_credentials_file(args.auth_mode)
saved_credentials = saved_config.get("credentials_file")
saved_folder_id = saved_config.get("folder_id")
saved_target_owner_email = saved_config.get("target_owner_email")
saved_subject_email = saved_config.get("subject_email")
saved_redirect_uri = saved_config.get("oauth_redirect_uri")
args.credentials_file = prompt_value(
"Path credentials file",
args.credentials_file,
saved_credentials or guessed_credentials,
)
if args.auth_mode == "service-account":
args.subject_email = prompt_value(
"Email akun sumber untuk impersonation",
args.subject_email,
saved_subject_email,
)
args.folder_id = prompt_value("Folder ID sumber", args.folder_id, saved_folder_id)
args.target_owner_email = prompt_value("Email akun tujuan", args.target_owner_email, saved_target_owner_email)
if not args.oauth_redirect_uri:
args.oauth_redirect_uri = saved_redirect_uri
save_json_file(
args.config_file,
{
"auth_mode": args.auth_mode,
"credentials_file": args.credentials_file,
"oauth_redirect_uri": args.oauth_redirect_uri,
"subject_email": args.subject_email,
"folder_id": args.folder_id,
"target_owner_email": args.target_owner_email,
},
)
return args
def resolve_oauth_redirect_uri(credentials_file: str, current_value: Optional[str]) -> str:
if current_value:
return current_value
try:
payload = json.loads(Path(credentials_file).read_text(encoding="utf-8"))
except Exception:
payload = {}
for key in ("installed", "web"):
config = payload.get(key) or {}
redirect_uris = config.get("redirect_uris") or []
if redirect_uris:
return redirect_uris[0]
return "http://localhost"
def configure_oauth_transport(redirect_uri: str) -> None:
if redirect_uri.startswith("http://localhost") or redirect_uri.startswith("http://127.0.0.1"):
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
def setup_logging(log_file: str) -> None:
log_path = Path(log_file)
log_path.parent.mkdir(parents=True, exist_ok=True)
handlers = [
logging.FileHandler(log_path, encoding="utf-8"),
logging.StreamHandler(sys.stdout),
]
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
handlers=handlers,
)
def utc_now() -> str:
return datetime.now(timezone.utc).isoformat()
def write_result(path: str, record: Dict[str, Any]) -> None:
result_path = Path(path)
result_path.parent.mkdir(parents=True, exist_ok=True)
with result_path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(record, ensure_ascii=True) + "\n")
def load_credentials(args: argparse.Namespace):
if args.auth_mode == "service-account":
creds = service_account.Credentials.from_service_account_file(
args.credentials_file,
scopes=SCOPES,
)
if args.subject_email:
creds = creds.with_subject(args.subject_email)
return creds
token_path = Path(args.token_file)
creds = None
if token_path.exists():
creds = Credentials.from_authorized_user_file(str(token_path), SCOPES)
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
token_path.write_text(creds.to_json(), encoding="utf-8")
return creds
if creds and creds.valid:
return creds
args.oauth_redirect_uri = resolve_oauth_redirect_uri(args.credentials_file, args.oauth_redirect_uri)
configure_oauth_transport(args.oauth_redirect_uri)
flow = InstalledAppFlow.from_client_secrets_file(args.credentials_file, SCOPES)
flow.redirect_uri = args.oauth_redirect_uri
auth_url, _ = flow.authorization_url(access_type="offline", prompt="consent")
print("Buka URL berikut di browser untuk login:")
print(auth_url)
print("")
redirected = input("Setelah redirect ke localhost, paste URL lengkapnya di sini lalu Enter: ").strip()
if redirected.startswith("http://") or redirected.startswith("https://"):
flow.fetch_token(authorization_response=redirected)
else:
parsed = parse_qs(redirected)
code = (parsed.get("code") or [redirected])[0]
flow.fetch_token(code=code)
creds = flow.credentials
token_path.write_text(creds.to_json(), encoding="utf-8")
return creds
def build_service(args: argparse.Namespace):
creds = load_credentials(args)
return build("drive", "v3", credentials=creds, cache_discovery=False)
def extract_error(error: HttpError) -> Dict[str, Any]:
status = getattr(getattr(error, "resp", None), "status", None)
reason = None
message = str(error)
payload = None
try:
payload = json.loads(error.content.decode("utf-8"))
except Exception:
payload = None
if isinstance(payload, dict):
error_block = payload.get("error") or {}
message = error_block.get("message", message)
details = error_block.get("errors") or []
if details:
reason = details[0].get("reason")
return {"status": status, "reason": reason, "message": message}
def should_retry(error: Exception) -> bool:
if not isinstance(error, HttpError):
return isinstance(error, (TimeoutError, ConnectionError))
details = extract_error(error)
if details["reason"] in NON_RETRYABLE_REASONS:
return False
return details["status"] in RETRYABLE_STATUS or details["reason"] in RETRYABLE_REASONS
def execute_with_retry(
request_factory: Callable[[], Any],
args: argparse.Namespace,
action: str,
file_id: Optional[str] = None,
) -> Any:
attempt = 0
while True:
try:
return request_factory().execute()
except Exception as error:
attempt += 1
if attempt > args.max_retries or not should_retry(error):
raise
sleep_for = min(args.max_backoff, args.min_backoff * (2 ** (attempt - 1))) + random.random()
logging.warning(
"retrying action=%s file_id=%s attempt=%s wait=%.2fs error=%s",
action,
file_id,
attempt,
sleep_for,
error,
)
time.sleep(sleep_for)
def get_me(service, args: argparse.Namespace) -> Dict[str, Any]:
return execute_with_retry(
lambda: service.about().get(fields="user(emailAddress,displayName)"),
args,
"about.get",
)
def get_file(service, args: argparse.Namespace, file_id: str) -> Dict[str, Any]:
return execute_with_retry(
lambda: service.files().get(
fileId=file_id,
fields="id,name,mimeType,owners(emailAddress),trashed,driveId",
supportsAllDrives=True,
),
args,
"files.get",
file_id=file_id,
)
def list_children(
service,
args: argparse.Namespace,
folder_id: str,
page_token: Optional[str] = None,
) -> Dict[str, Any]:
return execute_with_retry(
lambda: service.files().list(
q=f"'{folder_id}' in parents and trashed = false",
fields="nextPageToken,files(id,name,mimeType,owners(emailAddress),trashed,driveId)",
pageSize=args.page_size,
pageToken=page_token,
includeItemsFromAllDrives=True,
supportsAllDrives=True,
),
args,
"files.list",
file_id=folder_id,
)
def iter_tree(
service,
args: argparse.Namespace,
root_folder_id: str,
) -> Iterable[Dict[str, Any]]:
root = get_file(service, args, root_folder_id)
seen = {root_folder_id}
queue = [root]
yielded = 0
while queue:
current = queue.pop(0)
yield current
yielded += 1
if args.max_files and yielded >= args.max_files:
return
if current.get("mimeType") != FOLDER_MIME:
continue
page_token = None
while True:
response = list_children(service, args, current["id"], page_token=page_token)
for child in response.get("files", []):
child_id = child["id"]
if child_id in seen:
continue
seen.add(child_id)
queue.append(child)
page_token = response.get("nextPageToken")
if not page_token:
break
def list_permissions(service, args: argparse.Namespace, file_id: str) -> List[Dict[str, Any]]:
response = execute_with_retry(
lambda: service.permissions().list(
fileId=file_id,
fields="permissions(id,emailAddress,role,type,pendingOwner,deleted,permissionDetails(inherited,inheritedFrom,role,permissionType))",
supportsAllDrives=True,
),
args,
"permissions.list",
file_id=file_id,
)
return response.get("permissions", [])
def is_owned_by(item: Dict[str, Any], owner_email: str) -> bool:
owners = item.get("owners") or []
owner_lower = owner_email.lower()
return any((owner.get("emailAddress") or "").lower() == owner_lower for owner in owners)
def find_target_permission(permissions: Sequence[Dict[str, Any]], target_email: str) -> Optional[Dict[str, Any]]:
target_lower = target_email.lower()
for permission in permissions:
if permission.get("deleted"):
continue
email = (permission.get("emailAddress") or "").lower()
if permission.get("type") == "user" and email == target_lower:
return permission
return None
def refresh_target_permission(
service,
args: argparse.Namespace,
file_id: str,
target_email: str,
) -> Optional[Dict[str, Any]]:
permissions = list_permissions(service, args, file_id)
return find_target_permission(permissions, target_email)
def ensure_direct_writer_permission(
service,
args: argparse.Namespace,
item: Dict[str, Any],
) -> Dict[str, Any]:
target_permission = refresh_target_permission(service, args, item["id"], args.target_owner_email)
if target_permission and target_permission.get("role") == "owner":
return target_permission
if target_permission and target_permission.get("role") == "writer":
return target_permission
if target_permission:
updated = execute_with_retry(
lambda: service.permissions().update(
fileId=item["id"],
permissionId=target_permission["id"],
body={"role": "writer"},
supportsAllDrives=True,
fields="id,emailAddress,role,pendingOwner",
),
args,
"permissions.update.writer",
file_id=item["id"],
)
time.sleep(2)
return updated
created = execute_with_retry(
lambda: service.permissions().create(
fileId=item["id"],
body={
"type": "user",
"role": "writer",
"emailAddress": args.target_owner_email,
},
supportsAllDrives=True,
sendNotificationEmail=args.send_notification_email,
fields="id,emailAddress,role,pendingOwner",
),
args,
"permissions.create.writer",
file_id=item["id"],
)
time.sleep(2)
return created
def set_pending_owner(
service,
args: argparse.Namespace,
item: Dict[str, Any],
) -> Dict[str, Any]:
ensure_direct_writer_permission(service, args, item)
time.sleep(5)
target_permission = refresh_target_permission(service, args, item["id"], args.target_owner_email)
if not target_permission:
raise RuntimeError("Target account permission was not found after writer propagation")
return execute_with_retry(
lambda: service.permissions().update(
fileId=item["id"],
permissionId=target_permission["id"],
body={"role": "writer", "pendingOwner": True},
supportsAllDrives=True,
fields="id,emailAddress,role,pendingOwner",
),
args,
"permissions.update.pendingOwner",
file_id=item["id"],
)
def skip_record(item: Dict[str, Any], status: str, message: str) -> Dict[str, Any]:
return {
"timestamp": utc_now(),
"fileId": item.get("id"),
"name": item.get("name"),
"mimeType": item.get("mimeType"),
"status": status,
"message": message,
}
def request_transfer(
service,
args: argparse.Namespace,
item: Dict[str, Any],
source_email: str,
) -> Dict[str, Any]:
record = {
"timestamp": utc_now(),
"fileId": item["id"],
"name": item.get("name"),
"mimeType": item.get("mimeType"),
"targetOwnerEmail": args.target_owner_email,
}
if item.get("trashed"):
record.update({"status": "skipped_trashed", "message": "File is trashed"})
return record
if item.get("driveId"):
record.update(
{
"status": "skipped_shared_drive",
"message": "Ownership transfer is not supported for shared drive items",
}
)
return record
if not is_owned_by(item, source_email):
record.update(
{
"status": "skipped_not_owned",
"message": f"Authenticated account does not own this file as {source_email}",
}
)
return record
if args.dry_run:
record.update({"status": "dry_run", "message": "No permission change was sent"})
return record
target_permission = refresh_target_permission(service, args, item["id"], args.target_owner_email)
if target_permission and target_permission.get("role") == "owner":
record.update({"status": "already_owner", "message": "Target account is already the owner"})
return record
updated = set_pending_owner(service, args, item)
record.update(
{
"status": "requested",
"permissionId": updated.get("id"),
"pendingOwner": updated.get("pendingOwner"),
"role": updated.get("role"),
"message": "Prepared direct writer permission and set pending owner",
}
)
return record
def chunks(items: Sequence[Dict[str, Any]], size: int) -> Iterable[Sequence[Dict[str, Any]]]:
for index in range(0, len(items), size):
yield items[index : index + size]
def main() -> int:
args = normalize_args(parse_args())
setup_logging(args.log_file)
service = build_service(args)
me = get_me(service, args)
source_email = ((me.get("user") or {}).get("emailAddress") or "").strip()
if not source_email:
logging.error("Unable to determine authenticated email address")
return 1
logging.info("authenticated email=%s target=%s", source_email, args.target_owner_email)
items = list(iter_tree(service, args, args.folder_id))
logging.info("discovered items=%s folder_id=%s", len(items), args.folder_id)
counts = {
"requested": 0,
"already_owner": 0,
"dry_run": 0,
"skipped_not_owned": 0,
"skipped_shared_drive": 0,
"skipped_trashed": 0,
"failed": 0,
}
for batch_number, batch in enumerate(chunks(items, args.batch_size), start=1):
logging.info("processing batch=%s size=%s", batch_number, len(batch))
for item in batch:
try:
result = request_transfer(service, args, item, source_email)
except Exception as error:
result = skip_record(item, "failed", str(error))
counts[result["status"]] = counts.get(result["status"], 0) + 1
write_result(args.result_file, result)
logging.info(
"file_id=%s status=%s name=%s message=%s",
result.get("fileId"),
result.get("status"),
result.get("name"),
result.get("message"),
)
logging.info("summary=%s", json.dumps(counts, ensure_ascii=True, sort_keys=True))
return 0 if counts.get("failed", 0) == 0 else 2
if __name__ == "__main__":
raise SystemExit(main())
my permission is already editor, idk whats wrong this. i just want to transfer quickly.
i already search this problem in google, but no one has this issue.
hi. im want to transfer files quickly to my other account using python. but i get a problem whenever i tried, its always return
2026-04-17 17:22:45,421 INFO file_id=...<file-id>....- status=failed name=Graph.JPG message=<HttpError 403 when requesting https://www.googleapis.com/drive/v3/files/....<file-id>..../permissions/04862162013125427771?supportsAllDrives=true&fields=id%2CemailAddress%2Crole%2CpendingOwner&alt=json returned "The target user cannot be a pending owner because the target user does not have a writer role for the file.". Details: "[{'message': 'The target user cannot be a pending owner because the target user does not have a writer role for the file.', 'domain': 'global', 'reason': 'pendingOwnerWriterRequired'}]">which clearly i have on the other account. i already give editor permission.
here is my code.Note this is from AI
my permission is already editor, idk whats wrong this. i just want to transfer quickly.
i already search this problem in google, but no one has this issue.