In [1]:
import os
import numpy as np
import pandas as pd
import rasterio
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import cpu_count
import re
from scipy.stats import zscore
from sklearn.linear_model import LinearRegression
import matplotlib.pyplot as plt
import seaborn as sns


In [2]:

class SpectralIndicesExtractor:
    def __init__(self, base_dir, output_file, segment_types={'100m': [100, 200, 300], '200m': [200, 300, 400]}):
        self.base_dir = base_dir
        self.output_file = output_file
        self.segment_types = segment_types
        self.raw_results = []  # Store raw results before normalization
        self.max_workers = cpu_count()

    def extract_segment_info(self, file_path):
        """Extract segment_id, year, and configuration from the file path"""
        try:
            segment_type = '100m' if '100m aggregation' in file_path else '200m'
            match = re.search(r'segment_(\d+)_(\d{4})', file_path)
            if not match:
                return None
            segment_id = int(match.group(1))
            year = int(match.group(2))
            buffer_match = re.search(r'\\(\d+)m\\landsat\.tif$', file_path)
            if not buffer_match:
                return None
            buffer_size = int(buffer_match.group(1))

            # Add sensor type based on year
            sensor_type = 'L8' if year >= 2014 else 'L7'

            return {
                'segment_type': segment_type,
                'segment_id': segment_id,
                'year': year,
                'buffer_size': buffer_size,
                'sensor_type': sensor_type
            }
        except Exception as e:
            print(f"Error extracting segment info from {file_path}: {e}")
            return None

    def calculate_spectral_indices(self, image_data, sensor_type):
        """Calculate NDVI, NDWI, and NDBI indices with sensor-specific band mappings"""
        try:
            if sensor_type == 'L8':  # Landsat 8
                nir, red, green, swir = image_data[4], image_data[3], image_data[2], image_data[5]
            else:  # Landsat 5/7
                nir, red, green, swir = image_data[3], image_data[2], image_data[1], image_data[4]

            epsilon = 1e-8

            ndvi = (nir - red) / (nir + red + epsilon)
            ndwi = (green - swir) / (green + swir + epsilon)
            ndbi = (swir - nir) / (swir + nir + epsilon)

            return {
                "ndvi_mean": float(np.nanmean(ndvi)),
                "ndwi_mean": float(np.nanmean(ndwi)),
                "ndbi_mean": float(np.nanmean(ndbi))
            }
        except Exception as e:
            print(f"Error calculating indices: {e}")
            return {"ndvi_mean": np.nan, "ndwi_mean": np.nan, "ndbi_mean": np.nan}

    def process_file(self, file_path):
        """Process a single file and extract spectral indices"""
        try:
            segment_info = self.extract_segment_info(file_path)
            if not segment_info:
                return

            with rasterio.open(file_path) as src:
                data = src.read()
                indices = self.calculate_spectral_indices(data, segment_info['sensor_type'])
                result = {**segment_info, **indices}
                self.raw_results.append(result)

        except Exception as e:
            print(f"Error processing {file_path}: {e}")

    def normalize_indices(self):
        """Normalize indices by sensor type"""
        df = pd.DataFrame(self.raw_results)

        # Regression adjustment for L7 to match L8
        df_2013 = df[(df["year"] == 2013) & (df["sensor_type"] == "L7")]
        df_2014 = df[(df["year"] == 2014) & (df["sensor_type"] == "L8")]
        common_segments = set(df_2013["segment_id"]).intersection(set(df_2014["segment_id"]))

        df_2013 = df_2013[df_2013["segment_id"].isin(common_segments)]
        df_2014 = df_2014[df_2014["segment_id"].isin(common_segments)]

        for col in ["ndvi_mean", "ndwi_mean", "ndbi_mean"]:
            model = LinearRegression()
            model.fit(df_2013[[col]], df_2014[[col]])
            df.loc[df["sensor_type"] == "L7", col] = model.predict(df[df["sensor_type"] == "L7"][[col]])

        # Apply Z-score normalization
        for col in ["ndvi_mean", "ndwi_mean", "ndbi_mean"]:
            df[f"{col}_zscore"] = df.groupby("sensor_type")[col].transform(lambda x: zscore(x, nan_policy='omit'))

        return df

    def extract_indices(self):
        """Extract and normalize spectral indices from all TIF files"""
        tif_files = self.find_tif_files()
        print(f"Found {len(tif_files)} TIF files")

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            executor.map(self.process_file, tif_files)

        # Normalize results
        normalized_df = self.normalize_indices()
        normalized_df.to_csv(self.output_file, index=False)
        print(f"Normalized spectral indices saved to {self.output_file}")

    def find_tif_files(self):
        """Find all TIF files in the directory structure"""
        tif_files = []
        for segment_type in self.segment_types.keys():
            segment_dir = os.path.join(self.base_dir, f"First 6000 segments ({segment_type} aggregation)")
            if os.path.exists(segment_dir):
                for root, _, files in os.walk(segment_dir):
                    for file in files:
                        if file.endswith(".tif"):
                            tif_files.append(os.path.join(root, file))
        return tif_files


