#**Pipeline Download Landsat Images**




In [None]:
# Step 1: Setup and initialization
import ee
ee.Authenticate()
project_id = 'ee-beatriznattrodt'  # Replace with your actual project ID
ee.Initialize(project=project_id)

In [None]:
import os
import ee
import datetime
import geopandas as gpd
from google.colab import drive


# Input and output folder paths
shapefile_folder = '/content/roi'
output_folder = '/content/landsat'

# Constants
LANDSAT_RESOLUTION = 30  # meters
OUTPUT_CRS = 'EPSG:4326'  # wgs84

# Method to obtain historical series of Landsat images
def get_landsat_collection(aoi, start_date='2023-05-01', end_date='2023-12-31'):
    # Landsat Collection (tunable for different sensors)
    collection = ee.ImageCollection('LANDSAT/LC08/C02/T1_L2') \
        .filterBounds(aoi) \
        .sort('CLOUD_COVER') \
        .filterDate(start_date, end_date) \
        .map(lambda img: img.select(["SR_B1", "SR_B2", "SR_B3", "SR_B4", "SR_B5", "SR_B7"]))

    collection_list = collection.toList(collection.size())
    for i in range(collection.size().getInfo()):
        image = ee.Image(collection_list.get(i))
        timestamp = image.get('system:time_start').getInfo()  # Get the timestamp
        date = datetime.datetime.utcfromtimestamp(timestamp / 1000)  # Convert to datetime
        print(f"Image {i+1}: {date}")

    return collection

# Method to save images to Google Drive
def export_to_drive(image, description, region, output_folder, scale=30, crs="EPSG:4326"):
    task = ee.batch.Export.image.toDrive(
        image=image,
        description=description,
        folder=output_folder,
        scale=scale,
        region=region,
        crs=crs,
        maxPixels=1e13
    )
    task.start()
    print(f'Exportação iniciada: {description}')

In [None]:
# Loop through the shapefiles in the folder
for file in os.listdir(shapefile_folder):
    if file.endswith('.shp'):
        shapefile_path = os.path.join(shapefile_folder, file)

        # Load shapefile as GeoDataFrame
        gdf = gpd.read_file(shapefile_path)
        aoi = ee.Geometry.Polygon(gdf.geometry[0].__geo_interface__['coordinates'])

        # Get Landsat Collection
        landsat_collection = get_landsat_collection(aoi)

        # Export each image to Google Drive
        _collection_size = landsat_collection.size().getInfo()
        image_ee = landsat_collection.first().clip(aoi)
        description = f"{os.path.splitext(file)[0]}_landsat"
        print(image_ee.getInfo())
        export_to_drive(
            image=image_ee,
            description=description,
            region=aoi,
            output_folder=output_folder,
            scale=LANDSAT_RESOLUTION,
            crs=OUTPUT_CRS,
            )


print("Processing complete. Check the output folder in Google Drive.")

