In [1]:
# type: ignore
import os
import pandas as pd
import arcpy
from arcpy import env
from arcpy.management import *
from arcpy.sa import *
from arcpy.da import *
from arcpy.conversion import *
from arcpy.analysis import *
import json
from datetime import datetime
import numpy as np

In [2]:
# 向shp添加字段
def add_field(input_table,field_name,field_type='TEXT'):
    """参数说明：
        input_table: 输入数据表
        field_name: 字段名
        field_type: 字段类型"""
    arcpy.AddField_management(input_table,field_name,field_type)
# 计算字段dldm
def calc_field_dldm(shp_path, field_name, refer_field_name):
    # 创建代码块字符串，包含获取唯一编号的函数
    code_block = f"""
_counter = {{}}
_next_id = 1
def get_value(key):
    global _counter, _next_id
    key = str(key)
    if key not in _counter:
        _counter[key] = str(_next_id)
        _next_id += 1
    return _counter[key]
"""
    # 创建表达式，使用代码块中定义的函数
    expression = f"get_value(!{refer_field_name}!)"
    
    # 执行字段计算
    arcpy.CalculateField_management(
        in_table=shp_path,
        field=field_name,
        expression=expression,
        expression_type="PYTHON3",
        code_block=code_block
    )
# 计算字段dllb
def calc_field_dllb(shp_path, field_name, dlbm_field):
    code_block = """
def get_first_level(dlbm):
    if not dlbm:
        return ''
    
    # 特殊处理水稻土
    if str(dlbm).strip() == '0101':
        return '水稻土'
    
    # 获取前两位编码
    dlbm = str(dlbm).strip()[:2]
    
    # 一级地类字典（使用前两位编码）
    sddl_dict = {
        "耕地": ["01"],
        "园地": ["02"],
        "林地": ["03"],
        "草地": ["04"],
        "商服用地": ["05"],
        "工矿仓储用地": ["06"],
        "住宅用地": ["07"],
        "公共管理与公共服务用地": ["08"],
        "特殊用地": ["09"],
        "交通运输用地": ["10"],
        "水域及水利设施用地": ["11"],
        "其他土地": ["12"]
    }
    
    # 更高级别分类字典
    high_level_dict = {
        "非自然土": ["耕地", "园地", "其他土地"],
        "自然土": ["林地", "草地"],
        "建设用地": ["商服用地", "工矿仓储用地", "住宅用地", "公共管理与公共服务用地", 
                  "特殊用地", "交通运输用地", "水域及水利设施用地"]
    }
    
    # 先获取一级地类
    first_level = '未分类'
    for level, codes in sddl_dict.items():
        if dlbm in codes:
            first_level = level
            break
    
    # 再获取更高级别分类
    for high_level, categories in high_level_dict.items():
        if first_level in categories:
            return high_level
    return '未分类'
"""
    # 创建表达式，使用代码块中定义的函数
    expression = f"get_first_level(!{dlbm_field}!)"
    
    # 执行字段计算
    arcpy.CalculateField_management(
        in_table=shp_path,
        field=field_name,
        expression=expression,
        expression_type="PYTHON3",
        code_block=code_block
    )
    
# 创建缓冲区
def create_buffer_zone(input_shp, output_shp, buffer_distance=1000):
    """
    创建输入shp的缓冲区并融合为一个面
    
    参数:
    input_shp: 输入的shp文件路径
    output_shp: 输出的shp文件路径
    buffer_distance: 缓冲区距离，默认1500米
    """
    try:
        # 创建缓冲区并直接融合
        arcpy.analysis.PairwiseBuffer(
            in_features=input_shp,
            out_feature_class=output_shp,
            buffer_distance_or_field=f"{buffer_distance} Meters",
            dissolve_option="ALL",  # 融合所有缓冲区
            dissolve_field=None,
            method="PLANAR",
            max_deviation="0 Meters"
        )
        return output_shp
    except arcpy.ExecuteError:
        print(arcpy.GetMessages(2))
        raise
    except Exception as e:
        print(f"发生错误: {str(e)}")
        raise

In [3]:
# shp文件存放路径
base_shp_path = r'F:\cache_data\shp_file'

