In [None]:
import sys
import os

# 添加 SNAP 的 snappy 模块路径
snappy_path = "E:/anaconda3/envs/snap_env/Lib/snappy"
if snappy_path not in sys.path:
    sys.path.append(snappy_path)

# 检查路径是否正确
print("SNAPPY Path:", snappy_path)

## 1. 导入模块和设置路径

In [None]:
from snappy import ProductIO, GPF
from snappy import HashMap
import os

# 设置输入和输出路径
# 定义必要的文件路径
input_file_1 = "F:/data/MSR/data/original/S1A_IW_GRDH_1SDV_20181003T112511_20181003T112536_023971_029E46_1361.zip"  # 第一时间段的GRD产品路径
input_file_2 = "F:/data/MSR/data/original/S1A_IW_GRDH_1SDV_20181015T112511_20181015T112536_024146_02A3FF_5D3E.zip"  # 第二时间段的GRD产品路径
output_dir = "F:/data/MSR/data/output_code_4"  # 输出文件夹路径
final_output_file = os.path.join(output_dir, "F:/data/MSR/data/output_code_4/landslide_velocity_map.dim")  # 最终结果路径

# 确保输出目录存在
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
print("输出目录:", output_dir)

## 2. 加载输入数据

In [None]:
# 读取输入数据
product1 = ProductIO.readProduct(input_file_1)
product2 = ProductIO.readProduct(input_file_2)

print("输入数据已加载:")
print("Product 1:", input_file_1)
print("Product 2:", input_file_2)

## 3. 应用轨道文件

In [None]:
def apply_orbit_file(product, output_name):
    params = HashMap()
    params.put("Apply-Orbit-File", True)
    result = GPF.createProduct("Apply-Orbit-File", params, product)
    output_path = os.path.join(output_dir, output_name)
    ProductIO.writeProduct(result, output_path, "BEAM-DIMAP")
    print(f"轨道文件应用完成: {output_path}")
    return result

product1_orbit = apply_orbit_file(product1, "S1A_GRDH_20181003_Orb.dim")
product2_orbit = apply_orbit_file(product2, "S1A_GRDH_20181015_Orb.dim")

## 4. 图像配准

In [None]:
def dem_assisted_coregistration_with_xcorr(master, slave, output_name):
    """
    使用 DEM-Assisted Coregistration with XCorr 进行图像配准，结合地形校正功能。
    """
    params = HashMap()
    params.put("demName", "SRTM 3Sec")  # 使用 SRTM 3Sec
    params.put("resamplingType", "BILINEAR_INTERPOLATION")  # 重采样方法
    params.put("maskOutAreaWithoutElevation", True)  # 遮蔽无DEM数据区域
    params.put("xcorr", True)  # 启用交叉相关
    result = GPF.createProduct("DEM-Assisted-Coregistration", params, [slave, master])
    output_path = os.path.join(output_dir, output_name)
    ProductIO.writeProduct(result, output_path, "BEAM-DIMAP")
    print(f"DEM辅助配准完成（使用 XCorr）: {output_path}")
    return result

coregistered_product = dem_assisted_coregistration_with_xcorr(
    product1_orbit, product2_orbit, "S1A_GRDH_Orb_Stack.dim"
)


## 5.创建子集

In [None]:
def create_subset(product, region, output_name):
    """
    创建影像的子集，确保保留所有波段和必要的元数据。
    """
    params = HashMap()
    params.put("geoRegion", region)  # 设置兴趣区域 (WKT 格式)
    params.put("copyMetadata", True)  # 确保保留所有元数据和波段
    
    try:
        # 生成子集
        result = GPF.createProduct("Subset", params, product)
        
        # 检查是否生成结果
        if result is None:
            raise RuntimeError("子集裁剪未生成结果，请检查输入数据或裁剪区域。")
        
        # 保存子集到文件
        output_path = os.path.join(output_dir, output_name)
        ProductIO.writeProduct(result, output_path, "BEAM-DIMAP")
        
        # 提示生成的文件
        print(f"子集裁剪完成，结果已保存到: {output_path}")
        
        return result
    except Exception as e:
        print("子集裁剪失败:", e)
        raise