In [None]:

# Main script
def main():
    base_dir = "C:\\Users\\gmoor\\Documents\\REVISED CAPSTONE PROJECT\\Data"
    output_file = os.path.join(base_dir, "normalized_spectral_indices.csv")

    # Initialize and run the extraction
    extractor = SpectralIndicesExtractor(base_dir, output_file)
    extractor.extract_indices()

    # Load and categorize data
    df = pd.read_csv(output_file)

    # Define thresholds dynamically based on percentiles
    def define_thresholds(df, col):
        return {
            "low": df[col].quantile(0.25),
            "medium": df[col].quantile(0.5),
            "high": df[col].quantile(0.75),
        }

    # Compute thresholds separately for L7 and L8
    thresholds = {
        "NDVI": {"L7": define_thresholds(df[df["sensor_type"] == "L7"], "ndvi_mean"),
                 "L8": define_thresholds(df[df["sensor_type"] == "L8"], "ndvi_mean")},
        "NDWI": {"L7": define_thresholds(df[df["sensor_type"] == "L7"], "ndwi_mean"),
                 "L8": define_thresholds(df[df["sensor_type"] == "L8"], "ndwi_mean")},
        "NDBI": {"L7": define_thresholds(df[df["sensor_type"] == "L7"], "ndbi_mean"),
                 "L8": define_thresholds(df[df["sensor_type"] == "L8"], "ndbi_mean")},
    }

    # Assign categories based on computed thresholds
    def assign_categories(row, index, thresholds):
        value = row[index]
        sensor_type = row["sensor_type"]
        sensor_thresholds = thresholds[index.split('_')[0].upper()][sensor_type]
        if value <= sensor_thresholds["low"]:
            return "low"
        elif value <= sensor_thresholds["medium"]:
            return "medium"
        else:
            return "high"

    for index in ["ndvi_mean", "ndwi_mean", "ndbi_mean"]:
        df[f"{index}_category"] = df.apply(assign_categories, axis=1, args=(index, thresholds))

    # Compute Composite Environmental Score
    df["composite_env_score"] = df[["ndvi_mean_category", "ndwi_mean_category", "ndbi_mean_category"]].apply(
        lambda row: sum({"low": 1, "medium": 2, "high": 3}[r] for r in row), axis=1
    )

    # Save the final dataset
    final_output_file = os.path.join(base_dir, "final_categorized_spectral_indices.csv")
    df.to_csv(final_output_file, index=False)
    print("Categorization complete. Final data saved as:", final_output_file)


if __name__ == "__main__":
    main()

Found 3104 TIF files


In [None]:
import os
import numpy as np
import pandas as pd
import rasterio
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import cpu_count
import re
from scipy.stats import zscore
from sklearn.linear_model import LinearRegression
import matplotlib.pyplot as plt
import seaborn as sns


