In [83]:
import os
import asdf
import io
import requests
import argparse
from datetime import datetime, timedelta
from sunpy.net import Fido, attrs as a
from astropy.io import fits
import astropy.units as u
import numpy as np
import pandas as pd
from pandas import json_normalize
import matplotlib.pyplot as plt
from PIL import Image
import json
import sunpy.map
from sunpy.visualization.colormaps import color_tables as ct
import boto3
from dateutil import parser as dtparser
import os
import matplotlib.patches as patches
import requests
from bs4 import BeautifulSoup
import pandas as pd
from datetime import datetime
import asdf
import time
import yaml
import shutil
from collections import OrderedDict

# The amount of storage needed for the ASDF files required an external hard drive
EXTERNAL_DRIVE_BASE = "/Volumes/Elements/HelioData"

## Fetching Solar Event Metadata from Helioviewer (HEK and DONKI)

This section describes how solar event metadata is programmatically retrieved from the Helioviewer API using time-stamped observations of solar activity.

Two types of event catalogs are available through the Helioviewer system:

- **HEK (Heliophysics Event Knowledgebase):** A well-established NASA archive that catalogs solar events such as flares, active regions, and coronal mass ejections, primarily annotated by expert scientists and automated pipelines.
- **DONKI (Database Of Notifications, Knowledge, and Information):** A real-time event catalog produced by the NASA CCMC (Community Coordinated Modeling Center), which includes operational alerts and model-driven forecasts for solar activity.

For this study, **only the HEK metadata is used** to label and describe the solar images and to generate bounding boxes for flare-related activity. This decision is based on HEK’s consistency, scientific reliability, and widespread usage in heliophysics.

The **DONKI data is retained** and saved alongside the FITS and HEK-derived annotations for potential **future use**. This allows flexibility for:
- Comparative analysis with model-based events,
- Exploration of operational forecasting potential,
- Reuse by other researchers interested in real-time alerts.

Both datasets are queried using short time windows (default: 15 minutes) centered on each FITS image’s timestamp. Metadata is saved locally in JSON format for transparency, reproducibility, and long-term archival.


In [84]:
def fetch_donki_events(timestamp, filename, duration_minutes=15):
    url = "https://api.helioviewer.org/v2/events/"

    start_time_str = timestamp.strftime("%Y-%m-%dT%H:%M:%SZ")
    end_time = timestamp + timedelta(minutes=duration_minutes)
    end_time_str = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")

    params = {
        "startTime": start_time_str,
        "endTime": end_time_str,
        "sources": "DONKI"
    }

    response = requests.get(url, params=params)
    if response.status_code == 200:
        data = response.json()
        with open(filename, "w") as f:
            json.dump(data, f, indent=2)
        print(f"DONKI data saved to {filename} for {start_time_str}")
        return data
    else:
        print(f"Failed to fetch HEK events for {start_time_str}")
        return []


def fetch_hek_events(timestamp, filename, duration_minutes=15):
    url = "https://api.helioviewer.org/v2/events/"

    start_time_str = timestamp.strftime("%Y-%m-%dT%H:%M:%SZ")
    end_time = timestamp + timedelta(minutes=duration_minutes)
    end_time_str = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")

    params = {
        "startTime": start_time_str,
        "endTime": end_time_str,
        "sources": "HEK"
    }

    response = requests.get(url, params=params)
    if response.status_code == 200:
        data = response.json()
        with open(filename, "w") as f:
            json.dump(data, f, indent=2)
        print(f"HEK data saved to {filename} for {start_time_str}")
        return data
    else:
        print(f"Failed to fetch HEK events for {start_time_str}")
        return []

In [85]:
def save_to_local(local_path, bucket, s3_key):
    """Simulates S3 upload by saving to a local directory using the same structure."""
    # Convert bucket name into a base local directory
    base_dir = os.path.join(".", bucket)
    full_path = os.path.join(base_dir, s3_key)

    # Ensure the directory exists
    os.makedirs(os.path.dirname(full_path), exist_ok=True)

    # Copy the file to the new location
    with open(local_path, 'rb') as src_file:
        with open(full_path, 'wb') as dst_file:
            dst_file.write(src_file.read())

    print(f"Saved locally to {full_path}")


