In [2]:
import pickle
import numpy as np
from scipy.signal import correlate
from joblib import Parallel, delayed

In [3]:
# 定义读取数据的函数
def load_data(pickle_file):
    with open(pickle_file, 'rb') as file:
        return pickle.load(file)

# 定义计算互相关的函数
def calculate_correlation(cloud_series, precip_series, pollution_series):
    # 计算互相关
    corr_cloud_precip = correlate(cloud_series, precip_series, mode='full')
    corr_cloud_pollution = correlate(cloud_series, pollution_series, mode='full')
    corr_precip_pollution = correlate(precip_series, pollution_series, mode='full')

    # 找到最大相关系数及其索引
    max_corr_cloud_precip, idx_cloud_precip = max((val, idx) for (idx, val) in enumerate(np.abs(corr_cloud_precip)))
    max_corr_cloud_pollution, idx_cloud_pollution = max((val, idx) for (idx, val) in enumerate(np.abs(corr_cloud_pollution)))
    max_corr_precip_pollution, idx_precip_pollution = max((val, idx) for (idx, val) in enumerate(np.abs(corr_precip_pollution)))

    # 计算对应的时间延迟
    lag_cloud_precip = idx_cloud_precip - (len(cloud_series) - 1)
    lag_cloud_pollution = idx_cloud_pollution - (len(cloud_series) - 1)
    lag_precip_pollution = idx_precip_pollution - (len(precip_series) - 1)

    return max_corr_cloud_precip, lag_cloud_precip, max_corr_cloud_pollution, lag_cloud_pollution, max_corr_precip_pollution, lag_precip_pollution


# 定义处理单个像素的函数
def process_pixel(i, j, clouds, precipitation, pollution):
    cloud_series = clouds[:, i, j]
    precip_series = precipitation[:, i, j]
    pollution_series = pollution[:, i, j]
    return calculate_correlation(cloud_series, precip_series, pollution_series)


In [4]:
# 加载数据
data = load_data('data.pkl')
clouds, precipitation, pollution = data
print(clouds.shape, precipitation.shape, pollution.shape)
print('Loading data finished.')
# 获取尺寸
height, width = clouds.shape[1], clouds.shape[2]
print('height: {}, width: {}'.format(height, width))
print('Start processing...')


(48, 1118, 1372) (48, 1118, 1372) (48, 1118, 1372)
Loading data finished.
height: 1118, width: 1372
Start processing...


In [None]:

# 并行处理
results = Parallel(n_jobs=-1, verbose=10)(
    delayed(process_pixel)(i, j, clouds, precipitation, pollution) for i in range(height) for j in range(width)
)
print('Processing finished.')
# 重塑结果
max_corr_cloud_precip = np.array([result[0] for result in results]).reshape(height, width)
lag_cloud_precip = np.array([result[1] for result in results]).reshape(height, width)
max_corr_cloud_pollution = np.array([result[2] for result in results]).reshape(height, width)
lag_cloud_pollution = np.array([result[3] for result in results]).reshape(height, width)
max_corr_precip_pollution = np.array([result[4] for result in results]).reshape(height, width)
lag_precip_pollution = np.array([result[5] for result in results]).reshape(height, width)

print('Saving results...')


In [5]:
import rasterio

# 打开原始文件以获取空间信息
with rasterio.open('./clouds/monthly_mean_2019_1.tif') as src:
    transform = src.transform
    crs = src.crs


In [6]:
def save_raster(data, filename, transform, crs):
    with rasterio.open(
        filename,
        'w',
        driver='GTiff',
        height=data.shape[0],
        width=data.shape[1],
        count=1,
        dtype=data.dtype,
        crs=crs,
        transform=transform,
    ) as dst:
        dst.write(data, 1)


In [None]:

# 保存相关系数和时间延迟为 TIFF 文件
save_raster(max_corr_cloud_precip, 'max_corr_cloud_precip.tif', transform, crs)
save_raster(lag_cloud_precip, 'lag_cloud_precip.tif', transform, crs)
save_raster(max_corr_cloud_pollution, 'max_corr_cloud_pollution.tif', transform, crs)
save_raster(lag_cloud_pollution, 'lag_cloud_pollution.tif', transform, crs)
save_raster(max_corr_precip_pollution, 'max_corr_precip_pollution.tif', transform, crs)
save_raster(lag_precip_pollution, 'lag_precip_pollution.tif', transform, crs)

