In [1]:
%%configure -f
{"name": "arik-load-logrss", "executorMemory": "26G", "numExecutors": 8, "executorCores": 4,
 "conf": {"spark.yarn.appMasterEnv.PYSPARK_PYTHON":"python3"}}

In [2]:
import sys
import os
import subprocess
from io import BytesIO
from gzip import GzipFile
from pyspark.sql import Row
import glob
import time

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
169,application_1580142637008_0175,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
def get_fits_module():
    if 'astropy' not in sys.modules:
        stdout = subprocess.check_output(
            sys.executable + ' -m pip install astropy',
            stderr=subprocess.STDOUT,
            shell=True).decode('utf-8')
    from astropy.io import fits
    return fits

def getfits(compressed_data):
    fits = get_fits_module()
    decomp = GzipFile(fileobj=BytesIO(compressed_data)).read()
    fits_obj = fits.open(BytesIO(decomp))
    return fits_obj

def getfitslocal(path):
    fits = get_fits_module()
    fits_obj = fits.open(path)
    return fits_obj

# Spark dataframe cannot do numpy types
def typeconv(i):
    try:
        return i.item()
    except:
        return i

def headerDict(fits):
    return dict(fits[0].header.items())

def createFiberExposureRows(fits):
    obsinfo = fits['OBSINFO']
    obsinfo_columns = [i.name for i in obsinfo.columns]
    n_exposures = len(obsinfo.data)
    n_fibers = int(len(fits['FLUX'].data)/n_exposures)
    rows = []
    for exp in range(n_exposures):
        expinfo = dict(zip(obsinfo_columns, [typeconv(i) for i in obsinfo.data[exp]]))
        for fiber in range(n_fibers):
            ind = exp*n_fibers + fiber
            row = headerDict(fits)
            row.update(expinfo)
            row['EXPOSURE_INDEX'] = exp
            row['FIBER_INDEX'] = fiber
            # Per fiber/exposure data
            for unit in ['FLUX', 'XPOS', 'YPOS', 'IVAR', 'MASK', 'DISP']:
                row[unit] = fits[unit].data[ind].tolist()
            # references all spectra
            for unit in ['WAVE', 'SPECRES', 'SPECRESD']:
                row[unit] = fits[unit].data.tolist()
            # for convenience, store the mean fiber positions
            row['XPOS_MEAN'] = fits['XPOS'].data[ind].mean().tolist()
            row['YPOS_MEAN'] = fits['YPOS'].data[ind].mean().tolist()
            rows.append(Row(**row))
    return rows

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Read local fits to parquet
Here we scan all directories that have a stack subdir and identify all LOGRSS files within them. Each file will be given it's own partition, so first we scan for the total dataset size including number of files, sum of filesize and max filesize (this will determine how much memory tasks need).

In [None]:
base_dir = '/sciserver/vc/manga/vc/sas/dr15/manga/spectro/redux/v2_4_3/'
stack_dir = lambda x: base_dir + x + '/stack'
rss_dirs = [stack_dir(i) for i in
            os.listdir(base_dir)
            if os.path.isdir(stack_dir(i))
           ]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
rss_files_rdd = sc.parallelize(rss_dirs).flatMap(lambda d: glob.glob(d+'/*-LOGRSS.fits.gz'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
files_stats = rss_files_rdd.map(
    lambda x: (os.stat(x).st_size/1024/1024, 1, os.stat(x).st_size/1024/1024)
).reduce(
    lambda x,y: (x[0]+y[0], x[1]+y[1], max(x[2],y[2]))
)
files_stats

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(319175.72129917145, 4857, 197.0564022064209)

In [None]:
n_part = int(files_stats[1])
fiber_exposures = rss_files_rdd.repartition(n_part).map(getfitslocal).flatMap(createFiberExposureRows)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### WARNING
This is a big job. It ends up creating a bit over 500GB of parquet files, which are much larger in memory and with column format this needs to be stored in RAM of tasks. If there are problems, it is quite likely not enough memory is allocated.

In [None]:
t = time.time()
hdfs_dir = 'hdfs:///manga/arik-test/dr15/v2_4_3/logrss'
table = spark.createDataFrame(fiber_exposures)
table.write.mode('overwrite').parquet(hdfs_dir)
print(time.time()-t)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…