In [1]:
import os
import io
import gzip
import glob
import numpy as np
import pandas as pd
import tarfile
import matplotlib.pyplot as plt

from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
import fastavro

from astropy.time import Time
from astropy.io import fits
import aplpy
%matplotlib inline

In [15]:
## Download ZTF data to epyc directly

# ! curl -o /epyc/users/mmckay/ztf_public_20200402.tar.gz https://ztf.uw.edu/alerts/public/ztf_public_20200402.tar.gz

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 8635M  100 8635M    0     0   351M      0  0:00:24  0:00:24 --:--:--  368M


In [6]:
#Copy ROSAT catalog from epyc to local directory
#os.system('cp  /epyc/data/rosat_2rxs/cat2rxs.fits /epyc/users/mmckay/')

0

In [2]:
# Load ROSAT catalog
rosat_fits = fits.open('/epyc/users/mmckay/cat2rxs.fits')

# Make ROSAT data into a pandas dataframe
rosat_data = rosat_fits[1].data
dfx = pd.DataFrame(rosat_data)

# exclude sources that are not observable
dfx = dfx[dfx.DEC_DEG >= -30] 
dfx

Unnamed: 0,IAU_NAME,SEQ_ID,IND_DET,EXI_ML,CTS,CERR,RATE,ERATE,EXPOSURE,RA_DEG,...,CM_plus_CD,SPX,LCX,OBS_CLOCK_1,OBS_CLOCK_2,OBS_DATE_1,OBS_UT_1,OBS_DATE_2,OBS_UT_2,IND_2RXS
0,2RXS J121400.6+871149,930101,1,6.584808,6.252749,3.166993,0.006862,0.003475,911.271118,183.502866,...,,0,0,9239846.0,9671202.0,16-SEP-1990,19:44:15.5,21-SEP-1990,19:33:31.5,1
1,2RXS J111625.8+871159,930101,2,14.181885,16.618881,5.363760,0.019012,0.006136,874.139221,169.107785,...,0.084769,0,1,9216762.0,9619268.0,16-SEP-1990,13:19:31.5,21-SEP-1990,05:07:57.5,2
2,2RXS J111220.4+872612,930101,5,13.600342,13.386649,4.803939,0.015127,0.005429,884.943909,168.085245,...,0.081228,0,1,9164879.0,9584677.0,15-SEP-1990,22:54:48.5,20-SEP-1990,19:31:26.5,5
3,2RXS J094825.7+871344,930101,8,7.029663,12.105028,4.868176,0.014262,0.005736,848.755615,147.107123,...,0.055156,0,1,9112959.0,9515484.0,15-SEP-1990,08:29:28.5,20-SEP-1990,00:18:13.5,8
4,2RXS J121506.0+874154,930101,10,210.758667,124.229118,12.248785,0.134953,0.013306,920.533447,183.775026,...,0.254351,1,1,9124511.0,9555845.0,15-SEP-1990,11:42:00.5,20-SEP-1990,11:30:54.5,10
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
104222,2RXS J235836.1-294822,932258,179,9.701111,10.387036,4.228204,0.033249,0.013535,312.396912,359.650697,...,,0,0,15604490.0,15788874.0,29-NOV-1990,11:41:38.9,01-DEC-1990,14:54:42.9,169093
104223,2RXS J235827.9-294920,932258,180,8.675110,10.414165,4.314663,0.033398,0.013837,311.823120,359.616660,...,,0,0,15604488.0,15783115.0,29-NOV-1990,11:41:36.9,01-DEC-1990,13:18:43.9,169094
104224,2RXS J234722.3-295357,932258,182,7.887024,10.720377,4.363371,0.032700,0.013309,327.841858,356.843048,...,0.162858,0,1,15397028.0,15569920.0,27-NOV-1990,02:03:56.9,29-NOV-1990,02:05:28.9,169096
104225,2RXS J234443.6-295644,932258,183,14.752258,13.816039,4.571136,0.041312,0.013668,334.433746,356.181792,...,0.116295,0,1,15333657.0,15518084.0,26-NOV-1990,08:27:46.0,28-NOV-1990,11:41:32.9,169097


In [3]:
# append ROSAT error position [arcsec] 
dfx['err_pos_arcsec'] = np.sqrt(((dfx.XERR*45)**2.+ (dfx.YERR*45)**2.) + 0.6**2.)
dfx.err_pos_arcsec

0         12.209580
1         13.745840
2         11.233130
3         28.711039
4          5.574358
            ...    
104222    18.148684
104223    19.329465
104224    24.463706
104225    14.669044
104226    33.828258
Name: err_pos_arcsec, Length: 103351, dtype: float64

In [4]:
# List of ROSAT RA and DEC
rosat_ra_list = dfx.RA_DEG
rosat_dec_list = dfx.DEC_DEG