In [4]:
# 创建文件夹
qx_name = 'ky'
new_sd_shp_name = qx_name + '_sd_polygon.shp'
p_buffer_name = 'extent_p_1000.shp'
p_raster_buffer_name = 'extent_p_1500.shp'
d_buffer_name = 'extent_d_1500.shp'
d_proj_project = '4326' # 地理坐标系
proj_project = '4545' # 投影坐标系
dldm_name = 'DLDM'  # 用于存储唯一地类代码便于转换为栅格数据
dllb_name = 'DLLB'  # 用于存储地类类别用于区分用地类型{自然土、建设用地、非自然土}
dlmc_name = 'DLMC'  # 数据中的地类名称
dlbm_name = 'DLBM'  # 数据中的地类编码
os.makedirs(os.path.join(base_shp_path,qx_name),exist_ok=True)

In [5]:
# 三调数据处理
sd_file_path = r'D:\worker\工作\work\三普\数据\清镇\贵阳\更新后基础数据库(最终成果)\520121开阳县.gdb\Dataset\DLTB'

In [8]:
# 得到面积变换系数字典
area_df = pd.DataFrame(arcpy.da.FeatureClassToNumPyArray(sd_file_path,['BSM','TBDLMJ','Shape_Area']))
# 得到变化系数字典
map_dict = dict(zip(area_df['BSM'], area_df['TBDLMJ'] / area_df['Shape_Area']))
# 写入json,utf-8
json_file_path = os.path.join(base_shp_path,qx_name,'json_file')
os.makedirs(json_file_path,exist_ok=True)
with open(os.path.join(json_file_path,'area_index_dict.json'),'w',encoding='utf-8') as f:
    json.dump(map_dict,f)

In [9]:
# 按指定投影导出
arcpy.Project_management(sd_file_path,os.path.join(base_shp_path,qx_name,new_sd_shp_name),proj_project)

In [10]:
# 添加字段
add_field(os.path.join(base_shp_path,qx_name,new_sd_shp_name),dldm_name)
add_field(os.path.join(base_shp_path,qx_name,new_sd_shp_name),dllb_name)

In [11]:
# 计算字段dldm
calc_field_dldm(os.path.join(base_shp_path,qx_name,new_sd_shp_name),dldm_name,dlmc_name)

In [12]:
# 计算字段dlbm
calc_field_dllb(os.path.join(base_shp_path,qx_name,new_sd_shp_name),dllb_name,dlbm_name)

In [13]:
# 创建缓冲区 500
create_buffer_zone(os.path.join(base_shp_path,qx_name,new_sd_shp_name),os.path.join(base_shp_path,qx_name,f'{qx_name}_extent_p_500.shp'),500)

'F:\\cache_data\\shp_file\\ky\\ky_extent_p_500.shp'

In [14]:
# 创建缓冲区 1000
create_buffer_zone(os.path.join(base_shp_path,qx_name,new_sd_shp_name),os.path.join(base_shp_path,qx_name,f'{qx_name}_{p_buffer_name}'),1000)

'F:\\cache_data\\shp_file\\ky\\ky_extent_p_1000.shp'

In [15]:
# 创建缓冲区 raster
create_buffer_zone(os.path.join(base_shp_path,qx_name,new_sd_shp_name),os.path.join(base_shp_path,qx_name,f'{qx_name}_{p_raster_buffer_name}'),1500)

'F:\\cache_data\\shp_file\\ky\\ky_extent_p_1500.shp'

In [16]:
# 创建地理投影缓冲区
arcpy.Project_management(os.path.join(base_shp_path,qx_name,f'{qx_name}_{p_raster_buffer_name}'),os.path.join(base_shp_path,qx_name,f'{qx_name}_{d_buffer_name}'),d_proj_project)

# 计算地质数据的MDMCDM

In [8]:
# 地质数据shp路径
dz_shp_path = r'F:\cache_data\shp_file\ky\dz\ky_dzhaveriver_polygon.shp'
mzmcdm_name = 'MZMCDM' # 地质代码
mzmc_name = 'MZMC' # 地质名称
# 添加字段
add_field(dz_shp_path,mzmcdm_name)
# 计算字段mzmcdm
calc_field_dldm(dz_shp_path,mzmcdm_name,mzmc_name)

In [10]:
# 融合地质数据(dissove)
#获取时间戳
timestamp = datetime.now().strftime('%Y%m%d')
# 融合地质数据(dissove)
arcpy.env.overwriteOutput = True
arcpy.analysis.PairwiseDissolve(
    in_features=dz_shp_path,
    out_feature_class=os.path.join(base_shp_path,qx_name,'dz',f'dz_dissove_{timestamp}.shp'),
    dissolve_field=[mzmc_name,mzmcdm_name],
    statistics_fields=None,
    multi_part="SINGLE_PART",
    concatenation_separator=""
)