print('All done.')


In [3]:
# 加载数据_norm
data = load_data('data_norm.pkl')
cloud_norm, precip_norm, pollution_norm = data
print(cloud_norm.shape, precip_norm.shape, pollution_norm.shape)
print('Loading data finished.')
# 获取尺寸
height, width = cloud_norm.shape[1], cloud_norm.shape[2]
print('height: {}, width: {}'.format(height, width))
print('Start processing...')


(48, 1118, 1372) (48, 1118, 1372) (48, 1118, 1372)
Loading data finished.
height: 1118, width: 1372
Start processing...


In [4]:
# 并行处理
results = Parallel(n_jobs=-1, verbose=10)(
    delayed(process_pixel)(i, j, cloud_norm, precip_norm, pollution_norm) for i in range(height) for j in range(width)
)
print('Processing finished.')

[Parallel(n_jobs=-1)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=-1)]: Done   1 tasks      | elapsed:    3.8s
[Parallel(n_jobs=-1)]: Done   8 tasks      | elapsed:    3.9s
[Parallel(n_jobs=-1)]: Done  17 tasks      | elapsed:    3.9s
[Parallel(n_jobs=-1)]: Done  26 tasks      | elapsed:    4.0s
[Parallel(n_jobs=-1)]: Done  37 tasks      | elapsed:    4.0s
[Parallel(n_jobs=-1)]: Done  48 tasks      | elapsed:    4.1s
[Parallel(n_jobs=-1)]: Batch computation too fast (0.18578011871323719s.) Setting batch_size=2.
[Parallel(n_jobs=-1)]: Done  61 tasks      | elapsed:    4.1s
[Parallel(n_jobs=-1)]: Done  74 tasks      | elapsed:    4.1s
[Parallel(n_jobs=-1)]: Batch computation too fast (0.07973694801330566s.) Setting batch_size=4.
[Parallel(n_jobs=-1)]: Done  94 tasks      | elapsed:    4.2s
[Parallel(n_jobs=-1)]: Done 124 tasks      | elapsed:    4.2s
[Parallel(n_jobs=-1)]: Batch computation too fast (0.07071852684020996s.) Setting batch_size=8.
[Parallel(n_job

Processing finished.


[Parallel(n_jobs=-1)]: Done 1533896 out of 1533896 | elapsed:  1.1min finished


In [7]:
# 重塑结果
max_corr_cloud_precip_norm = np.array([result[0] for result in results]).reshape(height, width)
lag_cloud_precip_norm = np.array([result[1] for result in results]).reshape(height, width)
max_corr_cloud_pollution_norm = np.array([result[2] for result in results]).reshape(height, width)
lag_cloud_pollution_norm = np.array([result[3] for result in results]).reshape(height, width)
max_corr_precip_pollution_norm = np.array([result[4] for result in results]).reshape(height, width)
lag_precip_pollution_norm = np.array([result[5] for result in results]).reshape(height, width)

In [8]:
# 保存相关系数和时间延迟为 TIFF 文件
save_raster(max_corr_cloud_precip_norm, './norm/max_corr_cloud_precip_norm.tif', transform, crs)
save_raster(lag_cloud_precip_norm, './norm/lag_cloud_precip_norm.tif', transform, crs)
save_raster(max_corr_cloud_pollution_norm, './norm/max_corr_cloud_pollution_norm.tif', transform, crs)
save_raster(lag_cloud_pollution_norm, './norm/lag_cloud_pollution_norm.tif', transform, crs)
save_raster(max_corr_precip_pollution_norm, './norm/max_corr_precip_pollution_norm.tif', transform, crs)
save_raster(lag_precip_pollution_norm, './norm/lag_precip_pollution_norm.tif', transform, crs)

print('All done.')

All done.


In [17]:
import time

t1 = time.time()
process_pixel(0, 0, clouds, precipitation, pollution)
t2 = time.time()

print('Time cost: {} seconds.'.format(t2 - t1))
print((t2-t1)*height*width/60)

Time cost: 0.0011782646179199219 seconds.
30.122256406148274
