In [1]:
import geopandas as gpd
import pandas as pd
import math
import time
import os
import re
from shapely.geometry import Point, Polygon, LineString
import glob
import fiona
from sqlalchemy import create_engine
import psycopg2
from datetime import datetime
import numpy as np
import warnings
warnings.filterwarnings('ignore')

In [2]:
# 당해연도 기입
YYYY = '2025'

In [3]:
#All field list
spel_list = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']

FL_NM_list = ['논', '밭', '과수', '시설', '인삼', '비경지']

FL_CD_list = ['01', '02', '03', '04', '05', '06'] #CD01

SB_LDCG_CD_list = ['전', '답', '과', '목', '임', '잡', 'NULL'] #CD03

LDCG_CD_list = ['전','답','과','목','임','광','염','대','장','학','차','주','창','도','철','제','천','구','유','양','수','공','체','원','종','사','묘','잡', 'NULL'] #CD02

SOURCE_NM_list = ['항공정사영상', '위성영상', '드론영상', '영상미수급']
SOURCE_CD_list = ['01', '02', '03', '04'] #CD04

#!!BSH 수정
UPDT_TP_NM_list = ['신규', '삭제', '변경' ]
UPDT_TP_CD_list = ['01', '02', '03'] #CD06

CHG_TP_NM_list = ['생성','폐기','임의폐기','수정']
CHG_TP_CD_list = ['01', '02', '03', '04'] #CD08
#!!! BSH 수정까지
CAUSE_NM_list = ['개간','건물','도로','태양광','산림','시설','개선','합필','분필','임의폐기','기타'] # CHG_RSN_NM // '조림' >> '산림'

CAUSE_CD_list = ['01', '02', '03', '04', '05', '06', '07', '08','09','10','99'] #CD07

AD_USE_CD_list = ['00', '01', '02', '03', '04']

AI_YN_list = ['Y', 'N']

FL_CI_CD_list = ['11', '12', '13', '14', '21', '22', '23', '24', '31', '32', '41', '42', '43', '44', '51', '52', '53', '54', '55', '56', '57', '61', '71']

AI_FL_CD_list = ['01', '02', '03', '04', '05']

EXEPT_MTTR_list = ['휴경', '묘지', '바위', '송전탑', '시설창고', '간척지', '지번미정', '비경작', '초지', '건천농경지']

L3_YN_list = ['Y', 'N']

In [4]:
#품질검수 오류 0개 지역 리스트
not_count_shp_list = []

#품질검수 오류 n개 지역 리스트
count_shp_list = []

# File path

In [5]:
base_path = os.getcwd()
today = datetime.today().strftime("%Y-%m-%d")
output_dir_name = f"팜맵_검수_{today}"
output_dir = os.path.join(base_path, output_dir_name)
os.makedirs(output_dir, exist_ok=True)

In [6]:
# 법정경계 SHP 경로
shp_bjd_path = os.path.join(output_dir, "landing_db")
os.makedirs(shp_bjd_path, exist_ok=True)

# 품질검사 진행할 팜맵 SHP '폴더' 경로
folder_path = os.path.join(output_dir, "production_db")
os.makedirs(folder_path, exist_ok=True)

# 품질검사 결과 저장 '폴더' 경로
save_folder = r'C:\Users\Opt_AI\Desktop\Tool\입력 및 검수\속성검수결과'

In [7]:
db_info = {
    'host': '192.168.49.171',
    'port': 62484,
    'database': 'farmmap_demo',
    'user': 'Opt-Ai',
    'password': 'opt-ai'
}
schema_name = 'landing_db'

engine = create_engine(
    f"postgresql+psycopg2://{db_info['user']}:{db_info['password']}@{db_info['host']}:{db_info['port']}/{db_info['database']}"
)

# 3. 테이블 목록 추출
conn = psycopg2.connect(**db_info)
cur = conn.cursor()
cur.execute(f"""
    SELECT table_name
    FROM information_schema.tables
    WHERE table_schema = '{schema_name}'
""")
tables = [row[0] for row in cur.fetchall()]
cur.close()
conn.close()

# 4. 테이블별 shp 내보내기
for table in tables:
    shp_path = os.path.join(shp_bjd_path, f"{table}.shp")

    # geometry 컬럼명 추출
    try:
        conn = psycopg2.connect(**db_info)
        cur = conn.cursor()
        cur.execute(f"""
            SELECT f_geometry_column
            FROM geometry_columns
            WHERE f_table_schema = '{schema_name}' AND f_table_name = '{table}'
        """)
        geom_col = cur.fetchone()
        if geom_col is not None:
            geom_col = geom_col[0]
        else:
            print(f"[SKIP] {table}: geometry 컬럼 없음")
            cur.close()
            conn.close()
            continue
        cur.close()
        conn.close()
    except Exception as e:
        print(f"[SKIP] {table}: geometry 컬럼 추출 오류: {e}")
        continue

    # 쿼리 및 내보내기
    sql = f'''
        SELECT *, 
            ST_SetSRID("{geom_col}", 5179) AS geom_5179
        FROM "{schema_name}"."{table}"
    '''

    try:
        gdf = gpd.read_postgis(sql, engine, geom_col="geom_5179")
        gdf.set_crs(epsg=5179, inplace=True)
        
        # geometry 컬럼을 항상 맨 뒤로 배치
        non_geom_cols = [c for c in gdf.columns if c != 'geometry']
        if 'geometry' in gdf.columns:
            gdf = gdf[non_geom_cols + ['geometry']]

        if not gdf.empty:
            gdf.to_file(shp_path, driver='ESRI Shapefile', encoding='cp949')
            print(f"[OK] {table} → {shp_path}")
        else:
            print(f"[SKIP] {table}: 데이터 없음")
    except Exception as e:
        print(f"[FAIL] {table}: {e}")

[OK] legaldong_li_bndry_20250327 → c:\Users\Opt_AI\Desktop\Tool\입력 및 검수\팜맵_검수_2026-02-09\landing_db\legaldong_li_bndry_20250327.shp


In [8]:
db_info = {
    'host': '192.168.49.171',
    'port': 62484,
    'database': 'farmmap_demo',
    'user': 'Opt-Ai',
    'password': 'opt-ai'
}
schema_name = 'staging_db'

engine = create_engine(
    f"postgresql+psycopg2://{db_info['user']}:{db_info['password']}@{db_info['host']}:{db_info['port']}/{db_info['database']}"
)


# 원하는 WKT 문자열
custom_wkt = '''PROJCS["Transverse_Mercator",GEOGCS["GCS_Geographic_Coordinate_System",DATUM["D_GRS_80",SPHEROID["GRS_1980",6378137.0,298.257222101]],PRIMEM["Greenwich",0.0],UNIT["Degree",0.0174532925199433]],PROJECTION["Transverse_Mercator"],PARAMETER["False_Easting",1000000.0],PARAMETER["False_Northing",2000000.0],PARAMETER["Central_Meridian",127.5],PARAMETER["Scale_Factor",0.9996],PARAMETER["Latitude_Of_Origin",38.0],UNIT["Meter",1.0]]'''

# 테이블 목록 추출
conn = psycopg2.connect(**db_info)
cur = conn.cursor()
cur.execute(f"""
    SELECT table_name
    FROM information_schema.tables
    WHERE table_schema = '{schema_name}'
""")
tables = [row[0] for row in cur.fetchall()]
cur.close()
conn.close()

for table in tables:
    shp_path = os.path.join(folder_path, f"{table}.shp")

    # geometry 컬럼명 추출
    try:
        conn = psycopg2.connect(**db_info)
        cur = conn.cursor()
        cur.execute(f"""
            SELECT f_geometry_column
            FROM geometry_columns
            WHERE f_table_schema = '{schema_name}' AND f_table_name = '{table}'
        """)
        geom_col = cur.fetchone()
        if geom_col is not None:
            geom_col = geom_col[0]
        else:
            print(f"[SKIP] {table}: geometry 컬럼 없음")
            cur.close()
            conn.close()
            continue
        cur.close()
        conn.close()
    except Exception as e:
        print(f"[SKIP] {table}: geometry 컬럼 추출 오류: {e}")
        continue

    # 좌표계 변환 없이 5186로 지정
    sql = f'''
        SELECT *, 
            ST_SetSRID("{geom_col}", 5186) AS geom_5186
        FROM "{schema_name}"."{table}"
    '''

    try:
        gdf = gpd.read_postgis(sql, engine, geom_col="geom_5186")
        gdf.set_crs(epsg=5186, inplace=True)
        
        # ===== geometry 컬럼을 항상 맨 뒤로 배치 =====
        non_geom_cols = [c for c in gdf.columns if c != 'geometry']
        if 'geometry' in gdf.columns:
            gdf = gdf[non_geom_cols + ['geometry']]

        if not gdf.empty:
            gdf.to_file(shp_path, driver='ESRI Shapefile', encoding='cp949')
            print(f"[OK] {table} → {shp_path}")

            # .prj 강제 덮어쓰기
            prj_path = shp_path.replace('.shp', '.prj')
            with open(prj_path, 'w', encoding='utf-8') as f:
                f.write(custom_wkt)
        else:
            print(f"[SKIP] {table}: 데이터 없음")
    except Exception as e:
        print(f"[FAIL] {table}: {e}")

[OK] 2025_경상남도_함안군 → c:\Users\Opt_AI\Desktop\Tool\입력 및 검수\팜맵_검수_2026-02-09\production_db\2025_경상남도_함안군.shp


In [9]:
db_info = {
    'host': '192.168.49.171',
    'port': 62484,
    'database': 'farmmap_demo',
    'user': 'Opt-Ai',
    'password': 'opt-ai'
}
schema_name = 'validation_db'

engine = create_engine(
    f"postgresql+psycopg2://{db_info['user']}:{db_info['password']}@{db_info['host']}:{db_info['port']}/{db_info['database']}"
)


# 원하는 WKT 문자열
custom_wkt = '''PROJCS["Transverse_Mercator",GEOGCS["GCS_Geographic_Coordinate_System",DATUM["D_GRS_80",SPHEROID["GRS_1980",6378137.0,298.257222101]],PRIMEM["Greenwich",0.0],UNIT["Degree",0.0174532925199433]],PROJECTION["Transverse_Mercator"],PARAMETER["False_Easting",1000000.0],PARAMETER["False_Northing",2000000.0],PARAMETER["Central_Meridian",127.5],PARAMETER["Scale_Factor",0.9996],PARAMETER["Latitude_Of_Origin",38.0],UNIT["Meter",1.0]]'''

# 테이블 목록 추출
conn = psycopg2.connect(**db_info)
cur = conn.cursor()
cur.execute(f"""
    SELECT table_name
    FROM information_schema.tables
    WHERE table_schema = '{schema_name}'
""")
tables = [row[0] for row in cur.fetchall()]
cur.close()
conn.close()

