#!/usr/bin/python3 import multiprocessing from multiprocessing import Process import pathlib import argparse import socket import struct import random import threading import time import logging import logging.config import queue import sys import os import shutil import errno import traceback import configparser import http from google.cloud import storage from google.oauth2 import service_account from google.cloud.exceptions import GoogleCloudError from datetime import datetime, date, timedelta import google.cloud.exceptions import google.auth.exceptions METADATA_PREFIX = 'UCA-META-' GET_RETRY_COUNT=3 GET_RETRY_ERRORS='504,503' # Comma seperated list of error to retry ####################################################################### # download_file ####################################################################### def download_file( mygoogle_client, mybucket, myobjectname ): """Perform actual download""" # Build destination file name destfile = os.path.join(args.destdir, myobjectname) # Loop until success or max retries count_attempts = 0 while True: try: count_attempts += 1 # Get object metadata blob = mybucket.get_blob(myobjectname) # If it doesn't exist get out if blob is None: mylogger.error('object %s not found', myobjectname) qPrimary.task_done() return # Download file blob.download_to_filename(destfile) # Update metadata stuff object_metadata = {} retrieve_counter = blob.metadata[METADATA_PREFIX + 'RETRIEVECOUNTER'] if retrieve_counter is None: retrieve_counter = 1 else: retrieve_counter = int(retrieve_counter) + 1 object_metadata[METADATA_PREFIX + 'RETRIEVECOUNTER'] = str(retrieve_counter) mylogger.debug('Set retrieve counter to %s', object_metadata[METADATA_PREFIX + 'RETRIEVECOUNTER']) blob.metadata = object_metadata blob.patch(mygoogle_client) mylogger.debug('Cloud - GET %s completed', myobjectname) return except Exception as e: error_code=0 logging.error('Exception %s', vars(e)) # If a google error get its error code if hasattr(e, 'response'): logging.error('Response %s', vars(e.response)) if hasattr(e.response, 'status_code'): error_code=e.response.status_code logging.error('Download/Patch of %s failed (%s)', myobjectname, str(e)) if ( str(error_code) in GET_RETRY_ERRORS and count_attempts < GET_RETRY_COUNT): logging.error('Will retry! (%d)', count_attempts) time.sleep(1) else: return ####################################################################### # Worker thread - Pulls filenames from queue and retrieves from cloud ####################################################################### def retrieveWorker( ): """thread worker function""" # Initialize the cloud connection and get bucket once try: mylogger.debug('Creating GCS credentials') credentials = service_account.Credentials.from_service_account_file( args.gcs_key_file, scopes=["https://www.googleapis.com/auth/cloud-platform"],) mylogger.debug('Creating GCS client') google_client = storage.Client( credentials=credentials, project=credentials.project_id,) mylogger.debug('GCS connected') bucket = google_client.get_bucket(args.bucketname) except google.cloud.exceptions.NotFound: logging.error('Bucket %s not found', self.uca_default_container_name) raise except google.auth.exceptions.RefreshError as e: logging.error('Unable to refresh access token. Verify keyfile "%s" is valid.', self.gcs_key_file) raise except Exception as e: logging.error('Error while authenticating Google client. Verify json file. (%s)', str(e)) raise # Loop until work is done while True: # Look for work try: item = qPrimary.get(True, 5) except queue.Empty: # Nothing there, sleep and try again mylogger.debug('Cloud empty queue' ) return mylogger.debug( 'Processing %s', item ); download_file( google_client, bucket, item ) # Mark work as done if USE_THREADS is True: qPrimary.task_done() # Go do more work continue mylogger.debug('Cloud - worker ending') return ####################################################################### # Main - cloudRetrieve.py - Gets requested files from cloud ####################################################################### if __name__ == "__main__": # Setup basic logging for the app logging.basicConfig(stream=sys.stdout, format='%(asctime)s.%(msecs)03d %(threadName)-10s %(funcName)10s %(levelname)6s: %(message)s', level=logging.ERROR) mylogger = logging.getLogger(__name__) # Read in command line arguments parser = argparse.ArgumentParser() parser.add_argument("--inputfile", required=True, default='', help="File containing list of objects") parser.add_argument("--destdir", required=True, default='', help="Location to write files too") parser.add_argument("--workers", required=True, default='', help="Number of retrieve threads") parser.add_argument("--gcs_key_file", required=True, default='', help="key file") parser.add_argument("--bucketname", required=True, default='', help="name of bucket") parser.add_argument("--debug", action='store_true', required=False, default=False, help="True/False") args = parser.parse_args() # Turn on debug id asked for if args.debug == True: mylogger.setLevel(logging.DEBUG) # Check destination dir ret = os.access( args.destdir, os.W_OK ) if ret == False or len(args.destdir) == 0: mylogger.error('Bad destdir %s', args.destdir) sys.exit() # Check the key file keyfile = pathlib.Path(args.gcs_key_file) if not keyfile.exists(): mylogger.error('Key file {} not found '.format(args.gcs_key_file)) sys.exit() # Check the input file listfile = pathlib.Path(args.inputfile) if not listfile.exists(): mylogger.error('input file {} not found '.format(args.inputfile)) sys.exit() USE_THREADS=False #USE_THREADS=True # Parameter Dump mylogger.debug( 'PARAMETERS\n==================================' ) mylogger.debug( 'Input file: {}'.format(args.inputfile) ) mylogger.debug( 'Dest dir: {}'.format(args.destdir) ) mylogger.debug( 'Workers: {}'.format(args.workers) ) mylogger.debug( 'Key file: {}'.format(args.gcs_key_file) ) mylogger.debug( 'Bucket: {}\n'.format(args.bucketname) ) mylogger.debug( 'Threds: {}\n'.format(USE_THREADS) ) # Build queueu + populate it print( 'Loading work queue...' ) if USE_THREADS is True: qPrimary = queue.Queue(maxsize=500000) else: qPrimary = multiprocessing.Queue(maxsize=500000) with open(args.inputfile, 'r') as fp: for cnt, line in enumerate(fp): qPrimary.put(line.rstrip(), block=False) fp.close() print( 'Added {} items to work queue'.format(cnt) ) print( 'Starting file retrieval' ) totaltime_start = time.time() primary_workers = [] if USE_THREADS is True: # Start the right number of primary workers for i in range(int(args.workers)): t = threading.Thread(target=retrieveWorker) t.setDaemon(True) t.start() primary_workers.append(t) # Loop waiting on threads completion while True: sleeptime=5 # Check all primary workers status and start new primary worker if not alive for t in primary_workers: if not t.is_alive(): sleeptime=1 mylogger.debug('Primary worker thread {} is complete...'.format(t.name)) primary_workers.remove(t) if len(primary_workers) == 0: mylogger.debug('All workers are complete...') totaltime_end = time.time() break time.sleep(sleeptime) else: # Start the right number of primary workers for i in range(int(args.workers)): proc = Process(target=retrieveWorker) # instantiating without any argume primary_workers.append(proc) proc.start() # complete the processes for proc in primary_workers: proc.join() totaltime_end = time.time() # Sutting down from here on out mylogger.debug('Graceful end of the program.') print( 'Retrieval took {}'.format(totaltime_end - totaltime_start) )