README INFO: What is this pipeline, what does it do, what are its expected results?

What is this pipeline?
This pipeline can be divided into two parts: data acquisition/pre-processing and data syncing. If a user already has directories with the images they'd like to sync, and if the image names are in a format the syncing pipeline can understand, the user can skip the first part of the pipeline and go straight to the data syncing. It should also be noted that both parts of the pipeline can be performed separately and at separate times. A user can also run the first part of the pipeline multiple times before running the second part. The purpose of this pipeline is to create a database of machine-learning-ready solar images that can immediately be used in modeling. It serves as a data pre-processing step in the machine-learning modeling process. 

What does this pipeline do?
Initially, the pipeline will grab user-specified SDO or SOHO solar data images for user-specified dates at a user-specified cadence. Any images with missing data or planetary transits (LASCO-specific) will be replaced, if possible, with a clean image at a similar datetime. The goal is to create a set of useful images at more-or-less even time steps (one image at each time step). Then part II of the pipeline can take several differnt groups of images (e.g. multiple runs of part I of the pipeline) and sync them so that for each datetime, the resulting directory will contain one image from each product.

PIPELINE: PART I

First, import all necessary libraries.

In [5]:
import numpy as np
import os
from dateutil import parser
from datetime import timedelta
from sunpy.time import TimeRange
from time import process_time
from tqdm import tqdm
import drms
import json
import h5py
import numpy as np
import csv
from tqdm import tqdm
import warnings
from pandas.core.common import SettingWithCopyWarning

import Mission_utility.data_gen_helper as ipy
import Mission_utility.sdo_mdi as sdo
import Mission_utility.soho_other as soho
from Mission_utility.product_time_sync import csv_times_reader

Next, define all preliminary initial variables and perform any basic calculations/transforms to prepare initial variables that will be used in the workhorse for-loop. The next two code cells can be skipped if you already have the data you want and just need to sync data.

EXPLAIN VARIABLES (here or comments?)

In [7]:
date_start =  '2001-04-01'#'2002-04-01' #'1999-04-04' # '2010-10-01' '2005-12-01'
date_finish = '2001-04-07'#'2002-04-07' #'1999-04-09' # '2010-10-05' '2005-12-07'
image_size_output = 128
time_window = 6
flag = 'subsample'
home_dir = '/Users/gohawks/Desktop/swpc-local/soho-ml-data/soho-ml-data-ready-martinkus/'
bases = 'MDI_96m' #,LASCO_C3'#MDI_96m #AIA #HMI #'EIT195'
fits_headers = 'N'
lev1_LASCO = 'Y' #CANNOT USE 'N' UNTIL UNIT CONVERSION SORTED OUT
email = 'charlotte.martinkus@noaa.gov'

client = drms.Client(email=email, verbose=False) 
date_time_pre_start = date_start + '-0000'
date_time_start= parser.parse(date_time_pre_start)
date_time_pre_end = date_finish + '-2359'
date_time_end = parser.parse(date_time_pre_end)
time_increment = 60
url_prefix = 'https://seal.nascom.nasa.gov'
look_ahead = int(np.ceil(time_window*60/10.)) #should sufficiently cover all 7 products based on their cadence.
diff_start_finish_total_sec = (date_time_end - date_time_start).total_seconds()
total_sec = timedelta(days = time_increment).total_seconds()
num_loops = np.ceil(diff_start_finish_total_sec/total_sec) + 1 #num_loops would be equal to 94 + 1 for 19960101-0000' - '20110501-0000'
base_list = bases.split(',')

Begin the main workhorse loop.