for table in tables:
    shp_path = os.path.join(folder_path, f"{table}.shp")

    # geometry 컬럼명 추출
    try:
        conn = psycopg2.connect(**db_info)
        cur = conn.cursor()
        cur.execute(f"""
            SELECT f_geometry_column
            FROM geometry_columns
            WHERE f_table_schema = '{schema_name}' AND f_table_name = '{table}'
        """)
        geom_col = cur.fetchone()
        if geom_col is not None:
            geom_col = geom_col[0]
        else:
            print(f"[SKIP] {table}: geometry 컬럼 없음")
            cur.close()
            conn.close()
            continue
        cur.close()
        conn.close()
    except Exception as e:
        print(f"[SKIP] {table}: geometry 컬럼 추출 오류: {e}")
        continue

    # 좌표계 변환 없이 5186로 지정
    sql = f'''
        SELECT *, 
            ST_SetSRID("{geom_col}", 5186) AS geom_5186
        FROM "{schema_name}"."{table}"
    '''

    try:
        gdf = gpd.read_postgis(sql, engine, geom_col="geom_5186")
        gdf.set_crs(epsg=5186, inplace=True)
        
        # ===== geometry 컬럼을 항상 맨 뒤로 배치 =====
        non_geom_cols = [c for c in gdf.columns if c != 'geometry']
        if 'geometry' in gdf.columns:
            gdf = gdf[non_geom_cols + ['geometry']]

        if not gdf.empty:
            gdf.to_file(shp_path, driver='ESRI Shapefile', encoding='cp949')
            print(f"[OK] {table} → {shp_path}")

            # .prj 강제 덮어쓰기
            prj_path = shp_path.replace('.shp', '.prj')
            with open(prj_path, 'w', encoding='utf-8') as f:
                f.write(custom_wkt)
        else:
            print(f"[SKIP] {table}: 데이터 없음")
    except Exception as e:
        print(f"[FAIL] {table}: {e}")

[OK] 2025_경상남도_함안군_변경 → c:\Users\Opt_AI\Desktop\Tool\입력 및 검수\팜맵_검수_2026-02-09\production_db\2025_경상남도_함안군_변경.shp
[OK] 2025_경상남도_함안군_삭제 → c:\Users\Opt_AI\Desktop\Tool\입력 및 검수\팜맵_검수_2026-02-09\production_db\2025_경상남도_함안군_삭제.shp
[OK] 2025_경상남도_함안군_신규 → c:\Users\Opt_AI\Desktop\Tool\입력 및 검수\팜맵_검수_2026-02-09\production_db\2025_경상남도_함안군_신규.shp


In [10]:
file_list = glob.glob(folder_path+'/**/*.shp', recursive = True)

shp_path_list = []

for file_path in file_list:
    
    file_path = file_path.replace('\\','/')
    
    if '신규' in file_path.split('/')[-1] or '변경' in file_path.split('/')[-1] or '삭제' in file_path.split('/')[-1]:
        continue
    if '생성' in file_path.split('/')[-1] or '수정' in file_path.split('/')[-1] or '폐기' in file_path.split('/')[-1]:
        continue
    
    shp_path = file_path

    shp_path_list.append(shp_path)

#list체크
print('shp 파일 총 개수: ', len(shp_path_list), '개')
shp_path_list[:5]

shp 파일 총 개수:  1 개


['c:/Users/Opt_AI/Desktop/Tool/입력 및 검수/팜맵_검수_2026-02-09/production_db/2025_경상남도_함안군.shp']

In [11]:
# 특정 필드가 몇 번째 위치에 있는지 반환하는 함수.
def get_field_index(field_name):
    with fiona.open(shp_path, 'r') as src:
        properties = src.schema['properties']
        
        # 필드명 위치 찾기
        field_index = list(properties.keys()).index(field_name) if field_name in properties else None
        
        return field_index

# 한글 외 값 유무 체크
def contains_non_korean_or_non_space(text):
    # 한글(가-힣)과 공백만 허용 → 그 외가 있으면 True
    return bool(re.search(r'[^\uAC00-\uD7A3\s]', text))

# True False 역 변환
# def contains_only_korean_or_space(text):
#     return not bool(re.search(r'[^\uAC00-\uD7A3\s]', text))

# 숫자 외 값 여부 체크 
def contains_non_digit(text):
    # 숫자(0-9)가 아닌 문자가 하나라도 있으면 True
    return bool(re.search(r'[^\d]', text))

In [12]:
# field index
UID_index = get_field_index('UID')
CAD_CON_RA_index = get_field_index('CAD_CON_RA')
SOURCE_NM_index = get_field_index('SOURCE_NM')
SOURCE_CD_index = get_field_index('SOURCE_CD')
FLIGHT_YMD_index = get_field_index('FLIGHT_YMD')
EXEPT_MTTR_index = get_field_index('EXEPT_MTTR')
UPDT_YMD_index = get_field_index('UPDT_YMD')
OPRTR_NM_index = get_field_index('OPRTR_NM')

UPDT_NM_index = get_field_index('UPDT_NM')
UPDT_TP_NM_index = get_field_index('UPDT_TP_NM')
UPDT_TP_CD_index = get_field_index('UPDT_TP_CD')
CHG_TP_NM_index = get_field_index('CHG_TP_NM')
CHG_TP_CD_index = get_field_index('CHG_TP_CD')
CHG_RSN_NM_index = get_field_index('CHG_RSN_NM')
CHG_RSN_CD_index = get_field_index('CHG_RSN_CD')

FL_ARMT_YN_index = get_field_index('FL_ARMT_YN')
AI_YN_index = get_field_index('AI_YN')
COV_INFO_index = get_field_index('COV_INFO')
O_AREA_index = get_field_index('O_AREA')
O_UID_index = get_field_index('O_UID')
O_CLSF_NM_index = get_field_index('O_CLSF_NM')

ER_DCLR_TP_index = get_field_index('ER_DCLR_TP')

In [13]:
print(CHG_TP_NM_index)
print(CHG_TP_CD_index)

39
40


In [14]:
# 법정경계 SHP Load
bjd = gpd.read_file(shp_bjd_path)

In [15]:
# 1. bjd에서 비교용 주소 리스트 만들기
def build_bjd_address(row):
    sido = row.get("SIDO_NM", "")
    sgg  = row.get("SGG_NM", "")
    emd  = row.get("EMD_NM", "")
    ri   = row.get("RI_NM", None)

    if pd.notna(ri):
        if sido == sgg:
            return f"{sido} {emd} {ri}"
        else:
            return f"{sido} {sgg} {emd} {ri}"
    else:
        if sido == sgg:
            return f"{sido} {emd}"
        else:
            return f"{sido} {sgg} {emd}"
        

# 주소 문자열 집합 (중복 제거 + 빠른 조회용)
bjd_addr_set = list(set(bjd.apply(build_bjd_address, axis=1)))

In [16]:
# # 테스트용 shp
# shp = gpd.read_file('/media/FarmMap/02. DB/01. SHP DB/29. 지적정보기입_2024팜맵전국_결과_r3_ID재부여/2024_경기도_광명시/2024_경기도_광명시.shp')
# shp.columns
# shp.loc[0, 'ID'] = 'xxxx'
# shp.loc[0, 'ID']

# 팜맵

In [17]:
shp_path_list = shp_path_list[:3]

In [18]:
#전체 루프

# 조건 변수
# 적용할 list numb
location_numb = 1

bjd = gpd.read_file(shp_bjd_path)
bjd_indexed = bjd.set_index("RI_CD")

check_results = []

