<a href="https://colab.research.google.com/github/Amandaagambire/Galamsey-Watch/blob/main/G_watch_copy.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Install Earth Engine**

In [None]:

!pip install geemap
!pip install folium
!pip install rasterio
!pip install segmentation-models-pytorch
!pip install pyngrok
!pip install flask
!pip install earthengine-api --upgrade


# **Authenticate & Initialize earth engine**

In [None]:
from google.colab import drive
import ee
import geemap
import folium
import rasterio
from PIL import Image
import numpy as np
import os
import numpy as np
from google.colab import drive
from skimage.color import rgb2gray
from skimage.feature import graycomatrix, graycoprops
import pandas as pd
import torch
import torch.nn as nn
from torchvision import transforms
import shutil
from google.colab.output import eval_js
from pyngrok import ngrok
# from flask import Flask, request, jsonify, render_template




# Mount Google Drive
drive.mount('/content/drive')

#Authenticae then initialise google Earth Engine
ee.Authenticate()
ee.Initialize(project = 'ee-amandaagambire22')
print("Earth Engine Initialized")


# **DATA ACQUISITION: Fetching Sentinel -2 Images**

In [None]:
#Define Region of Interest
ghana = ee.FeatureCollection("FAO/GAUL/2015/level0") \
          .filter(ee.Filter.eq('ADM0_NAME', 'Ghana'))
# Define time range
start_date = '2023-01-01'
end_date = '2023-12-31'

# Sentinel-2 ImageCollection
sentinel = ee.ImageCollection('COPERNICUS/S2_SR') \
    .filterDate(start_date, end_date) \
    .filterBounds(ghana) \
    .filter(ee.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', 10)) \
    .median() \
    .clip(ghana)

rgb_nir = sentinel.select(['B4', 'B3', 'B2', 'B8'])


# **Vizulaize the Extracted images**


In [None]:

Map = geemap.Map(center = [5.2, -2.1], zoom=10)


Map.addLayer(sentinel, {'min': 0, 'max': 3000}, "Sentinel-2 RGB")
# Map.addLayer(landsat_rgb, {'min': 0, 'max': 3000}, "Landsat-8")

#Lets see it
Map

 # **Export to Google Drive as GeoTIFF**

In [None]:
# task = ee.batch.Export.image.toDrive(
#     image=rgb_nir,
#     description='ghana_sentinel_image',
#     folder='GEE_Exports',
#     fileNamePrefix='ghana_rgbnir',
#     region=ghana.geometry(),
#     scale=10,
#     maxPixels=1e13
# )
# task.start()


# **Load the GeoTIFF into Colab**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

import rasterio
import matplotlib.pyplot as plt

import os
base_path = '/content/drive/MyDrive/'

# Show all folders/files inside your Drive root
# for root, dirs, files in os.walk(base_path):
#     print(f"\nIn folder: {root}")
#     for file in files:
#         print(file)

tif_path = '/content/drive/MyDrive/galamsey_detection/ghana_rgbnir.tif'
if not os.path.exists(tif_path):
    raise FileNotFoundError(f"File not found at: {tif_path}")

with rasterio.open(tif_path) as src:
    img = src.read()  # shape: (bands, height, width)
    profile = src.profile


# **Export images to Google Drive**

In [None]:
def export_image_to_drive(image, description='Sentinel-2_Mining_Area', folder = 'GEE_Exports'):
  export_task = ee.batch.Export.image.toDrive(
      image=image,
      description=description,
      scale=30,
      region=region,
      fileFormat='GeoTIFF',
      folder= folder  # This is where the file will be saved in Drive
  )
  # Start the export task
  export_task.start()
  print("Export started! Check Google Drive → GEE_Exports folder in 5-10 minutes.")



task_list = ee.batch.Task.list()
for task in task_list:
    print(task.status())




# **Moves a file from Google Drive to Colab**

In [None]:
def move_file_from_drive_to_colab(source_path_on_drive, destination_path_on_colab):
    """Moves a file from Google Drive to Colab."""
    if IN_COLAB:
        source_path = f"/content/drive/My Drive/{source_path_on_drive}"
        try:
            shutil.copy(source_path, destination_path_on_colab)
            print(f"File moved to Colab: {destination_path_on_colab}")
            return True
        except FileNotFoundError:
            print(f"Error: Source file not found at {source_path}")
            return False
        except Exception as e:
            print(f"Error moving file: {e}")
            return False
    else:
        print("Not running in Colab, skipping file move.")
        return False

local_image_path = '/content/Sentinel2_Mining_Area.tif' # Assuming you have downloaded or moved the image


# **Download & Label for training**

In [None]:
import os

# Check if file exists in Google Colab
print(os.listdir('/content/'))


In [None]:
try:
  with rasterio.open(local_image_path) as src:
    image_array = src.read()
    image_metadata = src.profile
    red_band = image_array[0].astype(np.float32)
    green_band = image_array[1].astype(np.float32)
    blue_band = image_array[2].astype(np.float32)
    # nir_band = image_array[3]
except Exception as e:
    print(f"Error opening the image: {e}")

print(image_array.shape)
  # Print metadta
print(image_metadata)


# **U-Net Model Definition**

In [None]:
from logging import log
class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(DoubleConv, self).__init__()
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )
    def forward(self, x):
        return self.double_conv(x)