# Extract avro data from ZTF

In [26]:
# Extract avro data from the tar file to 
# *current working directory(Where this notebook is located)*
#tar_archive = '/epyc/users/mmckay/ztf_public_20200402.tar.gz'
#output_dir = tar_archive.split('/')[-1].split('.')[-3]
#archive = tarfile.open(tar_archive,'r:gz')
#archive.extractall(path=output_dir)
#archive.close()

In [5]:
#Count packets
def find_files(root_dir):
    for dir_name, subdir_list, file_list in os.walk(root_dir, followlinks=True):
        for fname in file_list:
            if fname.endswith('.avro'):
                yield dir_name+'/'+fname

In [6]:
ztf_data_dir = '/epyc/users/mmckay/alert_stream_crossmatch/ztf_public_20200402'
print('{} has {} avro files'.format(ztf_data_dir, len(list(find_files(ztf_data_dir)))))

/epyc/users/mmckay/alert_stream_crossmatch/ztf_public_20200402 has 200418 avro files


In [7]:
#files_gen = find_files(ztf_data_dir)
#fname = next(files_gen)
#fname

In [8]:
#Make list of avro files
fname_list = glob.glob('{}/*avro'.format(ztf_data_dir))
fname_list

['/epyc/users/mmckay/alert_stream_crossmatch/ztf_public_20200402/1187128334515010008.avro',
 '/epyc/users/mmckay/alert_stream_crossmatch/ztf_public_20200402/1187128331015010031.avro',
 '/epyc/users/mmckay/alert_stream_crossmatch/ztf_public_20200402/1187128330915015011.avro',
 '/epyc/users/mmckay/alert_stream_crossmatch/ztf_public_20200402/1187128330915010021.avro',
 '/epyc/users/mmckay/alert_stream_crossmatch/ztf_public_20200402/1187128333515015029.avro',
 '/epyc/users/mmckay/alert_stream_crossmatch/ztf_public_20200402/1187128334615015003.avro',
 '/epyc/users/mmckay/alert_stream_crossmatch/ztf_public_20200402/1187128332615010039.avro',
 '/epyc/users/mmckay/alert_stream_crossmatch/ztf_public_20200402/1187128332415015007.avro',
 '/epyc/users/mmckay/alert_stream_crossmatch/ztf_public_20200402/1187128334415015011.avro',
 '/epyc/users/mmckay/alert_stream_crossmatch/ztf_public_20200402/1187128332815015016.avro',
 '/epyc/users/mmckay/alert_stream_crossmatch/ztf_public_20200402/118712833251501

In [12]:
#Make packet into a dataframe
def make_dataframe(packet):
    df = pd.DataFrame(packet['candidate'], index=[0])
    df_prv = pd.DataFrame(packet['prv_candidates'])
    return pd.concat([df,df_prv], ignore_index=True)


def extract_avro_data(fname):
    with open(fname,'rb') as f:
        freader = fastavro.reader(f)
        schema = freader.writer_schema # freader.schema - depercated 
    
        for packet in freader:
            pass
            #print(packet.keys())
    #Make packet into a dataframe   
    dflc = make_dataframe(packet)
    avro_ra = dflc.ra[0]
    avro_dec = dflc.dec[0]
    return avro_ra, avro_dec
        

In [17]:
%%time 
from tqdm import tqdm
from tqdm.notebook import tqdm
avro_ra_list = []
avro_dec_list = []
for avro in tqdm(fname_list[:10]):
    ra, dec = extract_avro_data(avro)
    avro_ra_list.append(ra)
    avro_dec_list.append(dec)
    print('RA: {}, DEC: {}'.format(ra, dec))
    #if ra==ra_0 and dec==dec_0:
    #    print('skip')
    #print(ra, dec)

HBox(children=(FloatProgress(value=0.0, max=10.0), HTML(value='')))

RA: 104.6744271, DEC: -15.3390104
RA: 106.247876, DEC: -20.5182166
RA: 106.1343784, DEC: -19.6293488
RA: 106.4677208, DEC: -19.4312591
RA: 111.2721758, DEC: -16.7549467
RA: 104.902724, DEC: -16.4319291
RA: 105.9973217, DEC: -18.7481326
RA: 107.4867928, DEC: -17.9728991
RA: 105.2964855, DEC: -15.9753838
RA: 105.1938004, DEC: -17.724625

CPU times: user 345 ms, sys: 10.6 ms, total: 356 ms
Wall time: 422 ms


# Multiprocessing - In-progress

In [269]:
# Use multiprocessing to speed up avro data extraction
#from multiprocessing import Pool
#from tqdm import tqdm
#from tqdm.notebook import tqdm
#
#
#def extract_avro_radec(fname_list):
#    avro_ra_list = []
#    avro_dec_list = []
#    for avro in tqdm(fname_list):
#        print(avro)
#        ra, dec = extract_avro_data(avro)
#        avro_ra_list.append(ra)
#        avro_dec_list.append(dec)
#
#def extract_avro_mp(fname_list):
#    chunks = [fname_list[i::5] for i in range(5)] #split the list ogf avro files into 5 seperate chucks
#    print(chunks)
#    
#    pool = Pool(processes=5)
#    
#    result = pool.map(extract_avro_radec, chunks)
#    print(result)
#    return result


In [264]:
#r = extract_avro_mp(fname_list[:4])

[['/epyc/users/mmckay/alert_stream_crossmatch/ztf_public_20200402/1187128334515010008.avro'], ['/epyc/users/mmckay/alert_stream_crossmatch/ztf_public_20200402/1187128331015010031.avro'], ['/epyc/users/mmckay/alert_stream_crossmatch/ztf_public_20200402/1187128330915015011.avro'], ['/epyc/users/mmckay/alert_stream_crossmatch/ztf_public_20200402/1187128330915010021.avro'], ['/epyc/users/mmckay/alert_stream_crossmatch/ztf_public_20200402/1187128333515015029.avro']]


Process ForkPoolWorker-55:
Process ForkPoolWorker-54:
Process ForkPoolWorker-51:
Process ForkPoolWorker-53:
Process ForkPoolWorker-52:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/epyc/opt/anaconda/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/epyc/opt/anaconda/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/epyc/opt/anaconda/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/epyc/opt/anaconda/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/epyc/opt/anaconda/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/epyc/opt/anaconda/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self

KeyboardInterrupt: 

  File "<ipython-input-263-9f2341d9107d>", line 10, in f
    for avro in tqdm(fname_list):
  File "<ipython-input-263-9f2341d9107d>", line 10, in f
    for avro in tqdm(fname_list):
  File "<ipython-input-263-9f2341d9107d>", line 10, in f
    for avro in tqdm(fname_list):
  File "/epyc/opt/anaconda/lib/python3.7/site-packages/tqdm/std.py", line 515, in __new__
    with cls.get_lock():
  File "/epyc/opt/anaconda/lib/python3.7/site-packages/tqdm/std.py", line 515, in __new__
    with cls.get_lock():
  File "/epyc/opt/anaconda/lib/python3.7/site-packages/tqdm/std.py", line 515, in __new__
    with cls.get_lock():
  File "/epyc/opt/anaconda/lib/python3.7/site-packages/tqdm/std.py", line 515, in __new__
    with cls.get_lock():
  File "/epyc/opt/anaconda/lib/python3.7/site-packages/tqdm/std.py", line 515, in __new__
    with cls.get_lock():
  File "/epyc/opt/anaconda/lib/python3.7/site-packages/tqdm/std.py", line 96, in __enter__
    self.acquire()
  File "/epyc/opt/anaconda/lib/python3.7/s

In [275]:
#test = fname_list[:5]
#chunks = [test[i::5] for i in range(5)]
#print(chunks[0][0])
#a = extract_avro_radec(chunks[0][0])
#a

/epyc/users/mmckay/alert_stream_crossmatch/ztf_public_20200402/1187128334515010008.avro


In [276]:
#chunks[1]

['/epyc/users/mmckay/alert_stream_crossmatch/ztf_public_20200402/1187128331015010031.avro']

# Skycoord

In [14]:
from astropy.coordinates import SkyCoord  # High-level coordinates
#from astropy.coordinates import ICRS, Galactic, FK4, FK5  # Low-level frames
#from astropy.coordinates import Angle, Latitude, Longitude  # Angles
import astropy.units as u

In [15]:
# Put ROSAT ra and dec list in SkyCoord [degrees]
rosat_skycoord = SkyCoord(ra = rosat_ra_list, dec=rosat_dec_list, frame='icrs', unit=(u.deg))
rosat_skycoord

<SkyCoord (ICRS): (ra, dec) in deg
    [(183.50286557,  87.19713271), (169.10778487,  87.19988261),
     (168.08524479,  87.43678965), ..., (356.84304802, -29.89944039),
     (356.18179242, -29.94573029), (358.9696836 , -29.97436576)]>

In [233]:
# Check coordinates object is an array
rosat_skycoord.isscalar

False

In [19]:
# Put avro data in SkyCoord 
#ra = dflc.ra[0]
#dec = dflc.dec[0]
#ra = ztf_ra_list
#dec = ztf_dec_list
avro_skycoord = SkyCoord(ra=avro_ra_list, dec=avro_dec_list, frame='icrs', unit=(u.deg))
#c = SkyCoord(ra=ra, dec=dec, frame='icrs', unit=(u.hourangle, u.deg))
avro_skycoord

<SkyCoord (ICRS): (ra, dec) in deg
    [(104.6744271, -15.3390104), (106.247876 , -20.5182166),
     (106.1343784, -19.6293488), (106.4677208, -19.4312591),
     (111.2721758, -16.7549467), (104.902724 , -16.4319291),
     (105.9973217, -18.7481326), (107.4867928, -17.9728991),
     (105.2964855, -15.9753838), (105.1938004, -17.724625 )]>

In [20]:
# Check coordinates object is an array
avro_skycoord.isscalar

False

In [60]:
# Crossmatch function
def ztf_rosat_crossmatch(ztf_ra, ztf_dec, rosat_ra_list, rosat_dec_list, rosat_pos_err):
    """
    Cross match ZTF and ROSAT data in astropy.SkyCoords
    
    Parameters:
    
        ztf_ra: float or list of float
        
        ztf_dec: float or list of float
        
        rosat_ra_list: pandas.series
        
        rosat_dec_list: pandas/series
        
    Return:
        
        ztf_rosat_ra_match: float
        
        ztf_rosat_dec_match: float
        
        match: tuple
            Output from Skycoord.match_to_catalog_sky
        
        
    """
    # Input avro data ra and dec in SkyCoords
    avro_skycoord = SkyCoord(ra=ztf_ra, dec=ztf_dec, frame='icrs', unit=(u.deg))
    
    
    # Input rosat data ra list and dec list in SkyCoords
    rosat_skycoord = SkyCoord(ra = rosat_ra_list, dec=rosat_dec_list, frame='icrs', unit=(u.deg))
    
    # Finds the nearest ROSAT source's coordinates to the avro files ra[deg] and dec[deg]
    match = avro_skycoord.match_to_catalog_sky(catalogcoord=rosat_skycoord, nthneighbor=1)
    i = 0
    for idx, sep2d in tqdm(zip(match[0], match[2])):
        if sep2d <= rosat_pos_err[idx] * 0.000277778:
            match_result = 'Good! sep2d={} deg'.format(sep2d)
        else:
            match_result = 'Not so good sep2d={} deg'.format(sep2d)
            
        
        print('ZTF   RA: {}, DEC: {}'.format(avro_skycoord.ra[i], avro_skycoord.dec[i]))
        print('ROSAT RA: {}, DEC: {}'.format(rosat_skycoord.ra[idx], rosat_skycoord.dec[idx]))
        print('Match Result: {}\n'.format(match_result))
        i += 1
        
    
    
    # Index match indice with rosat coordinates
    #rosat_ra_match = rosat_skycoord.ra.value[idx]
    #rosat_dec_match = rosat_skycoord.dec.value[idx]
    
    #print('ZTF   RA: {} DEC: {} [deg]'.format(ra,dec))
    #print('ROSAT RA: {} DEC: {} [deg]'.format(rosat_ra_match, rosat_dec_match))
    #print('On-Sky seperation between the closet match {} \n'.format(match[1]))
    #return rosat_ra_match, rosat_dec_match, match
    
    

In [61]:
%%time
ztf_rosat_crossmatch(avro_ra_list, avro_dec_list, rosat_ra_list, rosat_dec_list, dfx.err_pos_arcsec)

HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))