for shp_path in shp_path_list:

    if location_numb >= 0 and location_numb <= 200:

        shp_name = os.path.splitext(os.path.basename(shp_path))[0]    #확장자 완전 제거
        # shp_root = shp_path.split('/2024')[0]

        # 저장 폴더 + 지역별 저장 폴더 경로
        save_path = os.path.join(save_folder, shp_name)
        print(f'검사 지역: {shp_name}')
        print(f'저장 경로: {save_path}')

        # 팜맵 SHP Load
        shp = gpd.read_file(shp_path, encoding = 'cp949')

        # 갱신 전(전차년도) 팜맵
        shp_bf = pd.DataFrame()

        # 법정경계 SHP Load
        bjd = gpd.read_file(shp_bjd_path)
        
        #########################################################################
        # # 이력 파일 Load
        # # 변경 및 삭제 경로
        shp_chg = pd.DataFrame()
        shp_del = pd.DataFrame()

        # try:
        #     try:
        #         shp_chg = gpd.read_file(shp_path.split('.shp')[0] + '_수정.shp', encoding = 'cp949')
        #     except:
        #         shp_chg = gpd.read_file(shp_path.split('.shp')[0] + '_변경.shp', encoding = 'cp949')
        # except:
        #     pass
        
        # try:
        #     try:
        #         shp_del = gpd.read_file(shp_path.split('.shp')[0] + '_폐기.shp', encoding = 'cp949')
        #     except:
        #         shp_del = gpd.read_file(shp_path.split('.shp')[0] + '_삭제.shp', encoding = 'cp949')
        # except:
        #     pass

        #########################################################################
        #사전 Error Check list setting
        shp_list = shp.values.tolist() 
        Error_Code_list = [] #Error_Code 추가할 list

        for N in range(len(shp_list)):
            Error_Code_list.append([N])

        #########################################################################
        #법정동주소 SHP load
        
        bjd_list = bjd.values.tolist()

        bjd_cd_list = []
        bjd_cd_nm_list = []

        bjd_layer_cd_list = bjd['RI_CD'].values.tolist()

        for N in shp_list:
            STDG_CD = N[4] #법정동코드
            STDG_ADDR = N[5] #법정동주소
            bjd_cd_list.append(STDG_CD)
            bjd_cd_nm_list.append([STDG_CD, STDG_ADDR])

        #########################################################################
        total_bjd_cd_list = list(set(bjd_cd_list))
        double_bjd_cd_list = [] #법정동 리스트에 없는 코드들만 모은 list 

        for N in total_bjd_cd_list:
            if N not in bjd_cd_list:
                double_bjd_cd_list.append(N)

        for N in shp_list:
            STDG_CD = N[4] #법정동코드
            STDG_ADDR = N[5] #법정동주소
            PNU = N[6] # PNU

            # 법정동코드 및 PNU[:10]의 법정동코드shp 파일 안에 존재 여부
            # if STDG_CD not in bjd_layer_cd_list:
            #     Error_Code_list[shp_list.index(N)].append('FA06-01')
            if STDG_CD not in bjd_layer_cd_list:
                Error_Code_list[shp_list.index(N)].append('FA06-02')
            # if PNU != None:
            if str(PNU)[:10] not in bjd_layer_cd_list:
                Error_Code_list[shp_list.index(N)].append('FA08-01')

        # 법정동코드 같을 때, 법정동주소 다른 경우
            if STDG_CD not in double_bjd_cd_list:
                # if len(str(STDG_ADDR)) > 8:
                    for bjd1 in bjd_list:
                        if STDG_CD == bjd1[6]:
                            bjd_name = bjd1[1] +' ' + bjd1[3] + ' ' + bjd1[5]
                            if bjd1[1] == bjd1[3]:
                                bjd_name = bjd1[1] + ' ' + bjd1[5]
                            if str(bjd1[7]) != 'None':
                                bjd_name = bjd1[1] +' '+ bjd1[3] +' '+ bjd1[5] +' '+ bjd1[7]
                                if bjd1[1] == bjd1[3]:
                                    bjd_name = bjd1[1] + ' ' + bjd1[5] +' '+ bjd1[7]

                            if STDG_ADDR != bjd_name:
                                Error_Code_list[shp_list.index(N)].append('FA07-01')
                                print(N[0], ':',STDG_ADDR, '>>', bjd_name)

        # bjd = gpd.read_file(shp_bjd_path)
        # bjd_indexed = bjd.set_index("RI_CD")
        if pd.notna(STDG_CD) and STDG_CD in bjd_indexed.index:
            bjd_row = bjd_indexed.loc[STDG_CD]
            
            sido = bjd_row.get("SIDO_NM", "")
            sgg  = bjd_row.get("SGG_NM", "")
            emd  = bjd_row.get("EMD_NM", "")
            ri   = bjd_row.get("RI_NM", None)
            
            # 주소 구성
            if pd.notna(ri):
                full_addr = f"{sido} {sgg} {emd} {ri}"
                if sido == sgg:
                    full_addr = f"{sido} {emd} {ri}"
            else:
                full_addr = f"{sido} {sgg} {emd}"
                if sido == sgg:
                    full_addr = f"{sido} {emd}"

            if STDG_ADDR != full_addr:
                Error_Code_list[shp_list.index(N)].append('FA07-02')

        if STDG_ADDR not in  bjd_addr_set:
            Error_Code_list[shp_list.index(N)].append('FA07-03')
        #########################################################################

        for idx, N in enumerate(shp_list):
            # 필드 값 추출
            ID = N[0]
            STDG_CD = N[4]  # 법정동코드
            STDG_ADDR = N[5]  # 법정동주소
            PNU = N[6]  # 대표PNU
            LDCG_CD = N[7]  # 대표지목
            SB_LDCG_CD = N[9]  # 부지목
            SB_PNU = N[8]  # 부PNU
                    
            # 11. 부지목 오류
            if SB_LDCG_CD == None:
                Error_Code_list[idx].append('AC01-11')

            if SB_LDCG_CD not in SB_LDCG_CD_list:
                Error_Code_list[idx].append('CD-11')

            if LDCG_CD not in ['전', '답', '과', '목', '임', '잡']:
                if SB_LDCG_CD not in ['전', '답', '과', '목', '임', '잡', 'NULL']:
                    Error_Code_list[idx].append('FA11-01')
                
            if LDCG_CD in ['전', '답', '과', '목', '임', '잡']:
                if LDCG_CD != SB_LDCG_CD:
                    Error_Code_list[idx].append('FA11-02')
                    
            # 지목/부지목과 PNU/부PNU 비교
            if SB_LDCG_CD in ['전', '답', '과', '목', '임', '잡']:
                if SB_PNU == None:
                    Error_Code_list[idx].append('FA10-01')
            if LDCG_CD in ['전', '답', '과', '목', '임', '잡']:
                if SB_PNU != PNU:
                    Error_Code_list[idx].append('FA10-02')
                    
            # 19., 21. 업무규칙 (갱신유형, 변경사유)
            UPDT_TP_NM = N[UPDT_TP_NM_index]  # 갱신유형
            CAUSE_NM = N[CHG_RSN_NM_index]  # 변경사유 == CHG_RSN_NM

            if UPDT_TP_NM != None:
                if CAUSE_NM not in CAUSE_NM_list:
                    Error_Code_list[idx].append('CHG_NM')
            
            if CAUSE_NM != None:
                if UPDT_TP_NM == None:
                    Error_Code_list[idx].append('FA20-01')
                    
            if UPDT_TP_NM != None:
                if CAUSE_NM == None:
                    Error_Code_list[idx].append('FA22-01')

            

            # 전차UID, 전차농경지 분류명
            O_UID = N[O_UID_index]  # 전차 UID
            O_FL_NM = N[O_CLSF_NM_index]  # 전차 농경지 분류명 == O_CLSF_NM
            UPDT_YMD = N[UPDT_YMD_index]  # 갱신 일자
            CHG_RSN_NM = N[CHG_RSN_NM_index]

#!!!BSH 수정 - 여기부터
            # 갱신유형에 따른 오류 체크 / 
            #★★★ 갱신일자가 당해년도 ★★★
            # if UPDT_YMD != None and str(UPDT_YMD) != 'nan':
            #     if UPDT_YMD[:4] == YYYY:
            if True:
                    if UPDT_TP_NM == '변경':
                        if O_UID == None:
                            # Error_Code_list[idx].append('FA24-01')
                            Error_Code_list[idx].append('FA24-01-01')
                        if O_FL_NM == None:
                            Error_Code_list[idx].append('FA25-01')
                        if UPDT_YMD == None:
                            Error_Code_list[idx].append('FA32-01')
                            
                    if UPDT_TP_NM == '변경':
                        if CHG_RSN_NM == '분필':  # 생성에 분필이 포함됨 // O_UID가 필수적으로 생성됨
                            if O_UID == None:
                                Error_Code_list[idx].append('FA24-01-01')
                            if O_FL_NM == None:
                                Error_Code_list[idx].append('FA25-01')
                            # if UPDT_YMD == None:
                            #     Error_Code_list[idx].append('FA32-01')
                    if UPDT_TP_NM == '신규':
                        if CHG_RSN_NM != '분필' and CHG_RSN_NM == '개간':
                            if O_UID != None:
                                Error_Code_list[idx].append('FA24-02')
                            if O_FL_NM != None:
                                Error_Code_list[idx].append('FA25-02')
                            # if UPDT_YMD == None:
                            #     Error_Code_list[idx].append('FA32-01')

                    if UPDT_TP_NM == '변경':
                        if O_UID == None:
                            Error_Code_list[idx].append('FA24-01')
                        if O_FL_NM == None:
                            Error_Code_list[idx].append('FA25-01')
                        # if UPDT_YMD == None:
                        #     Error_Code_list[idx].append('FA32-01')

                    if UPDT_TP_NM == '변경' and CHG_RSN_NM == '분필':
                        if O_UID == None or len(O_UID) != 8:
                            Error_Code_list[idx].append('FA24-03')

                    if UPDT_TP_NM == '변경' and CHG_RSN_NM == '합필':
                        if O_UID == None or len(O_UID) == 8:
                            Error_Code_list[idx].append('FA24-04')

                        if O_UID != None: # 합필의 O_UID 요소들의 길이가 8이 아닐 때
                            O_UID_test = O_UID.replace(',', '')
                            O_UID_test = O_UID_test.replace(' ', '')

                            if len(O_UID_test) % 8 != 0:
                                Error_Code_list[idx].append('FA24-04')
#!!!BSH 수정 - 여기까지
            ###########################################################
            # 06. 법정동코드 오류
            STDG_CD = N[4]
            
            if STDG_CD == None:
                Error_Code_list[idx].append('AC01-06')
            
            if STDG_CD != None:
                if len(STDG_CD) != 10:  # 법정동코드 길이가 10 아닌 경우
                    Error_Code_list[idx].append('AC02-06')
                if STDG_CD.isdigit() == False:  # 숫자 유무 체크 함수
                    Error_Code_list[idx].append('AC03-06')
            ###########################################################
            # 07. 법정동주소 오류
            STDG_ADDR = N[5]
            
            if STDG_ADDR == None:  # 법정동 주소 값 없는 경우
                Error_Code_list[idx].append('AC01-07')

            # elif STDG_ADDR != None: # 한글 유무 체크 >> 한글 X 값 >> True
            #     if contains_non_korean_or_non_space(STDG_ADDR) == True: # 공백 점검 필요
            #         Error_Code_list[idx].append('AC04-07')
            ###########################################################
            # 02. ID 오류
            ID = N[0]
            UPDT_YMD = N[UPDT_YMD_index]
            
            ID_str  = '' if pd.isna(ID) else str(ID)
            PNU_str = '' if pd.isna(PNU) else str(PNU)

            if ID == None:
                Error_Code_list[idx].append('AC01-02')
            
            if ID != None:
                if len(ID) != 15:  # ID 길이 15 아닌 경우
                    Error_Code_list[idx].append('AC02-02')
                    
                if ID.isdigit() == False:  # 숫자 유무 체크 함수
                    Error_Code_list[idx].append('AC03-02')
#!!! BSH 수정
                # 생성 분필의 ID 10자리의 법정동코드와 동일 여부
                if UPDT_YMD != None and STDG_CD != None:
                    if UPDT_YMD.split('-')[0] == YYYY:
                        if UPDT_TP_NM == '신규' or CHG_RSN_NM == '분필':
                            if ID[:10] not in STDG_CD:
                                Error_Code_list[idx].append('FA02-01')
            
                # 미갱신 농경지 ID 변경 여부
                if len(shp_bf) > 0:
                    shp_bf_id_list = list(shp_bf['ID'])
                    if UPDT_YMD.split('-')[0] != YYYY:
                        if ID not in shp_bf_id_list:
                            Error_Code_list[idx].append('FA02-02')

                    if UPDT_TP_NM == '변경' and UPDT_YMD.split('-')[0] != YYYY and CHG_RSN_NM not in ['합필','분필']:
                        if ID not in shp_bf_id_list:
                            Error_Code_list[idx].append('FA02-03')
