In [2]:
import copy
import warnings

import cv2
import numpy as np
import scipy.ndimage
import torch
from astropy.stats import sigma_clipped_stats
from photutils.detection import DAOStarFinder
from scipy import signal
from torch.nn import functional as F

print(f"使われているSciPyのバージョン: {scipy.__version__}")



warnings.resetwarnings()
warnings.simplefilter("ignore")

使われているSciPyのバージョン: 1.15.3


In [3]:
def norm_rp(data, nan_data_dim=None):
    if nan_data_dim is not None:
        data_min = np.nanmin(nan_data_dim)
        std = np.nanstd(nan_data_dim)
        mean = np.nanmean(nan_data_dim)
    else:
        data_min = np.nanmin(data)
        std = np.nanstd(data)
        mean = np.nanmean(data)
    data -= data_min
    max_ = std * 3 + mean

    data[data > max_] = max_
    data /= max_
    return data


def normalize_rp(array, r_header, g_header):
    """
    Input : (y, x, 2 or 3)
    Output: (y ,x, 2 or 3)
    """
    gauss_list = []
    dims = array.shape[2]
    for dim in range(dims):
        cut_data_k = array[:, :, dim]
        if dim == 0 or dim == 2:
            cut_data_k_ = norm_rp(cut_data_k)
            gauss_list.append(cut_data_k_[:, :, None])
        else:
            nan_data = remove_peak(cut_data_k, dim, r_header, g_header)
            cut_data_k_ = norm_rp(cut_data_k, nan_data)
            gauss_list.append(cut_data_k_[:, :, None])
    cut_data = np.concatenate(gauss_list, axis=2)

    return cut_data


def remove_peak(array, dim, r_resolution, g_resolution):
    data = array.copy()
    mean, median, std = sigma_clipped_stats(data, sigma=3)
    if dim == 0:
        fwhm_arcsec = 6
        fwhm_pixel = fwhm_arcsec / r_resolution
    elif dim == 1:
        fwhm_arcsec = 1.98
        fwhm_pixel = fwhm_arcsec / g_resolution

    daofind = DAOStarFinder(fwhm=fwhm_pixel, threshold=mean + 3 * std)
    sources = daofind(data)
    try:
        positions = np.transpose((sources["xcentroid"], sources["ycentroid"]))
        same_shape_zero = np.zeros_like(data)
        for y, x in positions:
            same_shape_zero = cv2.circle(same_shape_zero, (int(y), int(x)), int(4), (255, 255, 255), -1)

        data[same_shape_zero == same_shape_zero.max()] = np.nan
        return data
    except:
        return data


def resize(data, size):
    """
    Resize data to the specified size.

    Input  :（y, x, 2 or 3）
    Output :（size ,size, 2 or 3）
    """
    cut_data = np.swapaxes(data, 1, 2)
    cut_data = np.swapaxes(cut_data, 0, 1)
    cut_data = torch.from_numpy(cut_data)
    cut_data = cut_data.unsqueeze(0)
    resize_data = F.interpolate(cut_data, (size, size), mode="bilinear", align_corners=False)
    resize_data = np.squeeze(resize_data.detach().numpy())

    resize_data_ = np.swapaxes(resize_data, 0, 1)
    resize_data_ = np.swapaxes(resize_data_, 1, 2)
    return resize_data_

In [4]:
def norm_res(data, r_header, g_header):
    """
    Cuts the data,
    and performs normalization and resizing.
    """
    # shape_y = data.shape[0]
    # shape_x = data.shape[1]
    # data = data[int(shape_y / 4) : int(shape_y * 3 / 4), int(shape_x / 4) : int(shape_x * 3 / 4)]
    data_ = copy.deepcopy(data)
    data_ = normalize_rp(data_, r_header, g_header)
    data_ = resize(data_, 300)

    return data_


In [5]:
def conv(obj_size, obj_sig, data):
    """
    Input size of data↓
    Input: (y, x, 2 or 3)
    Output: (size, size, 2 or 3)
    -------------------------------
    If the cut-out data is larger than obj_size, perform smoothing.
    If it's smaller, return it as is.
    """

    if data.shape[0] > obj_size:
        fwhm = (data.shape[0] / obj_size) * 2
        sig3 = fwhm / (2 * (2 * np.log(2)) ** (1 / 2))
        sig2 = (sig3**2 - obj_sig**2) ** (1 / 2)

        # ===== ここに以下の2行を追加してください =====
        print(f"DEBUG: type of signal is {type(signal)}")
        print(f"DEBUG: dir of signal is {dir(signal)}")
        # ==========================================

        kernel = np.outer(signal.windows.gaussian(8 * round(sig2) + 1, sig2), signal.windows.gaussian(8 * round(sig2) + 1, sig2))
        kernel1 = kernel / np.sum(kernel)

        conv_list = []
        for k in range(data.shape[2]):
            cut_data_k = data[:, :, k]
            lurred_k = signal.fftconvolve(cut_data_k, kernel1, mode="same")
            conv_list.append(lurred_k[:, :, None])

        pi = np.concatenate(conv_list, axis=2)
    else:
        pi = data
    return pi