In [None]:
for base in tqdm(base_list):
        start_process_time = process_time() #initialize clock per product type
            
        base = base.strip(' ')
        if ('LASCO' in base) or ('EIT' in base):
            BaseClass = soho.SOHO_no_MDI(base, lev1_LASCO, fits_headers)
            
        else:
            BaseClass = sdo.SDO_MDI(base, lev1_LASCO, fits_headers, time_window)
            
        BaseClass.set_base_dictionary()
        
        holes_list = []
        transients_list = []
        unreadable_file_ids_product_list_global = []
    
        print(f'{base}')
        base_dir = home_dir + base + '_' + BaseClass.mission
        if not os.path.exists(base_dir):
            os.makedirs(base_dir)   
            
        time_range = TimeRange(date_time_start, timedelta(days = time_increment))
        
        
        prev_time, time_range_modified = ipy.prev_time_resumer(home_dir, time_range, date_time_end, BaseClass) #time_range_modified.next() is the workshorse that advances time at the end of the time for-loop
        for t_value in tqdm(np.arange(num_loops)): #this is the main workhorse loop of the program
            #print('t_value:', t_value)
            print('prev_time:', prev_time)
                
            if time_range_modified.end > date_time_end:
                time_range_modified = TimeRange(time_range_modified.start, date_time_end) 
                
            product_results, client = BaseClass.product_search(time_range_modified, client)
            
            if BaseClass.class_type == 'SDO_MDI':
                client_export_failed = product_results.has_failed()
                
                if client_export_failed == True:
                    product_results_number = 0
                
                elif client_export_failed == False:
                    product_results_number = product_results.data.count()['record'] 
            
            else:
                product_results_number = product_results.file_num
                client_export_failed = False
                
            if (product_results_number != 0) and (client_export_failed == False): #product_results.has_failed()
                ind = BaseClass.index_of_sizes(product_results) 
                size_sieved_df, fetch_indices_product_orig = ipy.fetch_indices(ind,product_results,time_window, prev_time, time_range_modified, BaseClass)
                if len(fetch_indices_product_orig) != 0:
                
                    size_sieved_df = ipy.product_distiller(fetch_indices_product_orig, size_sieved_df, date_time_end, product_results, look_ahead, time_window, url_prefix, flag, image_size_output, home_dir, email, client, BaseClass)
                    
                    all_time_window_sieved_times_sorted = np.unique(list(size_sieved_df['time_at_ind']))
                                
                    prev_time = [] #reset to empty list
                    if len(all_time_window_sieved_times_sorted) != 0:
                        prev_time.append(all_time_window_sieved_times_sorted[-1]) #append the last good time entry from the previous loop     
                    size_sieved_df.to_csv(f'{home_dir}{date_start}_to_{date_finish}_{BaseClass.base_full}_times_{flag}_{time_window}_LASCOlev1-{BaseClass.lev1_lasco}_{BaseClass.mission}_{image_size_output}_{t_value}.csv')
                        
                        #ipy.csv_writer(home_dir, date_start, date_finish, flag, time_window, image_size_output, all_time_window_sieved_times_sorted, BaseClass)
                    
    
                time_range_modified.next() #Sunpy iterator to go for the next time increment in number of days. There is also time_range_modified.previous() to go backwards in time.    
                #print('time_range_modified next:', time_range_modified)
            


        ipy.data_cuber(home_dir, date_start, date_finish, flag, time_window, image_size_output,BaseClass)
        
        end_process_time = process_time()
        time_of_process = end_process_time - start_process_time
        print(f'{base} time of process in seconds:', time_of_process)

PIPELINE: PART II

Next in the process is syncing any data you want to work with. (retroactive metadata seeding???)
First, define initial variables.

EXPLAIN VARIABLES (here or comments?)

In [None]:
date_start =  '2005-12-01'#'2002-04-01' #'1999-04-04' # '2010-10-01'
date_finish = '2005-12-07'#'2002-04-07' #'1999-04-09' # '2010-10-05'
time_step = 12
home_dir = '/Users/gohawks/Desktop/swpc-local/soho-ml-data/soho-ml-data-ready-martinkus/'
bases = 'LASCO_C2, EIT195' #,LASCO_C3'#MDI_96m #AIA #HMI #'EIT195'
fits_provided = 'Y' #are fits files provided?
sub_Fcorona = 'N' #if applicable, do you want additional data cubes that subtract fcorona?

