In [None]:
import os
import glob
import cv2
import numpy as np
import matplotlib
from matplotlib import pylab as plt
#from matplotlib import colors as mc

#from PIL import Image
from astropy.io import fits
from sunpy.map import Map
from sunpy.visualization.colormaps import cm
#from sunpy.coordinates import frames

parameters: [image cut level, gradientcut level, smooth scale] = [0.5, 0.3, 40]

Channels: $\mathrm{H}\alpha$, $\mathrm{Ca}K$, White Light

$\mathrm{H}\alpha$: matplotlib.colormaps['sdoaia304'] or mc.LinearSegmentedColormap.from_list('Ha_cmap', [(0, 'black'), (1, 'orange')])

$\mathrm{Ca}K$: matplotlib.colormaps['sdoaia335'] or mc.LinearSegmentedColormap.from_list('CaK_cmap', [(0, 'black'), (1, 'blue')])

In [None]:
CHANNEL = {'save': 'Ha', 'date': 'HA', 'path': 'ha', 'wavelength': 656.28, 'CCD size': 6420, 'guess radius': 2460, 'cmap': matplotlib.colormaps['sdoaia304'], 'notation': r'SZAO H$\alpha$'}# H\alpha
#CHANNEL = {'save': 'CaK', 'date': 'CAK', 'path': 'cak', 'wavelength': 393.73, 'CCD size': 4208, 'guess radius': 1745, 'cmap': matplotlib.colormaps['sdoaia335'], 'notation': r'SZAO CaK'}# CaK
#CHANNEL = {'save': 'WL', 'date': 'WL', 'path': 'wl', 'wavelength': 'White Light', 'CCD size': 4208, 'guess radius': 1745, 'cmap': 'grey', 'notation': r'SZAO WL'}# White Light
CCD_size = CHANNEL['CCD size']
yyyy = 2025# year
date = f'{yyyy}0826{CHANNEL['date']}/'#### ONLY change date
local_path = '/home/lenovo/solar_data/'
save_matrix_name = os.path.join(local_path, 'alignment/Rotation Matrix', f'{yyyy}0816HA rotation matrix.txt')#### saved at local directory and change ~MONTHLY
path = f'/run/user/1000/gvfs/smb-share:server=szaocloud.local,share=solar_observation/{yyyy}/{CHANNEL['path']}/'
directory = os.path.join(path, date)
save_directory = os.path.join(directory, date)
if not os.path.isdir(save_directory):
    os.mkdir(save_directory)

# load data
#fits_files = os.listdir(directory)
#for file in fits_files:
    #if (('Sun' in file) and ('.fits' in file)):
        #file_list.append(os.path.join(directory, file))
fits_files = glob.glob(os.path.join(directory, '*.fits'))
#file_list = [file for file in fits_files if 'Sun' in file]
file_enumerate = list(enumerate([file for file in fits_files if 'Sun' in file], start=1))
file_num =len(file_enumerate)
print(f'{file_num} .fits files today.')

hdrloc_obs = 0
hdrloc_gong = 1

image_cut_level = 0.5# threshold of luminosity check
grad_cut_level = 0.3# threshold of gradient check, for normal:0.4, for bright limbs or dark/shadowed image:0.3
smooth_scale = CCD_size / 20# smooth image

gong_file = os.path.join(local_path, 'alignment/Reference GONG Maps', '20250816012742Lh.fits.fz')
gong_map = Map(gong_file)
length = 10
origin = 'lower'
dpi = 100
image_extention = 'jpg'# save image extention
plt.rcParams.update({
    'font.family': 'serif',# General font family
    #'font.serif': 'Times New Roman',# Specific serif font
    #'mathtext.fontset': 'custom',
    #'mathtext.rm': 'Times New Roman',# Roman (serif) font for math
    #'mathtext.it': 'Times New Roman:italic',# Italic
    'figure.dpi': dpi,
    #'legend.fontsize': legend_size,
    'grid.alpha': 0.3,
    'savefig.transparent': True
})

Read fits file, construct UTC string and normalize image

In [None]:
def read_fits_file(filename, memmap=True):
    hdul = fits.open(filename, memmap=memmap)
    hdul.info()
    return hdul

def check_image_size(input_image):
    """
    In .fits file, Ny=image.shape[0] and Nx=image.shape[1]
    """
    Ny, Nx = input_image.shape
    return Ny, Nx, np.min([Ny, Nx])

def setup_UTC_string(header, keyword='DATE-AVG'):
    date, time = header[keyword].split('T')
    year, month, day = date.split('-')
    hh, mm, ss_original = time.split(':')
    ss = ss_original[0:2]# take the first two digits, DON'T use int(float(ss))-->might omit the '0' in the first digit
    time = f'{hh}:{mm}:{ss}'
    return date, time, (year, month, day), (hh, mm, ss)

def percents(data, pmin=1, pmax=99):
    """
    pmin, pmax: percentile cutoffs (clip outliers)
    """
    if (pmin < 0) or (pmax > 100):
        raise ValueError("Percentile must be [0, 100]%.")
    
    minimum, maximum = np.percentile(data, [pmin, pmax])
    data_clipped = np.clip(data, minimum, maximum)
    return data_clipped, minimum, maximum

def normalize(input_image, pmin=1, pmax=99, gamma=1.0):
    """
    Normalize and enhance contrast of images.
    gamma: gamma correction
    """
    image_clipped, vmin, vmax = percents(input_image, pmin, pmax)
    image_norm = (image_clipped - vmin) / (vmax - vmin)
    return image_norm**gamma

from skimage.exposure import rescale_intensity
def rescale_image(input_image, out_range=(0,255)):
    image_norm = rescale_intensity(input_image, in_range='image', out_range=out_range).astype(np.uint8)
    return image_norm

