This notebook takes care of preparing all necessary metadata for training and testing. Specifically, a metadata file is created containing all information used during both.


All necessary variables are:

1. usable
2. file_name
3. frame_date
4. number
5. temperature_avg
6. sea-level pressure
7. shortwave downwelling radiation
8. relative humidity
9. region
10. season



First, we impute and normalize all numerical climate data in each metadata.csv file to make sure there are no NaNs. Imputation method: the class average.

In [None]:
import os
import pandas as pd
import numpy as np

# Define the base path and subfolder names
base_path = r"C:\TjallingData\greenearthnet_additional"
subfolders = ['val_chopped', 'iid_chopped', 'ood-s_chopped', 'ood-st_chopped', 'ood-t_chopped', 'train']

# Columns to process (all numeric columns except 'file_name')
columns_to_process = ['Wind Speed', 'Relative Humidity', 'Rainfall', 'Sea-level Pressure',
                      'Shortwave Downwelling Radiation', 'Temperature Avg', 'Temperature Min', 'Temperature Max']

def impute_and_normalize(df, columns):
    # Step 1: Impute missing values with column mean
    for column in columns:
        df[column] = df[column].fillna(df[column].mean())

    # Step 2: Normalize the data
    min_values = df[columns].min()
    max_values = df[columns].max()

    for column in columns:
        df[column] = (df[column] - min_values[column]) / (max_values[column] - min_values[column])

    return df

for subfolder in subfolders:
    folder_path = os.path.join(base_path, subfolder)

    # Read the metadata CSV file
    metadata_file = os.path.join(folder_path, 'metadata.csv')
    metadata_df = pd.read_csv(metadata_file)

    # Impute and normalize the data
    metadata_df = impute_and_normalize(metadata_df, columns_to_process)

    # Save the normalized dataframe to a new CSV file
    output_file = os.path.join(folder_path, 'metadata_normalized.csv')
    metadata_df.to_csv(output_file, index=False)

    print(f"Created {output_file}")

print("All metadata files processed and normalized successfully.")


Then we merge _detailed.csv and metadata.csv together into a new .csv file that contains file_name, frame_date, number, temperature_avg, sea-level pressure, shortwave downwelling radiation, and relative humidity.

In [None]:
import os
import pandas as pd

# Define the base path and subfolder names
base_path = r"C:\TjallingData\greenearthnet_additional"
subfolders =  ['val_chopped', 'iid_chopped', 'ood-s_chopped', 'ood-st_chopped', 'ood-t_chopped', 'train']


# Columns to copy from metadata.csv
metadata_columns = ['Temperature Avg', 'Sea-level Pressure', 'Shortwave Downwelling Radiation', 'Relative Humidity']

for subfolder in subfolders:
    folder_path = os.path.join(base_path, subfolder)

    # Read the detailed CSV file
    detailed_csv = f"{subfolder}_detailed.csv"
    detailed_df = pd.read_csv(os.path.join(folder_path, detailed_csv))

    # Read the metadata CSV file
    metadata_df = pd.read_csv(os.path.join(folder_path, 'metadata_normalized.csv'))

    # Merge the dataframes based on 'file_name'
    combined_df = pd.merge(detailed_df, metadata_df[['file_name'] + metadata_columns], on='file_name', how='left')

    # Save the combined dataframe to a new CSV file
    output_file = os.path.join(folder_path, f"{subfolder}_combined.csv")
    combined_df.to_csv(output_file, index=False)

    print(f"Created {output_file}")

print("All files processed successfully.")

Next, the imputed images their metadata will be imputed. This happens in the same way it happened for the images.

In [None]:
import os
import pandas as pd
import numpy as np
from datetime import datetime

