# WebSky bright source catalog workflow


In [1]:
import h5py
import numpy as np
import healpy as hp
import matplotlib.pyplot as plt
from tqdm import tqdm



In [2]:
import os

# for jupyter.nersc.gov otherwise the notebook only uses 2 cores
num_threads = 128
os.environ["OMP_NUM_THREADS"] = "1"

In [3]:
cutoff_flux = 1e-3

In [4]:
output_filename = "websky_high_flux_catalog_1mJy.h5"

In [5]:
plot = False

In [6]:
cd /global/cfs/cdirs/sobs/www/users/Radio_WebSky/matched_catalogs_2

/global/cfs/cdirs/sobs/www/users/Radio_WebSky/matched_catalogs_2


In [7]:
%ls

catalog_100.0.h5  catalog_232.0.h5  catalog_353.0.h5  catalog_643.0.h5
catalog_111.0.h5  catalog_24.5.h5   catalog_375.0.h5  catalog_67.8.h5
catalog_129.0.h5  catalog_256.0.h5  catalog_409.0.h5  catalog_70.0.h5
catalog_143.0.h5  catalog_27.3.h5   catalog_41.7.h5   catalog_729.0.h5
catalog_153.0.h5  catalog_275.0.h5  catalog_44.0.h5   catalog_73.7.h5
catalog_164.0.h5  catalog_294.0.h5  catalog_467.0.h5  catalog_79.6.h5
catalog_18.7.h5   catalog_30.0.h5   catalog_47.4.h5   catalog_817.0.h5
catalog_189.0.h5  catalog_306.0.h5  catalog_525.0.h5  catalog_857.0.h5
catalog_21.6.h5   catalog_314.0.h5  catalog_545.0.h5  catalog_90.2.h5
catalog_210.0.h5  catalog_340.0.h5  catalog_584.0.h5  catalog_906.0.h5
catalog_217.0.h5  catalog_35.9.h5   catalog_63.9.h5


In [8]:
freqs = [
    "18.7",
    "24.5",
    "44.0",
    "70.0",
    "100.0",
    "143.0",
    "217.0",
    "353.0",
    "545.0",
    "643.0",
    "729.0",
    "857.0",
    "906.0",
]

In [9]:
cat = h5py.File("catalog_100.0.h5", "r")

There are no metadata in the file, I guess fluxes are in `Jy`

In [10]:
high_flux_sources_mask = cat["flux"][:] > cutoff_flux

In [11]:
cat["polarized flux"][:3]

array([1.42910628e-09, 1.99535624e-08, 2.29563857e-09], dtype='>f8')

In [12]:
(high_flux_sources_mask).sum()

372255

In [13]:
high_flux_sources_mask.mean() * 100

0.13211945911740433

In [14]:
for k, v in cat.items():
    print(k, v[:3])

flux [3.24291534e-07 3.16862867e-07 3.17171157e-07]
phi [3.22861886 3.22861886 3.22861886]
polarized flux [1.42910628e-09 1.99535624e-08 2.29563857e-09]
theta [1.64009452 1.64009452 1.64009452]


In [15]:
(all_indices,) = np.nonzero(high_flux_sources_mask)

In [16]:
len(all_indices)

372255

In [17]:
#all_indices = np.array(sorted(all_indices))
all_indices = np.array(all_indices)

Generate 1 source only

```
argmax = np.array(cat["flux"]).argmax()
all_indices = np.array([argmax])
high_flux_sources_mask = high_flux_sources_mask * 0
high_flux_sources_mask[argmax] = 1
```

In [18]:
import pandas as pd
import xarray as xr

In [19]:
columns = ["theta", "phi", "flux", "polarized flux"]

In [20]:
flux = xr.DataArray(
    data=np.zeros((len(all_indices), len(freqs)), dtype=np.float64),
    coords={"index": all_indices, "freq": list(map(float, freqs))},
    name="flux",
)
fluxnorm = flux.copy()

In [21]:
polarized_flux = flux.copy()

In [22]:
sources_xr = xr.Dataset(
    {"flux": flux, "polarized_flux": polarized_flux, "fluxnorm": fluxnorm}
)
for freq in tqdm(freqs):
    cat = h5py.File(f"catalog_{freq}.h5", "r")
    for column in ["flux", "polarized_flux"]:
        sources_xr[column].loc[dict(index=all_indices, freq=float(freq))] = cat[
            column.replace("_", " ")
        ][high_flux_sources_mask]

