In [None]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


1. Data path and dataset organization module

In [None]:
import os
import glob

BASE_DIR = "/content/drive/MyDrive/sentinel_lewsData/shimla"

# Soil types
soil_types = ["clay", "sand", "silt", "bulk"]
soil_files = {soil: [] for soil in soil_types}

# Scan soil folder for all depth layers(0-5, 5-15, 15-30, 30-60, 60-100)
for soil in soil_types:
    pattern = os.path.join(BASE_DIR, f"soil/shimla_{soil}*.tif")
    files = sorted(glob.glob(pattern))
    soil_files[soil] = files

# Paths dictionary
PATHS = {
    "boundary": os.path.join(BASE_DIR, "boundary/shimla_boundary.geojson"),
    "dem": os.path.join(BASE_DIR, "dem/dem.tif"),
    "lulc": os.path.join(BASE_DIR, "lulc/lulc.tif"),
    "soil": soil_files,
    "rain_excel": os.path.join(BASE_DIR, "rainfall/shimla_rain.xlsx"),
    "output": os.path.join(BASE_DIR, "outputs")
}

output_folder = os.path.join(BASE_DIR, "outputs")
if not os.path.exists(output_folder):
    os.mkdir(output_folder)

2. Creates a **10 m × 10 m spatial analysis** grid over the Shimla district boundary and prepares it **for cell-wise landslide risk modeling**.

In [None]:
import geopandas as gpd
import numpy as np
from shapely.geometry import box

# Load boundary
boundary = gpd.read_file(PATHS["boundary"])

# Reproject to metric CRS (Coordinate Reference System)
boundary = boundary.to_crs(epsg=32643)

# Grid resolution (meters)
GRID_SIZE = 10  # 100m × 100m

# Get bounds
minx, miny, maxx, maxy = boundary.total_bounds

grid_cells = []
grid_id = 0

x_coords = np.arange(minx, maxx, GRID_SIZE)
y_coords = np.arange(miny, maxy, GRID_SIZE)

for x in x_coords:
    for y in y_coords:
        cell = box(x, y, x + GRID_SIZE, y + GRID_SIZE)
        if boundary.intersects(cell).any():
            grid_cells.append({
                "grid_id": grid_id,
                "geometry": cell
            })
            grid_id += 1

grid = gpd.GeoDataFrame(grid_cells, crs=boundary.crs)

# Storing lat/lon for output
grid_latlon = grid.to_crs(epsg=4326)
grid["lat"] = grid_latlon.geometry.centroid.y
grid["lon"] = grid_latlon.geometry.centroid.x

print(f"Total grid cells created: {len(grid)}")


3. Preparing the DEM so that slope and elevation can be derived correctly

In [None]:
import rasterio
from rasterio.warp import calculate_default_transform, reproject, Resampling
import os

src_path = PATHS["dem"]
dst_path = src_path.replace(".tif", "_utm43.tif")

with rasterio.open(src_path) as src:
    transform, width, height = calculate_default_transform(
        src.crs,
        "EPSG:32643",
        src.width,
        src.height,
        *src.bounds
    )

    kwargs = src.meta.copy()
    kwargs.update({
        "crs": "EPSG:32643",
        "transform": transform,
        "width": width,
        "height": height
    })

    with rasterio.open(dst_path, "w", **kwargs) as dst:
        for i in range(1, src.count + 1):
            reproject(
                source=rasterio.band(src, i),
                destination=rasterio.band(dst, i),
                src_transform=src.transform,
                src_crs=src.crs,
                dst_transform=transform,
                dst_crs="EPSG:32643",
                resampling=Resampling.bilinear
            )

print("Reprojected DEM saved as:", dst_path)
PATHS["dem"] = PATHS["dem"].replace(".tif", "_utm43.tif")

4. This script derives terrain parameters (elevation and slope) from the reprojected DEM and assigns them to each 100 m × 100 m grid cell using the grid cell centroid.

In [None]:
import rasterio
import numpy as np

with rasterio.open(PATHS["dem"]) as src:
    dem = src.read(1).astype("float32")
    transform = src.transform
    dem_crs = src.crs
    res_x = transform.a
    res_y = -transform.e

# Pad DEM to avoid edge issues
z = np.pad(dem, 1, mode="edge")

dzdx = (
    (z[1:-1, 2:] + 2*z[2:, 2:] + z[2:, 1:-1]) -
    (z[1:-1, :-2] + 2*z[:-2, :-2] + z[:-2, 1:-1])
) / (8 * res_x)

