An earlier version of the code used to filter out the Sun, the Moon, and cloudy data.

In [None]:
# ---- IMPORTS ---- #

import os
import pandas as pd
import ephem
import numpy as np
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
from datetime import datetime
import math

# ---- CONFIG ---- #

# --- Main Settings --- #
# Choose the location to analyze from the dictionary below.
SELECTED_LOCATION = "Boerakker"

# Root directory where location folders (containing .dat files) are stored.
# Example: "C:/Users/YourUser/Desktop/SQM_Data/"
DATA_ROOT_DIR = r"C:\Users\YourUser\Desktop\SQM Data\\"

# Path to the CSV file containing cloudiness data.
# Source: EUMETSAT
CLOUD_DATA_FILE = f'{SELECTED_LOCATION} cloudy or not.csv'

# --- Analysis Parameters --- #
# Hours of the night to include in the weekly analysis.
HOURS_TO_INCLUDE = [21, 22, 23, 0, 1, 2]

# --- Celestial Filter Settings --- #
# These values determine when the sky is considered "dark enough".
SUN_ALTITUDE_THRESHOLD = -18  # Degrees (Astronomical Twilight)
MIN_DATAPOINTS_PER_FILE = 30 # Minimum readings required in a .dat file to be considered valid.

# --- Location Coordinates --- #
# Add other locations here if needed.
LOCATIONS_DICT = {
    "Ameland-Natuurcentrum-Nes": {"lat": "53.449", "lon": "5.775"},
    "Boerakker": {"lat": "53.187", "lon": "6.329"},
    "Ostland": {"lat": "53.607", "lon": "6.727"},
    # ... add other locations
}

# ---- FUNCTIONS ---- #

def load_cloud_data(filepath):
    """
    Loads cloudiness data and returns a set of cloudy datetimes to exclude.

    Args:
        filepath (str): The path to the 'cloudy or not.csv' file.

    Returns:
        set: A set of 'YYYY-MM-DD HH' strings for periods identified as cloudy.
    """
    print(f"Loading cloud data from: {filepath}")
    try:
        df = pd.read_csv(filepath)
    except FileNotFoundError:
        print(f"Warning: Cloud data file not found at {filepath}. No data will be excluded for cloudiness.")
        return set()

    # Extract date/time components from the filename column
    df['year'] = df['filename'].str.slice(28, 32)
    df['month'] = df['filename'].str.slice(32, 34)
    df['day'] = df['filename'].str.slice(34, 36)
    df['hour'] = df['filename'].str.slice(36, 38) # Keep as string for formatting

    # Create a 'YYYY-MM-DD HH' string to match the main DataFrame
    df['datetime_hour_str'] = df['year'] + '-' + df['month'] + '-' + df['day'] + ' ' + df['hour']

    # Filter for cloudy conditions and return the set of cloudy hour strings
    cloudy_hours = set(df[df['datavals'] == 'Cloudy']['datetime_hour_str'])
    print(f"Found {len(cloudy_hours)} cloudy hours to exclude.")
    return cloudy_hours


def calculate_celestial_altitudes(row, observer):
    """
    Calculates the sun/moon altitude and moon phase for a given timestamp.
    
    Args:
        row (pd.Series): A row from a DataFrame with datetime information.
        observer (ephem.Observer): An ephem Observer object configured with lat/lon.

    Returns:
        tuple: A tuple containing (sun_alt_deg, moon_alt_deg, moon_phase_percent).
    """
    try:
        # Expects a datetime object in the 'Datetime' column
        observer.date = row["Datetime"]
    except (ValueError, TypeError):
        return None, None, None

    sun = ephem.Sun(observer)
    moon = ephem.Moon(observer)
    
    sun_alt_deg = math.degrees(sun.alt)
    moon_alt_deg = math.degrees(moon.alt)
    moon_phase_percent = moon.phase

    return sun_alt_deg, moon_alt_deg, moon_phase_percent


