Extrapolate solar coronal magnetic field using the PFSS model and HMI data

In [None]:
# !jupyter lab --version

In [None]:
%matplotlib widget
# notebook, widget

import warnings
warnings.filterwarnings('ignore')

import matplotlib.pyplot as plt
import matplotlib.colors as colors
from mpl_toolkits.mplot3d import Axes3D
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
# import matplotlib
# matplotlib.use('TkAgg') # nbAgg, Qt5Agg, widget, TkAgg, Agg

import glob
import numpy as np
import pandas as pd
import sunpy.map
from sunpy.coordinates import sun
from sunpy.sun import constants as const
import astropy.units as u
from astropy.coordinates import SkyCoord
from astropy.utils.data import download_file
from astropy.visualization import ImageNormalize, SqrtStretch, LogStretch
import pfsspy
import pfsspy.tracing as tracing
from tqdm import tqdm

plt.rcParams['figure.figsize'] = [7, 7]
plt.rcParams['figure.dpi'] = 100
plt.rcParams['savefig.dpi'] = 300
plt.rcParams['savefig.format'] = 'png'
plt.rcParams['figure.facecolor'] = 'white'
plt.rcParams['savefig.facecolor'] = 'white'

data_dir = '/home/mnedal/data'
savedir = '/home/mnedal/repos/dias_work'

In [None]:
def split_datetime(start=None, end=None):
    
    START_DATE, START_TIME = start.split('T')
    END_DATE, END_TIME = end.split('T')

    START_YEAR, START_MONTH, START_DAY = START_DATE.split('-')
    END_YEAR, END_MONTH, END_DAY = END_DATE.split('-')

    START_HOUR, START_MINUTE, START_SECOND = START_TIME.split(':')
    END_HOUR, END_MINUTE, END_SECOND = END_TIME.split(':')

    datetime_dict = {
        'start_year': START_YEAR,
        'start_month': START_MONTH,
        'start_day': START_DAY,
        'start_hour': START_HOUR,
        'start_minute': START_MINUTE,
        'start_second': START_SECOND,
        
        'end_year': END_YEAR,
        'end_month': END_MONTH,
        'end_day': END_DAY,
        'end_hour': END_HOUR,
        'end_minute': END_MINUTE,
        'end_second': END_SECOND
    }
    return datetime_dict



def load_aia(start=None, end=None, channel=193):
    dt_dict = split_datetime(start=start, end=end)
    data_path = f'{data_dir}/AIA/{channel}A/lv15'
    data = sorted(glob.glob(f'{data_path}/aia_{channel}a_*.fits'))
    
    start_filename = f"aia_{channel}a_{dt_dict['start_year']}_{dt_dict['start_month']}_{dt_dict['start_day']}T{dt_dict['start_hour']}_{dt_dict['start_minute']}"
    end_filename   = f"aia_{channel}a_{dt_dict['end_year']}_{dt_dict['end_month']}_{dt_dict['end_day']}T{dt_dict['end_hour']}_{dt_dict['end_minute']}"
    
    first_file_to_find = glob.glob(f'{data_path}/{start_filename}*.fits')
    last_file_to_find  = glob.glob(f'{data_path}/{end_filename}*.fits')
    
    idx1 = data.index(first_file_to_find[0])
    idx2 = data.index(last_file_to_find[0])
    
    chosen_files = data[idx1:idx2]
    
    map_objects = []
    for i, file in enumerate(chosen_files):
        # load the file as a sunpy map
        m = sunpy.map.Map(file)
        map_objects.append(m)
        print(f'AIA {channel}A image {i} is loaded')
    return map_objects




