In [2]:
%load_ext autoreload

%autoreload 2

In [3]:
import os
from glob import glob
import subprocess
import shutil
import traceback
import random
import pickle

import joblib
from concurrent.futures import ProcessPoolExecutor

from backports import tempfile

import matplotlib
matplotlib.rcParams['savefig.dpi'] = 144
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt

import psrchive

import pipe

In [4]:
data_location = "/nimrod1/aarchiba/0337+17"
data_location_name = data_location+"/{telescope}/{epoch}"

In [5]:
wsrt_data = "/nimrod1/aarchiba/0337+17/WSRT/"
if not os.path.exists(wsrt_data):
    os.mkdir(wsrt_data)

In [15]:
par_db = pipe.EphemerisCollection(directory="/nimrod1/aarchiba/0337+17/ephemerides/")

In [19]:
wsrt_raw_location = "/nimrod1/aarchiba/0337+17/WSRT-raw"
wsrt_obs_glob = wsrt_raw_location+"/*/*0337*/"

class DiscontinuityError(ValueError):
    pass

def max_smearing(F,G):
    P = F.get_Integration(0).get_folding_period()
    deltas = []
    for i in range(len(F)):
        d = (G.get_Integration(i).get_epoch()-F.get_Integration(i).get_epoch()).in_days()*86400
        deltas.append(d/P)
    deltas = np.array(deltas)

    smear = np.diff(deltas)
    smear = (smear+0.5) % 1 - 0.5
    
    return np.amax(smear)

def clear_wsrt_day(d, success=False, failure=True):
    success_file = d+"/success"
    failure_file = d+"/failure.pickle"
    if success and os.path.exists(success_file):
        os.unlink(success_file)
    if failure and os.path.exists(failure_file):
        os.unlink(failure_file)

def process_wsrt_day(d):
    success_file = d+"/success"
    failure_file = d+"/failure.pickle"
    if os.path.exists(success_file):
        with open(success_file, "rb") as f:
            return pickle.load(f)
    if os.path.exists(failure_file):
        with open(failure_file, "rb") as f:
            raise pickle.load(f)
    with tempfile.TemporaryDirectory() as td:
        try:
            extra_info = dict(day=d, max_smearing=0)
            for s in glob(d+"/Band*"):
                b = s.split("/")[-1]
                tadd = td+"/"+b+".tadd.ar"
                pipe.check_call(["psradd", "-o", tadd]+glob(s+"/u*.ar"))
                F = psrchive.Archive_load(tadd)

                mjd = (F.get_Integration(0).get_start_time().in_days()
                    +F.get_Integration(len(F)-1).get_end_time().in_days())/2
                par = par_db.get_par_for(mjd)
                pipe.check_call(["pam", "-E", par, "-e", "align.ar", tadd])
                align = td+"/"+b+".tadd.align.ar"
                G = psrchive.Archive_load(align)
                
                s = max_smearing(F,G)
                extra_info["max_smearing"] = max(extra_info["max_smearing"], s)
                extra_info["mjd"] = mjd
                extra_info["par"] = par
                
                del F
                del G
            pipe.check_call(["psradd", "-o", td+"/raw.ar", "-R"]+glob(td+"/Band*.tadd.ar"))
            shutil.copy(td+"/raw.ar", "raw.ar")
            F = psrchive.Archive_load(td+"/raw.ar")
            l = (F.end_time()-F.start_time()).in_seconds()
            M = "%.1f" % (((F.end_time()+F.start_time())).in_days()/2)
            #T = F.get_nsubint()*F.get_first_Integration().get_duration()
            T = F.integration_length()
            extra_info["length"] = T
            extra_info["centre_frequency"] = F.get_centre_frequency()
            extra_info["P"] = F.get_Integration(0).get_folding_period()
            extra_info["file"] = M
            del F
            if np.abs(l-T)> 5:
                raise DiscontinuityError("Observation in directory %s appears not to be contiguous: "
                                 "total integration time %f but start-to-end time %f"
                                 % (d,T,l))
            shutil.copy(td+"/raw.ar", wsrt_data+"wsrt_%s.raw.ar" % M)
            with open(success_file,"wb") as f:
                pickle.dump(extra_info, f)
            return extra_info
        except Exception as e:
            with open(failure_file, "wb") as f:
                pickle.dump(e,f)
            raise



In [20]:
d = "/nimrod1/aarchiba/0337+17/WSRT-raw/11Apr2012/J0337+1715-1380/"

clear_wsrt_day(d)
process_wsrt_day(d)

CalledProcessError: CalledProcessError(
            cmd=['psradd', '-o', '/tmp/tmpO6IaPw/raw.ar', '-R', '/tmp/tmpO6IaPw/Band7.tadd.ar', '/tmp/tmpO6IaPw/Band1.tadd.ar', '/tmp/tmpO6IaPw/Band3.tadd.ar', '/tmp/tmpO6IaPw/Band4.tadd.ar', '/tmp/tmpO6IaPw/Band5.tadd.ar', '/tmp/tmpO6IaPw/Band6.tadd.ar', '/tmp/tmpO6IaPw/Band2.tadd.ar', '/tmp/tmpO6IaPw/Band0.tadd.ar'],
            returncode=255,
            output='''''',
            error='''psradd: shell not available; insufficient resources
''')

