In [None]:
from pathlib import Path

from dustgoggles.structures import MaybePool
from hostess.aws.s3 import Bucket
from hostess.monitors import Stopwatch
from more_itertools import chunked

from mcat_conversion.conversion import convert_mcats

In [None]:
CHUNKSZ = 5  # how many files each individual child process should convert
BUCKET_NAME = 'uraniborg-sieve-7738937'
OUTPRE = "parquet"  # output prefix under s3://BUCKET_NAME for converted files
TEMPPATH = Path("mcat_temp/")  # where to write scratch files on local system
LOGFILE = Path("mcat_conversion.log")  # log file on local system
BUCKET = Bucket(BUCKET_NAME)
# at least on an m6i.xlarge, this is CPU-bound. recommend number of cores - 1 
# at most; thrashing is pretty bad for performance here.
THREADS = 3  

In [None]:
manifest = BUCKET.df()
# the fits files
mcat_fits = manifest.loc[manifest['Key'].str.endswith('.fits.gz')]
# existing parquet files
mcat_pq = manifest.loc[manifest['Key'].str.endswith('.parquet')]
# stems of files we have we already converted
converted = mcat_pq['Key'].map(lambda p: Path(p).stem)
# stems of all fits files
avail = mcat_fits['Key'].map(lambda p: Path(p).stem).str.replace('.fits', '')
# which we shall convert -- change this if you wish to overwrite, or just
# delete the old ones
conversion_targets = mcat_fits.loc[~avail.isin(converted), 'Key']
# clusters of files to pass to child processes
chunks = tuple(chunked(conversion_targets, CHUNKSZ))

In [None]:
# set up the process pool and specs for process executions
pool = MaybePool(THREADS)
argrecs = [
    {'args': (BUCKET_NAME, chunk, TEMPPATH, OUTPRE, LOGFILE)} 
    for chunk in chunks
]

In [None]:
# ACTUALLY RUNS THE PROCESSES.

# the stopwatch is inessential obviously. it's just for interactive monitoring. 
watch = Stopwatch()
watch.start()
pool.map(convert_mcats, argrecs)
pool.close()

In [None]:
# just an interactive process monitor.

watch.click()
hours = watch.total / 3600 
n_ready = len([v for v in pool.results_ready().values() if v is True])
print(f"{n_ready}/{len(pool.results)} chunks (size {CHUNKSZ})")
s_per_obj = n_ready / watch.total
s_rem = s_per_obj * (len(pool.results) - n_ready)
hours_rem = s_rem / 3600
print(round(s_per_obj/CHUNKSZ, 3), 's per file')
print(f"{int(hours)}h {round(hours % 1 * 60, 1)}m elapsed")
print(f"{int(hours_rem)}h {round(hours_rem % 1 * 60, 1)}m remaining")