>Check memory usage and cleanup

In [None]:
import psutil
def get_memory_usage():
    process = psutil.Process(os.getpid())
    return process.memory_info().rss / (1024**2)# in MB

In [None]:
import time
import gc
def aggressive_memory_cleanup(hdul=None, quiet=True):
    """
    Force comprehensive memory cleanup after processing each file
    """
    # Close the FITS file explicitly
    if (hdul is not None):
        try:
            hdul.close()
        except:
            pass

    # Delete any references, the exact variable name in the workflow
    local_variables = locals()
    for var_name in list(local_variables.keys()):
        if ('hdu' in var_name) or ('data' in var_name) or ('header' in var_name) or ('image' in var_name):# including 'hdul'
            try:
                del local_variables[var_name]
            except:
                pass

    for _ in range(3):# Force garbage collection multiple times
        gc.collect()
        time.sleep(0.01)# Small pause between collections

    memory_after = get_memory_usage()
    if not quiet:
        print(f'Memory after cleanup: {memory_after:.1f}MB.')
    return memory_after

import sys
def deep_memory_cleanup():
    """
    Extreme memory cleanup measures
    """
    print('High memory detected - forcing deep cleanup!')
    # Clear various caches that might hold memory
    try:
        np.zeros(1)# Force numpy initialization cleanup
    except:
        pass

    # Multiple garbage collection passes
    for _ in range(5):
        gc.collect()
        gc.collect()# Double collect each pass
        time.sleep(0.1)

    # Clear module-level caches
    #cleared = 0
    for module in list(sys.modules.values()):
        if hasattr(module, '__dict__'):
            for key in list(module.__dict__.keys()):
                if key.startswith('_cache') or key.endswith('_cache'):
                    delattr(module, key)
                    #cleared += 1

Alternative

In [None]:
def process_with_subprocesses():
    """
    Use subprocesses to ensure complete memory isolation
    """
    import subprocess
    import json
    
    input_files = glob.glob('*.fits')
    batch_size = 20
    
    for i in range(0, len(input_files), batch_size):
        batch = input_files[i: i+batch_size]
        for file in batch:# Process each file in separate subprocess to guarantee memory cleanup
            result = subprocess.run([
                'python', '-c', f'''
import sys
sys.path.append(".")
from your_processing_module import process_single_file
process_single_file("{file}")
'''
            ], capture_output=True, text=True)

            if (result.returncode != 0):
                print(f'Error processing {file}: {result.stderr}')

# If subprocess approach is too heavy, use this lighter version:
def process_with_memory_monitoring():
    input_files = glob.glob('*.fits')
    high_memory_count = 0

    for i, input_file in enumerate(input_files):
        memory_before = get_memory_usage()

        if (memory_before > 14000):# 14GB threshold
            high_memory_count += 1
            print(f'High memory before processing: {memory_before:.1f}MB.')

            if high_memory_count > 3:# Consider saving state and restarting
                print('Consistently high memory - restarting may be needed!')
                break

        # Process with context manager (automatically closes)
        with fits.open(input_file, memmap=True) as hdul:
            data = hdul[0].data
            processed_data = your_processing_function(data)
            
            # Write output
            output_hdu = fits.PrimaryHDU(processed_data)
            output_hdu.header = hdul[0].header
            output_hdu.writeto(f'processed_{input_file}', overwrite=True)

        aggressive_memory_cleanup()# Force cleanup

        if (i % 50 == 0) and (get_memory_usage() > 12000):# If memory is still high after 50 files, suggest restart
            print('Consider restarting the process to clear accumulated memory.')

>Detect clouds

mean-->cloud, shadowed, instrument

gradient-->cloud

morphology-->cloud

In [None]:
from skimage.measure import label, regionprops
from skimage.filters import sobel
def clip_shadowed_image(input_image, max_threshold=25000, mean_threshold=8000, gradient_threshold=0.004):
    """
    1. lower mean value, < 8000; while mean value < 1000 is possibly thick clouds or CCD problem
    2. lower maximum, < 25000
    3. gradient magnitude rather blurred, < 0.005
    4. morphology: irregular and expand area, higher ellipcity deviate from 0 (sunspots) but not too high (distinguish from filaments and limbs) and large area
    """
    cloud = False# initialize
    # check mean and max value
    if (((np.mean(input_image) <= mean_threshold) and (np.max(input_image) <= max_threshold)) or (np.max(input_image) <= 1.1 * max_threshold)):
        rough_check = True
    else:
        rough_check = False
    # check gradient magnitude (the result of cv2.Sobel is ~4000 times larger than skimage.filters.sobel)
    '''
    grad_x = cv2.Sobel(input_image, cv2.CV_32F, 1, 0, ksize=3)
    grad_y = cv2.Sobel(input_image, cv2.CV_32F, 0, 1, ksize=3)
    grad_mag = np.sqrt(grad_x**2 + grad_y**2)
    
    grad_mag = sobel(input_image)
    shadow_like = ((input_image > np.max(input_image) * 0.08) & (input_image < mean_threshold * 2)) & (grad_mag < gradient_threshold)# Mask low-gradient dark regions

    # check morphology
    label_img = label(shadow_like)
    morphology_check_list = []
    for region in regionprops(label_img):
        if ((region.eccentricity < 0.8) and (region.area > 5000)):
            # Likely cloud
            morphology_check_list.append(True)
    cloud = np.any(morphology_check_list) and rough_check
    '''
    #print(cloud)
    return rough_check

>Detect disk center and radius

Hough transformation