def save_to_s3(local_path, bucket, s3_key):
    s3 = boto3.client("s3")
    print(f"Uploading to s3://{bucket}/{s3_key}...")
    s3.upload_file(local_path, bucket, s3_key)

## Adding ML-Compatible Labels to ASDF Files

Each FITS image is paired with a machine learning label indicating whether it corresponds to a solar flare (1) or a non-flaring region (0). These labels are stored in a pre-generated CSV file located at:

```
data_selection/image_labels.csv
```

This file contains two columns: `filename` (which includes the full FITS filename) and `label` (either 0 or 1). During the ASDF build process, the function performs the following:

- **Loads the CSV file into memory** and standardizes formatting by stripping extra spaces and converting all filenames to strings.
- **Constructs a normalized filename** based on the current FITS file being processed. This ensures a direct match with entries in the `image_labels.csv` file.
- **Searches for the label** corresponding to the FITS filename. If found, the label is inserted under the `meta` section of the ASDF tree using the key `image_label`.
- If the label is **not found**, a warning is printed, and `image_label` is set to `null` (i.e., `None` in Python) in the ASDF structure.

This design supports traceable and reproducible machine learning workflows, allowing the ASDF file to encapsulate not only observational and scientific metadata, but also a direct classification label for supervised training or evaluation.

Labels are critical to ensure that downstream CNN models can learn the difference between flare and non-flare imagery without needing to perform additional external joins or lookups.


In [86]:
# labels_df = pd.read_csv("data_selection/image_labels.csv")
EXTERNAL_DRIVE_BASE = "/Volumes/Elements/HelioData"

def build_and_save_asdf(fname, hdul, hek_data, donki_data, image_paths, bucket, base_key, rootPath):
    tree = OrderedDict()
    
    
    labels_df = pd.read_csv("data_selection/image_labels.csv")
    labels_df['filename'] = labels_df['filename'].astype(str).str.strip()

    # normalized_fname = fname.replace(".asdf", "").replace(".fits", "").strip()
    normalized_fname = f"{fname}.fits".strip() if not fname.endswith(".fits") else fname.strip()


    label_row = labels_df[labels_df["filename"] == normalized_fname]

    if not label_row.empty:
        image_label = int(label_row["label"].values[0])
    else:
        print(f"Label not found for: {normalized_fname}")
        image_label = None


    tree["meta"] = {
        "creator": "Dr. India R Jackson",
        "institution": "Georgia State University",
        "nsf_award": "AGS-PRF #2444918",
        "description": "ASDF archive for SDO image, HEK, and DONKI data with ML-ready FITS structure.",
        "pipeline_version": "1.0.0",
        "script": "HelioConverter",
        "pipeline_timestamp": datetime.utcnow().isoformat() + "Z",
        "image_label": image_label
    }

    tree["fits"] = {
        "source_file": f"s3://{bucket}/{base_key}/fits/raw/{fname}.fits",
        "num_hdus": len(hdul),
        "hdu0": {
            "header": full_header_to_dict(hdul[0].header),
            "data": hdul[0].data.tolist() if hdul[0].data is not None else None
        },
        "hdu1": {
            "header": full_header_to_dict(hdul[1].header) if len(hdul) > 1 else None,
            "data": hdul[1].data.tolist() if len(hdul) > 1 and hdul[1].data is not None else None
        }
    }

    tree["hek"] = hek_data
    # tree["donki"] = donki_data
    tree["images"] = image_paths
    
    # asdf_path = os.path.join(rootPath, f"{fname}.asdf")
    
    # asdf_dir = os.path.join("final_fits_1", "asdf")
    asdf_dir = os.path.join(EXTERNAL_DRIVE_BASE, "asdf")
    os.makedirs(asdf_dir, exist_ok=True)
    asdf_path = os.path.join(asdf_dir, f"{fname}.asdf")

    
    af = asdf.AsdfFile(tree)
    af.write_to(asdf_path, all_array_compression='zlib')
    print(f"ASDF saved locally: {asdf_path}")
    
    '''
    asdf_s3_key = f"{base_key}/asdf/{fname}.asdf"
    save_to_s3(asdf_path, bucket, asdf_s3_key)
    '''