def filter_by_celestial_conditions(df, observer):
    """
    Filters a DataFrame to keep only rows that meet dark sky criteria (sun and moon).
    
    Args:
        df (pd.DataFrame): The DataFrame to filter. Must have a 'Datetime' column.
        observer (ephem.Observer): The configured ephem observer.
        
    Returns:
        pd.DataFrame: A filtered DataFrame.
    """
    # Vectorize the calculation for efficiency
    altitudes = df.apply(calculate_celestial_altitudes, axis=1, observer=observer, result_type='expand')
    df[['sun_alt', 'moon_alt', 'moon_phase']] = altitudes
    df.dropna(subset=['sun_alt', 'moon_alt', 'moon_phase'], inplace=True)

    # Define the conditions for keeping a row
    is_sun_down = df['sun_alt'] < SUN_ALTITUDE_THRESHOLD
    
    # Moon illumination conditions based on its altitude
    cond1 = (df['moon_alt'] >= 0) & (df['moon_alt'] <= 5) & (df['moon_phase'] < (50 - (8 * df['moon_alt'])))
    cond2 = (df['moon_alt'] >= -3) & (df['moon_alt'] < 0) & (df['moon_phase'] < ((150 - (50 * df['moon_alt'])) / 3))
    cond3 = df['moon_alt'] < -3
    
    # Keep rows where the sun is down AND any of the moon conditions are met
    filtered_df = df[is_sun_down & (cond1 | cond2 | cond3)].copy()
    
    return filtered_df.drop(columns=['sun_alt', 'moon_alt', 'moon_phase'])


def load_and_process_dat_files(folder_path):
    """
    Loads, cleans, combines, and pre-filters all .dat files from a directory.

    Args:
        folder_path (str): The path to the folder containing .dat files.

    Returns:
        pd.DataFrame: A single DataFrame with all processed data.
    """
    all_dfs = []
    print(f"Searching for .dat files in: {folder_path}")
    
    if not os.path.isdir(folder_path):
        print(f"Error: Directory not found: {folder_path}")
        return pd.DataFrame()

    for filename in os.listdir(folder_path):
        if filename.endswith(".dat"):
            file_path = os.path.join(folder_path, filename)
            try:
                df = pd.read_csv(
                    file_path,
                    delimiter=";",
                    skiprows=35,
                    names=["DateTime", "DateTime2", "Temp", "Num", "Hz", "Magnitude"],
                    on_bad_lines='skip'
                )
                # Pre-filter files with too few data points
                if len(df) < MIN_DATAPOINTS_PER_FILE:
                    continue
                all_dfs.append(df)
            except Exception as e:
                print(f"Could not process file {filename}: {e}")
    
    if not all_dfs:
        print("No valid .dat files found after initial filtering.")
        return pd.DataFrame()

    print(f"Found and loaded {len(all_dfs)} valid .dat files. Processing...")
    
    # Combine all data, create a proper datetime object, and filter
    full_df = pd.concat(all_dfs, ignore_index=True)
    full_df['Datetime'] = pd.to_datetime(full_df['DateTime'], errors='coerce')
    full_df.dropna(subset=['Datetime', 'Magnitude'], inplace=True)
    full_df = full_df[full_df["Magnitude"] != -0.0]

    # Filter for specified hours of the night
    full_df = full_df[full_df['Datetime'].dt.hour.isin(HOURS_TO_INCLUDE)]

    print("Finished processing .dat files.")
    return full_df[['Datetime', 'Magnitude']].copy()