In [None]:
def cv2_hough_circle(input_image, min_center_Dist=3000, minRadius=1800, maxRadius=3000):
    """
    dp: The inverse of the accumulator resolution, use dp=1.0-1.2 for good accuracy.
    param1: Used internally by Canny edge detector (acts like threshold1/2), usually =100 is fine.
    param2: Circle detection threshold (30-50 for solar data). Higher-->more confident detections, but might miss faint edges; Lower-->more detections, but more false positives.
    """
    image_norm = rescale_image(input_image)
    #imgage_equalize = cv2.equalizeHist(image_norm)# Optional: enhance edge contrast
    #edges = cv2.Canny(image_norm, threshold1=3, threshold2=6)#after norm, threshold1~6, threshold2~3
    edges = cv2.medianBlur(image_norm, 5)
    circles = cv2.HoughCircles(edges, cv2.HOUGH_GRADIENT, dp=1, minDist=min_center_Dist,
                               param1=40, param2=50, minRadius=minRadius, maxRadius=maxRadius)
    
    if (circles is (not None)):
        circles = np.uint16(np.around(circles))
        xcenter, ycenter, radius = circles[0][0]
        return (xcenter, ycenter), radius
    else:
        raise ValueError("No circle found")

In [None]:
from skimage.feature import canny
from skimage.transform import hough_circle, hough_circle_peaks
def skimage_hough_circle(input_image, r_range=None):
    edges = canny(input_image, sigma=3)

    hough_res = hough_circle(edges, r_range)
    accums, cx, cy, radii = hough_circle_peaks(hough_res, r_range, total_num_peaks=1)

    if (len(cx)==0):
        raise ValueError("No circle found")

    xcenter, ycenter, rsun = cx[0], cy[0], radii[0]
    return (xcenter, ycenter), rsun

Least Square

detect and strengthen limbs first

In [None]:
from scipy.ndimage import median_filter, uniform_filter
from skimage.filters import sobel
def find_limbs(input_image, image_cut_level=0.5, grad_cut_level=0.3, smooth_scale=smooth_scale, initial_radius_guess=CHANNEL['guess radius']):
    """
    1. 6240, image cutlevel=0.5, grad cutlevel=0.3; 2048, image cutlevel=0.6, grad cutlevel=0.4
    2. initial guess radius: Halpha (6240) is about 2460; CaK and White Light (4208) are about 1745
    """
    # prepare coordinate grid
    Ny, Nx, image_size = check_image_size(input_image)# (Ny, Nx)
    x = np.tile(np.arange(Nx), (Ny, 1))
    y = np.tile(np.arange(Ny).reshape(-1, 1), (1, Nx))
    # margin mask: remove edges
    margin = 50
    mask = (x >= margin) & (x <= Nx - margin) & (y >= margin) & (y <= Ny - margin)
    #input_image *= mask

    median_filtered = median_filter(input_image, size=3)# smooth image with median filter
    gradient = sobel(median_filtered)# edge detection
    gradient_clipped, _, _ = percents(gradient, pmin=1, pmax=99.5)# clip outliers (too strong limbs or spots), within 99.5%
    gradient_clipped *= mask
    gradient_threshold = grad_cut_level * np.max(gradient_clipped)
    std_sob = np.std(gradient_clipped[gradient_clipped > gradient_threshold])# compute robust threshold using std-dev of strong edges

    strong_edge = np.copy(gradient_clipped)
    strong_edge[gradient_clipped <= gradient_threshold] = -1# find strong edges, if not strong enough, mask by minus value

    # smooth original image to suppress center
    image_smooth = uniform_filter(input_image.astype(np.float32), size=smooth_scale)
    image_smooth *= mask
    image_threshold = image_cut_level * np.max(image_smooth)

    sa = (strong_edge > 3 * std_sob) & (input_image < image_threshold)#### final mask for candidate limb points
    y_raw, x_raw = np.where(sa)
    guess_limb = ((x_raw - image_size / 2)**2 + (y_raw - image_size / 2)**2 >= (initial_radius_guess * 0.8)**2) & ((x_raw - image_size / 2)**2 + (y_raw - image_size / 2)**2 <= (initial_radius_guess * 1.2)**2)
    y_limb, x_limb = y_raw[guess_limb], x_raw[guess_limb]
    limb_points = np.column_stack([y_limb, x_limb])

    if (limb_points.shape[0] < 10):
        #print(f'number of limb points: {limb_points.shape[0]}')
        raise ValueError("Too few limb points.")

    return limb_points, y_limb, x_limb

`skimage`

In [None]:
from skimage.measure import CircleModel
def skimage_leastsq_circle(input_image, image_cut_level=0.6, grad_cut_level=0.4, smooth_scale=40):
    """
    Detect the solar limb using Sobel+median filtering and fit a circle.
    cut level: points which value < threshold are out of disk.
    """
    limb_points, _, _ = find_limbs(input_image, image_cut_level=image_cut_level, grad_cut_level=grad_cut_level, smooth_scale=smooth_scale, initial_radius_guess=CHANNEL['guess radius'])

    model = CircleModel()
    model.estimate(limb_points)
    ycenter, xcenter, radius = model.params# note: (row, col, radius)
    return (xcenter, ycenter), radius

`numpy` rough + `scipy` refine

In [None]:
from scipy.optimize import least_squares
def fit_circle_algebraic(x, y):
    """
    Fit circle using algebraic least squares (linear) by numpy.
    Returns initial guess (x_center, y_center, radius).
    """
    A = np.column_stack((2*x, 2*y, np.ones_like(x)))
    b = x**2 + y**2
    p, _, _, _ = np.linalg.lstsq(A, b, rcond=None)
    xc, yc, c = p
    r = np.sqrt(xc**2 + yc**2 + c)
    return xc, yc, r