start_process_time = process_time() #initialize clock

product_list = [] #list for all products #will need to hsplit this by base_list_len - won't result in equal division: need another approach
slice_start_ind_list = [] #to keep track of the corresponding subset of cube slice indices if the original corresponding time range has been modified.
slice_end_ind_list = []
BaseClass_list = []

base_list = bases.split(',')
base_list_len = len(base_list)
time_step_sec = time_step*3600
    
data_dim_list = []
time_step_prev_list = []
dim_err = "there is a mismatch in dimensionality among one or more products (e.g., 128x128 vs 256x256) in the directory"
time_step_err = "selected time step has to be greater than or equal to the original time step used (e.g., can not choose < 6 hrs if original time step was 6 hrs)"


In [None]:
for base in base_list:
    
    base = base.strip(' ')

    BaseClass = pts.Base_Class(base, home_dir, time_step, date_start, date_finish)
        
    BaseClass.set_base_dictionary()
    BaseClass_list.append(BaseClass)
    
    time_step_prev = BaseClass.time_step_prev_reader()
    time_step_prev_list.append(time_step_prev)
    
    if (time_step_prev > time_step):
        raise ValueError(time_step_err)
        exit
    
    print('time_step_prev:', time_step_prev)
    
    if (fits_provided.upper() == 'Y'): #so deal with .fits files since contain all info that need such as the time and dim. additionally have h5 cube and csv file too.
        found_times = BaseClass.fits_times_reader()
        data_dim_list = BaseClass.dimension_checker_from_fits(data_dim_list)
        
    elif (fits_provided.upper() == 'N'): #so dealing with the downloaded datasync_times_and_inds cubes and csv files but no fits files. so need times from csv and dim from h5 cube or csv. both h5 and csv used.
        found_times = BaseClass.csv_times_reader()
        data_dim_list = BaseClass.dimension_checker_from_h5cube_csv(data_dim_list)
        
    product_list.append(pts.times_actualizer(found_times, BaseClass.date_start, BaseClass.date_finish)[0]) #so the first return of times_actualizer which now returns two objects
    slice_start_ind_list.append(pts.times_actualizer(found_times, BaseClass.date_start, BaseClass.date_finish)[2][0]) #first start entry
    slice_end_ind_list.append(pts.times_actualizer(found_times, BaseClass.date_start, BaseClass.date_finish)[3][-1]) #last finish entry
                    
    min_time_diff = pts.min_time_step(pts.times_actualizer(found_times, BaseClass.date_start, BaseClass.date_finish)[1])
    print('min_time_diff in hours:', min_time_diff)
    
ind_dim = np.where(data_dim_list[0] == np.array(data_dim_list))[0] 

(what is this code??)

In [None]:
if len(ind_dim) != len(data_dim_list):   
    raise ValueError(dim_err)
    


ind_min_len = pts.shortest_prod_list_index_finder(product_list)
time_step_prev_max = np.max(time_step_prev_list)
    
sync_time_inds_list, sync_time_list = pts.sync_times_and_inds(product_list, ind_min_len, time_step, time_step_prev_max)
sync_time_inds_list_mod, sync_time_list_mod = pts.sync_times_and_inds_sort_by_product(sync_time_inds_list, sync_time_list)
#print('sync_time_list_mod_main:', sync_time_list_mod)

BaseClass_list_len = len(BaseClass_list)

lasco_diff_ind_Fcorona_24h_list = []
lasco_diff_len_ind_Fcorona_24h_list = [] #C2 and C3 list should be the same size even after 24 hr index but good to check just in case