def impute_climate_data(input_csv, output_csv):
    # Read the CSV file
    df = pd.read_csv(input_csv)

    # Convert date to datetime if it exists, otherwise use file_name to extract date
    if 'date' in df.columns:
        df['date'] = pd.to_datetime(df['date'])
    else:
        df['date'] = pd.to_datetime(df['file_name'].str.extract(r'(\d{4}-\d{2}-\d{2})')[0])

    # Extract minicube identifier from file_name
    df['minicube'] = df['file_name'].apply(lambda x: '_'.join(x.split('_')[:-1]))

    # Sort the dataframe by minicube and date
    df = df.sort_values(['minicube', 'date'])

    # List of climate columns to impute
    climate_columns = ['Temperature Avg', 'Sea-level Pressure', 'Shortwave Downwelling Radiation', 'Relative Humidity']

    # Group by minicube
    for minicube, group in df.groupby('minicube'):
        # For each climate column
        for column in climate_columns:
            # Get non-NaN values and their dates
            known_values = group.loc[group[column].notna(), column]
            known_dates = group.loc[group[column].notna(), 'date']

            if len(known_values) > 0:  # If there are known values in this minicube
                # Convert dates to numbers (days since the earliest date)
                date_nums = (group['date'] - group['date'].min()).dt.days
                known_date_nums = (known_dates - group['date'].min()).dt.days
                impute_date_nums = date_nums[group[column].isna()]

                # Interpolate values for imputed dates
                imputed_values = np.interp(
                    impute_date_nums,
                    known_date_nums,
                    known_values
                )

                # Assign interpolated values to the dataframe
                df.loc[group[group[column].isna()].index, column] = imputed_values

    # Remove the temporary 'minicube' column
    df = df.drop('minicube', axis=1)

    # Save the updated dataframe
    df.to_csv(output_csv, index=False)
    print(f"Imputed data saved to {output_csv}")

# Define the base path and subfolder names
base_path = r"C:\TjallingData\greenearthnet_additional"
subfolders = ['val_chopped', 'iid_chopped', 'ood-s_chopped', 'ood-st_chopped', 'ood-t_chopped', 'train']

for subfolder in subfolders:
    folder_path = os.path.join(base_path, subfolder)
    input_csv = os.path.join(folder_path, f"{subfolder}_combined.csv")
    output_csv = os.path.join(folder_path, f"{subfolder}_combined_imputed.csv")

    impute_climate_data(input_csv, output_csv)

print("All files processed and imputed successfully.")

Next, we scrape the location of each minicube and convert those to a location that we can use in the prompt.

In [None]:
import os
import pandas as pd
import xarray as xr
from geopy.geocoders import Nominatim
from geopy.exc import GeocoderTimedOut, GeocoderServiceError
import time
from tqdm import tqdm
import gc

def get_region(lat, lon, geolocator, cache):
    cache_key = f"{lat},{lon}"
    if cache_key in cache:
        return cache[cache_key]

    tries = 0
    while tries < 3:
        try:
            location = geolocator.reverse(f"{lat}, {lon}", language="en")
            if location:
                address = location.raw['address']
                region = address.get('state', address.get('county', "Unknown"))
                cache[cache_key] = region
                return region
            return "Unknown"
        except (GeocoderTimedOut, GeocoderServiceError):
            tries += 1
            time.sleep(1)
    return "Unknown"

def process_test_track(combined_imputed_path, minicube_folder, subfolder):
    geolocator = Nominatim(user_agent="minicube_processor")
    cache = {}

    # Read the combined_imputed CSV file
    df = pd.read_csv(combined_imputed_path)

    # Ensure 'region' column exists
    if 'region' not in df.columns:
        df['region'] = 'Unknown'

    # Process each .nc file in the minicube folder and its subfolders
    for root, dirs, files in os.walk(minicube_folder):
        for file in tqdm(files, desc=f"Processing {subfolder}"):
            if file.endswith('.nc'):
                minicube = file[:-3]  # Remove .nc extension
                nc_file = os.path.join(root, file)

                try:
                    with xr.open_dataset(nc_file) as ds:
                        lat = float(ds.lat.mean().values)
                        lon = float(ds.lon.mean().values)
                        region = get_region(lat, lon, geolocator, cache)
                        print(f"Processed minicube: {minicube}, Region: {region}")

                        # Update the DataFrame directly for each corresponding file
                        mask = df['file_name'].apply(lambda x: '_'.join(os.path.basename(x).split('_')[1:-1]) == minicube)
                        df.loc[mask, 'region'] = region

                        # Save progress after each processed minicube
                        output_file = os.path.join(os.path.dirname(combined_imputed_path), f"{subfolder}_combined_imputed_with_region.csv")
                        df.to_csv(output_file, index=False)
                        print(f"Updated DataFrame and saved to {output_file}")

                except Exception as e:
                    print(f"Error processing minicube file {nc_file}: {e}")

                # Clear some memory
                gc.collect()

# Base paths
base_path = r"C:\TjallingData\greenearthnet_additional"
minicube_base_path = r"C:\Workdir\Develop\greenearthnet"

subfolders = ['val_chopped', 'iid_chopped', 'ood-s_chopped', 'ood-st_chopped', 'ood-t_chopped', 'train']