100%|██████████| 13/13 [00:20<00:00,  1.54s/it]


In [23]:
# sources_xr = sources_xr.sortby(sources_xr.flux.loc[dict(freq=float(freqs[0]))])

In [24]:
sources_xr.coords["index"] = np.arange(len(sources_xr.coords["index"]))

In [25]:
if plot:
    for s in tqdm(range(len(all_indices))):
        sources_xr["fluxnorm"].loc[dict(index=s)] = sources_xr["flux"].loc[
            dict(index=s)
        ] / sources_xr["flux"].loc[dict(index=s)].sel(freq=100)

In [26]:
#print(sources_xr["fluxnorm"].loc[dict(index=s)], sources_xr["flux"].loc[dict(index=s)])

In [27]:
if plot:
    #sources_xr.fluxnorm.plot(vmin=0, vmax=100)

    plt.figure()
    sources_xr.flux.plot(vmin=0, vmax=100)

In [28]:
sources_xr["logpolycoefflux"] = xr.DataArray(
    np.zeros((len(all_indices), 5), dtype=np.float64),
    dims=["index", "power"],
    coords={"power": np.arange(5)[::-1]},
)
sources_xr["logpolycoefpolflux"] = sources_xr["logpolycoefflux"].copy()

if plot:
    sources_xr["logpolycoefnorm"] = sources_xr["logpolycoefflux"].copy()

In [29]:
from numba import njit

@njit
def model(freq, a, b, c, d, e):
    log_freq = np.log(freq)
    return a * log_freq**4 + b * log_freq**3 + c * log_freq**2 + d * log_freq + e

In [30]:
from scipy.optimize import curve_fit

In [31]:
from dask.distributed import Client, LocalCluster

cluster = LocalCluster(n_workers=32, threads_per_worker=2, processes=True)
client = Client(cluster)

In [32]:
print(client)

<Client: 'tcp://127.0.0.1:41433' processes=32 threads=64, memory=503.14 GiB>


In [33]:
s=0
sources_xr["flux"].sel(index=s).data

array([0.02853885, 0.01962042, 0.0104193 , 0.00548068, 0.00337772,
       0.00218903, 0.0014772 , 0.00095432, 0.00064614, 0.000557  ,
       0.00049761, 0.00043035, 0.00040939])

In [34]:
def run_curve_fit_factory(field):
    def run_curve_fit(s):
        return curve_fit(
            model, sources_xr.coords["freq"].data, sources_xr[field].sel(index=s).data
        )[0]
    return run_curve_fit

In [35]:
from dask.diagnostics import ProgressBar

In [36]:
import dask.bag as db

In [37]:
from dask.distributed import progress


In [38]:
%%time

sources_xr["logpolycoefflux"].data = db.range(len(all_indices), npartitions=num_threads).map(run_curve_fit_factory("flux")).compute()

This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.




CPU times: user 18.7 s, sys: 11.5 s, total: 30.2 s
Wall time: 39 s


In [39]:
%%time

sources_xr["logpolycoefpolflux"].data = db.range(len(all_indices), npartitions=num_threads).map(run_curve_fit_factory("polarized_flux")).compute()

This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


CPU times: user 16.2 s, sys: 10.1 s, total: 26.3 s
Wall time: 33.3 s


In [40]:
%%time

if plot:
    sources_xr["logpolycoefnorm"].data = db.range(len(all_indices), npartitions=num_threads).map(run_curve_fit_factory("fluxnorm")).compute()

CPU times: user 3 µs, sys: 3 µs, total: 6 µs
Wall time: 11 µs


In [41]:
# for s in range(len(all_indices)):
#     sources_xr["logpolycoefflux"].loc[dict(index=s)], cov = curve_fit(
#         model, sources_xr.coords["freq"], sources_xr.flux.sel(index=s)
#     )
#     sources_xr["logpolycoefpolflux"].loc[dict(index=s)], cov = curve_fit(
#         model, sources_xr.coords["freq"], sources_xr.polarized_flux.sel(index=s)
#     )
#     if plot:
#         sources_xr["logpolycoefnorm"].loc[dict(index=s)], cov = curve_fit(
#             model, sources_xr.coords["freq"], sources_xr.fluxnorm.sel(index=s)
#         )