def load_lasco(start=None, end=None, detector='C2'):
    """
    Load SOHO/LASCO C2 or C3 images as sunpy maps.
    """
    dt_dict = split_datetime(start=start, end=end)
    data = sorted(glob.glob(f"{data_dir}/LASCO_{detector}/LASCO_{detector}_{dt_dict['start_year']}{dt_dict['start_month']}{dt_dict['start_day']}*.jp2"))
    
    start_file_to_find = f"{data_dir}/LASCO_{detector}/LASCO_{detector}_{dt_dict['end_year']}{dt_dict['start_month']}{dt_dict['start_day']}T{dt_dict['start_hour']}{dt_dict['start_minute']}.jp2"
    end_file_to_find = f"{data_dir}/LASCO_{detector}/LASCO_{detector}_{dt_dict['end_year']}{dt_dict['end_month']}{dt_dict['end_day']}T{dt_dict['end_hour']}{dt_dict['end_minute']}.jp2"
    
    idx1 = data.index(start_file_to_find)
    idx2 = data.index(end_file_to_find)
    chosen_files = data[idx1:idx2]
    
    map_objects = []
    for i, file in enumerate(chosen_files):
        m = sunpy.map.Map(file)
        m.meta['bunit'] = 'ct' # a workaround for C2 and C3 jp2 images
        m.plot_settings['norm'] = ImageNormalize(vmin=0, vmax=250)
        map_objects.append(m)
        print(f'LASCO {detector} image {i} is done')
    return map_objects



def split_datetime(start=None, end=None):
    
    START_DATE, START_TIME = start.split('T')
    END_DATE, END_TIME = end.split('T')

    START_YEAR, START_MONTH, START_DAY = START_DATE.split('-')
    END_YEAR, END_MONTH, END_DAY = END_DATE.split('-')

    START_HOUR, START_MINUTE, START_SECOND = START_TIME.split(':')
    END_HOUR, END_MINUTE, END_SECOND = END_TIME.split(':')

    datetime_dict = {
        'start_year': START_YEAR,
        'start_month': START_MONTH,
        'start_day': START_DAY,
        'start_hour': START_HOUR,
        'start_minute': START_MINUTE,
        'start_second': START_SECOND,
        
        'end_year': END_YEAR,
        'end_month': END_MONTH,
        'end_day': END_DAY,
        'end_hour': END_HOUR,
        'end_minute': END_MINUTE,
        'end_second': END_SECOND
    }
    return datetime_dict




def remove_redundant_maps(maps):
    """
    Remove redundant SunPy maps, keeping only one map per unique timestamp.

    Parameters:
    maps (list): List of SunPy Map objects. Each map is expected to have a 'date-obs' 
                 key in its metadata that provides the observation timestamp.

    Returns:
    list: A list of unique SunPy Map objects, one per unique timestamp.
    
    Example:
    >>> unique_maps = remove_redundant_maps(list_of_sunpy_maps)
    """
    unique_maps = {}
    for m in maps:
        timestamp = m.latex_name
        if timestamp not in unique_maps:
            unique_maps[timestamp] = m
    return list(unique_maps.values())



def apply_runratio(maps):
    """
    Apply running-ratio image technique on EUV images.
    See: https://iopscience.iop.org/article/10.1088/0004-637X/750/2/134/pdf
        Inputs: list of EUV sunpy maps.
        Output: sequence of run-ratio sunpy maps.
    """
    runratio = [m / prev_m.quantity for m, prev_m in zip(maps[1:], maps[:-1])]
    m_seq_runratio = sunpy.map.Map(runratio, sequence=True)
    
    for m in m_seq_runratio:
        m.data[np.isnan(m.data)] = 1
        m.plot_settings['norm'] = colors.Normalize(vmin=0, vmax=2)
        m.plot_settings['cmap'] = 'Greys_r'
    
    return m_seq_runratio



def enhance_contrast(image, vmin, vmax):
    """
    Enhance contrast by clipping and normalization.
    """
    image_clipped = np.clip(image, vmin, vmax)
    image_normalized = (image_clipped - vmin) / (vmax - vmin)
    return image_normalized



def calculate_percentiles(image, lower_percentile=3, upper_percentile=97):
    """
    Calculate vmin and vmax based on the 1st and 99th percentiles.
    """
    vmin = np.percentile(image, lower_percentile)
    vmax = np.percentile(image, upper_percentile)
    return vmin, vmax

In [None]:
# get the Carrington rotation number for a specific date
dt = pd.Timestamp('2024-05-14 17:40')
cr = sun.carrington_rotation_number(t=dt)
print(f'Date: {dt}\tCarrington rotation number: {cr}')

In [None]:
int(cr), round(cr)

In [None]:
filename = download_file(
    f'http://jsoc.stanford.edu/data/hmi/synoptic/hmi.Synoptic_Mr.{int(cr)}.fits', cache=True)