# Process each test track
for subfolder in subfolders:
    combined_imputed_path = os.path.join(base_path, subfolder, f"{subfolder}_combined_imputed.csv")
    minicube_folder = os.path.join(minicube_base_path, subfolder)

    print(f"\nProcessing {subfolder}...")
    process_test_track(combined_imputed_path, minicube_folder, subfolder)

print("\nAll test tracks processed successfully.")


We now filter all region names and convert them to quality text, since certain symbols are expressed weirdly.

In [None]:
import csv
import re
import unicodedata
import os
from tqdm import tqdm

def normalize_region_name(name):
    # Decode the string to handle the specific problematic sequences correctly
    name = name.encode('latin-1').decode('utf-8', errors='ignore')

    # Specific problematic cases for manual replacement
    replacements = {
        'Ã©': 'é', 'Ã¼': 'ü', 'Ã³': 'ó', 'Ã': 'í',
        'â\x80\x93': '–', 'Ã¶': 'ö', 'Ã¡': 'á',
        'Ã¤': 'ä', 'Ã¢': 'â', 'Ã´': 'ô', 'Ã±': 'ñ'
    }
    for old, new in replacements.items():
        name = name.replace(old, new)

    # Normalize the Unicode characters
    normalized_name = unicodedata.normalize('NFKD', name)

    # Remove non-ASCII characters but keep spaces and common punctuation
    ascii_name = ''.join(char for char in normalized_name if ord(char) < 128 or char in (' ', '-', '/'))

    # Replace multiple spaces with a single space
    ascii_name = re.sub(r'\s+', ' ', ascii_name)

    # Remove leading/trailing spaces
    ascii_name = ascii_name.strip()

    return ascii_name

def process_csv_file(input_file, output_file):
    with open(input_file, 'r', newline='', encoding='latin-1') as infile, \
         open(output_file, 'w', newline='', encoding='utf-8') as outfile:

        reader = csv.reader(infile)
        writer = csv.writer(outfile)

        # Read the header
        header = next(reader)
        writer.writerow(header)

        # Find the index of the 'region' column
        region_index = header.index('region')

        for row in reader:
            # Normalize the region name
            row[region_index] = normalize_region_name(row[region_index])
            writer.writerow(row)

# Base path
base_path = r"C:\TjallingData\greenearthnet_additional"

subfolders = ['val_chopped', 'iid_chopped', 'ood-s_chopped', 'ood-st_chopped', 'ood-t_chopped', 'train']

# Process each subfolder
for subfolder in tqdm(subfolders, desc="Processing subfolders"):
    input_file = os.path.join(base_path, subfolder, f"{subfolder}_combined_imputed_with_region.csv")
    output_file = os.path.join(base_path, subfolder, f"{subfolder}_combined_imputed_with_region_normalized.csv")

    if os.path.exists(input_file):
        print(f"\nProcessing: {input_file}")
        process_csv_file(input_file, output_file)
        print(f"Normalized data has been written to {output_file}")
    else:
        print(f"\nFile not found: {input_file}")

print("\nAll files processed successfully.")

Next, we create the season variable by deducing it from each date the image was taken.

In [None]:
import os
import pandas as pd
from datetime import datetime
from tqdm import tqdm

def get_season(date):
    # Convert string to datetime if necessary
    if isinstance(date, str):
        date = datetime.strptime(date, '%Y-%m-%d')

    # Get day of year
    doy = date.timetuple().tm_yday

    # Define day ranges for seasons
    spring = range(80, 172)
    summer = range(172, 264)
    fall = range(264, 355)
    # winter = everything else

    # Define ranges for early, mid, late within each season
    def get_sub_season(start, end):
        third = (end - start + 1) // 3
        early = range(start, start + third)
        mid = range(start + third, start + 2*third)
        late = range(start + 2*third, end + 1)
        return early, mid, late

    spring_early, spring_mid, spring_late = get_sub_season(80, 171)
    summer_early, summer_mid, summer_late = get_sub_season(172, 263)
    fall_early, fall_mid, fall_late = get_sub_season(264, 354)
    winter_early, winter_mid, winter_late = get_sub_season(355, 79)

    if doy in spring:
        if doy in spring_early:
            return 'early spring'
        elif doy in spring_mid:
            return 'mid spring'
        else:
            return 'late spring'
    elif doy in summer:
        if doy in summer_early:
            return 'early summer'
        elif doy in summer_mid:
            return 'mid summer'
        else:
            return 'late summer'
    elif doy in fall:
        if doy in fall_early:
            return 'early fall'
        elif doy in fall_mid:
            return 'mid fall'
        else:
            return 'late fall'
    else:  # winter
        if doy in winter_early or doy in range(1, winter_early.stop):
            return 'early winter'
        elif doy in winter_mid:
            return 'mid winter'
        else:
            return 'late winter'