## Converting FITS Headers into Serializable Dictionaries

The `full_header_to_dict` function is used to convert a FITS Header object into a fully serializable Python dictionary that can be saved in formats such as JSON or included in ASDF files. This is a critical step when preparing astronomical data for archival, sharing, or machine learning workflows.

### Key Features:

- **Preserves COMMENT and HISTORY Cards**: These are metadata lines typically used to describe data provenance and processing history. The function captures them as lists of strings to preserve their order and meaning.
  
- **Handles Undefined or Problematic Values**: Some FITS header values may be of type `Undefined` (from `astropy.io.fits.card.Undefined`) or may not serialize well. These are either replaced with `None` or converted to strings.

- **Skips Blank or Unusable Keywords**: FITS headers may include empty keywords or corrupted entries. The function skips these to avoid polluting the output.

- **Ensures JSON Compatibility**: All values in the resulting dictionary are converted into basic Python types (e.g., `int`, `float`, `str`, `None`) that are fully compatible with JSON serialization or inclusion in other data formats like ASDF.

This function is invoked during the ASDF file creation to ensure the full contents of the FITS header are embedded in a machine-readable, lossless format — enabling future interpretability, transparency, and reproducibility of the data.


In [87]:
from astropy.io.fits.card import Undefined

def full_header_to_dict(header):
    """
    Converts a FITS Header to a fully serializable dictionary,
    including COMMENT and HISTORY cards as lists of strings.
    Skips or replaces non-serializable values like Undefined.
    """
    header_dict = {}

    for card in header.cards:
        key = card.keyword
        val = card.value

        # Skip blank keys
        if key == "":
            continue

        # Handle COMMENT and HISTORY
        if key == "COMMENT":
            header_dict.setdefault("COMMENT", []).append(str(val))
        elif key == "HISTORY":
            header_dict.setdefault("HISTORY", []).append(str(val))
        elif isinstance(val, Undefined):
            header_dict[key] = None  # or skip with `continue`
        else:
            try:
                header_dict[key] = val
            except Exception:
                header_dict[key] = str(val)

    return header_dict

## FITS File Processing and ASDF Construction

The `process_fits_info` function is responsible for parsing raw FITS files and converting them into a structured, analysis-ready format. It plays a central role in preparing solar image data for downstream machine learning workflows.

### Step-by-Step Function Overview

- **Filename Parsing & FITS Loading**: Each FITS file is opened and verified using `astropy.io.fits`. The function automatically selects the appropriate HDU for processing (either primary or secondary based on availability of image data).

- **Timestamp Extraction**: The observation timestamp is extracted from the FITS header using `T_OBS`, `DATE-OBS`, or `TIME-OBS` keywords. This timestamp is critical for time-based querying of solar event data.

- **Storage Path Construction**: A unique path is constructed using the instrument name, wavelength, and timestamp, which is used to logically organize data and generate S3-style URLs for metadata purposes (even though everything is saved locally).

- **Header & Data Conversion**:
  - The FITS header is converted to both `.json` and `.txt` formats.
  - The image data is saved in `.csv` and `.npy` formats for tabular and binary use cases.
  - All outputs are saved under `final_fits_1/fits/{header|data}`.

- **Image Generation**:
  - A log-scaled, grayscale-stretched version of the FITS image is generated and saved as `.png` and `.jp2`.
  - This aids both visualization and ML preprocessing.

- **Event Metadata Extraction**:
  - HEK (Heliophysics Event Knowledgebase) data is retrieved based on the observation time and saved as JSON files.
  - DONKI data retrieval is commented out for this study but can be enabled for future analyses.