syn_map = sunpy.map.Map(filename)

In [None]:
fig = plt.figure(figsize=[10,5])
ax = plt.subplot(projection=syn_map)
im = syn_map.plot(axes=ax)

ax.coords[0].set_axislabel('Carrington Longitude [deg]')
ax.coords[1].set_axislabel('Latitude [deg]')

ax.coords.grid(color='black', alpha=0.6, linestyle='dotted', linewidth=0.5)

cb = plt.colorbar(im, fraction=0.0195, pad=0.01)
cb.set_label(f"Radial magnetic field [{syn_map.unit}]")

# In order to make the x-axis ticks show, the bottom y-limit has to be adjusted slightly
ax.set_ylim(bottom=0)
ax.set_title(f"{syn_map.meta['content']}, CR: {syn_map.meta['CAR_ROT']}")
plt.show()

In [None]:
# Check for NaN values
nan_exists = np.isnan(syn_map.data).any()
if nan_exists:
    print("NaN values found in syn_map data")

# Check for infinite values
inf_exists = np.isinf(syn_map.data).any()
if inf_exists:
    print("Infinite values found in syn_map data")

print('Data shape:', syn_map.data.shape)

In [None]:
# Replace NaN or infinite values with some finite value
modified_data = np.nan_to_num(syn_map.data, nan=0.0, posinf=0.0, neginf=0.0)

# Make a new SunPy Map with the modified data
hmi_map = sunpy.map.Map(modified_data, syn_map.meta)

# Check for NaN values
nan_exists = np.isnan(hmi_map.data).any()
if nan_exists:
    print("NaN values found in hmi_map data")

# Check for infinite values
inf_exists = np.isinf(hmi_map.data).any()
if inf_exists:
    print("Infinite values found in hmi_map data")

# Resample HMI data
hmi_map = hmi_map.resample([360, 180]*u.pix) # incorrect!
# try resample by sum and convert to sunpy map ...

print('New Data shape:', hmi_map.data.shape)

In [None]:
fig = plt.figure(figsize=[10,5])
ax = plt.subplot(111, projection=hmi_map)
im = hmi_map.plot(axes=ax)

ax.coords[0].set_axislabel('Carrington Longitude [deg]')
ax.coords[1].set_axislabel('Latitude [deg]')

ax.coords.grid(color='black', alpha=0.6, linestyle='dotted', linewidth=0.5)

cb = plt.colorbar(im, fraction=0.0195, pad=0.01)
cb.set_label(f"Radial magnetic field [{syn_map.unit}]")

# In order to make the x-axis ticks show, the bottom y-limit has to be adjusted slightly
ax.set_ylim(bottom=0)
ax.set_title(f"Resampled {hmi_map.meta['content']}, CR: {hmi_map.meta['CAR_ROT']}")
plt.show()

In [None]:
nrho = 50 # umber of rho grid points
rss  = 7  # source surface radius
pfss_in  = pfsspy.Input(hmi_map, nrho, rss)
pfss_out = pfsspy.pfss(pfss_in)

In [None]:
num_footpoints = 20
lat = np.linspace(-0.4*np.pi, 0.5*np.pi, num_footpoints, endpoint=False)
lon = np.linspace(-0.5*np.pi, 1.5*np.pi, num_footpoints+10, endpoint=False)

lat, lon = np.meshgrid(lat, lon, indexing='ij')
lat, lon = lat.ravel()*u.rad, lon.ravel()*u.rad
r = 1.2*const.radius

# Make a 2D grid from these 1D points 
seeds = SkyCoord(lon, lat, r, frame=pfss_out.coordinate_frame)
tracer = tracing.FortranTracer()
flines = tracer.trace(seeds, pfss_out)

## AIA Map

In [None]:
aia_193_map_object = load_aia(start='2024-05-14T17:40:00', end='2024-05-14T17:41:00', channel=193)
print(len(aia_193_map_object))

In [None]:
aia_map = aia_193_map_object[0]
aia_map.plot_settings['norm'] = colors.Normalize(vmin=0, vmax=1500)