ZTF   RA: 104.6744271 deg, DEC: -15.3390104 deg
ROSAT RA: 104.64307171107367 deg, DEC: -15.300799410405327 deg
Match Result: Good! sep2d=0.0008504983396979236 deg

ZTF   RA: 106.247876 deg, DEC: -20.5182166 deg
ROSAT RA: 106.30238473523903 deg, DEC: -20.34752208139387 deg
Match Result: Not so good sep2d=0.003109708020907392 deg

ZTF   RA: 106.1343784 deg, DEC: -19.6293488 deg
ROSAT RA: 106.41869221135951 deg, DEC: -19.65370774930093 deg
Match Result: Good! sep2d=0.004692773991304001 deg

ZTF   RA: 106.4677208 deg, DEC: -19.4312591 deg
ROSAT RA: 106.41869221135951 deg, DEC: -19.65370774930093 deg
Match Result: Good! sep2d=0.003965323588956156 deg

ZTF   RA: 111.2721758 deg, DEC: -16.7549467 deg
ROSAT RA: 111.15870596145815 deg, DEC: -16.741814045701215 deg
Match Result: Good! sep2d=0.0019102127634821862 deg

ZTF   RA: 104.902724 deg, DEC: -16.4319291 deg
ROSAT RA: 105.29124629139554 deg, DEC: -16.49637427909155 deg
Match Result: Not so good sep2d=0.006599497171718853 deg

ZTF   RA: 105.

In [28]:
# testing
match = avro_skycoord.match_to_catalog_sky(catalogcoord=rosat_skycoord, nthneighbor=1)
idx = match[0]

In [29]:
idx

array([90083, 95500, 90150, 90150, 90184, 90101, 90134, 90121, 90097,
       90122])