__author__ = 'Armando Collado-Villaverde'
__email__ = 'armando.collado@uah.es'

# Download the SYM-H storms dataset

Downloads the whole timeline for the SYM-H index which will be used in the `1-set-expansion.ipynb` notebook, the ACE MAG and SWEPAM datasets and the SYM-H for the selected storms. The data will be downloaded from NASA's CDAWeb using the cdasws package. The specific datasets are as follows:

* The data for the SYM-H index will be downloaded from the `OMNI_HRO_5MIN` dataset
* For the ACE MAG datasets:
    * For the definitive data we use the `AC_H0_MFI`
    * For the provisional data we use the `AC_K1_MFI`
* For the ACE SWEPAM datasets:
    * For the definitive data we use the `AC_H0_SWE`
    * For the provisional data we use the `AC_K0_SWE`

First the raw data will be downloaded in saved: [Download and save raw data](#section_1)
Then the data from ACE will be grouped in 5 minute averages, they are calculated with right-closed intervals, then the data is merged with the index and saved for later use: [Preprocess and save data](#section_2)

The used directories are specified in the cell below

In [1]:
import os
import datetime
import utils
from cdasws import CdasWs
import pandas as pd
from cdasws.datarepresentation import DataRepresentation as dr
import numpy as np
import storm_dates
import warnings



In [9]:
download_base_path = "./data/"
preprocessed_data_path = os.path.join(download_base_path, 'preprocessed_data')
cdas = CdasWs()

train_data_path = os.path.join(download_base_path, 'sym_h_storms/train_storms')
val_data_path = os.path.join(download_base_path, 'sym_h_storms/validation_storms')
test_data_path = os.path.join(download_base_path, 'sym_h_storms/test_storms')
test_key_data_path = os.path.join(download_base_path, 'sym_h_storms/test_key_storms')

train_data_path_preprocessed = os.path.join(preprocessed_data_path, 'train_storms')
val_data_path_preprocessed = os.path.join(preprocessed_data_path, 'validation_storms')
test_data_path_preprocessed = os.path.join(preprocessed_data_path, 'test_storms')
test_key_data_path_preprocessed = os.path.join(preprocessed_data_path, 'test_key_storms')

train_data_path_preprocessed = os.path.join(preprocessed_data_path, 'train_storms')
val_data_path_preprocessed = os.path.join(preprocessed_data_path, 'validation_storms')
test_data_path_preprocessed = os.path.join(preprocessed_data_path, 'test_storms')
test_key_data_path_preprocessed = os.path.join(preprocessed_data_path, 'test_key_storms')

os.makedirs(train_data_path_preprocessed, exist_ok = True)
os.makedirs(val_data_path_preprocessed, exist_ok = True)
os.makedirs(test_data_path_preprocessed, exist_ok = True)
os.makedirs(test_key_data_path_preprocessed, exist_ok = True)

os.makedirs(download_base_path, exist_ok=True)
os.makedirs(preprocessed_data_path, exist_ok=True)

INDEX_COL = ['SYM_H']

# Download and save raw data <a class="anchor" id="section_1"></a>

In [3]:
warnings.filterwarnings("ignore")
print("Saving raw data to", download_base_path)

start_date = pd.to_datetime("1998-01-01", utc=True)
end_date = pd.to_datetime("2023-01-01", utc=True)

print(
    f"Downloading SYM-H data from {start_date} until {end_date}"
)
print(
    f"Data will be downloaded from the 'OMNI_HRO_5MIN' dataset with a 5 minute resolution"
)

status, data = cdas.get_data(
    "OMNI_HRO_5MIN",
    ["SYM_H"],
    start_date,
    end_date,
    dataRepresentation=dr.XARRAY,
)

sym_index = pd.DataFrame(data=data["Epoch"], columns=["datetime"])
sym_index["datetime"] = pd.to_datetime(sym_index["datetime"])
sym_index["SYM_H"] = data["SYM_H"].values
sym_index = sym_index.set_index("datetime")
# Remove the UTC formatting to ease the use
sym_index.index = sym_index.index.tz_localize(None)
sym_index["SYM_H"] = sym_index["SYM_H"].replace(99999, np.nan)

sym_index.loc[storm_dates.INVALID_DATES_SYM, "SYM_H"] = np.nan
sym_index[["SYM_H"]] = sym_index[["SYM_H"]].interpolate()

print(f"Saving data to {os.path.join(download_base_path, 'sym_index.pkl')}")

# Save the DataFrame to a pickle file with gzip compression
sym_index.to_pickle(os.path.join(download_base_path, 'sym_index.pkl'), compression="gzip")

del sym_index

print("\tDownloading SYM storms")

sym_path = os.path.join(download_base_path, "sym_h_storms")


os.makedirs(train_data_path, exist_ok=True)
os.makedirs(val_data_path, exist_ok=True)
os.makedirs(test_data_path, exist_ok=True)
os.makedirs(test_key_data_path, exist_ok=True)

td = datetime.timedelta(days=2)

storm_index_all = 1

print(f"\t\tTrain storms {train_data_path}")
for st_index in range(len(storm_dates.TRAIN_STORMS)):
    start_date = utils.convert_to_datetime(storm_dates.TRAIN_STORMS[st_index][0], utc=True) - td    
    end_date = utils.convert_to_datetime(storm_dates.TRAIN_STORMS[st_index][1], utc=True) + td

    try:
        df_ace = utils.download_data(
            cdas,
            utils.DATABASE_ACE_IMF_16,
            utils.VARS_ACE_IMF_H0,
            start_date,
            end_date,
        )
        df_ace.to_csv(
            os.path.join(train_data_path, f"ace_imf_storm{storm_index_all}.csv")
        )
    except Exception as ex:
        print(
            f"Error getting ACE IMF data for training storm {storm_index_all}\n"
            f"\tFrom: {start_date} \tto: {end_date}"
        )
        print(ex)

    try:
        df_swepam = utils.download_data(
            cdas,
            utils.DATABASE_ACE_SWEPAM,
            utils.VARS_ACE_SWEPAM,
            start_date,
            end_date,
        )
        df_swepam.to_csv(
            os.path.join(
                train_data_path, f"ace_swepam_storm{storm_index_all}.csv"
            )
        )
    except Exception as ex:
        print(
            f"Error getting ACE SWEPAM data for training storm {storm_index_all}\n"
            f"\tFrom: {start_date} \tto: {end_date}"
        )
        print(ex)

    try:
        df_omni = utils.download_data(
            cdas,
            utils.DATABASE_OMNI,
            utils.VARS_OMNI,
            start_date,
            end_date,
        )
        df_omni.to_csv(
            os.path.join(train_data_path, f"omni_storm{storm_index_all}.csv")
        )
    except Exception as ex:
        print(
            f"Error getting OMNI data for training storm {storm_index_all}\n"
            f"\tFrom: {start_date} \tto: {end_date}"
        )
        print(ex)

    print("\t\t\tSaving storm", storm_index_all)
    storm_index_all += 1

print(f"\t\tValidation storms {val_data_path}")
for st_index in range(len(storm_dates.VALIDATION_STORMS)):
    start_date = utils.convert_to_datetime(storm_dates.VALIDATION_STORMS[st_index][0], utc=True) - td
    end_date = utils.convert_to_datetime(storm_dates.VALIDATION_STORMS[st_index][1], utc=True) + td

    try:
        df_ace = utils.download_data(
            cdas,
            utils.DATABASE_ACE_IMF_16,
            utils.VARS_ACE_IMF_H0,
            start_date,
            end_date,
        )
        df_ace.to_csv(
            os.path.join(val_data_path, f"ace_imf_storm{storm_index_all}.csv")
        )
    except Exception as ex:
        print(
            f"Error getting ACE IMF data for valing storm {storm_index_all}\n"
            f"\tFrom: {start_date} \tto: {end_date}"
        )
        print(ex)

    try:
        df_swepam = utils.download_data(
            cdas,
            utils.DATABASE_ACE_SWEPAM,
            utils.VARS_ACE_SWEPAM,
            start_date,
            end_date,
        )
        df_swepam.to_csv(
            os.path.join(val_data_path, f"ace_swepam_storm{storm_index_all}.csv")
        )
    except Exception as ex:
        print(
            f"Error getting ACE SWEPAM data for valing storm {storm_index_all}\n"
            f"\tFrom: {start_date} \tto: {end_date}"
        )
        print(ex)        

    try:
        df_omni = utils.download_data(
            cdas,
            utils.DATABASE_OMNI,
            utils.VARS_OMNI,
            start_date,
            end_date,
        )
        df_omni.to_csv(
            os.path.join(val_data_path, f"omni_storm{storm_index_all}.csv")
        )
    except Exception as ex:
        print(
            f"Error getting OMNI data for valing storm {storm_index_all}\n"
            f"\tFrom: {start_date} \tto: {end_date}"
        )
        print(ex)

    print("\t\t\tSaving storm", storm_index_all)
    storm_index_all += 1

print(f"\t\tTest storms {test_data_path}")
for st_index in range(len(storm_dates.TEST_STORMS)):
    start_date = utils.convert_to_datetime(storm_dates.TEST_STORMS[st_index][0], utc=True) - td    
    end_date = utils.convert_to_datetime(storm_dates.TEST_STORMS[st_index][1], utc=True) + td    

    try:
        df_ace = utils.download_data(
            cdas,
            utils.DATABASE_ACE_IMF_16,
            utils.VARS_ACE_IMF_H0,
            start_date,
            end_date,
        )
        df_ace.to_csv(
            os.path.join(test_data_path, f"ace_imf_storm{storm_index_all}.csv")
        )
    except Exception as ex:
        print(
            f"Error getting ACE IMF data for testing storm {storm_index_all}\n"
            f"\tFrom: {start_date} \tto: {end_date}"
        )
        print(ex)

    try:
        df_swepam = utils.download_data(
            cdas,
            utils.DATABASE_ACE_SWEPAM,
            utils.VARS_ACE_SWEPAM,
            start_date,
            end_date,
        )
        df_swepam.to_csv(
            os.path.join(test_data_path, f"ace_swepam_storm{storm_index_all}.csv")
        )
    except Exception as ex:
        print(
            f"Error getting ACE SWEPAM data for testing storm {storm_index_all}\n"
            f"\tFrom: {start_date} \tto: {end_date}"
        )
        print(ex)

    try:
        df_omni = utils.download_data(
            cdas,
            utils.DATABASE_OMNI,
            utils.VARS_OMNI,
            start_date,
            end_date,
        )
        df_omni.to_csv(
            os.path.join(test_data_path, f"omni_storm{storm_index_all}.csv")
        )
    except Exception as ex:
        print(
            f"Error getting OMNI data for testing storm {storm_index_all}\n"
            f"\tFrom: {start_date} \tto: {end_date}"
        )
        print(ex)

    print("\t\t\tSaving storm", storm_index_all)
    storm_index_all += 1

print(f"\t\tTest key storms {test_key_data_path}")
for st_index in range(len(storm_dates.TEST_KEY_STORMS)):
    start_date = utils.convert_to_datetime(storm_dates.TEST_KEY_STORMS[st_index][0], utc=True) - td
    end_date = utils.convert_to_datetime(storm_dates.TEST_KEY_STORMS[st_index][1], utc=True) + td
    try:
        df_ace = utils.download_data(
            cdas,
            utils.DATABASE_ACE_IMF_PROVISIONAL,
            utils.VARS_ACE_IMF_PROVISIONAL,
            start_date,
            end_date,
        )
        df_ace.to_csv(
            os.path.join(
                test_key_data_path,
                "ace_imf_storm" + str(storm_index_all) + ".csv",
            )
        )

    except Exception as ex:
        print(
            f"Error getting provisional ACE imf data for test key storm {storm_index_all}\n"
            + f"\tFrom: {utils.convert_to_datetime(storm_dates.TEST_KEY_STORMS[st_index][0])} "
            + f"\tto: {utils.convert_to_datetime(storm_dates.TEST_KEY_STORMS[st_index][1])}"
        )
        print(ex)
    try:
        df_swepam = utils.download_data(
            cdas,
            utils.DATABASE_ACE_PLASMA_PROVISIONAL,
            utils.VARS_ACE_SWEPAM_PROVISIONAL,
            start_date,
            end_date,
        )
        df_swepam.to_csv(
            os.path.join(
                test_key_data_path,
                "ace_swepam_storm" + str(storm_index_all) + ".csv",
            )
        )

    except Exception as ex:
        print(
            f"Error getting provisional ACE swepam data for test key storm {storm_index_all}\n"
            + f"\tFrom: {utils.convert_to_datetime(storm_dates.TEST_KEY_STORMS[st_index][0])} "
            + f"\tto: {utils.convert_to_datetime(storm_dates.TEST_KEY_STORMS[st_index][1])}"
        )
        print(ex)

    try:
        df_omni = utils.download_data(
            cdas,
            utils.DATABASE_OMNI,
            utils.VARS_OMNI,
            start_date,
            end_date,
        )
        df_omni.to_csv(
            os.path.join(
                test_key_data_path, "omni_storm" + str(storm_index_all) + ".csv"
            )
        )
    except Exception as ex:
        print(
            f"Error getting OMNI data for test key storm {storm_index_all}\n"
            + f"\tFrom: {utils.convert_to_datetime(storm_dates.TEST_KEY_STORMS[st_index][0])} "
            + f"\tto: {utils.convert_to_datetime(storm_dates.TEST_KEY_STORMS[st_index][1])}"
        )
        print(ex)

    print("\t\t\tSaving storm", storm_index_all)
    storm_index_all = storm_index_all + 1

Saving raw data to ./data/
Downloading SYM-H data from 1998-01-01 00:00:00+00:00 until 2023-01-01 00:00:00+00:00
Data will be downloaded from the 'OMNI_HRO_5MIN' dataset with a 5 minute resolution
Saving data to ./data/sym_index.pkl
	Downloading SYM storms
		Train storms ./data/sym_h_storms/train_storms
			Saving storm 1
			Saving storm 2
			Saving storm 3
			Saving storm 4
			Saving storm 5
			Saving storm 6
			Saving storm 7
			Saving storm 8
			Saving storm 9
			Saving storm 10
			Saving storm 11
			Saving storm 12
			Saving storm 13
			Saving storm 14
			Saving storm 15
			Saving storm 16
			Saving storm 17
			Saving storm 18
			Saving storm 19
			Saving storm 20
			Saving storm 21
			Saving storm 22
			Saving storm 23
			Saving storm 24
			Saving storm 25
			Saving storm 26
			Saving storm 27
			Saving storm 28
			Saving storm 29
			Saving storm 30
			Saving storm 31
			Saving storm 32
			Saving storm 33
			Saving storm 34
			Saving storm 35
			Saving storm 36
			Saving storm 37
	

# Preprocess and save data data <a class="anchor" id="section_2"></a>

In [10]:
train_dfs_ace_imf = utils.read_data(train_data_path, pattern='imf')
val_dfs_ace_imf = utils.read_data(val_data_path, pattern='imf')
test_dfs_ace_imf = utils.read_data(test_data_path, pattern='imf')
test_key_dfs_ace_imf = utils.read_data(test_key_data_path, pattern='imf')

for i,storm in enumerate(train_dfs_ace_imf):
    train_dfs_ace_imf[i] = utils.preprocess_ace_imf(storm)

for i,storm in enumerate(val_dfs_ace_imf):
    val_dfs_ace_imf[i] = utils.preprocess_ace_imf(storm)

for i,storm in enumerate(test_dfs_ace_imf):
    test_dfs_ace_imf[i] = utils.preprocess_ace_imf(storm)
    
for i,storm in enumerate(test_key_dfs_ace_imf):
    test_key_dfs_ace_imf[i] = utils.preprocess_ace_imf_provisional(storm)

In [11]:
train_dfs_ace_swepam = utils.read_data(train_data_path, pattern='swepam')
val_dfs_ace_swepam = utils.read_data(val_data_path, pattern='swepam')
test_dfs_ace_swepam = utils.read_data(test_data_path, pattern='swepam')
test_key_dfs_ace_swepam = utils.read_data(test_key_data_path, pattern='swepam')

for i,storm in enumerate(train_dfs_ace_swepam):
    train_dfs_ace_swepam[i] = utils.preprocess_ace_swepam(storm)

for i,storm in enumerate(val_dfs_ace_swepam):
    val_dfs_ace_swepam[i] = utils.preprocess_ace_swepam(storm)

for i,storm in enumerate(test_dfs_ace_swepam):
    test_dfs_ace_swepam[i] = utils.preprocess_ace_swepam(storm)
    
for i,storm in enumerate(test_key_dfs_ace_swepam):
    test_key_dfs_ace_swepam[i] = utils.preprocess_ace_swepam_provisional(storm)

In [12]:
train_dfs_omni = utils.read_data(train_data_path, pattern='omni')
val_dfs_omni = utils.read_data(val_data_path, pattern='omni')
test_dfs_omni = utils.read_data(test_data_path, pattern='omni')
test_key_dfs_omni = utils.read_data(test_key_data_path, pattern = 'omni')

In [13]:
train_storms = []
val_storms = []
test_storms = []
test_key_storms = []


for i in range(len(train_dfs_omni)):
    storm = train_dfs_ace_imf[i].join(train_dfs_ace_swepam[i])
    storm = storm.join(train_dfs_omni[i][INDEX_COL])
    train_storms.append(storm)
    
for i in range(len(val_dfs_omni)):
    storm = val_dfs_ace_imf[i].join(val_dfs_ace_swepam[i])
    storm = storm.join(val_dfs_omni[i][INDEX_COL])
    val_storms.append(storm)
    
for i in range(len(test_dfs_omni)):
    storm = test_dfs_ace_imf[i].join(test_dfs_ace_swepam[i])
    storm = storm.join(test_dfs_omni[i][INDEX_COL])
    test_storms.append(storm)
    
for i in range(len(test_key_dfs_omni)):
    storm = test_key_dfs_ace_imf[i].join(test_key_dfs_ace_swepam[i])
    storm['Proton_speed_x'] = -storm['Proton_speed']
    storm = storm.join(test_key_dfs_omni[i][INDEX_COL])    
    test_key_storms.append(storm)

In [14]:
global_index_storm = 1

for i,storm in enumerate(train_storms):
    storm.to_csv(os.path.join(train_data_path_preprocessed, f'train_storm_{global_index_storm}.csv'))
    global_index_storm += 1
    
for i,storm in enumerate(val_storms):
    storm.to_csv(os.path.join(val_data_path_preprocessed, f'val_storm_{global_index_storm}.csv'))
    global_index_storm += 1
    
for i,storm in enumerate(test_storms):
    storm.to_csv(os.path.join(test_data_path_preprocessed, f'test_storm_{global_index_storm}.csv'))
    global_index_storm += 1

for i,storm in enumerate(test_key_storms):
    storm.to_csv(os.path.join(test_key_data_path_preprocessed, f'test_key_storm_{global_index_storm}.csv'))
    global_index_storm += 1