In [None]:


import numpy as np
import rasterio
import scipy.io as sio
from rasterio.enums import Resampling
import os
from pruning_py import pruning
from SVI_py import svi
np.random.seed(42)
# 初始化
import time

In [19]:

# 导入地理数据
location = '/Users/lixintong/Desktop/大三上/大数据并行计算/并行大作业/第三次作业/Bayesian_Causal-main/data'  # 文件所在的位置

event = '2023_turkey_new'
# event = '2024_japan2'
if event == '2023_turkey_new':
    event_1 = '2023_turkey'
else:
    event_1 = event

def read_raster(file_path):
    """
    读取栅格数据文件，并返回数据和其元数据配置
    参数:
        file_path: 栅格文件路径
    返回:
        data: 栅格数据
        profile: 栅格文件的元数据配置
    """
    with rasterio.open(file_path) as src:
        data = src.read(1, resampling=Resampling.nearest)  # 读取数据，并使用最近邻重采样
        profile = src.profile  # 获取文件元数据
    return data, profile

# 读取多个栅格文件
Y, Y_profile = read_raster(os.path.join(location, event, 'damage_proxy_map', f'{event_1}_damage_proxy_map.tif'))
BD, BD_profile = read_raster(os.path.join(location, event, 'building_footprint', f'{event_1}_building_footprint_rasterized.tif'))
LS, LS_profile = read_raster(os.path.join(location, event, 'prior_models', f'{event_1}_prior_landslide_model.tif'))
LF, LF_profile = read_raster(os.path.join(location, event, 'prior_models', f'{event_1}_prior_liquefaction_model.tif'))


In [20]:
# 数据修正
BD[BD > 0] = 1  # 将基础数据 BD 中所有大于 0 的值设为 1
Y[np.isnan(Y)] = 0  # 将 Y 数据中的 NaN 值设为 0
BD[np.isnan(BD)] = 0  # 将 BD 数据中的 NaN 值设为 0
LS[np.isnan(LS)] = 0  # 将滑坡数据 LS 中的 NaN 值设为 0
LF[np.isnan(LF)] = 0  # 将液化数据 LF 中的 NaN 值设为 0
Y = (Y + 11) / 20

### 原始处理方式

In [21]:

start_time = time.time()  # 开始计时，记录代码执行时间

# 将滑坡区域百分比转换为概率
new_LS = np.copy(LS)  # 创建滑坡数据的副本
index = np.where(LS > 0)  # 找到 LS 中大于 0 的元素索引
for i in range(len(index[0])):
    idx = (index[0][i], index[1][i])  # 获取当前元素的索引
    p = [4.035, -3.042, 5.237, (-7.592 - np.log(LS[idx]))]  # 构建多项式方程
    tmp_root = np.roots(p)  # 求解方程的根
    real_roots = tmp_root[np.iscomplex(tmp_root) == False]  # 筛选出实数根
    new_LS[idx] = np.real(real_roots)  # 将实数根存储到 new_LS，不取最大值
print('Converted Landslide Areal Percentages to Probabilities')  # 输出转换完成的信息
print("Elapsed time: {:.6f} seconds".format(time.time() - start_time))  # 输出当前已耗时间

# 将液化区域百分比转换为概率
new_LF = np.copy(LF)  # 创建液化数据的副本
index = np.where(LF > 0)  # 找到 LF 中大于 0 的元素索引
for i in range(len(index[0])):
    idx = (index[0][i], index[1][i])  # 获取当前元素的索引
    new_LF[idx] = (np.log((np.sqrt(0.4915 / LF[idx]) - 1) / 42.40)) / (-9.165)  # 根据给定公式计算概率
    # new_LF[i] = (np.log((np.sqrt(0.4915 / LF[i]) - 1) / 42.40)) / (-9.165)
print('Converted Liquefaction Areal Percentages to Probabilities')  # 输出转换完成的信息
print("Elapsed time: {:.6f} seconds".format(time.time() - start_time))  # 输出当前已耗时间

# 将概率值转换为非负数
new_LF[new_LF < 0] = 0  # 将 new_LF 中小于 0 的值设为 0
new_LS[new_LS < 0] = 0  # 将 new_LS 中小于 0 的值设为 0
new_LS[np.isnan(new_LS)] = 0  # 将 new_LS 中的 NaN 值设为 0
new_LF[np.isnan(new_LF)] = 0  # 将 new_LF 中的 NaN 值设为 0
tmp_LF = new_LF  # 临时存储液化数据
tmp_LS = new_LS  # 临时存储滑坡数据

  new_LS[idx] = np.real(real_roots)  # 将实数根存储到 new_LS，不取最大值


