Model Pipeline

Input:
    - Satellite tracking data
    - Satellite slr signal data
    - Station sat-imagery weather data
Output:
    - Record of ALL satellite passes over station, weather a signal was made in that pass, and what the cloud conditons were like at the time of pass


In [1]:
# this script generates a reort on the cloud and slr da6ta in line of sight of the stattion
import pandas as pd
import pandas_profiling
import pandas as pd
import datetime 
from datetime import datetime
from datetime import time
import os

  import pandas_profiling


In [2]:
def get_sat_passes(sat_name, start_date, end_date, elev):
    # Construct the file path using f-strings
    file_path = f"/Users/eugenerotherham/Documents/AtherasAnalytics/clouds-outage-prediction-main/sat_predictions/satellites/{sat_name}/passes_elev{elev}_2022-08_2023-08_df.txt"
    
    # Read data directly into DataFrame, specifying date parsing
    passes_df = pd.read_csv(file_path, sep='\t', parse_dates=['time'])
    
    # Filter by date range using boolean indexing
    passes_filtered_df = passes_df[(passes_df['time'] >= start_date) & (passes_df['time'] < end_date)]
    
    return passes_filtered_df


In [3]:
import pandas as pd

def get_slr_and_cloud_data(station_code, sat_name, sat_id, start_date, end_date, quarantine_list):
    
    # 1   IMPORT AND FORMAT SLR AND CLOUD DATA

    # read from file cloud and slr data
    cloud_df = pd.read_csv('/Users/eugenerotherham/Documents/AtherasAnalytics/clouds-outage-prediction-main/eumetsat_lab/stations/matm_cloud_2022-06_2023-08.txt', sep='\t')
    slr_df = pd.read_csv('/Users/eugenerotherham/Documents/AtherasAnalytics/clouds-outage-prediction-main/slr_data/1yr_data/matm_slr_2022-06_2023-08.txt', sep='\t')

    # cloud data

    cloud_df['time'] = pd.to_datetime(cloud_df['time'], format = "%Y%m%d%H%M%S")
    cloud_df = cloud_df.sort_values(by='time', ascending=True)
    cloud_df = cloud_df[['time', 'cloud_cov']]
    cloud_df.reset_index(drop=True, inplace=True)

    # slr data

    slr_df['time'] = pd.to_datetime(slr_df['time'], format='mixed')
    slr_df = slr_df.sort_values(by='time', ascending=True)
    slr_df['time'] = slr_df['time'].dt.strftime('%Y%m%d%H%M%S')
    slr_df['time'] = slr_df['time'].str.rstrip('.')
    slr_df['time'] = pd.to_datetime(slr_df['time'])

    new_slr_df = slr_df[['time', 'receive_amp', 'sat_id']].copy()

    new_slr_df.reset_index(drop=True, inplace=True)


    # 2   AGGREGATE SLR BASED ON SPECIFIC SATELLITE

    sat_slr_df = new_slr_df.copy()
    sat_slr_df = sat_slr_df[(sat_slr_df['sat_id'] == sat_id)]
    sat_slr_df.set_index('time', inplace=True)

    resampled_df = sat_slr_df.resample('15T').agg({'receive_amp': ['mean', 'std']})

    sat_slr_df = pd.DataFrame({
        'mean_receive_amp': resampled_df['receive_amp']['mean'],
        'std_receive_amp': resampled_df['receive_amp']['std']
    })
    sat_slr_df = sat_slr_df[(sat_slr_df.index >= '2022-08-01') & (sat_slr_df.index < '2023-08-01')]

    date_range = pd.date_range(start='2022-08-01', end='2023-08-01', freq='15T')
    date_df = pd.DataFrame({'time': date_range})

    slr_merged_df = pd.merge(date_df, sat_slr_df, left_on='time', right_index=True, how='left')

    # 3   MERGE SLR AND CLOUD DATA - SAVE TO FILE

    merged_df = pd.merge(slr_merged_df, cloud_df, on='time', how='inner')
    merged_df = merged_df[['time', 'mean_receive_amp',  'std_receive_amp', 'cloud_cov']]
    merged_df.to_csv(f"/Users/eugenerotherham/Documents/AtherasAnalytics/clouds-outage-prediction-main/exploration_data_analysis/data/{station_code}/satellites/{sat_name}_2022-08_2023-08_merged.txt", sep='\t')

    cloud_slr_df = pd.read_csv(f'/Users/eugenerotherham/Documents/AtherasAnalytics/clouds-outage-prediction-main/exploration_data_analysis/data/{station_code}/satellites/{sat_name}_2022-08_2023-08_merged.txt', sep='\t')
    cloud_slr_df = cloud_slr_df[['time', 'mean_receive_amp', 'cloud_cov']].copy()

    cloud_slr_df['time'] = pd.to_datetime(cloud_slr_df['time'])
    cloud_slr_filtered_df = cloud_slr_df[(cloud_slr_df['time'] >= start_date) & (cloud_slr_df['time'] < end_date)]

    # Use .loc to avoid the SettingWithCopyWarning
    cloud_slr_filtered_df.loc[:, 'time'] = cloud_slr_filtered_df['time'] + pd.Timedelta(hours=2)
    quarantine_days = pd.to_datetime(quarantine_list)
    filtered_df = cloud_slr_filtered_df[~cloud_slr_filtered_df['time'].isin(quarantine_days)]

    return filtered_df 

In [4]:
import pandas as pd

