In [1]:
import sys
from pathlib import Path

import pandas as pd
import numpy as np

In [2]:
# Set up paths by adding ../src (where paths.py is located) to sys.path
NOTEBOOK_DIR = Path.cwd()
SRC_DIR = NOTEBOOK_DIR.parent / "src"
sys.path.append(str(SRC_DIR))

# Now import the shared paths
from paths import CSV_DIR, DATAFRAMES_DIR

### First load the info about the dates of the original experiment.
#### Each observatory ('Site' in the Dfs) will have it's own dataframe. 
##### OT => Observatory Teide
##### ORM => Obervatory Roque de los Muchachos 

In [4]:
df_teide_dates = pd.read_csv(CSV_DIR / "OT_dates.csv")
df_roque_dates = pd.read_csv(CSV_DIR / "ORM_dates.csv")

### Then the meteo info provided, so we can filter them to contain only the data about the same days than the turbulence measurements. Meteo info is previously filtered when processing it from original .dat files, grouped by date and hour then calculate the mean() of each variable ('Temperature', 'Humidity','Wind_dir', 'Wind_speed', 'Pressure', 'Solar_radiation')

In [6]:
df_teide_meteo = pd.read_parquet(DATAFRAMES_DIR / "df_teide_meteo.parquet")
df_roque_meteo = pd.read_parquet(DATAFRAMES_DIR / "df_roque_meteo.parquet")

### Loading turbulence DFs grouped by profile_id = each set of 79 measurements of altitude/turbulence with same metadata, Site, Star,.... They are parsed from the original .dat files and some time related columns were added

In [8]:
df_teide = pd.read_parquet(DATAFRAMES_DIR / "df_teide_pid.parquet")
df_roque = pd.read_parquet(DATAFRAMES_DIR / "df_roque_pid.parquet")

In [9]:
df_teide.columns

Index(['Site', 'Star', 'cplane', 'sep', 'Deltah0', 'Res', 'day', 'month',
       'year', 'hbegin', 'hend', 'Seeing_totatm', 'bl', 'fa', 'altitude',
       'turbulence', 'profile_id', 'date_base', 'timestamp_begin',
       'timestamp_end', 'seconds_since_midnight_begin',
       'seconds_since_midnight_end', 'sin_time_begin', 'cos_time_begin',
       'sin_time_end', 'cos_time_end', 'duration'],
      dtype='object')

In [10]:
df_teide_meteo.columns

Index(['Year', 'Month', 'Day', 'hour', 'Temperature', 'Humidity', 'Wind_dir',
       'Wind_speed', 'Pressure', 'Solar_radiation', 'date_base'],
      dtype='object')

In [11]:
def extend_dates(df_dates,df_meteo):
    """
    
    Parameters
    ----------
    df_dates : DataFrame, df with original dates, from which a new one will be generated with extended dates.
    df_meteo : DataFrame with meteorological information, useful to know from which dates we have weather info
    """
    #Lets filter meteo with dates +/- 1 day using date_base column of dates dfs, to cover all the posibilities with hbegin and hend in the turbulence DF
    
    #Convert date_base to dateTime object and create the list of available dates
    df_dates['date_base'] = pd.to_datetime(df_dates['date_base'])
    
    #I have checked duplicated dates previously , but just to have them double checked and in array format 
    original_dates = df_dates['date_base'].unique()
    
    #Create new set with date -1 and +1
    extended_dates = set(original_dates)
    for d in original_dates:
        extended_dates.add(d - pd.Timedelta(days=1))#previous day
        extended_dates.add(d + pd.Timedelta(days=1))#following day
    
    # Remove any duplicates and keep only new dates
    all_dates = pd.Series(sorted(extended_dates))
    #filters all_dates to keep only the ones not in the original set. ~ NOT — it inverts the booleans 
    missing_dates = all_dates[~all_dates.isin(original_dates)]

    #Create a new DataFrame with missing dates, 
    #missing  = the ones 1 day after or 1 day before the ones previously present
    df_extra_dates = pd.DataFrame({'date_base': missing_dates})
    
    # Concatenate original dates + extra dates generated
    df_extended_dates = pd.concat([df_dates, df_extra_dates], ignore_index=True)
    
    # Sort by date, just for better visualization
    df_extended_dates = df_extended_dates.sort_values(by='date_base').reset_index(drop=True)

    #Rows from meteo DFs, which has more available dates than measurements dates, where date_base is included in the extended list, 
    #so it should be a 'possible match' with measurements DF
    #This is mainly the possible variances when calculating real day based on hbegin and hend, 
    #since they are fractional hours respect to midnight of the day when the measurement began
    df_meteo_filtered_dates = df_meteo[df_meteo['date_base'].isin(df_extended_dates['date_base'])].copy()

    #Add in meteo df a timestamp_begin just to be able to merge by timestamp_begin and hour, it will be YYYY-MM-DD 00:00:00
    df_meteo_filtered_dates.loc[:,'timestamp_begin_date'] = df_meteo_filtered_dates['date_base'].dt.date
    #convert it to datetime object
    df_meteo_filtered_dates['timestamp_begin_date'] = pd.to_datetime(df_meteo_filtered_dates['timestamp_begin_date'])

    #return df_extended_dates
    return df_meteo_filtered_dates

In [12]:
df_teide_meteo_extended = extend_dates(df_teide_dates,df_teide_meteo)
df_roque_meteo_extended = extend_dates(df_roque_dates,df_roque_meteo)