- **ASDF File Creation**:
  - The processed FITS data, HEK event metadata, image paths, and pre-assigned image labels are passed to the `build_and_save_asdf` function to generate a complete, portable `.asdf` archive.
  - These ASDF files include all critical solar observation and annotation information for each image, suitable for model training or further scientific analysis.

- **Local Storage**: All outputs—including headers, data, images, HEK metadata, and ASDF files—are saved locally to directories under `final_fits_1/`, without uploading to S3 during this stage.

This function is a core utility for transforming observational solar physics data into a unified, structured, and AI/ML-ready format.


In [88]:
def process_fits_info(rootPath, filePath, bucket, spacecraft, instrument, wavelength, detector):
    for file_path in filePath:
        fname = os.path.basename(file_path).replace(".fits", "")
        print(f"\n Processing: {fname}")
        hdul = fits.open(file_path)
        hdul.verify('fix')
        hdu = hdul[1] if len(hdul) > 1 and hdul[1].data is not None else hdul[0]

        data = hdu.data
        header = hdu.header

        # Get observed datetime
        if "T_OBS" in header:
            obs_time = header["T_OBS"].rstrip("Z")
        elif "DATE-OBS" in header and "T" in header["DATE-OBS"]:
            obs_time = header["DATE-OBS"]
        elif "DATE-OBS" in header and "TIME-OBS" in header:
            obs_time = header["DATE-OBS"] + "T" + header["TIME-OBS"]
        else:
            obs_time = None

        dt_obs = dtparser.parse(obs_time)

        # Build directory structure
        if spacecraft == "SDO":
            base_key = f"{instrument}/{wavelength}/{dt_obs.strftime('%Y/%m/%d/%H/%M')}"
        else:  # SOHO
            base_key = f"{instrument}/{detector}/{dt_obs.strftime('%Y/%m/%d/%H/%M')}"

        # Header/Data Conversion
        header_dict = {
            card.keyword: card.value
            for card in header.cards
            if card.keyword and not isinstance(card.value, Undefined)
        }

        formats = [
            ("header", "fits/header", [
                ("json", lambda: json.dumps(header_dict, indent=2)),
                ("txt", lambda: "\n".join([f"{k} = {v}" for k, v in header_dict.items()]))
            ]),
            ("data", "fits/data", [
                ("csv", lambda: pd.DataFrame(data).to_csv(index=False)),
                ("npy", lambda: data)
            ])
        ]
        

        for category, folder, files in formats:
            # local_out_dir = os.path.join("final_fits_1", folder)
            local_out_dir = os.path.join(EXTERNAL_DRIVE_BASE, folder)
            os.makedirs(local_out_dir, exist_ok=True)

            for ext, content_fn in files:
                local_out_path = os.path.join(local_out_dir, f"{fname}_{category}.{ext}")

                if ext == "npy":
                    np.save(local_out_path, content_fn())
                else:
                    with open(local_out_path, "w") as f:
                        f.write(content_fn())

                print(f"Saved locally to {local_out_path}")

        # Image (log stretch)
        data_clipped = np.clip(data, a_min=1e-3, a_max=None)
        data_log = np.log10(data_clipped)
        data_norm = 255 * (data_log - np.nanmin(data_log)) / (np.nanmax(data_log) - np.nanmin(data_log))
        data_uint8 = np.nan_to_num(data_norm).astype(np.uint8)

        # img_out_dir = os.path.join("final_fits_1", "images")
        
        img_out_dir = os.path.join(EXTERNAL_DRIVE_BASE, "images")
        os.makedirs(img_out_dir, exist_ok=True)

        for kind in ["log_stretch"]:
            plt.figure(figsize=(8, 8))
            plt.imshow(data_log, cmap='gray', origin='lower')
            plt.axis('off')
            plt.tight_layout()

            # for ext in ["png", "jpg", "jpeg"]:
            for ext in ["png"]:
                img_path = os.path.join(img_out_dir, f"{fname}_{kind}.{ext}")
                plt.savefig(img_path, dpi=300)
                print(f"Saved image: {img_path}")

            # Save JP2 separately
            jp2_path = os.path.join(img_out_dir, f"{fname}_{kind}.jp2")
            img = Image.fromarray(data_uint8)
            img.save(jp2_path, format="JPEG2000")
            print(f"Saved JP2 image: {jp2_path}")

            plt.close()
        
        # Fetch and dump raw HEK and DONKI data
        # hek_json_path = os.path.join("final_fits_1", "hek", f"{fname}_hek.json")
        hek_json_path = os.path.join(EXTERNAL_DRIVE_BASE, "hek", f"{fname}_hek.json")
        
        # donki_json_path = os.path.join("final_fits_1", "donki", f"{fname}_donki.json")
        os.makedirs(os.path.dirname(hek_json_path), exist_ok=True)
        # os.makedirs(os.path.dirname(donki_json_path), exist_ok=True)

        hek_data = fetch_hek_events(dt_obs, hek_json_path)
        # donki_data = fetch_donki_events(dt_obs, donki_json_path)

        if hek_data:
            print(f"HEK data saved to {hek_json_path} for {dt_obs}")

        image_paths = [
            f"https://{bucket}.s3.us-east-1.amazonaws.com/{base_key}/images/{fname}_log_stretch.{ext}"
            for ext in ["png", "jp2"]
        ]

        build_and_save_asdf(
            fname=fname,
            hdul=hdul,
            hek_data=hek_data,
            donki_data=None,
            image_paths=image_paths,
            bucket=bucket,
            base_key=base_key,
            rootPath=rootPath
        )

        hdul.close()


