In [1]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, random_split
import rasterio
from rasterio.plot import show
from rasterio.features import rasterize
import geopandas as gpd
from shapely.geometry import Point
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (accuracy_score, f1_score, 
                            roc_curve, auc, confusion_matrix, 
                            classification_report, roc_auc_score)
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
import seaborn as sns
import time
from tqdm import tqdm
import os

In [2]:
# 设置设备（GPU或CPU）
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
if device.type == "cuda":
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"CUDA version: {torch.version.cuda}")

# 设置随机种子
torch.manual_seed(42)
np.random.seed(42)
if device.type == "cuda":
    torch.cuda.manual_seed_all(42)

Using device: cuda
GPU: NVIDIA RTX A5000
CUDA version: 12.1


In [3]:
# 1. 数据准备函数（与之前相同）
def prepare_data(env_rasters, landslide_shp, non_landslide_shp, sample_size=15000):
    """
    准备训练数据集
    :param env_rasters: 环境因子栅格路径列表
    :param landslide_shp: 滑坡点Shapefile路径
    :param non_landslide_shp: 非滑坡点Shapefile路径
    :param sample_size: 每类采样点数
    :return: 特征数组, 标签数组, 栅格元数据, 特征名称
    """
    # 读取滑坡点数据
    landslide_gdf = gpd.read_file(landslide_shp)
    non_landslide_gdf = gpd.read_file(non_landslide_shp)
    
    # 随机采样
    if len(landslide_gdf) > sample_size:
        landslide_gdf = landslide_gdf.sample(sample_size, random_state=42)
    if len(non_landslide_gdf) > sample_size:
        non_landslide_gdf = non_landslide_gdf.sample(sample_size, random_state=42)
    
    # 合并点数据并添加标签
    landslide_gdf['label'] = 1
    non_landslide_gdf['label'] = 0
    all_points = pd.concat([landslide_gdf, non_landslide_gdf])
    
    # 初始化特征数组
    features = []
    labels = []
    
    # 读取第一个栅格获取元数据
    with rasterio.open(env_rasters[0]) as src:
        meta = src.meta
        transform = src.transform
        height, width = src.shape
    
    # 提取每个点的特征值
    print("提取点特征值...")
    for idx, point in tqdm(all_points.iterrows(), total=len(all_points)):
        geom = point.geometry
        if geom is None:
            continue
            
        # 获取点坐标
        x, y = geom.x, geom.y
        
        # 转换为栅格坐标
        col, row = ~transform * (x, y)
        col, row = int(col), int(row)
        
        # 检查坐标是否在范围内
        if 0 <= row < height and 0 <= col < width:
            point_features = []
            valid = True
            
            # 读取所有栅格在该点的值
            for raster_path in env_rasters:
                with rasterio.open(raster_path) as src:
                    value = src.read(1, window=((row, row+1), (col, col+1)))
                    if np.isnan(value).any() or value[0][0] == src.nodata:
                        valid = False
                        break
                    point_features.append(value[0][0])
            
            if valid:
                features.append(point_features)
                labels.append(point['label'])
    
    features = np.array(features)
    labels = np.array(labels)
    
    # 获取特征名称
    feature_names = [os.path.basename(r).split('.')[0] for r in env_rasters]
    
    print(f"提取完成: {features.shape[0]}个样本, {features.shape[1]}个特征")
    return features, labels, meta, feature_names

In [4]:
# 2. LSTM模型定义（替代CNN）
class LandslideLSTM(nn.Module):
    def __init__(self, input_size=13, hidden_size=64, num_layers=2, dropout_rate=0.3):
        """
        LSTM模型用于滑坡敏感性评估
        
        参数:
        input_size: 每个时间步的特征数量（即环境因子数量）
        hidden_size: LSTM隐藏层大小
        num_layers: LSTM层数
        dropout_rate: Dropout概率
        """
        super(LandslideLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # LSTM层
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,  # 输入格式为(batch, seq_len, features)
            dropout=dropout_rate if num_layers > 1 else 0
        )
        
        # 全连接层
        self.fc = nn.Linear(hidden_size, 1)
        
        # Dropout层
        self.dropout = nn.Dropout(dropout_rate)
        
        # 激活函数
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        # 添加序列维度: (batch, features) -> (batch, 1, features)
        x = x.unsqueeze(1)
        
        # 初始化隐藏状态
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        # LSTM前向传播
        out, _ = self.lstm(x, (h0, c0))
        
        # 取最后一个时间步的输出
        out = out[:, -1, :]
        
        # 全连接层
        out = self.fc(self.dropout(out))
        
        return self.sigmoid(out)