In [13]:
def add_time_data_to_measurement_df(df):
    #Add some time related columns to main measurements DF just to facilitate the merge process between DF's
    #So we will use date and hour to match measurements and meteo data
    df['hour'] = df['timestamp_begin'].dt.hour
    #Create a new column with only date from timestamp, not hours,min, or secs 
    df.loc[:,'timestamp_begin_date'] = df['timestamp_begin'].dt.date
    #Setting as datetime, not object
    df['timestamp_begin_date'] = pd.to_datetime(df['timestamp_begin_date'])
    
    print(f"Extra time info added to DataFrame")

In [14]:
add_time_data_to_measurement_df(df_teide)
add_time_data_to_measurement_df(df_roque)

Extra time info added to DataFrame
Extra time info added to DataFrame


In [15]:
#remove redundant columns, generated when extending dates
df_teide_meteo_extended = df_teide_meteo_extended.drop(columns=['date_base'])
df_roque_meteo_extended = df_roque_meteo_extended.drop(columns=['date_base'])

### Merge meteo filtered data with turbulence DFs , base on timestamp begin then hour

In [17]:
df_teide_merged = df_teide.merge(df_teide_meteo_extended, on=['timestamp_begin_date', 'hour'], how='left')
df_roque_merged = df_roque.merge(df_roque_meteo_extended, on=['timestamp_begin_date', 'hour'], how='left')

#### Once merge it's done , I add an extra turbulence_log column to be used for visualizations mainly

In [19]:
#Minimum values without zero values, since there are some rows with 0 value in turbulence as consequence of wrong measurements
turbulence_min_teide = df_teide_merged[df_teide_merged["turbulence"] > 0.0]["turbulence"].min()
turbulence_min_roque = df_roque_merged[df_roque_merged["turbulence"] > 0.0]["turbulence"].min()

In [20]:
# Do a log tranformation for turbulence values since they are really tiny, 
# this should improve visualization and model performance
# Use the actual minimum value as epsilon to avoid log(0)
def turbulence_log(df, turbulence_min):
    epsilon = turbulence_min
    # Scale the values by a large factor then apply log to avoid rounding or truncation issues 
    scaling_factor = 1e20
    
    #Apply log transformation
    df['turbulence_log'] = df['turbulence'].apply(
        lambda val: np.log(val * scaling_factor + epsilon) if val > 0 else np.nan
    ).copy()

    return df

In [21]:
#Add column turbulence log to Main dfs 
df_teide_merged = turbulence_log(df_teide_merged,turbulence_min_teide)
df_roque_merged = turbulence_log(df_roque_merged,turbulence_min_roque)

### The resulting DFs will include all related data. 
#### Metadata
#### Dates, hbegin and hend windows
#### Altitude + turbulence pairs 
#### Some extra time related features , duration , sin and cos
#### Meteo data averaged by hours of the same day that matches hours when the measurement was taken

In [23]:
def clean_merged_df(df_merged):
    print(f" 🪣 🧽  Cleaning the DataFrame...")
    #Drop NaN produced due to meteo data not present in the same hours than the turbulence measurements.
    #So the resulting df has values in all of its columns.
    df_merged = df_merged.dropna()
    #Remove some more redundant columns after merge
    df_merged = df_merged.drop(['day', 'month', 'year', 'date_base'],axis=1)
    #Modify some datatypes to improve size and performance when operating over the complete df 
    df_merged['cplane'] = df_merged['cplane'].astype('float32')
    df_merged['sep'] = df_merged['sep'].astype('float32')
    df_merged['Deltah0'] = df_merged['Deltah0'].astype('int16')
    df_merged['hbegin'] = df_merged['hbegin'].astype('float64')
    df_merged['hend'] = df_merged['hend'].astype('float64')
    df_merged['Seeing_totatm'] = df_merged['Seeing_totatm'].astype('float32')
    df_merged['bl'] = df_merged['bl'].astype('float32')
    df_merged['fa'] = df_merged['fa'].astype('float32')
    df_merged['profile_id'] = df_merged['profile_id'].astype('int32')
    df_merged['seconds_since_midnight_begin'] = df_merged['seconds_since_midnight_begin'].astype('float64')
    df_merged['seconds_since_midnight_end'] = df_merged['seconds_since_midnight_end'].astype('float64')
    df_merged['duration'] = df_merged['duration'].astype('float64')
    #Columns from meteo DF's after .dat files parse
    df_merged['hour'] = df_merged['hour'].astype('int8')
    df_merged['Year'] = df_merged['Year'].astype('int16')
    df_merged['Month'] = df_merged['Month'].astype('int8')
    df_merged['Day'] = df_merged['Day'].astype('int8')
    print(f"🧹 DataFrame clean and ready to use...")
    
    return df_merged

In [24]:
df_roque_cleaned = clean_merged_df(df_roque_merged)

 🪣 🧽  Cleaning the DataFrame...
🧹 DataFrame clean and ready to use...


In [25]:
df_teide_cleaned = clean_merged_df(df_teide_merged)

 🪣 🧽  Cleaning the DataFrame...
🧹 DataFrame clean and ready to use...


In [51]:
#Save both DataFrames as .parquet files, after finishing all processing
df_teide_cleaned.to_parquet(DATAFRAMES_DIR / "df_teide_full_Info.parquet", index=False, engine='pyarrow')
df_roque_cleaned.to_parquet(DATAFRAMES_DIR / "df_roque_full_Info.parquet", index=False, engine='pyarrow')