def scipy_leastsq_circle(input_image, image_cut_level=0.5, grad_cut_level=0.3, smooth_scale=smooth_scale, initial_radius_guess=CHANNEL['guess radius']):
    """
    Nonlinear least-squares fit to a circle.
    Parameters:
        x, y: coordinates of limb points
    Returns:
        xc, yc: center coordinates
        r: radius
    """

    def residuals(params, x, y):
        xcenter, ycenter, r = params
        return np.sqrt((x - xcenter)**2 + (y - ycenter)**2) - r

    _, y_limb, x_limb = find_limbs(input_image, image_cut_level=image_cut_level, grad_cut_level=grad_cut_level, smooth_scale=smooth_scale, initial_radius_guess=initial_radius_guess)
    initial_guess = fit_circle_algebraic(x_limb, y_limb)# initial: use numpy algebraic least squares (linear) fitting
    print(f"Algebraic guess: xc={initial_guess[0]:.4f}, yc={initial_guess[1]:.4f}, r={initial_guess[2]:.4f}")

    refined_fit = least_squares(residuals, initial_guess, args=(x_limb, y_limb))
    xcenter, ycenter, radius = refined_fit.x
    print(f"Refined fit: xc={xcenter:.4f}, yc={ycenter:.4f}, r={radius:.4f}")
    return (xcenter, ycenter), radius

linear regression

`directly written`

In [None]:
from scipy.ndimage import convolve
from sklearn.linear_model import LinearRegression
def IDL_version_disk_center(input_image, cut_level=0.85, no_log=False):
    """
    Estimate the center and radius of the solar disk from an image. (least sq)

    Parameters:
        image (2D ndarray): Input image containing the solar limb.
        cut_level (float): Threshold ratio for gradient magnitude. Default is 0.45.
        no_log (bool): If True, do not apply logarithm to the image.

    Returns: (float)
        xcenter: X-coordinate of the disk center.
        ycenter: Y-coordinate of the disk center.
        rsun: Radius of the solar disk.
        delxc: Standard deviation in xcenter estimate.
        delyc: Standard deviation in ycenter estimate.
    """

    # Prepare coordinate grid
    input_shape = input_image.shape# (Ny, Nx)
    x = np.tile(np.arange(input_shape[1]), (input_shape[0], 1))
    y = np.tile(np.arange(input_shape[0]).reshape(-1, 1), (1, input_shape[1]))

    # Sobel kernels
    #kernel_x = 0.5 * np.array([[0, 0, 0], [-1, 0, 1], [0, 0, 0]])
    #kernel_y = 0.5 * np.array([[0, -1, 0], [0, 0, 0], [0, 1, 0]])
    kernel_x = np.array([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]])
    kernel_y = np.array([[-1, -2, -1], [0, 0, 0], [1, 2, 1]])

    # Preprocessing
    if no_log:
        tmp = input_image
    else:
        input_image = np.maximum(input_image, np.max(input_image) * 0.1)
        tmp = np.log(input_image)

    # Compute gradients
    gx = convolve(tmp, kernel_x)
    gy = convolve(tmp, kernel_y)
    g = np.sqrt(gx**2 + gy**2)

    # Apply margin mask (remove edges)
    margin = 30
    mask = (x >= margin) & (x <= input_shape[1] - margin) & (y >= margin) & (y <= input_shape[0] - margin)
    g *= mask

    # Thresholding for limb detection
    threshold = cut_level * np.max(g)
    limb_mask = np.where(g >= threshold)

    if (limb_mask[0].size <= 10):
        raise ValueError("Not enough limb points!")

    # Extract gradient components and coordinates at limb points
    limb_gx = gx[limb_mask]
    limb_gy = gy[limb_mask]
    limb_g = g[limb_mask]

    qx = -limb_gx / limb_g
    qy = -limb_gy / limb_g

    x_limb = x[limb_mask]
    y_limb = y[limb_mask]
    ydata = x_limb * qx + y_limb * qy
    xdata = np.vstack((qx, qy)).T

    # Linear regression to fit center
    reg = LinearRegression()
    reg.fit(xdata, ydata)
    xcenter, ycenter = reg.coef_
    radius = reg.intercept_

    # Estimate standard deviation (error)
    yfit = reg.predict(xdata)
    residuals = ydata - yfit
    sigma = np.std(residuals) / np.sqrt(len(residuals))
    del_xcenter, del_ycenter = sigma, sigma

    return (xcenter, ycenter), np.abs(radius), del_xcenter, del_ycenter

>Shift disk

$\mathrm{H}\alpha$ and White Light should `transpose`

In [None]:
def shift_disk(input_image, xcenter, ycenter, channel=CHANNEL['save']):
    Ny, Nx, image_size = check_image_size(input_image)# (Ny, Nx) or (height, width)
    #print(Ny, Nx)
    # (dx, dy): from disk center to image center, x for width and y for height
    delta_y = (Ny / 2) - ycenter
    delta_x = (Nx / 2) - xcenter
    shift_matrix = np.array([[1, 0, delta_x], 
                             [0, 1, delta_y]], dtype=np.float32)
    shift_image = cv2.warpAffine(input_image, shift_matrix, (Nx,Ny))# shift
    if (channel=='CaK'):
        output_image = np.copy(shift_image)
    else:# H\alpha and White Light
        output_image = cv2.resize(shift_image.T, dsize=(image_size,image_size))# transpose
    return output_image, shift_matrix, image_size

>Affine transform

\begin{equation*}
\mathbf{M} = 
\begin{bmatrix}
    a & b & t_{x}\\
    c & d & t_{y}
\end{bmatrix}
\end{equation*}

$a$, $d$: scaling + rotation; $b$, $c$: shearing + rotation; $t_{x}$, $t_{y}$: translation

\begin{equation*}
\begin{bmatrix}
    x'\\
    y'
\end{bmatrix} = 
\begin{bmatrix}
    a & b\\
    c & d