# 3. 训练函数（与之前相同，但模型改为LSTM）
def train_model(model, train_loader, val_loader, criterion, optimizer, epochs=100, patience=10):
    best_val_loss = float('inf')
    best_epoch = 0
    history = {'train_loss': [], 'val_loss': [], 'val_acc': [], 'val_auc': []}
    
    # 将模型移至设备
    model = model.to(device)
    
    for epoch in range(epochs):
        start_time = time.time()
        model.train()
        train_loss = 0.0
        
        for inputs, labels in train_loader:
            # 将数据移至设备
            inputs, labels = inputs.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs.squeeze(), labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        
        # 验证阶段
        model.eval()
        val_loss = 0.0
        all_preds = []
        all_probs = []
        all_labels = []
        
        with torch.no_grad():
            for inputs, labels in val_loader:
                # 将数据移至设备
                inputs, labels = inputs.to(device), labels.to(device)
                
                outputs = model(inputs)
                loss = criterion(outputs.squeeze(), labels)
                val_loss += loss.item()
                
                probs = outputs.squeeze()
                preds = (probs > 0.5).float()
                
                all_probs.extend(probs.cpu().numpy())
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())
        
        # 计算指标
        avg_train_loss = train_loss / len(train_loader)
        avg_val_loss = val_loss / len(val_loader)
        val_acc = accuracy_score(all_labels, all_preds)
        val_auc = roc_auc_score(all_labels, all_probs)
        
        # 记录历史
        history['train_loss'].append(avg_train_loss)
        history['val_loss'].append(avg_val_loss)
        history['val_acc'].append(val_acc)
        history['val_auc'].append(val_auc)
        
        # 保存最佳模型
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            best_epoch = epoch
            torch.save(model.state_dict(), 'best_landslide_model_lstm.pth')
        
        # 早停检查OC
        if epoch - best_epoch > patience:
            print(f"Early stopping at epoch {epoch+1}")
            break
        
        epoch_time = time.time() - start_time
        print(f"Epoch {epoch+1}/{epochs} | Time: {epoch_time:.1f}s | "
              f"Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f} | "
              f"Val Acc: {val_acc:.4f} | Val AUC: {val_auc:.4f}")
    
    return history