fig = plt.figure()
ax = fig.add_subplot(111, projection=aia_map)
img = aia_map.plot(axes=ax)
plt.colorbar(img, shrink=0.8, pad=0.02)
ax.grid(False)
fig.tight_layout()
plt.show()

In [None]:
# Map of the footpoints
fig = plt.figure()
ax = plt.subplot(projection=aia_map)
aia_map.plot(axes=ax, cmap='gray')
ax.plot_coord(seeds, color='red', marker='o', markersize=4, linewidth=0)
ax.grid(False)
fig.tight_layout()
plt.show()

In [None]:
# Map of the footpoints
fig = plt.figure()
ax = plt.subplot(projection=aia_map)
aia_map.plot(axes=ax, cmap='gray_r')
ax.plot_coord(seeds, color='dodgerblue', marker='o', markersize=4, linewidth=0)
ax.grid(False)
fig.tight_layout()
plt.show()

In [None]:
fig = plt.figure(figsize=[10,5])
ax = plt.subplot(111, projection=hmi_map)
im = hmi_map.plot(axes=ax, cmap='seismic')
ax.plot_coord(seeds, color='black', marker='o', linewidth=0, markersize=2)

ax.coords[0].set_axislabel('Carrington Longitude [deg]')
ax.coords[1].set_axislabel('Latitude [deg]')

ax.coords.grid(color='black', alpha=0.6, linestyle='dotted', linewidth=0.5)

cb = plt.colorbar(im, fraction=0.0195, pad=0.01)
cb.set_label(f"Radial magnetic field [{syn_map.unit}]")

# In order to make the x-axis ticks show, the bottom y-limit has to be adjusted slightly
ax.set_ylim(bottom=0)
ax.set_title(f"Resampled {hmi_map.meta['content']}, CR: {hmi_map.meta['CAR_ROT']}")
fig.tight_layout()
plt.show()

In [None]:
fig = plt.figure()
ax = plt.subplot(111, projection=aia_map)
aia_map.plot(axes=ax, cmap='gray_r')

with tqdm(total=len(flines), desc='Plotting the field lines') as pbar:
    for fline in flines:
        ax.plot_coord(fline.coords, alpha=0.3, color='purple', linewidth=1)
        pbar.update(1)

ax.set_xlim(left=0, right=int(aia_map.dimensions.x.value))
ax.set_ylim(bottom=0, top=int(aia_map.dimensions.y.value))
fig.tight_layout()
plt.show()

## LASCO C2

In [None]:
lasco_c2_map_objects = load_lasco(start='2024-05-14T17:30:00', end='2024-05-14T20:00:00', detector='C2')

In [None]:
# remove redundant images
clean_maps_c2 = remove_redundant_maps(lasco_c2_map_objects)

print(f'Before:\nLASCO C2: {len(lasco_c2_map_objects)} images\n')
print(f'After:\nLASCO C2: {len(clean_maps_c2)} images')

In [None]:
m_c2 = clean_maps_c2[4]

fig = plt.figure()
ax = fig.add_subplot(111, projection=m_c2)
img = m_c2.plot(axes=ax)
m_c2.draw_limb()
ax.grid(False)
plt.colorbar(img, pad=0.02, shrink=0.8)
fig.tight_layout()
plt.show()

In [None]:
# make run-diff maps
m_seq_runratio_c2 = apply_runratio(clean_maps_c2)

In [None]:
m_c2_rr = m_seq_runratio_c2[3]

fig = plt.figure()
ax = fig.add_subplot(111, projection=m_c2_rr)
m_c2_rr.plot(axes=ax)
m_c2_rr.draw_limb()

with tqdm(total=len(flines), desc='Plotting the field lines') as pbar:
    for fline in flines:
        # color = {0: 'black', -1: 'tab:blue', 1: 'tab:red'}.get(fline.polarity)
        color = {0:'black', -1:'blue', 1:'red'}.get(fline.polarity)
        ax.plot_coord(fline.coords, alpha=0.5, color=color, linewidth=1)
        pbar.update(1)

ax.grid(False)
ax.set_xlim(left=0, right=int(m_c2_rr.dimensions.x.value))
ax.set_ylim(bottom=0, top=int(m_c2_rr.dimensions.y.value))
fig.tight_layout()
plt.show()