#!!! BSH 여기까지
            ###########################################################
            # 03. UID 오류
            UID = N[UID_index]

            # 납품데이터에서는 제외
            # if UID == None:
            #     Error_Code_list[idx].append('AC01-03') # UID 유무
            # 납품데이터에서는 제외
            if UID != None:
                if len(UID) != 8:  # ID 길이 15 아닌 경우
                    Error_Code_list[idx].append('AC02-03')
                if UID.isdigit() == False:  # 숫자 유무 체크 함수
                    Error_Code_list[idx].append('AC03-03')
            ###########################################################
            # 04. 분류명(농경지분류) 오류
            FL_NM = N[2]
            CLSF_NM = N[2]
            
            if FL_NM == None:
                Error_Code_list[idx].append('AC01-04')

            if FL_NM not in FL_NM_list:
                Error_Code_list[idx].append('AC04-04')
            ###########################################################
            # 05. 분류코드 오류
            FL_CD = N[3]
            
            if FL_CD == None:
                Error_Code_list[idx].append('AC01-05')
                
            if FL_CD != None:
                if FL_CD not in FL_CD_list:  # 농경지분류 코드 외 다른 값이 있을 경우
                    Error_Code_list[idx].append('CD-05')

            if FL_CD != None and FL_NM != None:
                if FL_NM == '논' and FL_CD != '01':
                    Error_Code_list[idx].append('FA05-01')
                if FL_NM == '밭' and FL_CD != '02':
                    Error_Code_list[idx].append('FA05-01')
                if FL_NM == '과수' and FL_CD != '03':
                    Error_Code_list[idx].append('FA05-01')
                if FL_NM == '시설' and FL_CD != '04':
                    Error_Code_list[idx].append('FA05-01')
                if FL_NM == '인삼' and FL_CD != '05':
                    Error_Code_list[idx].append('FA05-01')
                if FL_NM == '비경지' and FL_CD != '06':
                    Error_Code_list[idx].append('FA05-01')
            ###########################################################
            # 08. 대표 PNU 오류
            PNU = N[6]
            
            if PNU == None:
                Error_Code_list[idx].append('AC01-08')
            
            if PNU != None:
                if PNU.isdigit() == False:  # 숫자 유무 체크 함수
                    Error_Code_list[idx].append('AC03-08')

                elif len(str(PNU)) != 19:
                    Error_Code_list[idx].append('AC03-08')

            elif PNU != None: # 한글 포함 유무 체크 >> 한글 X 값 >> True
                if contains_non_digit(PNU) == True:
                    Error_Code_list[idx].append('AC04-08')

            if len(str(PNU)) != 19:#4377025334105940000
                Error_Code_list[idx].append('PNU_len')
            ###########################################################
            # 09. 대표지목 오류
            LDCG_CD = N[7]

            if LDCG_CD == None:  # 대표 지목 값 없는 경우
                Error_Code_list[idx].append('AC01-09')
            
            if LDCG_CD != None:
                if LDCG_CD not in LDCG_CD_list:  # 농경지분류 코드 외 다른 값이 있을 경우
                    Error_Code_list[idx].append('CD-09')
            ###########################################################
            # 10. 부PNU 오류
            SB_PNU = str(N[8])
                    
            if SB_PNU == None:  # 부PNU 값이 없을 경우
                Error_Code_list[idx].append('AC01-10')

            if SB_PNU != None:
                if SB_PNU != 'NULL':
                    if SB_PNU.isdigit() == False:  # 숫자 유무 체크 함수
                        Error_Code_list[idx].append('AC03-10')

                # if contains_non_digit(SB_PNU) == True:
                #     Error_Code_list[idx].append('AC04-10')
                    if SB_PNU.isdigit() == False:  # 숫자 유무 체크 함수
                        Error_Code_list[idx].append('AC04-10')
            ###########################################################
            # 12. 농경지면적 오류
            AREA = N[10]  # == FL_AR, AREA
            
            if AREA == None or str(AREA) == 'None' or str(AREA) == 'nan':  # 농경지 면적 값 없는 경우
                Error_Code_list[idx].append('AC01-12')

          #  if AREA != None:
           #     if AREA - N[-1].area > 1 or AREA - N[-1].area < -1:
            #        Error_Code_list[idx].append('AC07-12-1') # 확인 필요
                
             #   if float(AREA) <= 0:  # 농경지 면적 0 이하일 경우
              #      Error_Code_list[idx].append('FA12-01')

            # if UPDT_YMD.split('-')
            ###########################################################
            # 13. 지적일치율 오류
            CAD_CON_RA = N[CAD_CON_RA_index]

            if CAD_CON_RA == None:
                Error_Code_list[idx].append('AC01-13')
            ###########################################################
            # 14.판독영상명 오류
            SOURCE_NM = N[SOURCE_NM_index]

            if SOURCE_NM == None:
                Error_Code_list[idx].append('AC01-14')
            
            # elif SOURCE_NM != None: # 한글 유무 체크 >> 한글 X 값 >> True
            #     if contains_non_korean_or_non_space(SOURCE_NM) == True:
            #         Error_Code_list[idx].append('AC04-14')

            elif SOURCE_NM != None: # 한글 유무 체크 >> 한글 X 값 >> True
                if SOURCE_NM not in SOURCE_NM_list:
                    Error_Code_list[idx].append('AC04-14')
            ###########################################################
            # 15.판독영상 코드 오류
            SOURCE_CD = N[SOURCE_CD_index]

            if SOURCE_CD == None:
                Error_Code_list[idx].append('AC01-15')
            
            if SOURCE_CD != None:
                if SOURCE_CD not in SOURCE_CD_list:  # 판독 영상 코드가 아닌 다른 값이 있을 경우
                    Error_Code_list[idx].append('CD-15')

            if SOURCE_NM != None and SOURCE_CD != None:
                if SOURCE_NM == '항공정사영상' and SOURCE_CD != '01':
                    Error_Code_list[idx].append('FA15-01')
                if SOURCE_NM == '위성영상' and SOURCE_CD != '02':
                    Error_Code_list[idx].append('FA15-01')
                if SOURCE_NM == '드론영상' and SOURCE_CD != '03':
                    Error_Code_list[idx].append('FA15-01')
                if SOURCE_NM == '영상미수급' and SOURCE_CD != '04':
                    Error_Code_list[idx].append('FA15-01')
            ###########################################################
            # 16. 영상 촬영일자 오류
            t = N[FLIGHT_YMD_index]
            FLIGHT_YMD = t

            if FLIGHT_YMD == None:
                Error_Code_list[idx].append('A01-16') 

            if len(str(t)) == 10:
                for numb in range(10):  # PNU 숫자 아닐 경우
                    if numb < 4:
                        if t[numb] not in spel_list:
                            Error_Code_list[idx].append('AC06-16')
                            break
                    if numb == 4:
                        if t[numb] != '-':
                            Error_Code_list[idx].append('AC06-16')
                            break
                    if numb > 4 and numb <= 6:
                        if t[numb] not in spel_list:
                            Error_Code_list[idx].append('AC06-16')
                            break
                    if numb == 7:
                        if t[numb] != '-':
                            Error_Code_list[idx].append('AC06-16')
                            break
                    if numb > 7 and numb <= 9:
                        if t[numb] not in spel_list:
                            Error_Code_list[idx].append('AC06-16')
                            break
                            
            # 18. 갱신일자 오류
            UPDT_YMD = N[UPDT_YMD_index]  # 갱신 일자
            UPDT_TP_CD = N[UPDT_TP_CD_index]
            t = N[UPDT_YMD_index]
            if len(str(t)) == 10:
                for numb in range(10):  # 숫자 아닐 경우
                    if numb < 4:
                        if t[numb] not in spel_list:
                            Error_Code_list[idx].append('AC06-18')
                            break
                    if numb == 4:
                        if t[numb] != '-':
                            Error_Code_list[idx].append('AC06-18')
                            break
                    if numb > 4 and numb <= 6:
                        if t[numb] not in spel_list:
                            Error_Code_list[idx].append('AC06-18')
                            break
                    if numb == 7:
                        if t[numb] != '-':
                            Error_Code_list[idx].append('AC06-18')
                            break
                    if numb > 7 and numb <= 9:
                        if t[numb] not in spel_list:
                            Error_Code_list[idx].append('AC06-18')
                            break
                if t[5:6] == '00':
                    Error_Code_list[idx].append('AC06-18')

#!!! BSH 수정
            if UPDT_TP_NM == '신규' or UPDT_TP_NM == '변경':
                if UPDT_YMD == None:
                    Error_Code_list[idx].append('FA18-01')

            if UPDT_YMD != None:
                if int(UPDT_YMD.split('-')[0]) > int(YYYY):
                    Error_Code_list[idx].append('FA18-02')

                # if int(UPDT_YMD.replace('-', '')) > int(YYYY+'0101'):
                if int(UPDT_YMD.split('-')[0]) == int(YYYY):
                    if UPDT_TP_NM == None or UPDT_TP_CD == None:
                        Error_Code_list[idx].append('FA18-03')