# 定义裁剪区域的 WKT 格式
geo_region = "POLYGON ((98.696 31.112, 98.75 31.112, 98.75 31.06, 98.696 31.06, 98.696 31.112))"

# 创建子集
subset_product = create_subset(coregistered_product, geo_region, "S1A_GRDH_Orb_Stack_Subset.dim")

## 6. 偏移追踪

In [None]:
def offset_tracking(product, output_name):
    """
    偏移追踪功能，基于用户提供的 SNAP 界面参数。
    """
    params = HashMap()
    
    # 网格参数 (Output Grid)
    params.put("gridAzimuthSpacingInPixels", 14)      # 方位向网格间距 (像素)
    params.put("gridRangeSpacingInPixels", 14)        # 距离向网格间距 (像素)
    params.put("gridAzimuthSpacingInMeters", 140.0)   # 方位向网格间距 (米)
    params.put("gridRangeSpacingInMeters", 140.0)     # 距离向网格间距 (米)
    params.put("gridAzimuthDimension", 33)            # 方位向网格维度
    params.put("gridRangeDimension", 43)              # 距离向网格维度
    params.put("totalGridPoints", 1419)                # 总网格点数
    
    # 重采样方式 (Resampling Type)
    params.put("resamplingType", "BICUBIC_INTERPOLATION")  # 双三次插值
    
    # 注册参数 (Registration)
    params.put("registrationWindowWidth", '64')        # 注册窗口宽度
    params.put("registrationWindowHeight", '64')       # 注册窗口高度
    params.put("registrationOversampling", '16')        # 注册重采样倍数
    params.put("crossCorrelationThreshold", 0.1)      # 交叉相关阈值
    params.put("averageBoxSize", '5')                   # 平均窗口大小
    params.put("maxVelocity", 50.00)                     # 最大速度 (m/day)
    params.put("radiusForHoleFilling", 10)             # 填洞半径
    
    # 其他选项
    params.put("spatialAverage", True)                # 空间平均
    params.put("fillHoles", True)                     # 填充洞

    # ROI Vector Mask (可选)
    params.put("roiVectorMask", "")  # 如果不需要矢量掩膜，保持为空

    try:
        print("开始偏移追踪...")
        
        # 执行偏移追踪
        result = GPF.createProduct("Offset-Tracking", params, product)
        
        # 检查结果是否生成
        if result is None:
            raise RuntimeError("偏移追踪未生成结果，请检查输入数据或参数设置。")
        
        # 保存偏移追踪结果
        output_path = os.path.join(output_dir, output_name)
        ProductIO.writeProduct(result, output_path, "BEAM-DIMAP")
        
        # 提示结果保存路径
        print(f"偏移追踪完成，结果已保存到: {output_path}")
        
        return result
    except Exception as e:
        print("偏移追踪失败:", e)
        raise

velocity_map = offset_tracking(subset_product, "S1A_GRDH_Orb_Stack_Subset_velocity.dim")

## 7. 地形校正

In [None]:
def terrain_correction(product, output_name):
    params = HashMap()
    params.put("demName", "SRTM 3Sec")
    params.put("pixelSpacingInMeter", 10.0)
    result = GPF.createProduct("Terrain-Correction", params, product)
    output_path = os.path.join(output_dir, output_name)
    ProductIO.writeProduct(result, output_path, "BEAM-DIMAP")
    print(f"地形校正完成: {output_path}")
    return result

terrain_corrected_velocity_map = terrain_correction(velocity_map, "terrain_corrected_velocity_map.dim")

## 8. 保存最终结果

In [None]:
ProductIO.writeProduct(velocity_map, final_output_file, "BEAM-DIMAP")
print(f"滑坡形变图已生成并保存到: {final_output_file}")