In [1]:
# Standard Library Imports
from concurrent.futures import ThreadPoolExecutor, as_completed
from joblib import Parallel, delayed
from tqdm import tqdm
from datetime import datetime
from pathlib import Path
import os
import random

# Third-Party Imports
import ee
import geemap
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import rasterio
from lightgbm import LGBMClassifier
from shapely.affinity import scale, translate
from skimage import exposure
from sklearn.metrics import classification_report, f1_score
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.preprocessing import LabelEncoder

from shapely import wkt
import geopandas as gpd

import warnings
warnings.filterwarnings('ignore')

import logging
# Set up a logger to capture Rasterio warnings
logging.getLogger("rasterio._env").setLevel(logging.ERROR)

from my_configs.configs import Config
from my_utils.utils import Utils

rooth_path = Config.ROOT_PATH
train_path = Config.TRAIN_CSV
test_path = Config.TEST_CSV

# Authenticate with Google Earth Engine
# This opens a browser prompt for authentication, if not previously authenticated
ee.Authenticate()

# Initialize Earth Engine with a specific project
# Replace "project" with your project ID as needed
ee.Initialize(project=Config.PROJECT)

In [2]:

# Load the datasets
train = pd.read_csv(train_path)
test = pd.read_csv(test_path)

# Convert pandas DataFrames to GeoDataFrames with CRS set to 'epsg:4326'
train = gpd.GeoDataFrame(train, crs='epsg:4326', geometry=train['geometry'].apply(wkt.loads))
test = gpd.GeoDataFrame(test, crs='epsg:4326', geometry=test['geometry'].apply(wkt.loads))

# Concatenate train and test datasets into a single DataFrame for consistent processing
# 'dataset' column distinguishes between train and test rows
data = pd.concat(
    [train.assign(dataset='train'), test.assign(dataset='test')]
).reset_index(drop=True)

download = True

In [2]:
if download:
    error_indices = []  # Initialize list to collect indices of error rows
    # Execute image downloads for each row in parallel to improve performance
    with ThreadPoolExecutor(max_workers=4) as executor:  # Adjust the number of workers as needed
        futures = [executor.submit(Utils.process_row, index, row) for index, row in data.iterrows()]
        for future in tqdm(as_completed(futures), total=len(futures), desc="Downloading Images", unit="image"): 
            index, image_file = future.result() # Get the result from the future object                                                                                                                                                                                                                                                                                                                                                                                                                                                                 ``
            if image_file:
                data.at[index, 'tif_path'] = image_file  # Store the downloaded file path
            else:
                error_indices.append(index)  # Collect indices of rows with errors
    
    # Save error rows to a separate CSV file
    if error_indices:
        error_rows = data.loc[error_indices]
        error_rows.to_csv('error_rows.csv', index=False)
        logging.info(f"Saved {len(error_indices)} error rows to error_rows.csv")
    else:
        logging.info("All images downloaded successfully without errors.")
else:
    # Load existing data with image paths if downloads are not required
    os.system('cp -r ./downloads .')
    data_path = "./data.csv"

    data = pd.read_csv(data_path)


Attention required for COPERNICUS/S2! You are using a deprecated asset.
To ensure continued functionality, please update it.
Learn more: https://developers.google.com/earth-engine/datasets/catalog/COPERNICUS_S2

Downloading Images:   0%|          | 3/10606 [00:03<2:45:37,  1.07image/s]ERROR:root:Failed to process entry for District Medak after attempt.
Downloading Images:   0%|          | 5/10606 [00:04<2:28:14,  1.19image/s]ERROR:root:Failed to process entry for District Medak after attempt.
Downloading Images:   0%|          | 7/10606 [00:07<4:14:46,  1.44s/image]

Downloading data from https://earthengine.googleapis.com/v1/projects/zinditelangana2k24/thumbnails/65024423128d86999044f0bc698ba60c-b0efaee48c20b158b3135998cf8b912d:getPixels
Please wait ...


Downloading Images:   0%|          | 8/10606 [00:08<3:40:53,  1.25s/image]

Data downloaded to d:\github\telangana-crop-health\downloads\20240527T050651_20240527T051452_T43QHV_RGB_2023-11-20_2024-05-30.tif


Downloading Images:   0%|          | 9/10606 [00:12<6:05:46,  2.07s/image]ERROR:root:Failed to process entry for District Medak after attempt.
Downloading Images:   0%|          | 11/10606 [00:13<3:43:17,  1.26s/image]ERROR:root:Failed to process entry for District Medak after attempt.
Downloading Images:   0%|          | 13/10606 [00:14<2:59:32,  1.02s/image]ERROR:root:Failed to process entry for District Medak after attempt.
Downloading Images:   0%|          | 14/10606 [00:16<4:01:54,  1.37s/image]ERROR:root:Failed to process entry for District Medak after attempt.
Downloading Images:   0%|          | 17/10606 [00:21<4:25:47,  1.51s/image]ERROR:root:Failed to process entry for District Medak after attempt.
Downloading Images:   0%|          | 20/10606 [00:24<3:48:11,  1.29s/image]ERROR:root:Failed to process entry for District Medak after attempt.
Downloading Images:   0%|          | 23/10606 [00:26<3:12:37,  1.09s/image]ERROR:root:Failed to process entry for District Medak after at

In [None]:
# Save error rows to a separate CSV file
if error_indices:
    error_rows = data.loc[error_indices]
    error_rows.to_csv('error_rows.csv', index=False)
    logging.info(f"Saved {len(error_indices)} error rows to error_rows.csv")
else:
    logging.info("All images downloaded successfully without errors.")

# Retry Mechanism for Error Rows
if os.path.exists('error_rows.csv') and not error_indices:
    logging.info("No error rows to retry.")
elif os.path.exists('error_rows.csv'):
    retry_data = pd.read_csv('error_rows.csv')
    retry_error_indices = []  # Initialize list for new error indices
    
    logging.info("Retrying failed image downloads from error_rows.csv...")
    
    with ThreadPoolExecutor(max_workers=10) as executor:
        retry_futures = [executor.submit(Utils.process_row, index, row) for index, row in retry_data.iterrows()]
        for future in tqdm(as_completed(retry_futures), total=len(retry_futures), desc="Retrying Error Rows", unit="image"):
            index, image_file = future.result()
            if image_file:
                data.at[index, 'tif_path'] = image_file  # Update the downloaded file path
            else:
                retry_error_indices.append(index)  # Collect new errors
    
    # Save any new error rows back to CSV
    if retry_error_indices:
        new_error_rows = retry_data.loc[retry_error_indices]
        new_error_rows.to_csv('error_rows.csv', index=False)
        logging.info(f"Saved {len(retry_error_indices)} retry error rows to error_rows.csv")
    else:
        os.remove('error_rows.csv')  # Remove the file if all retries succeeded
        logging.info("All error rows re-downloaded successfully.")