## Batch Processing of Local FITS Files

The `process_local_fits_directory` function serves as the main entry point for initiating batch processing of all FITS files stored locally in a specified directory. It prepares the necessary inputs and passes them to the `process_fits_info` function for full pipeline execution.

### Function Purpose

This function scans a local directory for `.fits` files and orchestrates the processing of each file through the FITS-to-ASDF pipeline. While the function includes references to S3-compatible paths (to simulate cloud-based organization), all operations are performed and stored locally.

### Key Features

- **Directory Scanning**: It automatically detects and lists all `.fits` files in the specified directory.
- **Validation**: If no FITS files are found, the function halts and prints a warning.
- **Dynamic Configuration**:
  - Chooses the appropriate mock S3 bucket name (`helioconvert-sdo` or `helioconvert-soho`) based on the selected spacecraft.
  - Accepts instrument, wavelength, and optional detector information to construct the appropriate processing pipeline.
- **Pipeline Trigger**: Once the file list is prepared, it calls the `process_fits_info` function, which performs:
  - Header and data extraction
  - Image generation
  - HEK event querying
  - ASDF archive creation

### Note

No actual uploading to Amazon S3 occurs in this stage. Bucket names are used to maintain consistency in file naming, organization, and potential future migration to cloud storage.

This function allows for streamlined, large-scale data ingestion and transformation for any directory of pre-renamed FITS files.


In [None]:
def process_local_fits_directory(local_dir, spacecraft, instrument, wavelength=None, detector=None):
    """
    Process all renamed FITS files in a local directory and upload to S3.
    """
    bucket = "helioconvert-sdo" if spacecraft.upper() == "SDO" else "helioconvert-soho"

    # 🔍 Grab all FITS files
    file_list = [os.path.join(local_dir, f) for f in os.listdir(local_dir) if f.endswith(".fits")]
    
    if not file_list:
        print("❌ No FITS files found in the specified directory.")
        return

    print(f"📂 Found {len(file_list)} FITS files in '{local_dir}'.")

    process_fits_info(local_dir, file_list, bucket, spacecraft, instrument, wavelength, detector)


process_local_fits_directory(
    local_dir="data_selection/fits/raw",
    spacecraft="SDO",
    instrument="aia",
    wavelength="131"
)