In [None]:
import astropy
import astropy.io.fits as fits
import matplotlib.pyplot as plt
import os
from pathlib import Path
from glob import glob
import re
import logging
from rich.progress import Progress
import numpy as np
from sunpy.map import Map
import pandas as pd
from datetime import datetime, date, timedelta
import sscws
from sunpy.net import Fido, attrs as a
from functools import reduce

from matplotlib.colors import LogNorm, PowerNorm, Normalize
from tqdm.notebook import tqdm
#from tqdm import tqdm

import warnings
import cv2 # pip install opencv-python

from sunpy.map.maputils import all_coordinates_from_map
import astropy.units as u
from astropy.coordinates import SkyCoord

### Experiments Week 6

In [None]:
# Plot before and after processing
# for 2014 event

In [None]:
def generate_image(m, ref_map, img_fname, vmin=0, vmax=20):
    """
    for single image
    """
    # m = Map(f)

    pixel_coords = all_coordinates_from_map(m)
    solar_center = SkyCoord(0 * u.deg, 0 * u.deg, frame=m.coordinate_frame)
    pixel_radii = np.sqrt(
        (pixel_coords.Tx - solar_center.Tx) ** 2
        + (pixel_coords.Ty - solar_center.Ty) ** 2
    )
    # r2 masking
    mask = 1 - ((pixel_radii / pixel_radii.max()) ** 2) * 0.5
    mask = mask.value
    mask[pixel_radii.value >= 0.9 * pixel_coords.Tx.max().value] = np.nan

    data = ((m.data / m.exposure_time.value) - ref_map) / mask

    # imshow is mirror to m.plot
    plt.imshow(data, origin="lower", cmap="stereocor2", vmin=vmin, vmax=vmax)
    plt.savefig(img_fname, dpi=100)
    plt.show()

    #plt.savefig(img_fname, dpi=100)
    #plt.close()

def generate_ref_data(filenames, polar_angle):
    """
    generate background reference image to be subtracted
    """

    ref_data = []
    for i in filenames[:15]:  # range(len(filenames)):
        # angle = fits.getheader(filenames[i])['POLAR']
        m = Map(i)
        angle = m.meta["POLAR"]
        # print(angle)
        if angle == polar_angle:
            ref_data.append(m.data / m.exposure_time.value)

    ref_map = np.mean(ref_data, axis=0)

    return ref_map

In [None]:
eventdate = "20140222"
instrument = "cor2"  #'cor1'
satellite = "a"  # "b"

base_path = "/mnt/onboard_data/visualization/cme_video_{}_{}/".format(
    instrument, satellite
)

fnames = sorted(
    glob(
        "/mnt/onboard_data/data/{}/{}_*_n*{}.fts".format(
            instrument, eventdate, satellite
        )
    )
)

print(len(fnames))

In [None]:
#fname = fnames[45] # 0, 45, 51
fname = fnames[51]

print(fname)
print(Map(fname).meta["POLAR"])

req_polar = int(Map(fname).meta["POLAR"])

In [None]:
# processed

img_fname = os.path.join(
                    "./",
                    "processed_" + os.path.basename(fname).replace(".fts", ".jpg"),
                )

ref_map = generate_ref_data(fnames, req_polar)
generate_image(Map(fname), ref_map, img_fname=img_fname, vmin=0, vmax=20)

In [None]:
# unprocessed

img_fname = os.path.join(
                    "./",
                    "unprocessed_" + os.path.basename(fname).replace(".fts", ".jpg"),
                )

plt.imshow(Map(fname).data, origin="lower")# cmap="stereocor2", vmin=vmin, vmax=vmax)
plt.savefig(img_fname, dpi=100)
plt.show()

### Experiments Week 5

In [None]:
base_path = '/mnt/onboard_data/visualization/cme_video_1/'
os.makedirs(base_path, exist_ok=True)

In [None]:
fnames_a = sorted(glob("/mnt/onboard_data/data/cor2/20140222_*_n*a.fts"))
fnames_b = sorted(glob("/mnt/onboard_data/data/cor2/20140222_*_n*b.fts"))

print(len(fnames_a), len(fnames_b))

In [None]:
f = fnames_a[0]
m = Map(f)
m.rotate(recenter=True)

In [None]:
for f in fnames_a:
    print(f)

In [None]:
warnings.simplefilter('ignore')
for f in tqdm(fnames_a):
    # plot map to jpg
    m = Map(f)
    m = Map(np.log(m.data / m.exposure_time.value), m.meta)
    #print('VALUE RANGE', m.data.min(), m.data.max())
    #print(m.exposure_time)
    plt.figure(figsize=(4, 4))
    m.plot(norm=Normalize(vmin=4, vmax=7))
    plt.savefig(os.path.join(base_path, 'cor2sa_' + os.path.basename(f).replace('.fts', '.jpg')), dpi=100)
    plt.close()