dzdy = (
    (z[2:, 1:-1] + 2*z[2:, :-2] + z[1:-1, :-2]) -
    (z[:-2, 1:-1] + 2*z[:-2, 2:] + z[1:-1, 2:])
) / (8 * res_y)

slope_rad = np.arctan(np.sqrt(dzdx**2 + dzdy**2))
slope_deg = np.degrees(slope_rad)

grid_dem = grid.to_crs(dem_crs)

from rasterio.transform import rowcol

elevations = []
slopes = []

# Assign slope and elevation to the centroid
for geom in grid_dem.geometry:
    x, y = geom.centroid.x, geom.centroid.y
    row, col = rowcol(transform, x, y)

    if 0 <= row < dem.shape[0] and 0 <= col < dem.shape[1]:
        elevations.append(dem[row, col])
        slopes.append(slope_deg[row, col])
    else:
        elevations.append(np.nan)
        slopes.append(np.nan)

grid["elevation"] = elevations
grid["slope"] = slopes

# Sanity Check
grid[["elevation", "slope"]].describe()

# Saving progress
grid.drop(columns="geometry").to_csv(
    os.path.join(PATHS["output"], "shimla_grid_with_dem.csv"),
    index=False
)

print("DEM + slope added and saved")


5. This script extracts soil properties from multi-depth raster datasets, computes a depth-weighted average for each soil parameter, and assigns the resulting values to each 100 m grid cell.

In [None]:
def sample_raster_mean(raster_path, grid_gdf):
    with rasterio.open(raster_path) as src:
        raster_crs = src.crs
        grid_proj = grid_gdf.to_crs(raster_crs)

        coords = [(geom.x, geom.y) for geom in grid_proj.geometry.centroid]

        values = np.array(list(src.sample(coords)), dtype="float32").flatten()

        nodata = src.nodata
        if nodata is not None:
            values[values == nodata] = np.nan

        return values

soil_depth_weights = {
    "0-5": 0.05,
    "5-15": 0.10,
    "15-30": 0.15,
    "30-60": 0.30,
    "60-100": 0.40
}

for soil_type, files in PATHS["soil"].items():
    print(f"Processing {soil_type}...")

    weighted_sum = np.zeros(len(grid), dtype="float32")

    for f in files:
        for depth in soil_depth_weights:
            if depth in f:
                weight = soil_depth_weights[depth]
                break

        values = sample_raster_mean(f, grid)
        weighted_sum += values * weight

    grid[soil_type] = weighted_sum

# Convert g/kg → %
grid["clay"] = grid["clay"] / 10
grid["sand"] = grid["sand"] / 10
grid["silt"] = grid["silt"] / 10

# Sanity check
grid[["clay", "sand", "silt", "bulk"]].describe()



6. This script extracts Land Use / Land Cover (LULC) information from a raster map and assigns a LULC class label to each 100 m grid cell based on the grid cell’s centroid location.

In [None]:
import rasterio
import numpy as np
import geopandas as gpd
from rasterio.features import geometry_mask
from rasterio.sample import sample_gen

lulc_path = PATHS["lulc"]

with rasterio.open(lulc_path) as src:
    lulc = src.read(1)
    lulc_transform = src.transform
    lulc_crs = src.crs
    lulc_nodata = src.nodata

print(lulc_crs, lulc_transform)

lulc_src = rasterio.open(PATHS["lulc"])

def sample_lulc(lon, lat):
    """
    Sample LULC value at given lon, lat
    """
    val = list(
        lulc_src.sample([(lon, lat)])
    )[0][0]

    # Handle nodata
    if val == lulc_src.nodata:
        return None
    return int(val)

grid["lulc"] = grid.apply(
    lambda row: sample_lulc(row["lon"], row["lat"]),
    axis=1
)

# Features
grid["lulc"].value_counts().sort_index()


7. This script processes rainfall time-series data, computes antecedent rainfall indicators, and assigns them to every 100 m grid cell as triggering factors for landslide prediction.

In [None]:
import pandas as pd
rain = pd.read_excel(PATHS["rain_excel"])

rain = rain.rename(columns={
    "Data Time": "date",
    "Data Value": "rain_mm"
})

rain["date"] = pd.to_datetime(rain["date"])

# Time Series
rain = rain.sort_values("date").reset_index(drop=True)

rain = rain[["date", "rain_mm"]]

# Rolling rainfall
rain["R_7d"] = rain["rain_mm"].rolling(7).sum().shift(1)
rain["R_30d"] = rain["rain_mm"].rolling(30).sum().shift(1)