In [10]:
g = list(glob(wsrt_obs_glob))
random.shuffle(g)
s = 0
for d in g:
    print d
    try:
        M = process_wsrt_day(d)
        print "success;", M
        s += 1
    except subprocess.CalledProcessError as e:
        print "psradd failure", e
    except DiscontinuityError:
        traceback.print_exc()
print s, len(g)

/nimrod1/aarchiba/0337+17/WSRT-raw/25Apr2012/J0337+1715-1380a/
success; 56042.3
/nimrod1/aarchiba/0337+17/WSRT-raw/05Mar2012/J0337+1715-1380b/
success; 55991.8
/nimrod1/aarchiba/0337+17/WSRT-raw/01Feb2013/J0337+1715-1380/
psradd failure CalledProcessError(
            cmd=['psradd', '-o', '/tmp/tmppXLvd6/raw.ar', '-R', '/tmp/tmppXLvd6/Band7.tadd.ar', '/tmp/tmppXLvd6/Band1.tadd.ar', '/tmp/tmppXLvd6/Band3.tadd.ar', '/tmp/tmppXLvd6/Band4.tadd.ar', '/tmp/tmppXLvd6/Band5.tadd.ar', '/tmp/tmppXLvd6/Band6.tadd.ar', '/tmp/tmppXLvd6/Band2.tadd.ar', '/tmp/tmppXLvd6/Band0.tadd.ar'],
            returncode=255,
            output='''''',
            error='''psradd: no polynomials loaded

see http://psrchive.sourceforge.net/qa/?qa=95/no-polynomials-loaded
''')
/nimrod1/aarchiba/0337+17/WSRT-raw/10Dec2012/J0337+1715-1380/
psradd failure CalledProcessError(
            cmd=['psradd', '-o', '/tmp/tmpRjl6UU/raw.ar', '-R', '/tmp/tmpRjl6UU/Band7.tadd.ar', '/tmp/tmpRjl6UU/Band1.tadd.ar', '/tmp/tmpRjl6UU/B

OSError: [Errno 12] Cannot allocate memory

In [11]:
if False:
    # Mystery failures
    g = list(glob(wsrt_obs_glob))
    random.shuffle(g)
    def do_one(d):
        try:
            return process_wsrt_day(d)
        except subprocess.CalledProcessError:
            return "psradd problem with %s" % d
        except DiscontinuityError:
            return "discontinuity problem with %s" % d
    joblib.Parallel(n_jobs=-1)(joblib.delayed(do_one)(d) for d in g)

In [12]:
if False:
    # Mystery failures
    g = list(glob(wsrt_obs_glob))
    random.shuffle(g)
    def do_one(d):
        try:
            return process_wsrt_day(d)
        except subprocess.CalledProcessError:
            return "psradd problem with %s" % d
        except DiscontinuityError:
            return "discontinuity problem with %s" % d
    with ProcessPoolExecutor() as P:
        list(P.map(do_one,g))

Now the challenge is to fix as many of the failed observations as possible

In [8]:
F = psrchive.Archive_load(wsrt_data+"/wsrt_55956.6.raw.ar")

In [12]:
F.thisown

True

In [7]:
!ls /nimrod1/aarchiba/0337+17/WSRT/

wsrt_55956.6.raw.ar  wsrt_56039.7.raw.ar  wsrt_56199.2.raw.ar
wsrt_55956.7.raw.ar  wsrt_56042.3.raw.ar  wsrt_56199.3.raw.ar
wsrt_55959.7.raw.ar  wsrt_56044.3.raw.ar  wsrt_56200.3.raw.ar
wsrt_55961.9.raw.ar  wsrt_56044.5.raw.ar  wsrt_56200.9.raw.ar
wsrt_55966.8.raw.ar  wsrt_56045.7.raw.ar  wsrt_56202.3.raw.ar
wsrt_55967.8.raw.ar  wsrt_56048.7.raw.ar  wsrt_56204.3.raw.ar
wsrt_55968.8.raw.ar  wsrt_56049.6.raw.ar  wsrt_56206.3.raw.ar
wsrt_55972.7.raw.ar  wsrt_56050.3.raw.ar  wsrt_56206.8.raw.ar
wsrt_55977.8.raw.ar  wsrt_56055.3.raw.ar  wsrt_56208.2.raw.ar
wsrt_55982.9.raw.ar  wsrt_56056.3.raw.ar  wsrt_56211.8.raw.ar
wsrt_55987.7.raw.ar  wsrt_56056.7.raw.ar  wsrt_56213.8.raw.ar
wsrt_55988.9.raw.ar  wsrt_56057.3.raw.ar  wsrt_56214.8.raw.ar
wsrt_55989.9.raw.ar  wsrt_56061.3.raw.ar  wsrt_56216.8.raw.ar
wsrt_55990.6.raw.ar  wsrt_56064.3.raw.ar  wsrt_56217.8.raw.ar
wsrt_55991.6.raw.ar  wsrt_56067.5.raw.ar  wsrt_56220.8.raw.ar
wsrt_55991.8.raw.ar  wsrt_56070.2.raw.ar  wsrt_56221.8.