def add_season_to_csv(input_file, output_file):
    # Read the CSV file
    df = pd.read_csv(input_file)

    # Convert 'date' to datetime
    df['date'] = pd.to_datetime(df['date'])

    # Apply the function to create the new 'season' column
    df['season'] = df['date'].apply(get_season)

    # Save the updated DataFrame back to CSV
    df.to_csv(output_file, index=False)
    print(f"Processed: {output_file}")

# Base path
base_path = r"C:\TjallingData\greenearthnet_additional"

subfolders =  ['val_chopped', 'iid_chopped', 'ood-s_chopped', 'ood-st_chopped', 'ood-t_chopped', 'train']

# Process each subfolder
for subfolder in tqdm(subfolders, desc="Processing subfolders"):
    input_file = os.path.join(base_path, subfolder, f"{subfolder}_combined_imputed_with_region_normalized.csv")
    output_file = os.path.join(base_path, subfolder, f"{subfolder}_combined_imputed_with_region_normalized_and_season.csv")

    if os.path.exists(input_file):
        add_season_to_csv(input_file, output_file)
    else:
        print(f"File not found: {input_file}")

print("\nAll files processed successfully.")

Finally, we make a selection of minicubes we can use during testing: we can modify the selection criteria here, to omit any bad test sets containing less than 30% non-imputed images. It also adds an extra variable: 'minicube'.


Currently selecting on minimum 3 context and 6 target images, which means a total of 30% of all images need to be non-imputed. The same threshold is used in the source code of the EarthNetScore package.

Important note: The minimum threshold differs between the training set and test sets. For the training set, a minimum image count was set to 5 context and 10 target images (50%). This was done to ensure a larger training set.

In [None]:
import os
import csv
from collections import defaultdict
from tqdm import tqdm

context_img_required = 3
target_img_required = 6

def process_csv_file(input_file, output_file):
    # Dictionary to store minicube information
    minicubes = defaultdict(lambda: {'context_no_impute': 0, 'target_no_impute': 0})

    # Read the input CSV file and process each row
    with open(input_file, 'r') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            file_name = row['file_name']
            property_type = row['property']
            imputed = row['imputed']

            # Extract minicube name by removing the .png suffix and the date
            minicube_name = '_'.join(file_name.split('_')[:-1])
            minicube_name = minicube_name.rsplit('.', 1)[0]  # Remove .png

            # Increment the appropriate counter only if not imputed
            if imputed.lower() == 'no':
                if property_type == 'context':
                    minicubes[minicube_name]['context_no_impute'] += 1
                elif property_type == 'target':
                    minicubes[minicube_name]['target_no_impute'] += 1

    print(f"Debug: Number of unique minicubes: {len(minicubes)}")
    print(f"Debug: First 5 minicubes and their counts: {list(minicubes.items())[:5]}")

    # Write the results to the output CSV file
    with open(input_file, 'r') as infile, open(output_file, 'w', newline='') as outfile:
        reader = csv.DictReader(infile)
        fieldnames = reader.fieldnames + ['minicube', 'usable']
        writer = csv.DictWriter(outfile, fieldnames=fieldnames)

        writer.writeheader()
        for row in reader:
            file_name = row['file_name']
            minicube_name = '_'.join(file_name.split('_')[:-1])
            minicube_name = minicube_name.rsplit('.', 1)[0]  # Remove .png

            # Determine if the minicube is usable
            counts = minicubes[minicube_name]
            usable = 'yes' if counts['context_no_impute'] >= context_img_required and counts['target_no_impute'] >= target_img_required else 'no'

            # Add the 'minicube' and 'usable' columns to the row
            row['minicube'] = minicube_name
            row['usable'] = usable
            writer.writerow(row)

    return minicubes

# Base path
base_path = r"C:\TjallingData\greenearthnet_additional"

subfolders =  ['val_chopped', 'iid_chopped', 'ood-s_chopped', 'ood-st_chopped', 'ood-t_chopped', 'train']