In [None]:
def generate_ref_data(filenames, polar_angle):

    ref_data = []
    for i in filenames[:15]:#range(len(filenames)):
        #angle = fits.getheader(filenames[i])['POLAR']
        m= Map(i)
        angle = m.meta['POLAR'] 
        #print(angle)
        if angle == polar_angle: 
            ref_data.append( m.data / m.exposure_time.value )

    ref_map = np.mean(ref_data, axis=0)

    return ref_map
#generate_ref_data(fnames_a, 0)

In [None]:
fits.getheader(fnames_a[0])

In [None]:
m0 = Map(fnames_a[0])
print(fnames_a[0])
m1 = Map(fnames_a[1])
print(fnames_a[1])

plt.imshow(m0.data)

In [None]:
plt.imshow(m1.data)

In [None]:
lens = []
for i in ref_data:
    lens.append(i.shape)

In [None]:
np.unique(lens, return_counts=True)

In [None]:
ref_map.shape

In [None]:
plt.imshow(np.sqrt(ref_map), origin="lower", cmap="stereocor2")
plt.show()

In [None]:
f = fnames_a[42]
print(f)

m = Map(f)

pixel_coords = all_coordinates_from_map(m)
solar_center = SkyCoord(0*u.deg, 0*u.deg, frame=m.coordinate_frame)
pixel_radii = np.sqrt((pixel_coords.Tx-solar_center.Tx)**2 +
                      (pixel_coords.Ty-solar_center.Ty)**2)
# r2 masking
mask = 1 - ((pixel_radii / pixel_radii.max()) ** 2)*0.5
mask = mask.value
mask[pixel_radii.value >= 0.9 * pixel_coords.Tx.max().value] = np.nan

In [None]:
#m = Map(np.log(m.data / m.exposure_time.value / mask), m.meta)
#m.plot(norm=Normalize())

#data = np.log( (m.data - ref_map.data) / m.exposure_time.value / mask) # 6, 7
data = ( (m.data - ref_map)  / mask) # 20, 45 # /m.exposure_time.value
plt.imshow(data, origin="lower", cmap="stereocor2", vmin=0, vmax=300) # mirror to m.plot

plt.show()

print(np.nanmin(data), np.nanmax(data), data.min(), data.max())

In [None]:
plt.imshow(mask)
plt.colorbar()
plt.show()

In [None]:
m.data.min(), m.data.max(), m.exposure_time

### Generate video

https://github.com/FrontierDevelopmentLab/2023-europe-space-weather/blob/ground/sunerf/evaluation/video_cme.py

In [None]:
base_path='/mnt/onboard_data/visualization/cme_video_1'

In [None]:
angles = []; observatories=[]
for f in tqdm(fnames_a[:4]):
    header = fits.getheader(f)
    angle = header['CROTA']
    obs = header['OBSRVTRY']
    angles.append(angle)
    observatories.append(obs)

In [None]:
fits.getheader(fnames_a[0])

In [None]:
fits.getheader(fnames_a[1])

In [None]:
print(np.min(angles))
print(np.max(angles))

In [None]:
print(np.unique(observatories))