Converted Landslide Areal Percentages to Probabilities
Elapsed time: 445.772633 seconds
Converted Liquefaction Areal Percentages to Probabilities
Elapsed time: 449.777730 seconds


输入的 LS 是一个 4816×5323 的栅格数据，总像元数约 $2.56 \times 10^7$。其中即使只有一部分像元 >0，仍然会有几百万级别的像元需要求解多项式。

- np.roots 逐像元调用
  - 即使用了一定的“批量方式”（先批量构造系数，再用列表推导 [np.roots(poly) for poly in p]），本质上仍是对“每一个像元”都单独调用一次三次方程求解。
  - 三次方程求解需要做多步复杂的浮点计算，量级大时（几百万次）非常耗时。
  - Python 的循环或列表推导在面对上千万次的调用时开销相当大。
- 后续逐像元循环赋值\
  - for i, (x, y) in enumerate(zip(*np.where(index))): ...
  - 这里又是一个 Python 级的循环，enumerate 一下子就会迭代出上百万乃至几百万次。
  - 在循环中做数组索引与赋值操作，频繁地在 Python 和 NumPy 之间转换，也会带来大量开销。
- NumPy 机制限制
- NumPy 最擅长的是大批量向量化运算（一次性对大数组进行简单算术或逻辑操作）。然而 np.roots 本身并不是真正的“矢量化”函数；它只是可以对 1D 多项式系数数组执行运算。批量执行三次方程求根时，实际还是对每个多项式逐一做同样的运算。

### NumPy 向量化

In [22]:
start_time = time.time()  # 开始计时

# 滑坡区域百分比转换为概率（向量化实现）
def landslide_conversion(LS):
    """
    滑坡概率转换函数，使用向量化处理
    """
    index = LS > 0  # 找到 LS > 0 的位置
    log_LS = np.log(LS[index])  # 取对数
    n = len(log_LS)  # 有效点的数量

    # 构造多项式系数，每行表示一个多项式
    p = np.column_stack((
        np.full(n, 4.035),       # 多项式最高次项
        np.full(n, -3.042),      # 二次项系数
        np.full(n, 5.237),       # 一次项系数
        -7.592 - log_LS          # 常数项
    ))

    # 批量求解多项式的根
    roots = np.array([np.roots(poly) for poly in p])  # 每一行解一个多项式
    real_roots = np.real(roots)  # 提取实数部分的根

    # 只取第一个实根
    result = np.zeros_like(LS)
    for i, (x, y) in enumerate(zip(*np.where(index))):
        result[x, y] = real_roots[i][np.isfinite(real_roots[i])].min(initial=0)  # 防止无根情况

    return result

# 液化区域百分比转换为概率（向量化实现）
def liquefaction_conversion(LF):
    index = LF > 0
    result = np.zeros_like(LF)
    result[index] = (np.log((np.sqrt(0.4915 / LF[index]) - 1) / 42.40)) / (-9.165)
    result[result < 0] = 0  # 去掉负值
    result[~np.isfinite(result)] = 0  # 排除无穷大和 NaN
    return result

# 滑坡概率转换
start_time = time.time()
new_LS = landslide_conversion(LS)
print("Converted Landslide Areal Percentages to Probabilities")
print(f"Elapsed time for Landslide: {time.time() - start_time:.6f} seconds")

# 液化概率转换
start_time = time.time()
new_LF = liquefaction_conversion(LF)
print("Converted Liquefaction Areal Percentages to Probabilities")
print(f"Elapsed time for Liquefaction: {time.time() - start_time:.6f} seconds")

# 最终清理结果
new_LF[new_LF < 0] = 0
new_LS[new_LS < 0] = 0
new_LS[np.isnan(new_LS)] = 0
new_LF[np.isnan(new_LF)] = 0

tmp_LF = new_LF
tmp_LS = new_LS
print(f"Total elapsed time: {time.time() - start_time:.6f} seconds")

Converted Landslide Areal Percentages to Probabilities
Elapsed time for Landslide: 424.402355 seconds
Converted Liquefaction Areal Percentages to Probabilities
Elapsed time for Liquefaction: 0.140815 seconds
Total elapsed time: 0.195449 seconds


### multiprocessing 多进程并行

In [23]:
import numpy as np
import time
from multiprocessing import Pool, cpu_count