In [None]:
for i,BaseClass in tqdm(enumerate(BaseClass_list)):
    
    cube_data, cube_dim, meta_items  = BaseClass.cube_data_reader() #, cube_hdr #, meta_items
    
    if len(cube_data) == 0:
        raise ValueError('No data cubes were found for the exact user-specified dates')
    
    BaseClass.cube_sync_maker(BaseClass_list_len, cube_data, cube_dim, meta_items, slice_start_ind_list[i], slice_end_ind_list[i], sync_time_inds_list_mod[i], time_step_prev_max) ###cube_dim, cube_hdr, ##### cube_dim, meta_items
    BaseClass.csv_time_sync_writer(BaseClass_list_len, cube_dim, sync_time_list_mod[i], time_step_prev_max)
    
    if ('LASCO' in BaseClass.base_full) and (sub_Fcorona.upper() == 'Y'):
        

        
        lasco_ind_Fcorona_24h = pts.lasco_diff_times_inds(sync_time_list_mod[i])
        #print('lasco_ind_Fcorona_24h:',lasco_ind_Fcorona_24h)
        
        if len(lasco_ind_Fcorona_24h)>0:
            lasco_diff_ind_Fcorona_24h_list.append(lasco_ind_Fcorona_24h)
            lasco_diff_len_ind_Fcorona_24h_list.append(len(lasco_ind_Fcorona_24h))
        
            #print('lasco_diff_ind_Fcorona_24h_list:', lasco_diff_ind_Fcorona_24h_list)
            print('lasco_diff_len_ind_Fcorona_24h_list:', lasco_diff_len_ind_Fcorona_24h_list)

In [None]:
if (len(lasco_diff_ind_Fcorona_24h_list)!=0) and (sub_Fcorona.upper() == 'Y'):
    
    flag_lasco =  'Fcorona'   
    
    if (lasco_diff_len_ind_Fcorona_24h_list) and (len(np.unique(lasco_diff_len_ind_Fcorona_24h_list)) > 1):
        ind_lasco_len_Fcorona_24h_min = np.where(np.array(lasco_diff_len_ind_Fcorona_24h_list) == np.min(lasco_diff_len_ind_Fcorona_24h_list))[0][0]
        print('ind_lasco_len_Fcorona_24h_min:', ind_lasco_len_Fcorona_24h_min)
        ind_lasco_principal_Fcorona_24h = lasco_diff_ind_Fcorona_24h_list[ind_lasco_len_Fcorona_24h_min]        
    
    elif (lasco_diff_len_ind_Fcorona_24h_list) and (len(np.unique(lasco_diff_len_ind_Fcorona_24h_list)) == 1):
        ind_lasco_len_Fcorona_24h_min = 0
        ind_lasco_principal_Fcorona_24h = lasco_diff_ind_Fcorona_24h_list[ind_lasco_len_Fcorona_24h_min]
    
    else:
        ind_lasco_principal_Fcorona_24h = 0
        
    for i,BaseClass in tqdm(enumerate(BaseClass_list)): #loop to make time_diff set
        
        cube_data_pre, cube_dim, meta_items = BaseClass.cube_data_reader() #, cube_hdr #, meta_items
        
        if ('LASCO' in base):
            cube_data_diff = cube_data_pre[1:] - cube_data_pre[:-1]
            cube_data = cube_data_diff[ind_lasco_principal_Fcorona_24h] #don't need the +1 here as for the times since this is the difference cube
        
        else:
            cube_data = cube_data_pre[ind_lasco_principal_Fcorona_24h+1]           
        
        BaseClass.cube_sync_maker(BaseClass_list_len, cube_data, cube_dim, meta_items, slice_start_ind_list[i], slice_end_ind_list[i], sync_time_inds_list_mod[i][ind_lasco_principal_Fcorona_24h+1], time_step_prev, flag_lasco) #adding one to get to original time value since was taking differce of the time array ###cube_dim, cube_hdr, #####cube_dim, meta_items
        BaseClass.csv_time_sync_writer(BaseClass_list_len, cube_dim, sync_time_list_mod[i][ind_lasco_principal_Fcorona_24h+1], time_step_prev, flag_lasco)
        #adding one to get to original time value since was taking differce of the time array

In [None]:
end_process_time = process_time()
time_of_process = end_process_time - start_process_time
print('time to run in seconds:', time_of_process)