In [None]:
exp_size = 2216
req_polars = [0, 120, 240]
for i in range(0,len(req_polars)):
    req_polar = req_polars[i]
    path_polar = base_path + '/'+str(req_polar)
    ref_map = generate_ref_data(fnames_a, req_polar)
    if not os.path.exists(path_polar):
        os.makedirs(path_polar)
    warnings.simplefilter('ignore')
    for f in tqdm(fnames_a):
        m = Map(f)
        angle = m.meta['POLAR']
        if angle == req_polar:
            pitch = m.meta['SC_PITCH']
            yaw = m.meta['SC_YAW'] 
            #m = m0.rotate()#angle=angle*u.deg)#,recenter=True) # CROTA

            pixel_coords = all_coordinates_from_map(m)
            solar_center = SkyCoord(0*u.deg, 0*u.deg, frame=m.coordinate_frame)
            pixel_radii = np.sqrt((pixel_coords.Tx-solar_center.Tx)**2 +
                                (pixel_coords.Ty-solar_center.Ty)**2)
            # r2 masking
            mask = 1 - ((pixel_radii / pixel_radii.max()) ** 2)*0.5
            mask = mask.value
            mask[pixel_radii.value >= 0.9 * pixel_coords.Tx.max().value] = np.nan


    
            #data = np.log(m.data)
            #data = np.nan_to_num(data, nan=10, neginf=10)
            data =((m.data/m.exposure_time.value)-ref_map)/ mask
            print(np.nanmin(data), np.nanmax(data))
            print('ploar: ' + str(angle))
            print('pitch: '+ str(pitch))
            print('yaw: '+ str(yaw))
        # data = ( (d - ref_map)  / maski) # 20, 45 # /m.exposure_time.value

            #plt.figure(figsize=(4, 4))
            plt.imshow(data, origin="lower", cmap="stereocor2", vmin=0,vmax=20) # mirror to m.plot

            plt.savefig(os.path.join(path_polar, 'cor2sa_' + os.path.basename(f).replace('.fts', '.jpg')), dpi=100)
            plt.close()
    
    video_path = path_polar

    video_name = os.path.join(video_path, 'video_'+str(req_polar)+'.mp4')

    images = sorted(glob(os.path.join(video_path, '*.jpg')))
    frame = cv2.imread(images[0])
    height, width, layers = frame.shape

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    video = cv2.VideoWriter(video_name, fourcc, 10, (width,height)) # can reduce frames per second here

    for image in images:
        video.write(cv2.imread(image))

    cv2.destroyAllWindows()
    video.release()

In [None]:
def fgenerate_video(video_path, video_name, framerate=10):
    """
    warning: if no write permissions it still pretends to work
    """    
    images = sorted(glob(os.path.join(video_path, '*.jpg')))
    print(images)
    frame = cv2.imread(images[0])
    height, width, layers = frame.shape

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    video = cv2.VideoWriter(video_name, fourcc, framerate, (width,height)) # can reduce frames per second here

    for image in images:
        video.write(cv2.imread(image))

    cv2.destroyAllWindows()
    video.release()

In [None]:
base_path

In [None]:
req_polars = [0, 120, 240]
for i in range(0,len(req_polars)):
    req_polar = req_polars[i]
    path_polar = base_path + str(req_polar)

    video_path = path_polar
    video_name = os.path.join(video_path, 'video_'+str(req_polar)+'.mp4')

    fgenerate_video(video_path, video_name, framerate=5)

In [None]:
plt.close()
data =((m.data/m.exposure_time.value)-ref_map)/ mask
m_norm = m.data/m.exposure_time.value
plt.imshow(data, origin="lower", cmap="stereocor2", vmin=0,vmax=20) 
print(np.nanmin(data))
print(np.nanmax(data))

In [None]:
plt.imshow(m_norm-ref_map, origin="lower")#, cmap="stereocor2")#, vmin=-300,vmax=300) 
print(np.nanmin(m_norm-ref_map))
print(np.nanmax(m_norm-ref_map))

In [None]:
plt.imshow(ref_map)

In [None]:
i

In [None]:
test_img = cv2.imread( os.path.join(base_path, "cor2sa_20140222_150915_n4c2a.jpg") )
# img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
test_img = test_img[...,::-1] # reverse red and blue channels
#plt.close()
plt.imshow(test_img)
plt.axis('off')
plt.show()

In [None]:
video_path = path_polar

video_name = os.path.join(video_path, 'video_'+str(req_polar)+'.mp4')

images = sorted(glob(os.path.join(video_path, '*.jpg')))
frame = cv2.imread(images[0])
height, width, layers = frame.shape

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
video = cv2.VideoWriter(video_name, fourcc, 10, (width,height))

for image in images:
    video.write(cv2.imread(image))

cv2.destroyAllWindows()
video.release()

In [None]:
images

### Get 3D tensor from sequence

3D array of pixel values and timesteps [x, y, t]

In [None]:
def fget_seq_tensor(fnames):
    """
    # given sequence (list of filenames)
    # extract time (from filename or from meta)
    # extract pixels
    # combine in 3d tensor
    """
    pixels = []
    obs_ts = [] # tensors can't be strings

    for f in tqdm(fnames):
        m = Map(f)

        data = m.data # (np.log(data) - v_min) / (v_max - v_min)
        data = data.astype(np.float32)
        t = m.date.datetime #meta["date-obs"]
        pixels.append(data)
        obs_ts.append(t)
    return pixels, obs_ts

In [None]:
fnames = sorted(fnames_a)[:4]
pixels, obs_t = fget_seq_tensor(fnames)

In [None]:
import torch

In [None]:
torch.stack(pixels)

In [None]:
pixels[0]

In [None]:
pixels[0].shape

In [None]:
pixels[0][0]

In [None]:
pixels[0][1]

In [None]:
pixels[0][0][2]