latest = rain.dropna().iloc[-1]

R_7d = latest["R_7d"]
R_30d = latest["R_30d"]

grid["R_7d"] = R_7d
grid["R_30d"] = R_30d

out_file = os.path.join(PATHS["output"], "shimla_model_grid.csv")

grid.drop(columns="geometry").to_csv(
    out_file,
    index=False
)




8. Dataset Cleaning

In [None]:
import pandas as pd
import numpy as np
from scipy import ndimage
from scipy.stats import mode

grid_file = os.path.join(PATHS["output"], "shimla_model_grid.parquet")
grid = pd.read_parquet(grid_file)

# Step 1: Fill elevation & slope NaNs using nearest neighbor
for col in ["elevation", "slope"]:
    data = grid[col].values
    mask = np.isnan(data)

    if mask.any():
        indices = np.arange(len(data))
        valid_idx = indices[~mask]
        valid_values = data[~mask]

        # Fill NaNs using nearest valid value
        filled_values = np.interp(indices, valid_idx, valid_values)
        grid[col] = filled_values.astype(data.dtype)

# Step 2: Fill LULC NaNs with mode of neighboring cells
if grid["lulc"].isna().any():
    grid["lulc"] = grid["lulc"].fillna(method="ffill").fillna(method="bfill")

# Step 3: Save the cleaned grid
clean_file = os.path.join(PATHS["output"], "shimla_model_grid_clean.csv")
grid.to_csv(clean_file)

# Optional: check remaining NaNs
print(grid.isna().sum())


9. This script extracts monthly satellite rainfall (IMERG) values for every 100 m grid cell in Shimla and builds a time-series table.

In [None]:
import os
import rasterio
import numpy as np
import pandas as pd
from tqdm import tqdm

PATHS["rainfall_folder"] = os.path.join(BASE_DIR, "rainfall/imerg_monthly")
grid_file = os.path.join(PATHS["output"], "shimla_model_grid_clean.csv")
grid = pd.read_csv(grid_file)

coords = list(zip(grid["lon"].values, grid["lat"].values))
grid_ids = grid["grid_id"].values

rain_output = os.path.join(PATHS["output"], "shimla_grid_imerg_monthly.csv")

lons = grid["lon"].values
lats = grid["lat"].values
grid_ids = grid["grid_id"].values

monthly_rain = pd.DataFrame(index=grid_ids)

imerg_folder = os.path.join(BASE_DIR, "rainfall/imerg_monthly")
imerg_files = sorted([f for f in os.listdir(imerg_folder) if f.endswith(".tif")])

def parse_date(filename):
    base = os.path.basename(filename).replace(".tif", "")
    parts = base.split("_")
    year = int(parts[2])
    month = int(parts[3])
    return pd.Timestamp(year=year, month=month, day=1)

dates = [parse_date(f) for f in imerg_files]

for f, date in tqdm(zip(imerg_files, dates), total=len(imerg_files), desc="Processing Imerg"):
    path = os.path.join(imerg_folder, f)

    with rasterio.open(path) as src:
        data = src.read(1)
        transform = src.transform
        col_inds = ((lons - transform.c) / transform.a).astype(int)
        row_inds = ((lats - transform.f) / transform.e).astype(int)

        row_inds = np.clip(row_inds, 0, src.height - 1)
        col_inds = np.clip(col_inds, 0, src.width - 1)

        vals = data[row_inds, col_inds]
        vals = np.nan_to_num(vals, 0.0)

        monthly_rain[date] = vals

monthly_rain.to_csv(rain_output)
print("Saved monthly rainfall for all grid cells to:", rain_output)


10. Creating final dataset

In [None]:
import pandas as pd
import os

# Paths
main_grid_file = os.path.join(PATHS["output"], "shimla_model_grid_clean.parquet")
rain_file = os.path.join(PATHS["output"], "shimla_grid_imerg_monthly.csv")
final_output_csv = os.path.join(PATHS["output"], "shimla_final_grid.csv")

# Load files
grid = pd.read_parquet(main_grid_file)
rain = pd.read_csv(rain_file, index_col=0)

# Merge on grid_id (index)
grid = grid.set_index("grid_id")
final_grid = grid.join(rain, how="left")

# Optional: fill any NaNs in rainfall with 0
final_grid[rain.columns] = final_grid[rain.columns].fillna(0)

# Save final dataset as CSV
final_grid.to_csv(final_output_csv)
print("Final grid saved to:", final_output_csv)