#!!! BSH 수정까지
            ###########################################################
            # 17. 특이사항 오류 
            # ECT_CN >> EXEPT_MTTR
            EXEPT_MTTR = N[EXEPT_MTTR_index]
            
           # if EXEPT_MTTR != None:  # 코드성 정보 오류
            #    if EXEPT_MTTR not in EXEPT_MTTR_list:
            #        Error_Code_list[idx].append('CD-17')

            if CLSF_NM in ['과수','비경지','인삼']:
                if EXEPT_MTTR == '휴경':
                    Error_Code_list[idx].append('FA17-01')

            #if CLSF_NM =='비경지':
             #   if EXEPT_MTTR not in ['묘지','바위','송전탑','시설창고', '지번미정','간척지']:
               #     Error_Code_list[idx].append('FA17-02')

            if CLSF_NM in ['논','밭']:
                if EXEPT_MTTR not in ['휴경','비경작', '지번미정','간척지', '초지', None]:
                    Error_Code_list[idx].append('FA17-03')

            ###########################################################
            # 19. 갱신유형 오류
            # 20. 갱신유형코드 오류
            UPDT_TP_NM = N[UPDT_TP_NM_index]
            UPDT_TP_CD = N[UPDT_TP_CD_index]

            if UPDT_TP_NM != None: # 한글 유무 체크 >> 한글 X 값 >> True
                if contains_non_korean_or_non_space(UPDT_TP_NM) == True or UPDT_TP_NM not in UPDT_TP_NM_list:
                    Error_Code_list[idx].append('AC04-19')
            
            if UPDT_TP_CD != None:  # 좌우 공백, 특수문자 및 
                if UPDT_TP_CD not in UPDT_TP_CD_list: #정의된 코드 이외의 값이 들어가 있는 경우 오류
                    Error_Code_list[idx].append('CD-20')
                    
            if UPDT_TP_NM != None:
                if UPDT_TP_CD == None:
                    Error_Code_list[idx].append('FA20-01')
            ###########################################################
            # 21. 변경사유명 오류
            # 22. 변경사유코드 오류
            CAUSE_CD = N[CHG_RSN_CD_index]  # == CHG_RSN_CD
            CAUSE_NM = N[CHG_RSN_NM_index]  # == CHG_RSN_NM

            if CAUSE_NM != None: # 한글 유무 체크 >> 한글 X 값 >> True
                if contains_non_korean_or_non_space(CAUSE_NM) == True or CAUSE_NM not in CAUSE_NM_list:
                    Error_Code_list[idx].append('AC04-21')
            
            if CAUSE_CD != None:  # 코드성 정보 오류
                if CAUSE_CD not in CAUSE_CD_list:
                    Error_Code_list[idx].append('CD-22')

            if UPDT_TP_NM in UPDT_TP_NM_list:
                if CAUSE_NM == None:
                    Error_Code_list[idx].append('FA21-01')

            if CAUSE_NM != None:
                if CAUSE_CD == None:
                    Error_Code_list[idx].append('FA22-01')
            ###########################################################
            # 23. 경지정리여부 오류
            L3_YN = N[FL_ARMT_YN_index]  # == FL_ARMT_YN
            
            if L3_YN != None:
                if L3_YN not in L3_YN_list:
                    Error_Code_list[idx].append('CD-23')
            
            if CLSF_NM == '논' or CLSF_NM == '밭':
                if L3_YN == None:
                    Error_Code_list[idx].append('FA23-01')
                    
            if CLSF_NM == '인삼': 
                if L3_YN != 'Y':
                    Error_Code_list[idx].append('FA23-02')

            if CLSF_NM in ['과수','시설','비경지']:
                if L3_YN != None:
                    Error_Code_list[idx].append('FA23-03')

            # if CLSF_NM == '논' or CLSF_NM == '밭' or CLSF_NM == '인삼':
            #     if L3_YN == None:
            #         Error_Code_list[idx].append('FA23-01')
            ###########################################################
            # 24. 전차 UID 오류
            O_UID = N[O_UID_index]
            
            # if O_UID_null == False:
            if O_UID != None:
                if len(O_UID) < 8:
                    Error_Code_list[idx].append('AC02-24')
                    # print(idx, len(O_UID), O_UID)
                if len(O_UID) > 8:
                    length = len(O_UID.replace(',', ''))
                    if length % 8 != 0:
                        Error_Code_list[idx].append('AC02-24')
                        print(2)

            ###########################################################
            # 26. 갱신구분 오류
            if UPDT_NM_index != None:
                UPDT_NM = N[UPDT_NM_index]

                # if UPDT_NM not in ['우선', '정규']:
            #    if UPDT_YMD != None:
                #    if int(UPDT_YMD.split('-')[0]) == YYYY:
                 #       if UPDT_NM not in ['수시', '정기']: ######################################################################################################################
                  #          Error_Code_list[idx].append('FA26-01')
            ###########################################################
            # 32. 오류신고유형 오류
            ER_DCLR_TP = N[ER_DCLR_TP_index]

            if ER_DCLR_TP not in ['생성','형상','형상속성','속성','폐기',None]:
                Error_Code_list[idx].append('FA32-01')
            ############################################################
            # 41. 처리유형 오류 - 추가
            # 42. 처리유형코드 오류
            CHG_TP_NM, CHG_TP_CD = None, None
            if CHG_TP_NM_index is not None:
                CHG_TP_NM = N[CHG_TP_NM_index]
            
            if CHG_TP_CD_index is not None:
                CHG_TP_CD = N[CHG_TP_CD_index]

            if CHG_TP_NM != None: # 한글 유무 체크 >> 한글 X 값 >> True
                if contains_non_korean_or_non_space(CHG_TP_NM) == True or CHG_TP_NM not in CHG_TP_NM_list:
                    Error_Code_list[idx].append('AC04-20')
            
            if CHG_TP_CD != None:  # 좌우 공백, 특수문자 및 
                if CHG_TP_CD not in CHG_TP_CD_list: #정의된 코드 이외의 값이 들어가 있는 경우 오류
                    Error_Code_list[idx].append('CD-21')
                    
            if CHG_TP_NM != None:
                if CHG_TP_CD == None:
                    Error_Code_list[idx].append('FA20-02')
        #########################################################################
        # ID 중복성 체크
        first_value_indices = {}
        duplicate_indices = []

        for index, sub_list in enumerate(shp_list):
            first_value = sub_list[0]
            if pd.isna(first_value):
                continue
            if first_value in first_value_indices:
                duplicate_indices.append((first_value_indices[first_value], index))
            else:
                first_value_indices[first_value] = index

        # Error_Code_list에 중복 인덱스 추가
        for duplicate_indice in duplicate_indices:
            for N in duplicate_indice:
                # N은 이미 shp_list의 인덱스
                Error_Code_list[N].append('AC05-02')

        # UID 중복성 체크
        #first_value_indices = {}
        #duplicate_indices = []

        #for index, sub_list in enumerate(shp_list):
         #   first_value = sub_list[1]
          #  if pd.isna(first_value):
           ##if first_value in first_value_indices:
             #   duplicate_indices.append((first_value_indices[first_value], index))
            #else:
             #   first_value_indices[first_value] = index

        #납품데이터에서는 제외
        # # Error_Code_list에 중복 인덱스 추가
        # for duplicate_indice in duplicate_indices:
        #     for N in duplicate_indice:
        #         # N은 이미 shp_list의 인덱스
        #         Error_Code_list[N].append('AC05-03')
        #납품데이터에서는 제외

        #########################################################################
        # 팜맵 품질검사 Result

        Error_save_list2 = Error_Code_list
        count = 0
        for N in Error_save_list2:
            if len(N) > 1:
                # print(N)
                count += 1
        #########################################################################
        Error_save_list2 = []

        for N in Error_Code_list:
            if len(N) == 0 or len(N) == 1:
                Error_save_list2.append([])

            if len(N) >1:
                Error_save_list2.append(N[1:])
        #########################################################################

        # shp['Error'] = None
        shp['Error'] = Error_save_list2
        # #########################################################################
        # SAVE
        print('오류 개수: ' + str(count) + '개')
        check_results.append({
            "name": shp_name,
            "save_path": save_path,
            "error_count": str(count)
        })
        if count > 0:
            if not os.path.exists(save_path):
                os.makedirs(save_path)
            
            shp['Error'] = shp['Error'].apply(lambda x: ', '.join(map(str, x)) if isinstance(x, list) else x) # list 형태 변환
            # shp.to_file(save_path +'/'+shp_name+'_품질검수.shp', encoding='cp949') #shp convert

            # Error 값 체크
            shp_error_filtered = shp[shp['Error'].fillna('').str.len() >= 5]    
            if len(shp_error_filtered) > 0:
                shp_error_filtered['UPDT_YMD'] = shp_error_filtered['UPDT_YMD'].astype(str)
                shp_error_filtered.to_file(save_path +'/'+shp_name+'_검수결과.shp', encoding="cp949")
                shp_error_filtered.drop(columns='geometry').to_excel(save_path +'/'+shp_name+'_검수결과.xlsx', index=False)

            count_shp_list.append(shp_name)

        if count == 0:
            not_count_shp_list.append(shp_name)

        #########################################################################
    location_numb += 1

# Error_save_list2

검사 지역: 2025_경상남도_함안군
저장 경로: C:\Users\Opt_AI\Desktop\Tool\입력 및 검수\속성검수결과\2025_경상남도_함안군
오류 개수: 67959개


In [19]:
# DataFrame으로 변환
df_results = pd.DataFrame(check_results)
df_results

Unnamed: 0,name,save_path,error_count
0,2025_경상남도_함안군,C:\Users\Opt_AI\Desktop\Tool\입력 및 검수\속성검수결과\20...,67959


# 히스토리테이블 비교

In [20]:
# 필요 함수

# 연도 추출
def safe_extract_year(x):
    try:
        return int(str(x).split('-')[0])
    except:
        return np.nan

# 팜맵 - 수정 / O_UID - UID 이력비교 함수

def is_invalid_o_uid_chg(row, chg_uid_set):
    updt_ymd = pd.to_datetime(row['UPDT_YMD'], errors='coerce') 
#!!! BSH 수정
    if row['UPDT_TP_NM'] != '변경' or pd.isna(updt_ymd) or updt_ymd.year != YYYY:
        return None  # 검사 대상 아님
#!!! BSH 수정까지
    o_uid = row['O_UID']
    reason = row['CHG_RSN_NM']

    if pd.isna(o_uid):
        return None  # UID가 없으면 패스

    uid_list = [uid.strip() for uid in str(o_uid).split(',') if uid.strip()]
    
    if reason == '합필':
        return not (uid_list and uid_list[0] in chg_uid_set)

    elif pd.isna(reason):
        return not all(uid in chg_uid_set for uid in uid_list)

    return None  # 합필도 아니고 None도 아니면 검사 대상 아님

#*팜맵 - 폐기삭제 / O_UID - UID 이력비교

def is_invalid_o_uid_del(row, del_uid_set):
    updt_ymd = pd.to_datetime(row['UPDT_YMD'], errors='coerce')
    
#!!! BSH 수정
    if row['UPDT_TP_NM'] != '변경' or pd.isna(updt_ymd) or updt_ymd.year != YYYY:
        return None  # 검사 대상 아님
#!!! BSH 수정까지
    o_uid = row['O_UID']
    reason = row['CHG_RSN_NM']

    if pd.isna(o_uid):
        return None  # UID가 없으면 패스

    uid_list = [uid.strip() for uid in str(o_uid).split(',') if uid.strip()]
    
    if reason == '합필':
        return not all(uid in del_uid_set for uid in uid_list[1:])

    if reason == '분필':
        return not all(uid in del_uid_set for uid in uid_list)
        
    return None  # 합필도 아니고 None도 아니면 검사 대상 아님

#*팜맵 - 신규생성 / O_UID - UID 이력비교

def is_invalid_o_uid_new(row, new_uid_set):
    updt_ymd = pd.to_datetime(row['UPDT_YMD'], errors='coerce')
#!!! BSH 수정
    if row['UPDT_TP_NM'] != '신규' or pd.isna(updt_ymd) or updt_ymd.year != YYYY:
        return None  # 검사 대상 아님
#!!! BSH 수정까지
    o_uid = row['O_UID']
    reason = row['CHG_RSN_NM']

    if pd.isna(o_uid):
        return None  # UID가 없으면 패스

    uid_list = [uid.strip() for uid in str(o_uid).split(',') if uid.strip()]

    if reason == '분필':
        return not all(uid in new_uid_set for uid in uid_list)

    return None  # 분필 아니면 검사 대상 아님

#O_UID - UID 이력 비교

# 누적해서 Error를 업데이트
def append_error(row, error_code, condition_func):
    if condition_func(row):
        current = row.get('Error', '')
        return (current + ',' if current else '') + error_code
    return row.get('Error', '')

# # 1. 팜맵 - 수정
# chg_uid_set = set(shp_chg['UID'])

# # shp['O_UID_INVALID'] = shp.apply(lambda row: is_invalid_o_uid(row, chg_uid_set), axis=1)

# condition_func = lambda row: is_invalid_o_uid_chg(row, chg_uid_set)
# shp['Error'] = shp.apply(lambda row: append_error(row, 'RS04-03', condition_func), axis=1)

# # 2. 팜맵 - 폐기
# del_uid_set = set(shp_del['UID'])

# # shp['O_UID_INVALID'] = shp.apply(lambda row: is_invalid_o_uid(row, del_uid_set), axis=1)
# condition_func = lambda row: is_invalid_o_uid_del(row, del_uid_set)
# shp['Error'] = shp.apply(lambda row: append_error(row, 'RS05-03', condition_func), axis=1)

