cgw-util-cos-sync

Syncs two COS buckets using the [CLAIMED Generic Grid Computing Pattern](https://medium.com/@romeokienzler/the-generic-grid-computing-pattern-transforms-any-sequential-workflow-step-into-a-transient-grid-c7f3ca7459c8) which allows:

- processing data in partitions based on a file matching pattern for parallelization
- running on transient storage only as both sides of this operator are COS

In [None]:
!pip install aiobotocore botocore s3fs

In [None]:
import logging
import os
import re
import s3fs
import sys
import glob

In [None]:
# cos path to get job (files) from (including bucket)
cgw_source_path = os.environ.get('cgw_source_path') 

# cgw_source_access_key_id
cgw_source_access_key_id = os.environ.get('cgw_source_access_key_id') 

# source_secret_access_key
cgw_source_secret_access_key = os.environ.get('cgw_source_secret_access_key') 

# source_endpoint
cgw_source_endpoint = os.environ.get('cgw_source_endpoint')

# cgw_coordinator_access_key_id
cgw_coordinator_access_key_id = os.environ.get('cgw_coordinator_access_key_id') 

# cgw_coordinator_secret_access_key
cgw_coordinator_secret_access_key = os.environ.get('cgw_coordinator_secret_access_key') 

# cgw_coordinator_endpoint
cgw_coordinator_endpoint = os.environ.get('cgw_coordinator_endpoint') 

# cgw_target_access_key_id
cgw_target_access_key_id = os.environ.get('cgw_target_access_key_id') 

# cgw_target_secret_access_key
cgw_target_secret_access_key = os.environ.get('cgw_target_secret_access_key') 

# cgw_target_endpoint
cgw_target_endpoint = os.environ.get('cgw_target_endpoint') 

# cgw_target_path (including bucket)
cgw_target_path = os.environ.get('cgw_target_path') 

# lock file suffix
cgw_lock_file_suffix = os.environ.get('cgw_lock_file_suffix', '.lock')

# processed file suffix
cgw_processed_file_suffix = os.environ.get('cgw_processed_file_suffix', '.processed')

# log level
cgw_log_level = os.environ.get('cgw_log_level', 'INFO')

# timeout in seconds to remove lock file from struggling job (default 1 hour)
cgw_lock_timeout = int(os.environ.get('cgw_lock_timeout', 60*60))

# group files which need to be processed together
cgw_group_by = os.environ.get('cgw_group_by', None)

In [None]:
root = logging.getLogger()
root.setLevel(cgw_log_level)

handler = logging.StreamHandler(sys.stdout)
handler.setLevel(cgw_log_level)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
root.addHandler(handler)

logging.basicConfig(level=logging.CRITICAL)

parameters = list(
    map(lambda s: re.sub('$', '"', s),
        map(
            lambda s: s.replace('=', '="'),
            filter(
                lambda s: s.find('=') > -1 and bool(re.match(r'[A-Za-z0-9_]*=[.\/A-Za-z0-9]*', s)),
                sys.argv
            )
    )))

for parameter in parameters:
    logging.info('Parameter: ' + parameter)
    exec(parameter)

# parameter postprocessing
cgw_source_path = cgw_source_path[1:] if cgw_source_path.startswith('/') else cgw_source_path

In [None]:
input_path = 'input'
target_path = 'target'

if not os.path.exists(input_path):
    os.makedirs(input_path)
    
if not os.path.exists(target_path):
    os.makedirs(target_path)

In [None]:
def process():
    return os.listdir(target_path)

In [None]:
import os
import s3fs

s3source = s3fs.S3FileSystem(
    anon=False,
    key=cgw_source_access_key_id,
    secret=cgw_source_secret_access_key,
    client_kwargs={'endpoint_url': cgw_source_endpoint})

s3target = s3fs.S3FileSystem(
    anon=False,
    key=cgw_target_access_key_id,
    secret=cgw_target_secret_access_key,
    client_kwargs={'endpoint_url': cgw_target_endpoint})

s3coordinator = s3fs.S3FileSystem(
    anon=False,
    key=cgw_coordinator_access_key_id,
    secret=cgw_coordinator_secret_access_key,
    client_kwargs={'endpoint_url': cgw_coordinator_endpoint})

In [None]:
groups = set()
all_files = s3source.glob(cgw_source_path)
for path_string in all_files:
    exec('part = path_string' + cgw_group_by)
    logging.info(f'Identified group: {part}')
    groups.add(part)

In [None]:
from datetime import datetime, timedelta
import os
import random
import time

delay = random.randint(1, 60)
logging.info(f'Staggering start, waiting for {delay} seconds')
time.sleep(delay)

for batch in groups:
	logging.info(f'Processing batch: {batch}')

	batch_fileset = list(filter(lambda file: batch in file, all_files))
	file = batch_fileset[0]

	logging.info(f'Processing {file}: ')

	lock_file = file+cgw_lock_file_suffix
	processed_file = file+cgw_processed_file_suffix

	def is_locked():
		exists =s3coordinator.exists(lock_file)
		logging.info(f'Lock file {lock_file} {exists}')
		return exists

	def is_processed():
		exists =s3coordinator.exists(processed_file)
		logging.info(f'Processed file {processed_file} {exists}')
		return exists

	def unlock():
		s3coordinator.rm(lock_file)

	def lock():
		logging.info(f'Locking {lock_file}')
		s3coordinator.touch(lock_file)

	def is_locked_expired():
		if is_locked():
			last_modified = s3coordinator.info(lock_file)['LastModified']
			logging.info(f'Lock file {lock_file} last modified {last_modified}')
			time_threshold = datetime.now(last_modified.tzinfo) - timedelta(seconds=cgw_lock_timeout)
			logging.info(f'Lock file {lock_file} time threshold {time_threshold}')
			expired = last_modified < time_threshold
			logging.info(f'Lock file {lock_file} expired {expired}')
			return expired
		else:
			return False

	# remove strugglers
	if is_locked_expired():
		s3coordinator.rm(lock_file)

	if is_locked():
		continue

	if is_processed():
		continue
	
	lock()

	# downloading input files to local storage
	for batch_file in batch_fileset:
		logging.info(f'Downloading {batch_file}')
		#if os.path.exists(input_path + '/' + batch_file.split('/')[-1]):
		#	continue
		s3source.get(batch_file,input_path + '/' + batch_file.split('/')[-1])

	output_files : list = process()

	# removing local input files
	for batch_file in batch_fileset:
		file_to_remove = batch_file.split('/')[-1]
		logging.info(f'Removing {file_to_remove}')
		os.remove(os.path.join(input_path,file_to_remove))

	# uploading results to cos
	for output_file in output_files:
		logging.info(f'Uploading {output_file}')
		s3target.put(os.path.join(target_path,output_file),cgw_target_path + '/' + output_file)

	# removing local target files
	for output_file in output_files:
		output_file = output_file.split('/')[-1]
		logging.info(f'Removing {output_file}')
		os.remove(os.path.join(target_path,output_file))
	
	# marking input files as processed
	logging.info(f'Marking {processed_file} as processed')
	s3source.touch(processed_file)
	
	unlock()