In [42]:
if plot:
    for s in range(len(all_indices)):
        plt.figure()
        sources_xr.flux.sel(index=s).plot(marker="o", linestyle="none")  # , xscale="log")
        sources_xr.fluxnorm.sel(index=s).plot(
            marker="o", linestyle="none"
        )  # , xscale="log")

        plt.loglog(
            sources_xr.coords["freq"],
            model(sources_xr.coords["freq"], *sources_xr.logpolycoefflux.sel(index=s)),
        )
        plt.loglog(
            sources_xr.coords["freq"],
            model(sources_xr.coords["freq"], *sources_xr.logpolycoefnorm.sel(index=s)),
        )
        plt.grid()

In [43]:
sources_xr.logpolycoefflux.min(), sources_xr.logpolycoefflux.max()

(<xarray.DataArray 'logpolycoefflux' ()>
 array(-17557.7742453),
 <xarray.DataArray 'logpolycoefflux' ()>
 array(23993.56673443))

In [44]:
if plot:
    plt.figure(figsize=(12, 5))
    plt.subplot(121)
    sources_xr.logpolycoefflux.plot(vmax=50, vmin=-50)
    plt.subplot(122)
    sources_xr.logpolycoefnorm.plot(vmax=50, vmin=-50)

In [45]:
if plot:
    plt.figure(figsize=(15, 8))

    for power in range(5):
        plt.subplot(231 + power)

        np.fabs(sources_xr.logpolycoefflux.loc[dict(power=power)]).plot.hist(
            bins=np.logspace(-0, 4, 20), density=False, lw=3, label="fluxes"
        )

        np.fabs(sources_xr.logpolycoefnorm.loc[dict(power=power)]).plot.hist(
            bins=np.logspace(-0, 4, 20),
            density=False,
            histtype="step",
            lw=2,
            label="normalized fluxes",
            linestyle="--",
        )
        plt.grid()
        plt.title(f"Power {power}")
        plt.legend()
        plt.xscale("log")
        plt.xlabel(None)

In [46]:
output_catalog = sources_xr[["logpolycoefflux","logpolycoefpolflux"]]

In [47]:
output_catalog["index"] = all_indices

In [48]:
output_catalog.logpolycoefflux.attrs["units"] = "Jy"
output_catalog.logpolycoefpolflux.attrs["units"] = "Jy"

In [49]:
for coord in ["theta", "phi"]:
    output_catalog = output_catalog.assign_coords(
        **{coord:(("index"), cat[coord][high_flux_sources_mask].astype(np.float64))})

In [50]:
output_catalog

In [51]:
output_catalog.coords["theta"].attrs["units"] = "rad"
output_catalog.coords["phi"].attrs["units"] = "rad"

In [52]:
flux_100 = np.polynomial.polynomial.polyval(np.log(100), np.array(output_catalog["logpolycoefflux"])[:,::-1].T, tensor=False)

In [53]:
output_catalog["flux_100"] = np.polynomial.polynomial.polyval(np.log(100), output_catalog["logpolycoefflux"][:,::-1].T, tensor=False)

In [54]:
output_catalog["flux_100"].max()

In [55]:
np.array(output_catalog["logpolycoefflux"]).shape

(372255, 5)

In [56]:
output_catalog = output_catalog.sortby("flux_100", ascending=False)

In [57]:
del output_catalog["flux_100"]

In [58]:
np.polyval(output_catalog.logpolycoefflux[0], np.log(100))

0.003478966281996676

In [59]:
output_catalog.attrs["notes"] = \
"""Catalog of sources where the flux in Jy at any frequency is calculated with a 5th order polynomial in the logarithm of the frequency in GHz, separately for temperature and polarization.
The catalog does not contain information about the polarization angle of a source.
The catalog sorted in descending order based on the source flux at 100 GHz"""

In [60]:
output_catalog

In [61]:
output_catalog.to_netcdf(output_filename, format="NETCDF4") # requires netcdf4 package

In [62]:
%ls -lah $output_filename

-rw-rw---- 1 zonca sobs 37M Sep 19 11:15 websky_high_flux_catalog_1mJy.h5


In [63]:
import xarray

In [64]:
xarray.open_dataset(output_filename)

In [65]:
import h5py
f = h5py.File(output_filename, 'r')
f["logpolycoefflux"]

<HDF5 dataset "logpolycoefflux": shape (372255, 5), type "<f8">

In [66]:
f["logpolycoefflux"].attrs["units"]

b'Jy'

In [67]:
!mv $output_filename ~/p/issues/202405_pysm_catalog_pixell/