# # 3. 팜맵- 생성
# new_uid_set = set(shp_new['UID'])

# # shp['O_UID_INVALID'] = shp.apply(lambda row: is_invalid_o_uid(row, new_uid_set), axis=1)
# condition_func = lambda row: is_invalid_o_uid_new(row, new_uid_set)
# shp['Error'] = shp.apply(lambda row: append_error(row, 'RS03-03', condition_func), axis=1)

In [21]:
#전체 루프

# 조건 변수
# 적용할 list numb
location_numb = 1

bjd = gpd.read_file(shp_bjd_path)
bjd_indexed = bjd.set_index("RI_CD")

for shp_path in shp_path_list:

    if location_numb >= 0 and location_numb <= 24:

        shp_name = os.path.splitext(os.path.basename(shp_path))[0]    #확장자 완전 제거
        # shp_root = shp_path.split('/2024')[0]

        # 저장 폴더 + 지역별 저장 폴더 경로
        save_path = os.path.join(save_folder, shp_name)
        print(f'검사 지역: {shp_name}')
        print(f'저장 경로: {save_path}')

        # 팜맵 SHP Load
        shp = gpd.read_file(shp_path, encoding = 'cp949')

        # 갱신 전(전차년도) 팜맵
        shp_bf = pd.DataFrame()

        # 법정경계 SHP Load
        bjd = gpd.read_file(shp_bjd_path)
        
        #########################################################################
        # # 이력 파일 Load
        # # 변경 및 삭제 경로

        # df 초기화
        shp_chg = pd.DataFrame(columns=shp.columns)
        shp_del = pd.DataFrame(columns=shp.columns)
        shp_new = pd.DataFrame(columns=shp.columns)
        shp_pre = pd.DataFrame(columns=shp.columns)

        shp_hist_path_list = glob.glob(shp_path.split(shp_path.split('/')[-1])[0]+'**/*.shp', recursive = True)

        for shp_hist_path in shp_hist_path_list:

            shp_hist_path = shp_hist_path.replace('\\', '/')

            if '수정' in shp_hist_path.split('/')[-1] or '변경' in shp_hist_path.split('/')[-1]:
                shp_chg_path = shp_hist_path
                shp_chg = gpd.read_file(shp_chg_path, encoding = 'cp949')

            if '폐기' in shp_hist_path.split('/')[-1] or '삭제' in shp_hist_path.split('/')[-1]:
                shp_del_path = shp_hist_path
                shp_del = gpd.read_file(shp_del_path, encoding = 'cp949')

            if '생성' in shp_hist_path.split('/')[-1] or '신규' in shp_hist_path.split('/')[-1]:
                shp_new_path = shp_hist_path
                shp_new = gpd.read_file(shp_new_path, encoding = 'cp949')

        # 히스토리테이블 파일이 없을 경우 지역 스킵
        if len(shp_chg) == 0 or len(shp_del) == 0:
            print('수정 or 폐기에 대한 SHP파일이 없습니다')
            continue
        #########################################################################
        # 오류 체크 필드 추가
        shp['Error'] = None
        shp_chg['Error'] = None
        shp_del['Error'] = None
        shp_new['Error'] = None
        #########################################################################
        #법정동주소 SHP load-
        
        bjd_list = bjd.values.tolist()

        bjd_cd_list = []
        bjd_cd_nm_list = []

        bjd_layer_cd_list = bjd['RI_CD'].values.tolist()

        shp_list = shp.values.tolist()

        for N in shp_list:
            STDG_CD = N[4] #법정동코드
            STDG_ADDR = N[5] #법정동주소
            bjd_cd_list.append(STDG_CD)
            bjd_cd_nm_list.append([STDG_CD, STDG_ADDR])

        #########################################################################
        # 테스트용 오류 추가
        #201 - 형상
        #44,56 - 속성

        # shp.loc[201, 'O_UID'] = shp.loc[100301, 'O_UID'] # 08-07
        # shp.loc[44, 'O_UID'] = None # 08-08
        # shp.loc[56, 'O_UID'] = shp.loc[100301, 'O_UID'] # 08-09

        # #########################################################################
        # 히스토리테이블 관련

        # 파일 간ID 및 UID 중복 추출
        
        # 팜맵(shp)+폐기(shp_del)의 ID 중복 추출(파일 내 중복은 취급 X)
        # 1. 각 파일의 고유 ID 집합 만들기
        id_set_shp = set(shp['ID'])
        # id_set_chg = set(shp_chg['ID'])
        id_set_del = set(shp_del['ID'])

        # 2. 교차 중복되는 ID만 추출 (파일 간 중복만)
        duplicate_ids_all = (
            # (id_set_shp & id_set_chg) |
            # (id_set_chg & id_set_del) |
            (id_set_shp & id_set_del)
        )

        # 3. shp에서 해당 ID를 가진 행의 인덱스 찾기
        duplicate_indices = shp[shp['ID'].isin(duplicate_ids_all)].index
        # duplicate_indices_chg = shp_chg[shp_chg['ID'].isin(duplicate_ids_all)].index
        duplicate_indices_del = shp_del[shp_del['ID'].isin(duplicate_ids_all)].index

        # 4. 'Error' 컬럼 업데이트
        shp.loc[duplicate_indices, 'Error'] = shp.loc[duplicate_indices, 'Error'].fillna('').apply(
            lambda x: (x + ',' if x else '') + 'RS01-01'
        )
        # shp_chg.loc[duplicate_indices_chg, 'Error'] = shp_chg.loc[duplicate_indices_chg, 'Error'].fillna('').apply(
        #     lambda x: (x + ',' if x else '') + 'RS01-01'
        # )
        shp_del.loc[duplicate_indices_del, 'Error'] = shp_del.loc[duplicate_indices_del, 'Error'].fillna('').apply(
            lambda x: (x + ',' if x else '') + 'RS01-01'
        )

        # 팜맵(shp)+폐기(shp_del)의 UID 중복 추출(파일 내 중복은 취급 X)
        # 1. 각 파일의 고유 UID 집합 만들기
        uid_set_shp = set(shp['UID'])
        uid_set_del = set(shp_del['UID'])

        # 2. 교차 중복되는 ID만 추출 (파일 간 중복만)
        duplicate_uids_all = (
            (uid_set_shp & uid_set_del)
        )

        # 3. shp에서 해당 ID를 가진 행의 인덱스 찾기
        duplicate_indices = shp[shp['UID'].isin(duplicate_uids_all)].index
        duplicate_indices_del = shp_del[shp_del['UID'].isin(duplicate_uids_all)].index

        # 4. 'Error' 컬럼 업데이트
        shp.loc[duplicate_indices, 'Error'] = shp.loc[duplicate_indices, 'Error'].fillna('').apply(
            lambda x: (x + ',' if x else '') + 'RS02-01'
        )
        shp_del.loc[duplicate_indices_del, 'Error'] = shp_del.loc[duplicate_indices_del, 'Error'].fillna('').apply(
            lambda x: (x + ',' if x else '') + 'RS02-01'
        )

        ##############################################################################
        # 생성 파트

        if len(shp_new) > 0:
            # # 팜맵 내 생성 ID 추출
            # # 1. shp_new의 ID field
            # new_id_set = set(shp_new['ID'])

            # # 2. 겹치는 shp의 ID 행 추출
            # mask_overlap = shp['ID'].isin(new_id_set)
            # idx = shp.index[mask_overlap]

            # # 3. Error 필드에 'RS05-01' 누적 추가
            # shp.loc[idx, 'Error'] = shp.loc[idx, 'Error'].fillna('').apply(
            #     lambda x: (x + ',' if x else '') + 'RS03-01'
            # )
            # 중복 ID & UID 추출
            
            # 생성 ID 중복 추출
            # 1. 중복된 ID 추출
            duplicate_ids = shp_new.loc[shp_new['ID'].duplicated(keep=False), 'ID'].unique()

            # 2. shp에서 해당 ID를 가진 행의 인덱스 찾기
            duplicate_indices = shp_new[shp_new['ID'].isin(duplicate_ids)].index

            # 3. 'Error' 컬럼 업데이트
            shp_new.loc[duplicate_indices, 'Error'] = shp_new.loc[duplicate_indices, 'Error'].fillna('').apply(
                lambda x: (x + ',' if x else '') + 'RS03-02'
            )
            #######################################
            # 팜맵 - 생성(O_UID - UID 체크)
            new_uid_set = set(shp_new['UID'])

            condition_func = lambda row: is_invalid_o_uid_new(row, new_uid_set)
            shp['Error'] = shp.apply(lambda row: append_error(row, 'RS03-03', condition_func), axis=1)
            #######################################
            # 히스토리테이블 처리유형 및 코드 누락 / 처리사유 및 코드 누락 / 작업자 누락
#!!! BSH 수정
            mask_error = (shp_new['UPDT_TP_NM'] != '신규') | (shp_new['UPDT_TP_CD'] != '01')
            mask_error2 = (shp_new['CHG_RSN_NM'].isna()) | (shp_new['CHG_RSN_CD'].isna())
            mask_error3 = shp_new['OPRTR_NM'].isna()
#!!! BSH 수정까지
            # 조건에 해당하는 shp_new의 ID 값 추출
            ids_error1 = shp_new.loc[mask_error, 'ID']
            ids_error2 = shp_new.loc[mask_error2, 'ID']
            ids_error3 = shp_new.loc[mask_error3, 'ID']

            # shp_new 해당 ID 값을 가진 행의 인덱스 추출
            idx1 = shp_new[shp_new['ID'].isin(ids_error1)].index
            idx2 = shp_new[shp_new['ID'].isin(ids_error2)].index
            idx3 = shp_new[shp_new['ID'].isin(ids_error3)].index

            # shp_new Error 컬럼 업데이트
            shp_new.loc[idx1, 'Error'] = shp_new.loc[idx1, 'Error'].fillna('').apply(
                lambda x: (x + ',' if x else '') + 'RS03-04'
            )
            shp_new.loc[idx2, 'Error'] = shp_new.loc[idx2, 'Error'].fillna('').apply(
                lambda x: (x + ',' if x else '') + 'RS03-05'
            )
            shp_new.loc[idx3, 'Error'] = shp_new.loc[idx3, 'Error'].fillna('').apply(
                lambda x: (x + ',' if x else '') + 'RS03-06'
            )
        ##############################################################################
        # 수정 파트

        if len(shp_chg) > 0:
            # # 팜맵 내 수정ID 추출
            # # 1. shp_chg의 ID field
            # chg_id_set = set(shp_chg['ID'])

            # # 2. 겹치는 shp의 ID 행 추출
            # mask_overlap = shp['ID'].isin(chg_id_set)
            # idx = shp.index[mask_overlap]

            # # 3. Error 필드에 'RS05-01' 누적 추가
            # shp.loc[idx, 'Error'] = shp.loc[idx, 'Error'].fillna('').apply(
            #     lambda x: (x + ',' if x else '') + 'RS04-01'
            # )
            # 중복 ID & UID 추출
            
            # 수정 ID 중복 추출
            # 1. 중복된 ID 추출
            duplicate_ids = shp_chg.loc[shp_chg['ID'].duplicated(keep=False), 'ID'].unique()

            # 2. shp에서 해당 ID를 가진 행의 인덱스 찾기
            duplicate_indices = shp_chg[shp_chg['ID'].isin(duplicate_ids)].index

            # 3. 'Error' 컬럼 업데이트
            shp_chg.loc[duplicate_indices, 'Error'] = shp_chg.loc[duplicate_indices, 'Error'].fillna('').apply(
                lambda x: (x + ',' if x else '') + 'RS04-02'
            )
            #######################################
            # 팜맵 - 수정(O_UID - UID 체크)
            chg_uid_set = set(shp_chg['UID'])

            condition_func = lambda row: is_invalid_o_uid_chg(row, chg_uid_set)
            shp['Error'] = shp.apply(lambda row: append_error(row, 'RS04-03', condition_func), axis=1)
            #######################################
            # 히스토리테이블 처리유형 및 코드 누락 / 처리사유 및 코드 누락 / 작업자 누락