class UNet(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(UNet, self).__init__()
        self.inc = DoubleConv(in_channels, 64)
        self.outc = nn.Conv2d(64, out_channels, kernel_size=1) # Simplified output

    def forward(self, x):
        x1 = self.inc(x)
        logits = self.outc(x1)
        return logits

# Load a pre-trained model (replace with the actual path to your trained model)
n_channels = 3
n_classes = 1
model = UNet(n_channels, n_classes)
try:
    model.load_state_dict(torch.load('/content/drive/My Drive/galamsey_unet_simplified.pth', map_location=torch.device('cpu'))) # Load on CPU for simplicity
    model.eval()
    print("Pre-trained U-Net model loaded successfully!")
except FileNotFoundError:
    print("Error: Pre-trained U-Net model not found. Please train and save your model.")
    model = None

# Define image transformation for prediction
predict_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


# **Prediction Function**

In [None]:
def predict_mining_site(image_array, model, transform, device='cpu'):
    """
    Performs prediction on an image using the U-Net model.

    Args:
        image_array (numpy.ndarray): The input image array (e.g., RGB).
        model (torch.nn.Module): The trained U-Net model.
        transform (torch.transforms): The image transformation pipeline.
        device (str): 'cpu' or 'cuda'.

    Returns:
        numpy.ndarray or None: The binary prediction mask (1 for mining, 0 otherwise), or None if model is not loaded.
    """
    if model is None or image_array is None:
        print("Error: Model not loaded or image not available for prediction.")
        return None

    # Assuming image_array is (bands, height, width), we need (height, width, bands) for PIL
    rgb_image = np.transpose(image_array[:3, :, :], (1, 2, 0))
    pil_image = Image.fromarray(rgb_image).convert('RGB')
    input_tensor = transform(pil_image).unsqueeze(0).to(device) # Add batch dimension

    with torch.no_grad():
        output = model(input_tensor)
        prediction = torch.sigmoid(output).squeeze().cpu().numpy() > 0.5

    return prediction.astype(np.uint8)

# **Connected Components Analysis**

In [None]:
def analyze_mining_clusters(prediction_mask):
    """
    Analyzes a binary prediction mask to detect clusters of illegal mining activity.

    Args:
        prediction_mask (numpy.ndarray): A binary mask where 1 represents mining.

    Returns:
        list: A list of dictionaries, where each dictionary contains information
              about a detected mining cluster (e.g., area, bounding box, centroid).
    """
    if prediction_mask is None:
        print("Error: No prediction mask available for cluster analysis.")
        return []

    labeled_mask = label(prediction_mask, connectivity=1)
    regions = regionprops(labeled_mask)
    mining_clusters = []
    for region in regions:
        if region.area > 10: # Example threshold
            mining_clusters.append({
                'area': region.area,
                'bounding_box': region.bbox,
                'centroid': region.centroid
            })
    return mining_clusters


# **Basic Risk Propagation (Conceptual)**

In [None]:
def calculate_risk_propagation(elevation_data, mining_locations, water_sources):
    """
    A very basic conceptual implementation of risk propagation using Dijkstra's.
    This requires actual elevation data and water source locations.

    Args:
        elevation_data (numpy.ndarray): A 2D array representing elevation.
        mining_locations (list of tuples): Coordinates (row, col) of mining sites.
        water_sources (list of tuples): Coordinates (row, col) of water sources.

    Returns:
        dict: A dictionary mapping mining locations to the nearest water source path (conceptual).
    """
    if elevation_data is None or not mining_locations or not water_sources:
        print("Error: Missing data for risk propagation analysis.")
        return {}

    rows, cols = elevation_data.shape
    graph = nx.grid_2d_graph(rows, cols)

    # Define a very simple weight based on elevation (higher elevation = higher weight)
    for u, v, data in graph.edges(data=True):
        u_row, u_col = u
        v_row, v_col = v
        weight = (elevation_data[u_row, u_col] + elevation_data[v_row, v_col]) / 2 + 1
        data['weight'] = weight

    risk_paths = {}
    for mining_loc in mining_locations:
        shortest_path = None
        min_distance = float('inf')
        for water_loc in water_sources:
            try:
                path = nx.dijkstra_path(graph, source=mining_loc, target=water_loc, weight='weight')
                distance = nx.dijkstra_path_length(graph, source=mining_loc, target=water_loc, weight='weight')
                if distance < min_distance:
                    min_distance = distance
                    shortest_path = path
            except nx.NetworkXNoPath:
                pass

        if shortest_path:
            risk_paths[mining_loc] = f"Path to nearest water source for {mining_loc}" # Conceptual path

    return risk_paths

# Create dummy elevation data and water sources for demonstration
if image_array is not None:
    dummy_elevation_data = np.random.rand(image_array.shape[1], image_array.shape[2]) * 100
    dummy_water_sources = [(50, 50), (150, 200)] # Example coordinates