def analyze_and_plot_weekly_trend(df, location_name, excluded_cloudy_hours):
    """
    Performs weekly trend analysis on the filtered data and generates a plot.
    
    Args:
        df (pd.DataFrame): The pre-filtered data.
        location_name (str): The name of the location for the plot title.
        excluded_cloudy_hours (set): A set of 'YYYY-MM-DD HH' strings to exclude.
    """
    if df.empty:
        print("No data available for analysis after initial filtering.")
        return

    # --- Cloud Filter --- #
    # Create a string representation of the hour for filtering
    df['datetime_hour_str'] = df['Datetime'].dt.strftime('%Y-%m-%d %H')
    df = df[~df['datetime_hour_str'].isin(excluded_cloudy_hours)]
    df = df.drop(columns=['datetime_hour_str'])
    if df.empty:
        print("No data remaining after removing cloudy periods.")
        return

    # --- Weekly Aggregation --- #
    # Convert magnitude to a linear scale for averaging
    df['Linear_Magnitude'] = 1.08 * (10**8) * 10**(-0.4 * df['Magnitude'])
    df.loc[df['Linear_Magnitude'] > 10, 'Linear_Magnitude'] = np.nan # Remove outliers
    df.dropna(subset=['Linear_Magnitude'], inplace=True)
    
    # Resample to get the median value for each week
    weekly_median = df.resample('W', on='Datetime')['Linear_Magnitude'].median()
    weekly_median.dropna(inplace=True)

    if len(weekly_median) < 2:
        print("Not enough weekly data points to calculate a trend.")
        return

    # --- Regression Analysis --- #
    X = np.arange(len(weekly_median)).reshape(-1, 1)
    y = weekly_median.values
    
    reg = LinearRegression().fit(X, y)
    trend = reg.predict(X)
    
    # --- Plotting --- #
    plt.figure(figsize=(12, 6))
    plt.plot(weekly_median.index, y, 'o', color='darkblue', label='Median Weekly Brightness')
    plt.plot(weekly_median.index, trend, linestyle='--', color='red', label='Trend Line')
    plt.ylim(0, max(2, y.max() * 1.1))
    plt.xticks(fontsize=14, rotation=45)
    plt.yticks(fontsize=14)
    plt.xlabel('Time', fontsize=15)
    plt.ylabel(r'Median Sky Brightness (mcd/m$^2$)', fontsize=15)
    plt.title(f'Weekly Sky Brightness Trend | {location_name}', fontsize=18)
    plt.grid(True)
    plt.legend(fontsize=15)
    plt.tight_layout()
    plt.show()

    # --- Calculate and Print Statistics --- #
    slope_per_sample = reg.coef_[0]
    # There are ~52.14 weeks in a year. Slope is per week.
    slope_per_year = slope_per_sample * (365.25 / 7)
    percentage_change = (slope_per_year / np.median(y)) * 100 if np.median(y) != 0 else 0
    
    print("\n--- WEEKLY ANALYSIS RESULTS ---")
    print(f"Trend Slope: {slope_per_year:.4f} mcd/m^2 per year.")
    print(f"Percentage Change: {percentage_change:.2f}% per year based on weekly medians.")


# ---- MAIN EXECUTION ---- #

def main():
    """
    Main function to run the entire analysis pipeline.
    """
    print(f"--- Starting Weekly Analysis for: {SELECTED_LOCATION} ---")
    
    # 1. Setup Observer
    location_coords = LOCATIONS_DICT.get(SELECTED_LOCATION)
    if not location_coords:
        print(f"Error: Coordinates for '{SELECTED_LOCATION}' not found in LOCATIONS_DICT.")
        return
        
    observer = ephem.Observer()
    observer.lat = location_coords['lat']
    observer.lon = location_coords['lon']

    # 2. Load Cloud Data
    excluded_cloudy_hours = load_cloud_data(CLOUD_DATA_FILE)

    # 3. Load and Process main .dat files
    location_data_path = os.path.join(DATA_ROOT_DIR, SELECTED_LOCATION)
    df_processed = load_and_process_dat_files(location_data_path)

    if df_processed.empty:
        print("Analysis stopped because no data could be loaded.")
        return
    
    # 4. Filter by Astronomical Conditions (Sun & Moon)
    print("Filtering data based on sun and moon positions...")
    df_celestial_filtered = filter_by_celestial_conditions(df_processed, observer)
    print(f"Kept {len(df_celestial_filtered)} rows after celestial filtering.")

    if df_celestial_filtered.empty:
        print("No data remains after celestial filtering. Cannot proceed.")
        return

    # 5. Perform Weekly Analysis and Plotting (Cloud filter is inside this function)
    analyze_and_plot_weekly_trend(df_celestial_filtered, SELECTED_LOCATION, excluded_cloudy_hours)
        
    print("\n--- Analysis Complete ---")


if __name__ == "__main__":
    main()