#!!! BSH 수정
            mask_error = (shp_chg['UPDT_TP_NM'] != '변경') | (shp_chg['UPDT_TP_CD'] != '03')
            mask_error2 = (shp_del['CHG_RSN_NM'].isna()) | (shp_chg['CHG_RSN_CD'].isna())
            mask_error3 = shp_chg['OPRTR_NM'].isna()
#!!! BSH 수정까지
            # 조건에 해당하는 shp_chg의 ID 값 추출
            ids_error1 = shp_chg.loc[mask_error, 'ID']
            ids_error2 = shp_chg.loc[mask_error2, 'ID']
            ids_error3 = shp_chg.loc[mask_error3, 'ID']

            # shp_chg 해당 ID 값을 가진 행의 인덱스 추출
            idx1 = shp_chg[shp_chg['ID'].isin(ids_error1)].index
            idx2 = shp_chg[shp_chg['ID'].isin(ids_error2)].index
            idx3 = shp_chg[shp_chg['ID'].isin(ids_error3)].index

            # shp_chg Error 컬럼 업데이트
            shp_chg.loc[idx1, 'Error'] = shp_chg.loc[idx1, 'Error'].fillna('').apply(
                lambda x: (x + ',' if x else '') + 'RS04-04'
            )
            shp_chg.loc[idx2, 'Error'] = shp_chg.loc[idx2, 'Error'].fillna('').apply(
                lambda x: (x + ',' if x else '') + 'RS04-05'
            )
            shp_chg.loc[idx3, 'Error'] = shp_chg.loc[idx3, 'Error'].fillna('').apply(
                lambda x: (x + ',' if x else '') + 'RS04-06'
            )
        ##############################################################################
        # 삭제 파트

        if len(shp_del) > 0:
            # 팜맵 내 삭제ID 추출
            # 1. shp_del의 ID field
            del_id_set = set(shp_del['ID'])

            # 2. 겹치는 shp의 ID 행 추출
            mask_overlap = shp['ID'].isin(del_id_set)
            idx = shp.index[mask_overlap]

            # 3. Error 필드에 'RS05-01' 누적 추가
            shp.loc[idx, 'Error'] = shp.loc[idx, 'Error'].fillna('').apply(
                lambda x: (x + ',' if x else '') + 'RS05-01'
            )
            # 중복 ID & UID 추출
            
            # 폐기 ID 중복 추출
            # 1. 중복된 ID 추출
            duplicate_ids = shp_del.loc[shp_del['ID'].duplicated(keep=False), 'ID'].unique()

            # 2. shp에서 해당 ID를 가진 행의 인덱스 찾기
            duplicate_indices = shp_del[shp_del['ID'].isin(duplicate_ids)].index

            # 3. 'Error' 컬럼 업데이트
            shp_del.loc[duplicate_indices, 'Error'] = shp_del.loc[duplicate_indices, 'Error'].fillna('').apply(
                lambda x: (x + ',' if x else '') + 'RS05-02'
            )
            #######################################
            # 팜맵 - 폐기(O_UID - UID 체크)
            del_uid_set = set(shp_del['UID'])

            condition_func = lambda row: is_invalid_o_uid_del(row, del_uid_set)
            shp['Error'] = shp.apply(lambda row: append_error(row, 'RS05-03', condition_func), axis=1)
            #######################################
            # 히스토리테이블 처리유형 및 코드 누락 / 처리사유 및 코드 누락 / 작업자 누락
#!!! BSH 수정
            mask_error = (shp_del['UPDT_TP_NM'] != '삭제') | (shp_del['UPDT_TP_CD'] != '02')
            mask_error2 = (shp_del['CHG_RSN_NM'].isna()) | (shp_del['CHG_RSN_CD'].isna())
            mask_error3 = shp_del['OPRTR_NM'].isna()
#!!! BSH 수정까지
            # 조건에 해당하는 shp_del의 ID 값 추출
            ids_error1 = shp_del.loc[mask_error, 'ID']
            ids_error2 = shp_del.loc[mask_error2, 'ID']
            ids_error3 = shp_del.loc[mask_error3, 'ID']

            # shp_del 해당 ID 값을 가진 행의 인덱스 추출
            idx1 = shp_del[shp_del['ID'].isin(ids_error1)].index
            idx2 = shp_del[shp_del['ID'].isin(ids_error2)].index
            idx3 = shp_del[shp_del['ID'].isin(ids_error3)].index

            # shp_del Error 컬럼 업데이트
            shp_del.loc[idx1, 'Error'] = shp_del.loc[idx1, 'Error'].fillna('').apply(
                lambda x: (x + ',' if x else '') + 'RS05-04'
            )
            shp_del.loc[idx2, 'Error'] = shp_del.loc[idx2, 'Error'].fillna('').apply(
                lambda x: (x + ',' if x else '') + 'RS05-05'
            )
            shp_del.loc[idx3, 'Error'] = shp_del.loc[idx3, 'Error'].fillna('').apply(
                lambda x: (x + ',' if x else '') + 'RS05-06'
            )
        ##############################################################################
        # 팜맵 테이블 기준

        # UPDT_YMD >> datetime 변경 적용 전 >> None일 경우 제외
        shp['UPDT_YMD'] = pd.to_datetime(shp['UPDT_YMD'], errors='coerce')

        YYYY = int(YYYY)
#!!! BSH 수정
        mask = (
            # (shp['UPDT_YMD'].dt.year == YYYY) &
            (shp['UPDT_TP_NM'] == '신규') &
            (shp['CHG_RSN_NM'] == '개간') &
            (shp['UID'].isna() | shp['O_UID'].notna())
        )

        # 에러 추가
        #shp.loc[mask, 'Error'] = shp.loc[mask, 'Error'].fillna('').apply(
        #    lambda x: (x + ',' if x else '') + 'RS08-01'
        #)

        mask = (
            # (shp['UPDT_YMD'].dt.year == YYYY) &
            (shp['UPDT_TP_NM'] == '변경') &
            (shp['CHG_RSN_NM'] == '합필') &
            (shp['O_UID'].astype(str).str.len() <= 8)
        )

        # 에러 추가
        shp.loc[mask, 'Error'] = shp.loc[mask, 'Error'].fillna('').apply(
            lambda x: (x + ',' if x else '') + 'RS08-02'
        )

        mask = (
            (shp['UPDT_TP_NM'] == '변경') &
            (shp['CHG_RSN_NM'] == '합필') &
            (
                (shp.apply(lambda row: str(row['UID']) not in str(row['O_UID']), axis=1)) |
                (shp.apply(lambda row: str(row['O_UID']).count(',') != str(row['O_CLSF_NM']).count(','), axis=1))
            )
        )

        # 에러 추가
        shp.loc[mask, 'Error'] = shp.loc[mask, 'Error'].fillna('').apply(
            lambda x: (x + ',' if x else '') + 'RS08-03'
        )

        mask = (
            # (shp['UPDT_YMD'].dt.year == YYYY) &
            (shp['UPDT_TP_NM'] == '변경') &
            (shp['CHG_RSN_NM'] == '분필') &
            (shp['UID'].isna() | (shp['UID'].astype(str).str.len() != 8) | (shp['O_UID'].astype(str).str.len() != 8) | shp['O_UID'].isna())
        )


        # 에러 추가
        shp.loc[mask, 'Error'] = shp.loc[mask, 'Error'].fillna('').apply(
            lambda x: (x + ',' if x else '') + 'RS08-04'
        )


        #mask = (
            # (shp['UPDT_YMD'].dt.year == YYYY) &
        #    (shp['UPDT_TP_NM'] == '변경') &
        #    (shp['CHG_RSN_NM'] == '분필') &
        #   (shp['UID'] == shp['O_UID'])
        #)

        # 에러 추가
        #shp.loc[mask, 'Error'] = shp.loc[mask, 'Error'].fillna('').apply(
        #    lambda x: (x + ',' if x else '') + 'RS08-05'
        #)

        mask = (
            # (shp['UPDT_YMD'].dt.year == YYYY) &
            (shp['UPDT_TP_NM'] == '변경') &
            (~shp['CHG_RSN_NM'].isin(['합필', '분필', '기타'])) &
            (shp['CLSF_NM'] == shp['O_CLSF_NM']) &
            (shp['O_UID'].isna())
        )

        # 에러 추가
        shp.loc[mask, 'Error'] = shp.loc[mask, 'Error'].fillna('').apply(
            lambda x: (x + ',' if x else '') + 'RS08-06'
        )

        mask = (
            (shp['UPDT_YMD'].dt.year == YYYY) &
            (shp['UPDT_TP_NM'] == '변경') &
            (~shp['CHG_RSN_NM'].isin(['합필', '분필', '기타'])) &
            (shp['CLSF_NM'] == shp['O_CLSF_NM']) &
            (shp['O_UID'].notna()) & # 해당 조건 없을 시, 08-06과 중복
            (shp['O_UID'] != shp['UID'])
        )

        # 에러 추가
        shp.loc[mask, 'Error'] = shp.loc[mask, 'Error'].fillna('').apply(
            lambda x: (x + ',' if x else '') + 'RS08-07'
        )

        mask = (
            # (shp['UPDT_YMD'].dt.year == YYYY) &
            (shp['UPDT_TP_NM'] == '변경') &
            (~shp['CHG_RSN_NM'].isin(['합필', '분필', '기타'])) &
            (shp['CLSF_NM'] != shp['O_CLSF_NM']) &
            (shp['O_UID'].isna())
        )

        # 에러 추가
        shp.loc[mask, 'Error'] = shp.loc[mask, 'Error'].fillna('').apply(
            lambda x: (x + ',' if x else '') + 'RS08-08'
        )

        mask = (
            (shp['UPDT_YMD'].dt.year == YYYY) &
            (shp['UPDT_TP_NM'] == '변경') &
            (~shp['CHG_RSN_NM'].isin(['합필', '분필', '기타'])) &
            (shp['CLSF_NM'] != shp['O_CLSF_NM']) &
            (shp['O_UID'].notna()) & # 해당 조건 없을 시, 08-08과 중복
            (shp['O_UID'] != shp['UID'])
        )

        # 에러 추가
        shp.loc[mask, 'Error'] = shp.loc[mask, 'Error'].fillna('').apply(
            lambda x: (x + ',' if x else '') + 'RS08-09'
        )

        mask = (
            # (shp['UPDT_YMD'].dt.year == YYYY) &
            (shp['UPDT_TP_NM'] == '변경') &
            (~shp['CHG_RSN_NM'].isin(['합필', '분필', '기타'])) &
            (shp['CLSF_NM'] == shp['O_CLSF_NM']) &
            (shp['O_UID'].isna())
        )