class SpectralIndicesExtractor:
    def __init__(self, base_dir, output_file, segment_types={'100m': [100, 200, 300], '200m': [200, 300, 400]}):
        self.base_dir = base_dir
        self.output_file = output_file
        self.segment_types = segment_types
        self.raw_results = []  # Store raw results before normalization
        self.max_workers = cpu_count()

    def extract_segment_info(self, file_path):
        """Extract segment_id, year, and configuration from the file path"""
        try:
            segment_type = '100m' if '100m aggregation' in file_path else '200m'
            match = re.search(r'segment_(\d+)_(\d{4})', file_path)
            if not match:
                return None
            segment_id = int(match.group(1))
            year = int(match.group(2))
            buffer_match = re.search(r'\\(\d+)m\\landsat\.tif$', file_path)
            if not buffer_match:
                return None
            buffer_size = int(buffer_match.group(1))

            # Add sensor type based on year
            sensor_type = 'L8' if year >= 2014 else 'L7'

            return {
                'segment_type': segment_type,
                'segment_id': segment_id,
                'year': year,
                'buffer_size': buffer_size,
                'sensor_type': sensor_type
            }
        except Exception as e:
            print(f"Error extracting segment info from {file_path}: {e}")
            return None

    def calculate_spectral_indices(self, image_data, sensor_type):
        """Calculate NDVI, NDWI, and NDBI indices with sensor-specific band mappings"""
        try:
            if sensor_type == 'L8':  # Landsat 8
                nir, red, green, swir = image_data[4], image_data[3], image_data[2], image_data[5]
            else:  # Landsat 5/7
                nir, red, green, swir = image_data[3], image_data[2], image_data[1], image_data[4]

            epsilon = 1e-8

            ndvi = (nir - red) / (nir + red + epsilon)
            ndwi = (green - swir) / (green + swir + epsilon)
            ndbi = (swir - nir) / (swir + nir + epsilon)

            return {
                "ndvi_mean": float(np.nanmean(ndvi)),
                "ndwi_mean": float(np.nanmean(ndwi)),
                "ndbi_mean": float(np.nanmean(ndbi))
            }
        except Exception as e:
            print(f"Error calculating indices: {e}")
            return {"ndvi_mean": np.nan, "ndwi_mean": np.nan, "ndbi_mean": np.nan}

    def process_file(self, file_path):
        """Process a single file and extract spectral indices"""
        try:
            segment_info = self.extract_segment_info(file_path)
            if not segment_info:
                return

            with rasterio.open(file_path) as src:
                data = src.read()
                indices = self.calculate_spectral_indices(data, segment_info['sensor_type'])
                result = {**segment_info, **indices}
                self.raw_results.append(result)

        except Exception as e:
            print(f"Error processing {file_path}: {e}")

    def normalize_indices(self):
        """Normalize indices by sensor type"""
        df = pd.DataFrame(self.raw_results)

        # Regression adjustment for L7 to match L8
        df_2013 = df[(df["year"] == 2013) & (df["sensor_type"] == "L7")]
        df_2014 = df[(df["year"] == 2014) & (df["sensor_type"] == "L8")]
        common_segments = set(df_2013["segment_id"]).intersection(set(df_2014["segment_id"]))

        df_2013 = df_2013[df_2013["segment_id"].isin(common_segments)]
        df_2014 = df_2014[df_2014["segment_id"].isin(common_segments)]

        for col in ["ndvi_mean", "ndwi_mean", "ndbi_mean"]:
            model = LinearRegression()
            model.fit(df_2013[[col]], df_2014[[col]])
            df.loc[df["sensor_type"] == "L7", col] = model.predict(df[df["sensor_type"] == "L7"][[col]])

        # Apply Z-score normalization
        for col in ["ndvi_mean", "ndwi_mean", "ndbi_mean"]:
            df[f"{col}_zscore"] = df.groupby("sensor_type")[col].transform(lambda x: zscore(x, nan_policy='omit'))

        return df

    def extract_indices(self):
        """Extract and normalize spectral indices from all TIF files"""
        tif_files = self.find_tif_files()
        print(f"Found {len(tif_files)} TIF files")

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            executor.map(self.process_file, tif_files)

        # Normalize results
        normalized_df = self.normalize_indices()
        normalized_df.to_csv(self.output_file, index=False)
        print(f"Normalized spectral indices saved to {self.output_file}")

    def find_tif_files(self):
        """Find all TIF files in the directory structure"""
        tif_files = []
        for segment_type in self.segment_types.keys():
            segment_dir = os.path.join(self.base_dir, f"First 6000 segments ({segment_type} aggregation)")
            if os.path.exists(segment_dir):
                for root, _, files in os.walk(segment_dir):
                    for file in files:
                        if file.endswith(".tif"):
                            tif_files.append(os.path.join(root, file))
        return tif_files


# Main script
def main():
    base_dir = "C:\\Users\\gmoor\\Documents\\REVISED CAPSTONE PROJECT\\Data"
    output_file = os.path.join(base_dir, "normalized_spectral_indices.csv")

    # Initialize and run the extraction
    extractor = SpectralIndicesExtractor(base_dir, output_file)
    extractor.extract_indices()

    # Load and categorize data
    df = pd.read_csv(output_file)

    # Define thresholds dynamically based on percentiles
    def define_thresholds(df, col):
        return {
            "low": df[col].quantile(0.25),
            "medium": df[col].quantile(0.5),
            "high": df[col].quantile(0.75),
        }

    # Compute thresholds separately for L7 and L8
    thresholds = {
        "NDVI": {"L7": define_thresholds(df[df["sensor_type"] == "L7"], "ndvi_mean"),
                 "L8": define_thresholds(df[df["sensor_type"] == "L8"], "ndvi_mean")},
        "NDWI": {"L7": define_thresholds(df[df["sensor_type"] == "L7"], "ndwi_mean"),
                 "L8": define_thresholds(df[df["sensor_type"] == "L8"], "ndwi_mean")},
        "NDBI": {"L7": define_thresholds(df[df["sensor_type"] == "L7"], "ndbi_mean"),
                 "L8": define_thresholds(df[df["sensor_type"] == "L8"], "ndbi_mean")},
    }

    # Assign categories based on computed thresholds
    def assign_categories(row, index, thresholds):
        value = row[index]
        sensor_type = row["sensor_type"]
        sensor_thresholds = thresholds[index.split('_')[0].upper()][sensor_type]
        if value <= sensor_thresholds["low"]:
            return "low"
        elif value <= sensor_thresholds["medium"]:
            return "medium"
        else:
            return "high"

    for index in ["ndvi_mean", "ndwi_mean", "ndbi_mean"]:
        df[f"{index}_category"] = df.apply(assign_categories, axis=1, args=(index, thresholds))

    # Compute Composite Environmental Score
    df["composite_env_score"] = df[["ndvi_mean_category", "ndwi_mean_category", "ndbi_mean_category"]].apply(
        lambda row: sum({"low": 1, "medium": 2, "high": 3}[r] for r in row), axis=1
    )

    # Save the final dataset
    final_output_file = os.path.join(base_dir, "final_categorized_spectral_indices.csv")
    df.to_csv(final_output_file, index=False)
    print("Categorization complete. Final data saved as:", final_output_file)


if __name__ == "__main__":
    main()