\end{bmatrix} \cdot
\begin{bmatrix}
    x\\
    y
\end{bmatrix} + 
\begin{bmatrix}
    t_{x}\\
    t_{y}
\end{bmatrix}
\end{equation*}

for a given center, scale $k$ and rotation angle $\theta$, OpenCV uses $\alpha = k\cos{\theta}$ and $\beta = k\sin{\theta}$, so 
\begin{equation*}
\mathbf{M} = 
\begin{bmatrix}
    \alpha & \beta & (1 - \alpha)x_{\mathrm{c}} - \beta y_{\mathrm{c}}\\
    -\beta & \alpha & \beta x_{\mathrm{c}} + (1 - \alpha)y_{\mathrm{c}}
\end{bmatrix}
\end{equation*}

for only translation, 
\begin{equation*}
\mathbf{M} = 
\begin{bmatrix}
    1 & 0 & t_{x}\\
    0 & 1 & t_{y}
\end{bmatrix}
\end{equation*}

In [None]:
def estimate_affine_transform(test_image, reference_image, max_features=100, min_keypoints=10):
    """
    Estimates affine transform (rotation + translation) from test_image to reference_image.
    Returns transformation matrix M (2x3).
    """
    test_norm = rescale_image(test_image)
    reference_norm = rescale_image(reference_image)

    # ORB feature detection, description and check.
    orb = cv2.ORB_create(nfeatures=max_features)
    keypoints1, descriptors1 = orb.detectAndCompute(test_norm, None)
    #print(len(keypoints1))
    if ((descriptors1 is None) or (len(keypoints1) < min_keypoints)):
        raise ValueError("Feature detection failed on the test image.")

    keypoints2, descriptors2 = orb.detectAndCompute(reference_norm, None)
    #print(len(keypoints2))
    if ((descriptors2 is None) or (len(keypoints2) < min_keypoints)):
        raise ValueError("Feature detection failed on the reference image.")
    
    # match descriptors using brute force
    brute_force = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
    matches = brute_force.match(descriptors1, descriptors2)
    matches = sorted(matches, key=lambda x: x.distance)# Sort matches by distance (match quality)
    print(f'Good matches: {len(matches)}')
    if (len(matches) < 3):
        raise ValueError("Not enough good matches found for alignment.")

    # use good matches to estimate affine transform
    src_points = np.float32([keypoints1[m.queryIdx].pt for m in matches]).reshape(-1, 1, 2)
    dst_points = np.float32([keypoints2[m.trainIdx].pt for m in matches]).reshape(-1, 1, 2)
    Matrix_estimate, mask = cv2.estimateAffinePartial2D(src_points, dst_points)
    #print(Matrix_estimate)
    angle_rad = -1 * np.arctan2(Matrix_estimate[1, 0], Matrix_estimate[0, 0])# only extract rotation angle
    angle_deg = np.rad2deg(angle_rad)

    # create rotation matrix (center given and fixed)
    height, width = test_image.shape
    center_fixed = (width // 2, height // 2)
    Matrix_rotation_only = cv2.getRotationMatrix2D(center_fixed, angle=angle_deg, scale=1.0)
    return Matrix_estimate, Matrix_rotation_only, angle_deg

>Align image and evaluate

In [None]:
from skimage.metrics import structural_similarity
def align_to_reference(max_features, input_image, reference_image, calculate_matrix=True, transform_matrix=None):
    height_i, width_i = input_image.shape
    height_r, width_r = reference_image.shape
    if calculate_matrix:# if matrix unknown
        transform_matrix_rotation_and_scale, transform_matrix_rotation, rotation_angle_degree = estimate_affine_transform(input_image, reference_image, max_features=max_features)
        #print(transform_matrix_rotation_and_scale)
        #print(transform_matrix_rotation)
        aligned_image_output = cv2.warpAffine(input_image, transform_matrix_rotation, (width_i, height_i), flags=cv2.INTER_LINEAR)# transform for output
        aligned_image_evaluate = cv2.warpAffine(input_image, transform_matrix_rotation_and_scale, (width_r, height_r), flags=cv2.INTER_LINEAR)# transform for evaluate
        score = structural_similarity(rescale_image(aligned_image_evaluate), rescale_image(reference_image))# ssim, evaluate similarity
        print(f'Alignment SSIM Score: {score:.4f}')
    else:# if already obtained matrix
        transform_matrix_rotation = np.copy(transform_matrix)
        aligned_image_output = cv2.warpAffine(input_image, transform_matrix_rotation, (width_i, height_i), flags=cv2.INTER_LINEAR)# transform for output
        #aligned_image_evaluate = cv2.resize(aligned_image_output, dsize=reference_image.shape)
        rotation_angle_degree = None
        score = 'Already scored.'
    return aligned_image_output, transform_matrix_rotation, score

>Rewrite header

In [None]:
def update_header(input_header, input_image, radius, time_obs, date_obs, wavelength=CHANNEL['wavelength'], 
                  observatory_site='Shenzhen Astronomical Observatory', type_obs='FULLDISK'):
    """
    Changes are automatically saved when using 'update' mode
    """
    output_header = input_header.copy()
    naxis2, naxis1, _ = check_image_size(input_image)# (Ny, Nx)
    # Only add comment:
    output_header.set('Bzero', value=None, comment='Data is Unsigned Integer')# Bzero: Data is Unsigned Integer

    # Only add/update keyword:
    output_header['CAMERA'] = '          '# CAMERA
    output_header['ORIGIN'] = observatory_site# ORIGIN
    output_header['TELESCOP'] = '   '# TELESCOP

    # Add keyword with comments:
    output_header.set('CRPIX1', value=naxis1/2, comment='ThePixelPositionOfTheSolarCenterInTheXAxis')# CRPIX1: ThePixelPositionOfTheSolarCenterInTheXAxis
    output_header.set('CRPIX2', value=naxis2/2, comment='ThePixelPositionOfTheSolarCenterInTheYAxis')# CRPIX2: ThePixelPositionOfTheSolarCenterInTheYAxis
    output_header.set('DATE_OBS', value=date_obs, comment='observationDate')# DATE_OBS: observationDate
    output_header.set('NAXIS1', value=naxis1, comment='length of data axis 1')# NAXIS1: length of data axis 1
    output_header.set('NAXIS2', value=naxis2, comment='length of data axis 2')# NAXIS2: length of data axis 2
    output_header.set('O_BEZRO', value=np.abs(np.int16(np.copy(input_header['BZERO']))), comment='Original BZERO Value')# O_BEZRO: Original BZERO Value
    output_header.set('RSUN_OBS', value=radius, comment='Pixel radius of Sun')# RSUN_OBS: Pixel radius of Sun
    output_header.set('TIME-OBS', value=time_obs, comment='observationTime')# TIME-OBS: observationTime
    output_header.set('TYPE-OBS', value=type_obs, comment='observationModel')# TYPE-OBS: observationModel
    output_header.set('WAVELNTHnm', value=wavelength, comment='waveLength [nm]')# WAVELNTHnm: waveLength [nm]# H\alpha 656.28; CaK 393.73; WL White Light

    # Delete keyword:
    if ('ADCBITS' in output_header):# ADCBITS
        del output_header['ADCBITS']
    if ('BIASADU' in output_header):# BIASADU
        del output_header['BIASADU']
    if ('BLKLEVEL' in output_header):# BLKLEVEL
        del output_header['BLKLEVEL']
    if ('CAMID' in output_header):# CAMID
        del output_header['CAMID']
    if ('DATE-AVG' in output_header):# DATE-AVG
        del output_header['DATE-AVG']
    if ('EGAIN' in output_header):# EGAIN
        del output_header['EGAIN']
    if ('EGAINSAV' in output_header):# EGAINSAV
        del output_header['EGAINSAV']
    if ('GAIN' in output_header):# GAIN
        del output_header['GAIN']
    if ('JD_UTC' in output_header):# JD_UTC
        del output_header['JD_UTC']
    if ('OFFSET' in output_header):# OFFSET
        del output_header['OFFSET']
    
    return output_header

In [None]:
def add_coordinate(input_header, reference_header):
    delta = np.abs(reference_header['CDELT1'])# angular change (saved in arcsec/px)
    R_standard_arcsec = reference_header['SOLAR-R']# arcsec
    R_standard_px = R_standard_arcsec / delta# px-->radius in standard image

    output_header = input_header.copy()
    R_obs_px = input_header['RSUN_OBS']# px
    scale = R_obs_px / R_standard_px# from standard GONG to SZAO

    output_header.set('WCSNAME', value='Helioprojective-cartesian', comment='WCS system name')
    output_header.set('CTYPE1', value='HPLN-TAN', comment='Axis 1 gives helioprojective westward angle')
    output_header.set('CTYPE2', value='HPLT-TAN', comment='Axis 2 gives helioprojective northward angle')
    output_header.set('CRVAL1', value=0.00000, comment='Helioprojective westward angle at CRPIX1')
    output_header.set('CRVAL2', value=0.00000, comment='Helioprojective northward angle at CRPIX2')
    output_header.set('CDELT1', value=delta*scale, comment='Angular change per pixel along axis 1')
    output_header.set('CDELT2', value=delta*scale, comment='Angular change per pixel along axis 2')
    output_header.set('CUNIT1', value='arcsec', comment='Units of CRVAL1 and CDELT1')
    output_header.set('CUNIT2', value='arcsec', comment='Units of CRVAL2 and CDELT2')

    return output_header

>Sharpen edges

In [None]:
def sharpen_edges(input_image, center, radius, cut_threshold=0.02, factor=2.5, inner_dr=4, outer_dr=0.05*CCD_size, channel=CHANNEL['save']):
    """
    ONLY sharpen H\alpha images.
    6420 image: [factor=2.5, radius range=300]
    """
    if (channel=='Ha'):
        if (factor <= 1):
            raise ValueError("Sharpen factor should be larger than 1.")

        UPPER_LIMIT = np.max(input_image)
        (xcenter, ycenter) = center
        Ny, Nx, _ = check_image_size(input_image)# (Ny, Nx)
        y_grid, x_grid = np.ogrid[:Ny, :Nx]
        dist_sq = (x_grid - xcenter)**2 + (y_grid - ycenter)**2

        # mask for different radius:
        #inside = dist_sq <= (radius - inner_dr)**2
        edge_features = (dist_sq > (radius - inner_dr)**2) & (dist_sq <= (radius + outer_dr)**2)
        #outside = dist_sq > (radius + outer_dr)**2
    
        output_image = np.copy(input_image).astype(np.float32)# data type matters for in-place multiplication
        #output_image[outside] = 0# remove background
        output_image[edge_features & (output_image > cut_threshold * np.max(output_image))] *= factor# sharpen features (keep background regions as the same)
        output_image[output_image > UPPER_LIMIT] = UPPER_LIMIT# remove too large values
        return output_image
    else:# CaK and White Light
        return input_image

>Fit and load rotation matrix

Fit the transform matrix `monthly`

rotation matrix is saved as:
\begin{equation*}
\begin{bmatrix}
\alpha\\
\beta\\
t_{x}\\
t_{y}\\
\theta
\end{bmatrix}
\end{equation*}

In [None]:
fit_matrix_file = os.path.join(local_path, 'alignment/Reference GONG Maps', '2025-08-16-0127_1-Sun_00013.fits')
observation_file_name = os.path.abspath(fit_matrix_file)
hdul = read_fits_file(observation_file_name)
data_obs, header_obs = hdul[hdrloc_obs].data, hdul[hdrloc_obs].header# array shape: (ny, nx), while in fits is (nx, ny)
_, _, image_size = check_image_size(data_obs)
fit_image = data_obs[:image_size, :image_size]# cut a square image (avoid leaving off the last pixel)
hdul.close()# free memory

(xc_obs, yc_obs), r0_obs = scipy_leastsq_circle(fit_image, image_cut_level=image_cut_level, grad_cut_level=grad_cut_level, smooth_scale=smooth_scale)
image_shift, shift_matrix, _ = shift_disk(fit_image, xcenter=xc_obs, ycenter=yc_obs)
aligned_image, MATRIX, angle_degree, score = align_to_reference(max_features=100, input_image=image_shift, reference_image=gong_map.data, 
                                      calculate_matrix=True, transform_matrix=None)

alpha, beta = MATRIX[0, 0:2]
tx, ty = MATRIX[0:2, 2]
components = [alpha, beta, tx, ty, angle_degree]

with open(save_matrix_name, 'w') as save_matrix:# Open a file in write mode ("w" overwrites, "a" appends)
    for item in components:
        save_matrix.write(f'{item}\n')
save_matrix.close()

print(MATRIX)
print(f'Rotation matrix saved to: {save_matrix_name}.')

In [None]:
def load_rotation_matrix(file_name, image_size, channel=CHANNEL['save']):
    """
    Rotation matrix is fitted by H\alpha data originally.
    If use it for other channels, further rotation affine should be applied manually.
    """
    items = []
    with open(file_name, 'r') as load_matrix:
        for line in load_matrix:
            items.append(np.float64(line))
    load_matrix.close()

    alpha, beta, tx, ty, rotation_angle_degree = items
    #transform_matrix = [[alpha, beta, tx], [-1 * beta, alpha, ty]]

    if (channel=='CaK'):
        rotation_angle_degree += -69
    elif (channel=='WL'):
        rotation_angle_degree += 29
    transform_matrix = cv2.getRotationMatrix2D((image_size/2, image_size/2), angle=rotation_angle_degree, scale=1.0)
    #print(transform_matrix)
    return transform_matrix, rotation_angle_degree

pipeline output

In [None]:
load_transform_matrix, rotation_angle_degree = load_rotation_matrix(file_name=save_matrix_name, image_size=CCD_size)# load rotation matrix

>Single file process-->clean up memory after `each` file

In [None]:
def single_file_process(input_data, input_header, save_extention=image_extention):
    fit_image = input_data[:CCD_size, :CCD_size]# crop a SQUARE image (avoid leaving off the last pixel)
    date_obs, time_obs, (year, month, day), (hour, minute, second) = setup_UTC_string(input_header, keyword='DATE-AVG')

    (xc_obs, yc_obs), r0_obs = scipy_leastsq_circle(fit_image, image_cut_level=image_cut_level, grad_cut_level=grad_cut_level, smooth_scale=smooth_scale)

    image_shift, shift_matrix, _ = shift_disk(fit_image, xcenter=xc_obs, ycenter=yc_obs)
    aligned_image, _, _ = align_to_reference(max_features=100, input_image=image_shift, reference_image=gong_map.data, 
                                             calculate_matrix=False, transform_matrix=load_transform_matrix)
    #(xc_rot, yc_rot), r0_rot = scipy_leastsq_circle(aligned_image, image_cut_level=image_cut_level, grad_cut_level=grad_cut_level, smooth_scale=smooth_scale)

    new_header_obs = update_header(input_header, aligned_image, r0_obs, time_obs, date_obs)
    updated_header_obs = add_coordinate(new_header_obs, gong_map.meta)
    
    file_prefix = f'Z_SWGO_C_BFSE_{year}{month}{day}{hour}{minute}{second}'
    output_fits_name = os.path.join(save_directory, f'{file_prefix}_O_{CHANNEL['save']}.fits')
    output_jpg_name = os.path.join(save_directory, f'{file_prefix}_P_{CHANNEL['save']}.{save_extention}')# save as .jpg
    aligned_image = sharpen_edges(aligned_image, center=(CCD_size/2, CCD_size/2), radius=r0_obs, cut_threshold=0.02, factor=2.5, inner_dr=4)

    # plot and save image
    pri_hdu = fits.PrimaryHDU(aligned_image.astype(np.uint16), header=updated_header_obs)
    pri_hdu.name = ('Primary')
    hdu_save = fits.HDUList([pri_hdu])
    hdu_save.writeto(output_fits_name, overwrite=True)
    ''''''
    figure, ax = plt.subplots(1, 1, figsize=(length,length), layout='constrained')
    ax.set_position([0, 0, 1, 1])# remove margins
    sunplot = ax.imshow(aligned_image, origin=origin, cmap='grey')
    #sunplot = ax.imshow(normalize(aligned_image, pmin=1, pmax=99.5, gamma=0.4), origin=origin, cmap=CHANNEL['cmap'])
    ax.text(0.01*CCD_size, 0.02*CCD_size, CHANNEL['notation'], color='white', fontsize=40)
    ax.text(0.66*CCD_size, 0.95*CCD_size, f'{year}-{month}-{day} {hour}:{minute}:{second} UT', color='white', fontsize=18)
    ax.axis('off')
    #ax.set_title(utc_string)
    #figure.show()
    figure.savefig(output_jpg_name)
    plt.close(figure)
    return True

In [None]:
j = 0
for i, file in file_enumerate:
    print(f'----######## {i}/{file_num} .fits file ########----')
    hdul = read_fits_file(file, memmap=False)# BZERO/BSCALE/BLANK header keywords present. MUST set memmap=False
    data_obs, header_obs = hdul[hdrloc_obs].data, hdul[hdrloc_obs].header# array shape: (ny, nx), while in fits is (nx, ny)
    if clip_shadowed_image(data_obs):# skip cloud images and go on to the next one
        print('----Skip cloud-shadowed data.----')
        continue
    
    try:
        single_file_process(input_data=data_obs, input_header=header_obs)
        j += 1
        print(f'----######## {j} of {file_num} {CHANNEL['save']} data saved ########----')
    except Exception as e:
        print(f"Failed {file}: {e}")
    
    memory_after = aggressive_memory_cleanup(hdul=hdul)# AGGRESSIVE cleanup after each file
    if (i % 50 == 0) and (memory_after > 12000):# Additional check: if memory keeps growing (12GB threshold), take more drastic measures
        deep_memory_cleanup()
print(f'{j} of {file_num} files are useful.')

>Check files

In [None]:
from scipy.ndimage import median_filter, uniform_filter
from skimage.filters import sobel
check_path = os.path.join(local_path, 'check/')
testfile = os.path.join(check_path, 'cak/2025-08-25-0131_0-Sun_00001.fits')
hdu_test = read_fits_file(testfile)
data_test = hdu_test[hdrloc_obs].data[:CCD_size, :CCD_size]
hdu_test.close()

median_filtered = median_filter(data_test, size=3)
g = sobel(median_filtered)# edge detection
#print(g.shape, np.max(g), np.mean(g))
'''
smooth_scale = CCD_size / 20
image_smooth = uniform_filter(data_test.astype(np.float32), size=smooth_scale)
print(image_smooth.shape, np.max(image_smooth), np.mean(image_smooth))
'''
#cloud = clip_shadowed_image(data_test)

In [None]:
limb_points, y_limb, x_limb = find_limbs(data_test, image_cut_level=0.5, grad_cut_level=0.3)# normal:0.4, dark/shadowed:0.3, for too strong limbs, fit separately
print(x_limb.shape, y_limb.shape)
#(xc_obs, yc_obs), r0 = skimage_leastsq_circle(data_test, image_cut_level=image_cut_level)
(xc_obs, yc_obs), r0 = scipy_leastsq_circle(data_test, image_cut_level=0.5, grad_cut_level=0.3)

In [None]:
image_shift, shift_matrix, image_size = shift_disk(data_test, xcenter=xc_obs, ycenter=yc_obs)
aligned_image, _, score = align_to_reference(input_image=image_shift, reference_image=gong_map.data, calculate_matrix=True, max_features=100)
(xc_obs, yc_obs), r0 = scipy_leastsq_circle(aligned_image, image_cut_level=0.5, grad_cut_level=0.3)

In [None]:
sharpen_image = sharpen_edges(aligned_image, center=(CCD_size/2, CCD_size/2), radius=r0_obs, cut_threshold=0.02, factor=2.5, inner_dr=4)

In [None]:
figure, ax = plt.subplots(1, 2, figsize=(2*length,length), layout='constrained')
#sunplot = ax[0].imshow(data_test, origin=origin, cmap='grey')
#median = ax[0].imshow(median_filtered, origin=origin, cmap='grey')
gradient = ax[0].imshow(g, origin=origin, cmap='grey')
ax[0].set_xticks([])
ax[0].set_yticks([])
#shift = ax[1].imshow(sharpen_image, origin=origin, cmap='grey')
#smooth = ax[1].imshow(image_smooth, origin=origin, cmap='grey')
edge_points = ax[1].scatter(x_limb, y_limb, s=1, marker='x')
ax[1].set_xlim(0,CCD_size)
ax[1].set_ylim(0,CCD_size)
ax[1].set_xticks([])
ax[1].set_yticks([])
figure.show()
'''
save_file = os.path.join(directory, 'gradient_check.fits')
pri_hdu = fits.PrimaryHDU(g.astype(np.float32))
#save_file = os.path.join(directory, 'smooth_check.fits')
#pri_hdu = fits.PrimaryHDU(image_smooth)
#save_file = os.path.join(directory, 'median.fits')
#pri_hdu = fits.PrimaryHDU(median_filtered)
pri_hdu.name = ('Primary')
hdu_save = fits.HDUList([pri_hdu])
hdu_save.writeto(save_file, overwrite=True)'''

In [None]:
from astropy.wcs import WCS
hdu_gong = read_fits_file(gong_file)
data_gong, header_gong = hdu_gong[hdrloc_gong].data, hdu_gong[hdrloc_gong].header
hpc = WCS(header_gong)# Helioprojective cartisian

plot

In [None]:
from astropy import units as u
smap = Map(gong_file)
fig = plt.figure(figsize=(length,length))
ax = fig.add_subplot(projection=smap)
smap.plot(axes=ax, clip_interval=(1, 99.99)*u.percent)
smap.draw_limb(axes=ax)
#smap.draw_grid(axes=ax)
fig.show()

In [None]:
figure, ax = plt.subplots(1, 1, figsize=(length,length), subplot_kw={'projection':hpc}, layout='constrained')
sunplot = ax.imshow(data_gong, cmap='grey')
ax.set_xlabel('Helio X', fontsize=10)
ax.set_ylabel('Helio Y', fontsize=10)
figure.show()

In [None]:
figure = plt.figure(frameon=False)
ax = plt.axes([0, 0, 1, 1])
ax.set_axis_off()# Disable the axis
norm = smap.plot_settings['norm']
_, norm.vmin, norm.vmax = percents(smap.data, pmin=1, pmax=99.9)
ax.imshow(smap.data, norm=norm, cmap=smap.plot_settings['cmap'], origin="lower")