def landslide_conversion_chunk(chunk_data):
    """
    子进程执行的函数:
    - 输入 chunk_data: (LS_sub, top_left_x, top_left_y)
        其中 LS_sub 是原始数组的一部分
    - 返回和 LS_sub 同维度的结果局部矩阵
    """
    LS_sub, offset_x, offset_y = chunk_data
    
    # --- 这里可以直接复用原始的 landslide_conversion 逻辑 ---
    # 但要改成仅对 LS_sub 进行处理
    index = LS_sub > 0
    log_LS = np.log(LS_sub[index])
    n = len(log_LS)

    # 构造多项式系数
    p = np.column_stack((
        np.full(n, 4.035),
        np.full(n, -3.042),
        np.full(n, 5.237),
        -7.592 - log_LS
    ))

    # 批量求根
    roots = np.array([np.roots(poly) for poly in p])
    real_roots = np.real(roots)

    # 写回结果
    sub_result = np.zeros_like(LS_sub, dtype=float)
    coords = np.argwhere(index)  # 在子块坐标系下
    for i, (x, y) in enumerate(coords):
        valid_vals = real_roots[i][np.isfinite(real_roots[i])]
        if valid_vals.size > 0:
            sub_result[x, y] = valid_vals.min()
        else:
            sub_result[x, y] = 0.0

    return (sub_result, offset_x, offset_y)


def parallel_landslide_conversion(LS, num_workers=None, chunks=4):
    """
    使用多进程并行对 LS 做 landslide_conversion
    - num_workers: 并行进程数(默认是 cpu_count())
    - chunks: 在维度上分块的数量(示例简化为对行方向分块)
    """
    if num_workers is None:
        num_workers = cpu_count()
    
    # 1) 将 LS 在行方向做拆分
    rows = LS.shape[0]
    chunk_size = rows // chunks
    chunked_data = []
    for i in range(chunks):
        start = i * chunk_size
        # 最后一块可能包含多余行
        end = rows if (i == chunks - 1) else ((i+1) * chunk_size)
        LS_sub = LS[start:end, :]
        # 把子块和它在原图中的起始偏移量一起传递
        chunked_data.append((LS_sub, start, 0))

    # 2) 建立进程池并行执行
    with Pool(processes=num_workers) as p:
        results = p.map(landslide_conversion_chunk, chunked_data)

    # 3) 合并各子块结果
    final_result = np.zeros_like(LS, dtype=float)
    for sub_result, offset_x, offset_y in results:
        h, w = sub_result.shape
        final_result[offset_x:offset_x+h, offset_y:offset_y+w] = sub_result

    return final_result


def liquefaction_conversion(LF):
    """
    液化概率转换函数（保持不变）
    """
    index = LF > 0
    result = np.zeros_like(LF, dtype=float)
    result[index] = (
        np.log( (np.sqrt(0.4915 / LF[index]) - 1) / 42.40 )
        / (-9.165)
    )
    # 去掉负值、无穷大和 NaN
    result[result < 0] = 0
    result[~np.isfinite(result)] = 0
    return result

# ------------------------------#
#      主程序: 使用并行加速
# ------------------------------#
if __name__ == "__main__":
    # 假设 LS, LF 是已加载的 np.array
    # LS, LF = ...

    start_time = time.time()
    new_LS = parallel_landslide_conversion(LS, num_workers=4, chunks=4)
    print("Converted Landslide Areal Percentages to Probabilities (Parallel)")
    print(f"Elapsed time for Landslide: {time.time() - start_time:.6f} seconds")

    start_time = time.time()
    new_LF = liquefaction_conversion(LF)
    print("Converted Liquefaction Areal Percentages to Probabilities")
    print(f"Elapsed time for Liquefaction: {time.time() - start_time:.6f} seconds")

    # 清理结果
    new_LF[new_LF < 0] = 0
    new_LS[new_LS < 0] = 0
    new_LS[np.isnan(new_LS)] = 0
    new_LF[np.isnan(new_LF)] = 0

    print(f"Total elapsed time: {time.time() - start_time:.6f} seconds")

Process SpawnPoolWorker-52:
Traceback (most recent call last):
  File "/Users/lixintong/opt/anaconda3/envs/three_env/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Users/lixintong/opt/anaconda3/envs/three_env/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/lixintong/opt/anaconda3/envs/three_env/lib/python3.9/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/Users/lixintong/opt/anaconda3/envs/three_env/lib/python3.9/multiprocessing/queues.py", line 367, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'landslide_conversion_chunk' on <module '__main__' (built-in)>
Process SpawnPoolWorker-50:
Traceback (most recent call last):
  File "/Users/lixintong/opt/anaconda3/envs/three_env/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Users/lixintong/opt/anaconda3/envs/three_env/lib/pyt

KeyboardInterrupt: 