In [None]:
# Imports
import os, random
import numpy as np
import pandas as pd
import cv2
from pathlib import Path
from tqdm.notebook import tqdm

In [2]:
roi_df = pd.read_csv("/kaggle/input/seasonal-regions-coords/seasonal_regions_coordinates.csv")

# Season in lowecase
SEASON = "fall"

df = roi_df[roi_df["Season"].str.lower() == SEASON].reset_index(drop=True)
print(f" Loaded {len(df)} {SEASON} coordinates from coords CSV.")

SEASON_DIR = ""
if SEASON == "fall" :
    SEASON_DIR = os.path.join("/kaggle/input/tum-sentinel-1-2", "Sentinel_fall", "ROIs1970_fall")
elif SEASON == "spring" :
    SEASON_DIR = os.path.join("/kaggle/input/tum-sentinel-1-2", "Sentinel_spring", "ROIs1158_spring")
elif SEASON == "winter" :
    SEASON_DIR = os.path.join("/kaggle/input/tum-sentinel-1-2", "Sentinel_winter", "ROIs2017_winter")
elif SEASON == "summer" :
    SEASON_DIR = os.path.join("/kaggle/input/tum-sentinel-1-2", "Sentinel_summer", "ROIs1868_summer")

 Loaded 64 fall coordinates from coords CSV.


In [None]:
from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()
secret_value_0 = user_secrets.get_secret("CLIENT-ID")
secret_value_1 = user_secrets.get_secret("CLIENT-SECRET")

In [None]:
from oauthlib.oauth2 import BackendApplicationClient
from requests_oauthlib import OAuth2Session

def getOAuth() :
    # Client credentials
    client_id =secret_value_0
    client_secret = secret_value_1

    # Create session
    client = BackendApplicationClient(client_id=client_id)
    oauth = OAuth2Session(client=client)

    # Get token
    token = oauth.fetch_token(token_url='https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/token',
                            client_secret=client_secret,
                            include_client_id=True)

    # Proper error handling
    def sentinelhub_compliance_hook(response):
        response.raise_for_status()
        return response

    oauth.register_compliance_hook("access_token_response", sentinelhub_compliance_hook)

    SessionData = (oauth, token)
    return SessionData

In [None]:
SEN2_COLLECTION_ID = 'byoc-5460de54-082e-473a-b6ea-d5cbe3c17cca'

def getImage(lat1: float, lon1: float, lat2: float, lon2: float, oauth: OAuth2Session, year: int) :
    
    bbox = [lon1, lat2, lon2, lat1]

    img_height = 2500
    img_width = 2500

    evalScript = """
    //VERSION=3
    function setup() {
        return {
            input: ["B02", "B03", "B04"],
            output: { bands: 3 }
        };
    }

    function evaluatePixel(sample) {
        return [2.5 * sample.B04/10000, 2.5 * sample.B03/10000, 2.5 * sample.B02/10000];
    }
    """

    fromDateTime = f"{year}-09-29T23:59:59Z"
    toDateTime = f"{year}-10-30T00:00:00Z"

    request = {
        "input": {
            "bounds": {
                "bbox": bbox
            },
            "data": [
                {
                    "dataFilter": {
                        "timerange": {
                            "from": fromDateTime,
                            "to": toDateTime
                        }
                    },
                    "type": SEN2_COLLECTION_ID
                }
            ]
        },
        "output": {
            "width": img_width,
            "height": img_height,
            "responses": [
                {
                    "identifier": "default",
                    "format": {
                        "type": "image/png"
                    }
                }
            ]
        },
        "evalscript": evalScript
    }

    url = "https://sh.dataspace.copernicus.eu/api/v1/process"
    response = oauth.post(url, json=request)

    if response.ok :
        with open("temp.png", 'wb') as fp :
            fp.write(response.content)

        big_img = cv2.imread("temp.png")
        all_parts = []
        for i in range(81) :

            x = i % 9
            y = i // 9

            crop_s2 = big_img[256*x : 256*(x+1), 256*y : 256*(y+1)]
            all_parts.append(crop_s2)

        return ("success", all_parts)

    else :
        return ("error", "error")


In [None]:
#  ORB Matching Function
def orb_match_score(img1, img2):
    try:
        orb = cv2.ORB_create()
        kp1, des1 = orb.detectAndCompute(img1, None)
        kp2, des2 = orb.detectAndCompute(img2, None)
        if des1 is None or des2 is None:
            return 0
        bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
        matches = bf.match(des1, des2)
        return len(matches)
    except:
        return 0

In [None]:
# Download images
from time import time

oauth, token = getOAuth()
YEAR = 2021

for i, row in tqdm(df.iterrows(), total=df.shape[0]):
        
        lat, lon = row["Latitude"], row["Longitude"]
        bbox = [lat, lon, lat+4, lon+3]

        try:
            if time() > (token["expires_at"] - 20) :
                oauth, token = getOAuth()

            status, s2_pngs = getImage(lat, lon, lat+4, lon+3, oauth, YEAR)

            if status == "error" :
                raise Exception("Error getting data from Copernicus")

            Path(f"{SEASON}/{i}").mkdir(parents=True, exist_ok=True)
            for index, cropped_image in enumerate(tqdm(s2_pngs, leave=False)) :
                cv2.imwrite(f"{SEASON}/{i}/image_{index}.png", cropped_image)

        except Exception as e:
            print(f" Error for coordinate ({lat},{lon}):", e)

In [None]:
# Process Each s2 Folder
results = []
s2_folders = sorted([f.path for f in os.scandir(SEASON_DIR) if f.is_dir() and "s2_" in f.name])
# print(s2_folders)

for folder in s2_folders:
    folder_name = os.path.basename(folder)
    print(f"\n Processing {folder_name}...")

    images = [img for img in os.listdir(folder) if img.endswith(".png")]
    if not images:
        print(" No PNGs in", folder)
        continue

    target_img_name = random.choice(images)
    target_path = os.path.join(folder, target_img_name)
    target_img = cv2.imread(target_path, cv2.IMREAD_COLOR)
    if target_img is None:
        print(" Couldn't load target image:", target_img_name)
        continue

    best_score = 0
    best_img_arr = None
    best_coord = None
    best_img = None

    
    for i, row in tqdm(df.iterrows(), total=df.shape[0]):
        
        lat, lon = row["Latitude"], row["Longitude"]

        try:
            s2_pngs =[cv2.imread(f"{SEASON}/{i}/{down_img}", cv2.IMREAD_COLOR) for down_img in os.listdir(f"{SEASON}/{i}")]

            # Matching of Sentinel-2 and target image 
            for img in tqdm(s2_pngs, leave=False):
                img_arr = (np.array(img) * 255).astype(np.uint8)
                score = orb_match_score(target_img, img_arr)
                if score > best_score:
                    best_score = score
                    best_img_arr = img_arr
                    best_coord = (lat, lon)
                    best_img = img
        except Exception as e:
            print(f" Error for coordinate ({lat},{lon}):", e)

    if best_img is None:
        print(" No suitable match found for", folder_name)
        continue

    print(f" Best match score: {best_score}")

    temp_zone = "Tropical" if abs(best_coord[0]) < 23.5 else "Temperate" if abs(best_coord[0]) < 66.5 else "Arctic"
    
    results.append({
        "s2_folder": folder_name,
        "temperature_zone": temp_zone,
        "match_score": best_score
    })

In [None]:
# Save results
output_df = pd.DataFrame(results)
csv_output_path = f"{SEASON}_ROI_results.csv"
output_df.to_csv(csv_output_path, index=False)
print("\n Final CSV saved at", csv_output_path)