def los_signal_true_false(passes_filtered_df, cloud_slr_filtered_df, sat_name, station_code):
    los_data = []

    for i in range(0, len(passes_filtered_df), 2):
        try:
            los_start = passes_filtered_df['time'].iloc[i].strftime('%Y-%m-%d %H:%M:%S')
            los_end = passes_filtered_df['time'].iloc[i + 1].strftime('%Y-%m-%d %H:%M:%S')
        except IndexError:
            print("IndexError: No end of pass marker at end_date. Skipping pass...")
            continue

        stage_cloud_slr = cloud_slr_filtered_df[
            (cloud_slr_filtered_df['time'] >= los_start) & (cloud_slr_filtered_df['time'] <= los_end)
        ]

        average_cloud_cov = stage_cloud_slr['cloud_cov'].mean()
        pass_success = not stage_cloud_slr['mean_receive_amp'].isna().all()

        los_data.append({
            'station': station_code,
            'satellite': sat_name,
            'pass_start_date': los_start,
            'pass_end_date': los_end,
            'mean_cloud': average_cloud_cov,
            'pass_success': pass_success
        })

    los_df = pd.DataFrame(los_data)
    return los_df


In [5]:
import os
import pandas as pd
import gc
from contextlib import ExitStack
from tqdm import tqdm  # Import tqdm for progress bars


def main():
    elevation_values = [45]
    station_code = 'matm'
    start_date = '2022-08-01'
    end_date = '2023-08-01'

    quarantine_list = [
        '2023-02-15', '2023-02-16', '2023-02-17', '2023-02-21', '2023-03-06', '2023-03-08', '2023-03-10', '2023-04-26', '2023-05-11', '2023-05-17', 
        '2023-01-11', '2023-02-09', '2022-06-02', '2022-11-01','2022-12-25', '2022-12-26', '2023-01-01', '2023-01-06', '2023-04-04', '2023-04-25',
        '2023-05-01', '2023-05-23']

    satellite_folder_path = '/Users/eugenerotherham/Documents/AtherasAnalytics/clouds-outage-prediction-main/sat_predictions/satellites/'

    sat_list_path = '/Users/eugenerotherham/Documents/AtherasAnalytics/clouds-outage-prediction-main/sat_predictions/satellite_list_matm_2022-08_2022-08.txt'

    with ExitStack() as stack:
        sat_list_file = stack.enter_context(open(sat_list_path, 'r'))
        sat_list = pd.read_csv(sat_list_file, sep='\t')

        for elev in elevation_values:
            print(f'Processing data with elevation angle {elev}')

            model_df = pd.DataFrame()

            # Add tqdm progress bar for satellites
            satellites = os.listdir(satellite_folder_path)
            satellites_progress = tqdm(satellites, desc='Satellites', unit='satellite')

            for sat_name in satellites_progress:
                sat_name_path = os.path.join(satellite_folder_path, sat_name)
                if os.path.isdir(sat_name_path):
                    satellites_progress.set_postfix({'Current Satellite': sat_name})  # Update progress bar description
                    satellites_progress.update()  # Manually update progress bar

                    sat_id = sat_list.loc[sat_list['sat_name'] == sat_name, 'sat_id'].values[0]
                    passes_filtered_df = get_sat_passes(sat_name, start_date, end_date, elev)
                    cloud_slr_filtered_df = get_slr_and_cloud_data(station_code, sat_name, sat_id, start_date, end_date, quarantine_list)
                    los_df = los_signal_true_false(passes_filtered_df, cloud_slr_filtered_df, sat_name, station_code)
                    model_df = pd.concat([model_df, los_df], ignore_index=True)

            satellites_progress.close()  # Close the progress bar

            # Convert pass_start_date and pass_end_date to datetime objects in your actual DataFrame (model_elev30)
            model_df['pass_start_date'] = pd.to_datetime(model_df['pass_start_date'])
            model_df['pass_end_date'] = pd.to_datetime(model_df['pass_end_date'])

            # Extract only the date part from the datetime columns in the DataFrame
            model_df['pass_start_date_date'] = model_df['pass_start_date'].dt.date
            model_df['pass_end_date_date'] = model_df['pass_end_date'].dt.date

            # Convert quarantine_list to datetime objects
            quarantine_dates = pd.to_datetime(quarantine_list).date

            # Filter rows based on quarantine_list
            filtered_model_df = model_df[~model_df['pass_start_date_date'].isin(quarantine_dates) & ~model_df['pass_end_date_date'].isin(quarantine_dates)]

            filtered_model_df = filtered_model_df.drop(['pass_start_date_date', 'pass_end_date_date'], axis=1)

            model_file_path = f'/Users/eugenerotherham/Documents/AtherasAnalytics/clouds-outage-prediction-main/exploration_data_analysis/pipeline_results/elev{elev}_model_df.txt'
            filtered_model_df.to_csv(model_file_path, sep='\t')
            print(f'Model for dataframe finished processing: see {model_file_path}')
            
            # Manually trigger garbage collection to free up memory
            gc.collect()

if __name__ == "__main__":
    main()


        

Processing data with elevation angle 45


Satellites:   0%|          | 0/35 [00:00<?, ?satellite/s, Current Satellite=galileo210]

Satellites: 100%|██████████| 35/35 [38:57<00:00, 66.79s/satellite, Current Satellite=galileo209]


Model for dataframe finished processing: see /Users/eugenerotherham/Documents/AtherasAnalytics/clouds-outage-prediction-main/exploration_data_analysis/pipeline_results/elev45_model_df.txt


PIPELINE

FEATURES
satellite // pass_start_date // pass_end_date // pass_duration // other_sat_success // cloud_cov       (optional: station, max_elevation)

LABEL
pass_success


Steps

- remove bad days

- calculate pass_duration
- calculate other sat passes

- trim dataframe