Image 1: 2023-08-31 13:28:52.501000
Image 2: 2023-10-18 13:29:01.411000
Image 3: 2023-10-02 13:29:00.187000
Image 4: 2023-07-14 13:28:35.078000
Image 5: 2023-07-30 13:28:42.481000
Image 6: 2023-09-16 13:28:56.586000
Image 7: 2023-06-12 13:28:19.279000
Image 8: 2023-08-15 13:28:48.016000
Image 9: 2023-11-19 13:29:06.649000
Image 10: 2023-12-05 13:28:59.420000
Image 11: 2023-11-03 13:29:05.081000
{'type': 'Image', 'bands': [{'id': 'SR_B1', 'data_type': {'type': 'PixelType', 'precision': 'int', 'min': 0, 'max': 65535}, 'dimensions': [333, 126], 'origin': [5432, 4665], 'crs': 'EPSG:32622', 'crs_transform': [30, 0, 594285, 0, -30, 115815]}, {'id': 'SR_B2', 'data_type': {'type': 'PixelType', 'precision': 'int', 'min': 0, 'max': 65535}, 'dimensions': [333, 126], 'origin': [5432, 4665], 'crs': 'EPSG:32622', 'crs_transform': [30, 0, 594285, 0, -30, 115815]}, {'id': 'SR_B3', 'data_type': {'type': 'PixelType', 'precision': 'int', 'min': 0, 'max': 65535}, 'dimensions': [333, 126], 'origin': [5432,

#**Pipeline Resample Images**

In [None]:
!pip install rasterio loguru
!pip install opencv-python-headless

##**Class Raster**

Class responsible for performing all raster operations

In [None]:
import numpy as np
from loguru import logger
import os
import rasterio
import cv2
from time import time
from concurrent.futures import ProcessPoolExecutor




class Raster:
    """Class to execute raster operations"""

    def __init__(self) -> None:
        pass

    def resample_bands_rasterio(self, filepath, output_path, xres, yres, retinex):
        start_time = time()
        logger.info(f"Resampling resolution to approximately {xres}x{yres} degrees using Rasterio's Lanczos...")

        # Open the input raster file using Rasterio
        with rasterio.open(filepath) as dataset:

            # Calculate scale factors for the x and y axes based on desired resolution
            scale_factor_x = dataset.res[0] / xres
            scale_factor_y = dataset.res[1] / yres

            # Copy the dataset profile to keep metadata for the output file
            profile = dataset.profile.copy()

            # Read the data from the dataset, applying the Lanczos resampling method to adjust resolution
            data = dataset.read(
                out_shape=(
                    dataset.count,
                    int(dataset.height * scale_factor_y),
                    int(dataset.width * scale_factor_x),
                ),
                resampling=rasterio.enums.Resampling.lanczos,
            )

            # Adjust the affine transform to account for the new scaling
            transform = dataset.transform * dataset.transform.scale(
                (1 / scale_factor_x), (1 / scale_factor_y)
            )

            # Applying retinex
            if retinex:
                logger.info("Aplicando Retinex nas bandas...")
                data = np.transpose(data, (1, 2, 0))  # From (bands, rows, cols) To (rows, cols, bands)
                data = self.apply_retinex(data)
                data = np.transpose(data, (2, 0, 1))  # Back to (bands, rows, cols)
                profile.update(dtype=rasterio.uint8)
            else:
                profile.update(dtype=dataset.dtypes[0])

            # Update the profile with the new dimensions, transform, and file format (GTiff)
            profile.update({
                "height": data.shape[1],
                "width": data.shape[2],
                "transform": transform,
                "driver": "GTiff",
                "count": data.shape[0],
            })

            # Write the resampled data to the specified output path with the updated profile
            with rasterio.open(output_path, "w", **profile) as dst:
                dst.write(data)
        end_time = time()
        logger.success(f"Resampling complete and saved to {output_path}! Duration: {end_time - start_time:.2f} seconds")


    def resample_bands_opencv(self, filepath, output_path, scale_factor, retinex):
      start_time = time()
      logger.info(f"Resampling resolution using OpenCV's Lanczos...")

      # Open raster
      with rasterio.open(filepath) as src:
          bands = src.read()
          profile = src.profile

          processed_bands = []

          # Unstacking bands
          for band in bands:
              # Resample bands for 10m using Lanczos in OpenCV
              band_resized = cv2.resize(band, (int(band.shape[1] * scale_factor), int(band.shape[0] * scale_factor)), interpolation=cv2.INTER_LANCZOS4)
              processed_bands.append(band_resized)

          # Stacking bands
          result = np.stack(processed_bands, axis=-1)  # Shape: (rows, cols, bands)

          # Apply Retinex
          if retinex:
              logger.info("Applying Retinex to the bands...")
              result = self.apply_retinex(result)

          result = np.transpose(result, (2, 0, 1))  # Back to (bands, rows, cols)

          # Update metadata
          profile.update({
              'height': result.shape[1],
              'width': result.shape[2],
              'transform': profile['transform'] * profile['transform'].scale(1 / scale_factor, 1 / scale_factor),
              'dtype': rasterio.uint8 if retinex else result.dtype
          })

          # Save image
          with rasterio.open(output_path, 'w', **profile) as dst:
              dst.write(result)

      end_time = time()
      logger.success(f"Resampling complete. Output saved successfully. Duration: {end_time - start_time:.2f} seconds")


    def apply_retinex(self, image, sigma_list=[15, 80, 250]):
        logger.info("Applying Retinex...")
        # Convert image to float32
        image = image.astype(np.float32) + 1.0

        # Apply Multi-Scale Retinex
        retinex = self.multi_scale_retinex(image, sigma_list)

        # Normalize image to original values
        retinex = self.simple_color_balance(retinex)
        retinex = np.clip(retinex, 0, 255).astype(np.uint8)

        logger.success("Retinex applyed sucessfully!")
        return retinex

    def multi_scale_retinex(self, image, sigma_list):
        retinex = np.zeros_like(image)
        for sigma in sigma_list:
            logger.debug(f"Processing sigma={sigma}")
            # Apply gaussian filter
            img_blur = cv2.GaussianBlur(image, (0, 0), sigma)
            # Avoid division for 0
            img_blur[img_blur == 0] = 0.01
            # Calculate retinex
            retinex += np.log(image) - np.log(img_blur)
        retinex = retinex / len(sigma_list)
        return retinex

    def simple_color_balance(self, image, low_clip=0.01, high_clip=0.01):
        total_pixels = image.shape[0] * image.shape[1]
        for i in range(image.shape[2]):
            unique, counts = np.unique(image[:, :, i], return_counts=True)
            cumulative_counts = np.cumsum(counts)
            low_val = unique[np.searchsorted(cumulative_counts, total_pixels * low_clip)]
            high_val = unique[np.searchsorted(cumulative_counts, total_pixels * (1 - high_clip))]
            image[:, :, i] = np.clip(image[:, :, i], low_val, high_val)
            image[:, :, i] = (image[:, :, i] - low_val) * 255 / (high_val - low_val)
        return image



##**Class Calculator**

Class responsible for performing all calculation operations

In [None]:
from loguru import logger
import math

class Calculator:
    """Class to execute raster operations"""

    def __init__(self) -> None:
        pass

    def meters_to_degree(self, meter):
      degrees = meter / 111320

      return degrees

##**Class Files**

Class responsible for performing all operations relating to folders and files

In [None]:
from loguru import logger
import os

class Files:
    """Class to execute raster operations"""

    def __init__(self) -> None:
        pass

    def create_folder(self, path):
      if not os.path.exists(path):
        os.makedirs(path)
        logger.success(f"Directory '{path}' created successfully.")
      else:
        logger.warning(f"Directory '{path}' already exists.")

    def separate_filename(self, filename):
      logger.info("Getting file's roi")
      parts = filename.split("_")
      formatted_roi = f'{parts[0]}_{parts[1]}'
      logger.success(f"Region separated: {formatted_roi}")
      return formatted_roi

    def check_file_exists(self, filepath):
      return  os.path.exists(filepath)

    def list_files(self, path):
      files = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
      return files


##**Pipeline**

Class responsible for managing the resample pipeline

In [None]:
class Pipeline:
    """Class to execute raster operations"""

    def __init__(self) -> None:
      self.files = Files()
      self.calc = Calculator()
      self.raster = Raster()

      self.resample_map = {
            "rasterio": self.raster.resample_bands_rasterio,
            'OpenCV': self.raster.resample_bands_opencv,
        }

    def resample_lanczos_pipeline(self, path, pixel_size, scale_factor, kernel_size, window_size, resample_type, retinex=False):
        resample_method = self.resample_map.get(resample_type)
        if not resample_method:
            raise ValueError(f"Unsupported resample type: {resample_type}")

        files = self.files.list_files(path)
        pixel_size_degree = self.calc.meters_to_degree(pixel_size)

        resample_configs = {
            "rasterio": {"xres": pixel_size_degree, "yres": pixel_size_degree, "retinex": retinex},
            "OpenCV": {"scale_factor": scale_factor, 'retinex': retinex},
        }

        for file in files:
            try:
                folder_roi = self.files.separate_filename(file)
                output_folder = os.path.join(path, folder_roi, f"{pixel_size}m", resample_type)
                self.files.create_folder(output_folder)
                if retinex:
                  fullpath = os.path.join(output_folder, f'retinex_{file}')
                else:
                  fullpath = os.path.join(output_folder, file)
                if not self.files.check_file_exists(fullpath):
                    filepath = os.path.join(path, file)
                    resample_method(filepath, fullpath, **resample_configs[resample_type])
                else:
                    logger.warning(f"{file} já foi reamostrado")
            except Exception as e:
                logger.error(f"Erro ao processar o arquivo {file}: {e}")




if __name__ == "__main__":
    path = '/content/drive/MyDrive/Visao Computacional/areas_of_interest/output'
    pipeline = Pipeline()
    pipeline.resample_lanczos_pipeline(path, 10, 3, 7, 3, 'rasterio')