# Process each subfolder
for subfolder in tqdm(subfolders, desc="Processing subfolders"):
    input_file = os.path.join(base_path, subfolder, f"{subfolder}_combined_imputed_with_region_normalized_and_season.csv")
    output_file = os.path.join(base_path, subfolder, f"{subfolder}_combined_imputed_with_region_normalized_season_minicube_and_usable.csv")

    if os.path.exists(input_file):
        print(f"\nProcessing {subfolder}")
        minicubes = process_csv_file(input_file, output_file)

        # Count usable and non-usable minicubes
        usable_count = sum(1 for counts in minicubes.values() if counts['context_no_impute'] >= context_img_required and counts['target_no_impute'] >= target_img_required)
        total_count = len(minicubes)

        print(f"\nResults for {subfolder}:")
        print(f"Total minicubes: {total_count}")
        print(f"Usable minicubes: {usable_count}")
        print(f"Non-usable minicubes: {total_count - usable_count}")
        print(f"Usable percentage: {usable_count / total_count * 100:.2f}%")

        # Debug: Print some sample minicubes and their counts
        print("\nDebug: Sample minicubes and their counts:")
        for minicube, counts in list(minicubes.items())[:5]:
            print(f"{minicube}: context_no_impute={counts['context_no_impute']}, target_no_impute={counts['target_no_impute']}")
    else:
        print(f"File not found: {input_file}")

print("\nProcessing complete.")

Moving both NIR imputed and NIR non-imputed to a common folder per track.

In [None]:
import os
import csv
from collections import defaultdict
from tqdm import tqdm
import shutil


# Base path
base_path = r"C:\TjallingData\greenearthnet_additional"

subfolders = ['val_chopped', 'iid_chopped', 'ood-s_chopped', 'ood-st_chopped', 'ood-t_chopped', 'train']

def copy_folders(subfolder_path, source_folders, target_folder):
    target_path = os.path.join(subfolder_path, target_folder)

    # Create target folder if it doesn't exist
    if not os.path.exists(target_path):
        os.makedirs(target_path)

    # Copy contents of source folders
    for source_folder in source_folders:
        source_path = os.path.join(subfolder_path, source_folder)
        if os.path.exists(source_path):
            for item in os.listdir(source_path):
                s = os.path.join(source_path, item)
                d = os.path.join(target_path, item)
                if os.path.isdir(s):
                    shutil.copytree(s, d, dirs_exist_ok=True)
                else:
                    shutil.copy2(s, d)

    print(f"Copied contents of {', '.join(source_folders)} to {target_folder} in {subfolder_path}")

# Process each subfolder
for subfolder in tqdm(subfolders, desc="Processing subfolders"):
    subfolder_path = os.path.join(base_path, subfolder)
    input_file = os.path.join(subfolder_path, f"{subfolder}_combined_imputed_with_region_normalized_and_season.csv")
    output_file = os.path.join(subfolder_path, f"{subfolder}_combined_imputed_with_region_normalized_season_minicube_and_usable.csv")
    
    if os.path.exists(input_file):
        print(f"\nProcessing {subfolder}")
        
        # Copy NIR folders
        copy_folders(subfolder_path, ['NIR', 'NIR_imputed'], 'NIR_total')
        
        # Copy RGB folders
        copy_folders(subfolder_path, ['RGB', 'RGB_imputed'], 'RGB_total')

print("\nProcessing complete.")

Next, we remove 'RGB_imputed' and 'NIR_imputed' from all imputed images in the _total folders:

In [None]:
import os

def remove_imputed_prefix(base_dir):
    folders = ['NIR_total', 'RGB_total']
    prefixes = ['NIR_imputed_', 'RGB_imputed_']

    for folder in folders:
        folder_path = os.path.join(base_dir, folder)
        if not os.path.exists(folder_path):
            print(f"Folder not found: {folder_path}")
            continue

        print(f"Processing folder: {folder_path}")
        for filename in os.listdir(folder_path):
            for prefix in prefixes:
                if filename.startswith(prefix):
                    old_path = os.path.join(folder_path, filename)
                    new_filename = filename.replace(prefix, '', 1)
                    new_path = os.path.join(folder_path, new_filename)

                    try:
                        os.rename(old_path, new_path)
                        print(f"Renamed: {filename} -> {new_filename}")
                    except Exception as e:
                        print(f"Error renaming {filename}: {str(e)}")

                    break  # Stop checking prefixes once one is found and removed

    print("Finished removing prefixes.")

# Usage
base_directory = r'C:\TjallingData\greenearthnet_additional\iid_chopped'
remove_imputed_prefix(base_directory)