#!!! BSH 수정까지
        # 에러 추가
        shp.loc[mask, 'Error'] = shp.loc[mask, 'Error'].fillna('').apply(
            lambda x: (x + ',' if x else '') + 'RS08-10'
        )

        # # 연도가 다른 행 필터링
        mask_year = shp['UPDT_YMD'].dt.year != YYYY

        # shp_pre의 ID → O_UID 매핑 생성
        if len(shp_pre) > 0:
            shp_pre_o_uid_dict = shp_pre.set_index('ID')['O_UID'].to_dict()

            #조건 체크 및 에러 대상 인덱스 수집
            mismatched_indices = []

            for idx, row in shp[mask_year].iterrows():
                id_val = row['ID']
                o_uid_shp = row['O_UID']
                o_uid_shp_pre = shp_pre_o_uid_dict.get(id_val)

                if o_uid_shp_pre is not None and o_uid_shp != o_uid_shp_pre:
                    mismatched_indices.append(idx)

            # Error 필드에 'RS08-11' 추가 (누적 방식)
            for idx in mismatched_indices:
                current = shp.loc[idx, 'Error']
                shp.loc[idx, 'Error'] = (current + ',' if pd.notna(current) and current != '' else '') + 'RS08-11'

        #########################################################################
        # shp에서 UPDT_TP_NM이 None인 행 추출
        if len(shp_pre) > 0:
            mask_none = shp['UPDT_TP_NM'].isna()
            target_rows = shp[mask_none][['ID', 'AREA']]

            # shp_pre에서 ID 기준으로 AREA 정보 가져오기
            shp_pre_area_dict = shp_pre.set_index('ID')['AREA'].to_dict()

            # AREA 값 비교 → 다른 경우만 추출
            mismatched_indices = []

            for idx, row in target_rows.iterrows():
                id_val = row['ID']
                shp_area = row['AREA']
                pre_area = shp_pre_area_dict.get(id_val)

                # shp_pre에 ID가 있고, AREA 값이 다르면 에러
                if pre_area is not None and shp_area != pre_area:
                    mismatched_indices.append(idx)

            # 에러 추가
            for idx in mismatched_indices:
                current_error = shp.loc[idx, 'Error']
                shp.loc[idx, 'Error'] = (current_error + ',' if pd.notna(current_error) and current_error != '' else '') + 'RS09-01'
#!!! BSH 수정
        # 조건 필터링: shp에서 대상 행 추출
        mask = (
            (shp['UPDT_TP_NM'] == '변경') &
            (~shp['CHG_RSN_NM'].isin(['합필', '분필', '기타'])) &
            (shp['CLSF_NM'] != shp['O_CLSF_NM']) &
            (shp['UP_DTP1_NM'] != '형상속성')
        )
#!!! BSH 수정까지
        target_rows = shp[mask][['ID', 'AREA']]

        # shp_chg에서 ID → AREA 딕셔너리 생성
        chg_area_dict = shp_chg.set_index('ID')['AREA'].to_dict()

        # AREA 비교
        mismatched_indices = []

        for idx, row in target_rows.iterrows():
            id_val = row['ID']
            shp_area = row['AREA']
            chg_area = chg_area_dict.get(id_val)

            if chg_area is not None and int(shp_area) != int(chg_area):
                mismatched_indices.append(idx)

        # Error
        for idx in mismatched_indices:
            current = shp.loc[idx, 'Error']
            shp.loc[idx, 'Error'] = (current + ',' if pd.notna(current) and current != '' else '') + 'RS09-02'

#!!! BSH 수정
        # 1. 조건 필터링: 대상 행 추출
        mask = (
            (shp['UPDT_TP_NM'] == '변경') &
            (~shp['CHG_RSN_NM'].isin(['합필', '분필', '기타'])) &
            (shp['CLSF_NM'] == shp['O_CLSF_NM'])
        )
#!!! BSH 수정까지
        target_rows = shp[mask][['ID', 'CLSF_NM']]

        # 2. shp_chg에서 ID → CLSF_NM 딕셔너리 생성
        chg_clsf_dict = shp_chg.set_index('ID')['CLSF_NM'].to_dict()

        # 3. CLSF_NM 비교
        mismatched_indices = []

        for idx, row in target_rows.iterrows():
            id_val = row['ID']
            clsf_shp = row['CLSF_NM']
            clsf_chg = chg_clsf_dict.get(id_val)

            if clsf_chg is not None and clsf_shp != clsf_chg:
                mismatched_indices.append(idx)

        # 4. Error 컬럼에 'RS09-03' 누적 추가
        for idx in mismatched_indices:
            current = shp.loc[idx, 'Error']
            shp.loc[idx, 'Error'] = (current + ',' if pd.notna(current) and current != '' else '') + 'RS09-03'


        #########################################################################
        # ID 중복성 체크
        first_value_indices = {}
        duplicate_indices = []

        for index, sub_list in enumerate(shp_list):
            first_value = sub_list[0]

            if pd.isna(first_value):
                continue

            if first_value in first_value_indices:
                duplicate_indices.append((first_value_indices[first_value], index))
            else:
                first_value_indices[first_value] = index

        # 중복된 인덱스에 'AC05-03' 에러코드 추가
        for pair in duplicate_indices:
            for idx in pair:
                shp.loc[idx, 'Error'] = (shp.loc[idx, 'Error'] + ',' if shp.loc[idx, 'Error'] else '') + 'ID_중복'

        # UID 중복성 체크
        #first_value_indices = {}
        #duplicate_indices = []

        #for index, sub_list in enumerate(shp_list):
         #   first_value = sub_list[1]
#
 #           if pd.isna(first_value):
  #              continue

   #         if first_value in first_value_indices:
    #            duplicate_indices.append((first_value_indices[first_value], index))
     #       else:
      #          first_value_indices[first_value] = index

        # 중복된 인덱스에 'AC05-03' 에러코드 추가
        for pair in duplicate_indices:
            for idx in pair:
                shp.loc[idx, 'Error'] = (shp.loc[idx, 'Error'] + ',' if shp.loc[idx, 'Error'] else '') + 'UID_중복'
        #########################################################################

        # 오류 개수 count
        shp['Error'] = shp['Error'].replace('', np.nan)
        count = (shp['Error'].notna() & (shp['Error'] != '')).sum()

        # SAVE
        print('오류 개수: ' + str(count) + '개')
        if count > 0:
            if not os.path.exists(save_path):
                os.makedirs(save_path)
            
            shp['Error'] = shp['Error'].apply(lambda x: ', '.join(map(str, x)) if isinstance(x, list) else x) # list 형태 변환
            # shp.to_file(save_path +'/'+shp_name+'_품질검수.shp', encoding='cp949') #shp convert
            # shp.to_file(save_path +'/'+shp_name+'_품질검수.shp', encoding='cp949') #shp convert

            # Error 컬럼의 문자열 길이가 5 이상인 행만 추출
            # shp['체크'] = shp['체크'].replace(['3', '9', '10'], None)
            shp_error_filtered = shp[shp['Error'].fillna('').str.len() >= 5]    
            shp_error_filtered['UPDT_YMD'] = shp_error_filtered['UPDT_YMD'].astype(str)
            shp_error_filtered.to_file(save_path +'/'+shp_name+'_검수결과.shp', encoding="cp949")
            shp_error_filtered.drop(columns='geometry').to_excel(save_path +'/'+shp_name+'_검수결과.xlsx', index=False)
#!!! BSH 수정
            # '생성'의 에러 데이터가 존재할 경우 저장
            shp_error_filtered = shp_new[shp_new['Error'].fillna('').str.len() >= 5]
            if len(shp_error_filtered) > 0:
                shp_error_filtered['UPDT_YMD'] = shp_error_filtered['UPDT_YMD'].astype(str)
                shp_error_filtered.to_file(save_path +'/'+shp_name+'_신규_검수결과.shp', encoding="cp949")
                shp_error_filtered.drop(columns='geometry').to_excel(save_path +'/'+shp_name+'_신규_검수결과.xlsx', index=False)

            # '수정'의 에러 데이터가 존재할 경우 저장
            shp_error_filtered = shp_chg[shp_chg['Error'].fillna('').str.len() >= 5]
            if len(shp_error_filtered) > 0:
                shp_error_filtered['UPDT_YMD'] = shp_error_filtered['UPDT_YMD'].astype(str)
                shp_error_filtered.to_file(save_path +'/'+shp_name+'_변경_검수결과.shp', encoding="cp949")
                shp_error_filtered.drop(columns='geometry').to_excel(save_path +'/'+shp_name+'_변경_검수결과.xlsx', index=False)

            # '폐기'의 에러 데이터가 존재할 경우 저장
            shp_error_filtered = shp_del[shp_del['Error'].fillna('').str.len() >= 5]    
            if len(shp_error_filtered) > 0:
                shp_error_filtered['UPDT_YMD'] = shp_error_filtered['UPDT_YMD'].astype(str)
                shp_error_filtered.to_file(save_path +'/'+shp_name+'_삭제_검수결과.shp', encoding="cp949")
                shp_error_filtered.drop(columns='geometry').to_excel(save_path +'/'+shp_name+'_삭제_검수결과.xlsx', index=False)
#!!! BSH 수정까지
            count_shp_list.append(shp_name)

        if count == 0:
            not_count_shp_list.append(shp_name)

        #########################################################################
    
    location_numb += 1

검사 지역: 2025_경상남도_함안군
저장 경로: C:\Users\Opt_AI\Desktop\Tool\입력 및 검수\속성검수결과\2025_경상남도_함안군
오류 개수: 0개