# 4. 评估函数（与之前相同）
def evaluate_model(model, test_loader):
    # 将模型移至设备
    model = model.to(device)
    model.eval()
    
    all_preds = []
    all_probs = []
    all_labels = []
    
    with torch.no_grad():
        for inputs, labels in test_loader:
            # 将数据移至设备
            inputs, labels = inputs.to(device), labels.to(device)
            
            outputs = model(inputs)
            probs = outputs.squeeze()
            preds = (probs > 0.5).float()
            
            all_probs.extend(probs.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    # 计算指标
    accuracy = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    roc_auc = roc_auc_score(all_labels, all_probs)
    report = classification_report(all_labels, all_preds)
    
    # 计算ROC曲线
    fpr, tpr, thresholds = roc_curve(all_labels, all_probs)
    
    # 绘制ROC曲线
    plt.rcParams["font.family"] = "Times New Roman"
    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic')
    plt.legend(loc="lower right")
    plt.savefig('roc_curve_lstm.pdf',format='pdf',bbox_inches="tight")
    plt.close()
    
    # 绘制混淆矩阵
    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(6, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=['Non-Landslide', 'Landslide'],
                yticklabels=['Non-Landslide', 'Landslide'])
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title('Confusion Matrix')
    plt.savefig('confusion_matrix_lstm.png')
    plt.close()
    
    print(f"Test Accuracy: {accuracy:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"AUC: {roc_auc:.4f}")
    print("\nClassification Report:")
    print(report)
    
    return all_probs, all_preds, all_labels

# 5. 区域预测函数（与之前相同）
def predict_region(model, env_rasters, meta, scaler, output_path='landslide_susceptibility_lstm.tif'):
    """
    对整个研究区进行滑坡敏感性预测
    :param model: 训练好的模型
    :param env_rasters: 环境因子栅格路径列表
    :param meta: 栅格元数据
    :param scaler: 标准化器
    :param output_path: 输出路径
    """
    # 更新元数据
    meta.update(count=1, dtype='float32', nodata=-9999)
    
    # 获取栅格尺寸
    with rasterio.open(env_rasters[0]) as src:
        height, width = src.shape
        transform = src.transform
    
    # 创建输出栅格
    with rasterio.open(output_path, 'w', **meta) as dst:
        # 分块处理以避免内存不足
        block_size = 256
        num_blocks_x = (width + block_size - 1) // block_size
        num_blocks_y = (height + block_size - 1) // block_size
        
        print(f"开始区域预测 ({height}x{width}), 分块: {num_blocks_x}x{num_blocks_y}")
        
        # 创建进度条
        pbar = tqdm(total=num_blocks_x * num_blocks_y, desc="区域预测")
        
        # 将模型移至设备
        model = model.to(device)
        model.eval()
        
        for y_block in range(num_blocks_y):
            for x_block in range(num_blocks_x):
                # 计算当前块的位置
                x_offset = x_block * block_size
                y_offset = y_block * block_size
                win_width = min(block_size, width - x_offset)
                win_height = min(block_size, height - y_offset)
                
                # 读取所有栅格数据
                block_data = []
                for raster_path in env_rasters:
                    with rasterio.open(raster_path) as src:
                        data = src.read(1, window=((y_offset, y_offset+win_height), 
                                                  (x_offset, x_offset+win_width)))
                        block_data.append(data)
                
                # 堆叠为特征数组
                block_stack = np.stack(block_data, axis=-1)
                original_shape = block_stack.shape
                
                # 转换为2D数组 (height*width, features)
                block_2d = block_stack.reshape(-1, original_shape[-1])
                
                # 处理无效值
                valid_mask = ~np.isnan(block_2d).any(axis=1)
                valid_data = block_2d[valid_mask]
                
                # 如果没有有效数据，跳过
                if valid_data.size == 0:
                    result = np.full(valid_mask.shape, meta['nodata'], dtype=np.float32)
                    result = result.reshape(original_shape[:-1])
                    dst.write(result, 1, window=((y_offset, y_offset+win_height), 
                                                (x_offset, x_offset+win_width)))
                    pbar.update(1)
                    continue
                
                # 标准化
                valid_data = scaler.transform(valid_data)
                
                # 转换为Tensor并移至设备
                valid_tensor = torch.tensor(valid_data, dtype=torch.float32).to(device)
                
                # 预测
                with torch.no_grad():
                    outputs = model(valid_tensor)
                    probs = outputs.squeeze().cpu().numpy()
                
                # 创建结果数组
                result = np.full(valid_mask.shape, meta['nodata'], dtype=np.float32)
                result[valid_mask] = probs
                result = result.reshape(original_shape[:-1])
                
                # 写入结果
                dst.write(result, 1, window=((y_offset, y_offset+win_height), 
                                            (x_offset, x_offset+win_width)))
                
                pbar.update(1)
        
        pbar.close()
    
    print(f"区域预测完成，结果保存至: {output_path}")

# 6. 可视化函数（与之前相同）
def visualize_results(output_path):
    """可视化滑坡敏感性图"""
    with rasterio.open(output_path) as src:
        data = src.read(1)
        
        # 设置颜色映射
        colors = ['#f7fbff', '#deebf7', '#c6dbef', '#9ecae1', '#6baed6', 
                 '#4292c6', '#2171b5', '#08519c', '#08306b']
        cmap = mcolors.LinearSegmentedColormap.from_list("susceptibility", colors)
        
        # 创建图像
        plt.figure(figsize=(10, 8))
        show(data, cmap=cmap, title='Landslide Susceptibility', vmin=0, vmax=1)
        plt.colorbar(label='Susceptibility Probability')
        plt.savefig('susceptibility_map_lstm.png')
        plt.close()

In [6]:
ENV_RASTERS = [
    'Slope.tif',
    'Aspect.tif',
    'PlanCurvature.tif',
    'ProfCurvature.tif',
    'Distance_to_fault.tif',
    'Distance_to_river.tif',
    'Distance_to_road.tif',
    'NDVI.tif',
    'TWI.tif',
    'elevation.tif',
    'Landuse.tif',
    'LItgology.tif',
    'Sen`s_slope_of_rain.tif'
    ]
LANDSLIDE_SHP = '滑坡.shp'
NON_LANDSLIDE_SHP = '非滑坡.shp'

# 1. 准备数据
print("准备训练数据...")
features, labels, meta, feature_names = prepare_data(
    ENV_RASTERS, LANDSLIDE_SHP, NON_LANDSLIDE_SHP, sample_size=15000
)





准备训练数据...
提取点特征值...


100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 28980/28980 [11:35<00:00, 41.69it/s]

提取完成: 28980个样本, 13个特征





In [12]:
# 2. 数据预处理
# 划分训练/测试集
X_train, X_test, y_train, y_test = train_test_split(
    features, labels, test_size=0.3, random_state=42, stratify=labels
)

# 标准化
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# 转换为Tensor
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

# 创建数据集
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

# 划分验证集
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_dataset, val_dataset = random_split(
    train_dataset, [train_size, val_size]
)

# 创建数据加载器
batch_size = 256 if device.type == "cuda" else 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# 3. 初始化LSTM模型
input_size = len(feature_names)
model = LandslideLSTM(
    input_size=input_size,
    hidden_size=128,  # 增加隐藏层大小
    num_layers=2,     # 双层LSTM
    dropout_rate=0.4
)

# 添加多GPU支持（如果可用）
if torch.cuda.device_count() > 1:
    print(f"使用 {torch.cuda.device_count()} 个GPU进行训练")
    model = nn.DataParallel(model)

criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)

# 4. 训练模型
print("开始训练LSTM模型...")
history = train_model(
    model, train_loader, val_loader, criterion, optimizer, epochs=200, patience=15
)


开始训练LSTM模型...
Epoch 1/200 | Time: 0.2s | Train Loss: 0.5860 | Val Loss: 0.4614 | Val Acc: 0.7787 | Val AUC: 0.8579
Epoch 2/200 | Time: 0.2s | Train Loss: 0.4449 | Val Loss: 0.4153 | Val Acc: 0.7920 | Val AUC: 0.8823
Epoch 3/200 | Time: 0.2s | Train Loss: 0.4138 | Val Loss: 0.3916 | Val Acc: 0.8135 | Val AUC: 0.8952
Epoch 4/200 | Time: 0.2s | Train Loss: 0.3920 | Val Loss: 0.3760 | Val Acc: 0.8255 | Val AUC: 0.9025
Epoch 5/200 | Time: 0.2s | Train Loss: 0.3809 | Val Loss: 0.3661 | Val Acc: 0.8337 | Val AUC: 0.9074
Epoch 6/200 | Time: 0.2s | Train Loss: 0.3722 | Val Loss: 0.3572 | Val Acc: 0.8415 | Val AUC: 0.9118
Epoch 7/200 | Time: 0.3s | Train Loss: 0.3683 | Val Loss: 0.3514 | Val Acc: 0.8430 | Val AUC: 0.9153
Epoch 8/200 | Time: 0.2s | Train Loss: 0.3604 | Val Loss: 0.3458 | Val Acc: 0.8467 | Val AUC: 0.9179
Epoch 9/200 | Time: 0.2s | Train Loss: 0.3549 | Val Loss: 0.3450 | Val Acc: 0.8521 | Val AUC: 0.9204
Epoch 10/200 | Time: 0.2s | Train Loss: 0.3518 | Val Loss: 0.3384 | Val Acc: 

In [13]:
# 5. 评估模型
print("\n评估模型...")
# 加载最佳模型（注意设备映射）
if torch.cuda.device_count() > 1:
    model.module.load_state_dict(torch.load('best_landslide_model_lstm.pth', map_location=device))
else:
    model.load_state_dict(torch.load('best_landslide_model_lstm.pth', map_location=device))

probs, preds, test_labels = evaluate_model(model, test_loader)

# 保存测试结果
test_results = pd.DataFrame({
    'Probability': probs,
    'Prediction': preds,
    'Actual_Label': test_labels
})
test_results['Susceptibility'] = test_results['Probability'].apply(
    lambda p: 'High' if p > 0.5 else 'Low'
)
test_results.to_csv('landslide_test_results_lstm.csv', index=False)

# 6. 对整个研究区进行预测
print("\n对整个研究区进行预测...")
# 确保使用单GPU进行区域预测
if torch.cuda.device_count() > 1:
    predict_model = model.module
else:
    predict_model = model

predict_region(predict_model, ENV_RASTERS, meta, scaler, 
              output_path='landslide_susceptibility_lstm.tif')



评估模型...


  model.load_state_dict(torch.load('best_landslide_model_lstm.pth', map_location=device))


Test Accuracy: 0.9218
F1 Score: 0.9240
AUC: 0.9716

Classification Report:
              precision    recall  f1-score   support

         0.0       0.95      0.89      0.92      4347
         1.0       0.90      0.95      0.92      4347

    accuracy                           0.92      8694
   macro avg       0.92      0.92      0.92      8694
weighted avg       0.92      0.92      0.92      8694


对整个研究区进行预测...
开始区域预测 (1937x1521), 分块: 6x8


区域预测: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 48/48 [00:07<00:00,  6.81it/s]

区域预测完成，结果保存至: landslide_susceptibility_lstm.tif





In [None]:
# 7. 可视化结果
print("\n可视化结果...")
# visualize_results('landslide_susceptibility_lstm.tif')

# 8. 绘制训练历史
plt.rcParams["font.family"] = "Times New Roman"
plt.figure(figsize=(12, 10))

plt.subplot(2, 2, 1)
plt.plot(history['train_loss'], label='Train Loss')
plt.plot(history['val_loss'], label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

plt.subplot(2, 2, 2)
plt.plot(history['val_acc'], label='Validation Accuracy', color='green')
plt.title('Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(2, 2, 3)
plt.plot(history['val_auc'], label='Validation AUC', color='purple')
plt.title('Validation AUC')
plt.xlabel('Epochs')
plt.ylabel('AUC')
plt.legend()

plt.tight_layout()
plt.show()
plt.savefig('training_history_lstm.png')
plt.close()

# 9. 清理GPU内存
if device.type == "cuda":
    torch.cuda.empty_cache()
    print("已清理GPU内存")