In [34]:
import zipfile
import os
import tempfile
import shutil

def filter_zip(zip_path, should_delete):
    """
    zip_path: 원본 zip 파일 경로
    should_delete: 파일명을 인자로 받아, 삭제할 파일이면 True를 반환하는 함수
    """
    # 임시 파일 생성
    tmp_fd, tmp_path = tempfile.mkstemp(suffix='.zip')
    os.close(tmp_fd)
    
    with zipfile.ZipFile(zip_path, 'r') as zin, \
         zipfile.ZipFile(tmp_path, 'w') as zout:
        for item in zin.infolist():
            # should_delete 함수가 True를 반환하면 해당 파일은 새 압축파일에 추가하지 않음
            if not should_delete(item.filename):
                print(f"Deleting {item.filename}")
                continue
            # 파일 읽어와서 새 압축파일에 기록
            file_data = zin.read(item.filename)
            zout.writestr(item, file_data)
    
    # 원본 파일을 백업하거나 새 파일로 교체 (여기서는 원본 파일을 대체)
    shutil.move(tmp_path, zip_path)
    print("압축파일 업데이트 완료!")

# 예시: 파일명이 '.txt' 확장자로 끝나면 삭제하도록 필터링 함수 정의
def delete_txt_files(filename):
    return filename.endswith('00.csv')

# 사용 예시
zip_file_path = 'D:\\PROJECT\\2025\\01.CarbonMap\\01.Data\\02.Weather\\01_Weather_202307.zip'
filter_zip(zip_file_path, delete_txt_files)

Deleting WEATHER_202307010005.csv
Deleting WEATHER_202307010010.csv
Deleting WEATHER_202307010015.csv
Deleting WEATHER_202307010020.csv
Deleting WEATHER_202307010025.csv
Deleting WEATHER_202307010030.csv
Deleting WEATHER_202307010035.csv
Deleting WEATHER_202307010040.csv
Deleting WEATHER_202307010045.csv
Deleting WEATHER_202307010050.csv
Deleting WEATHER_202307010055.csv
Deleting WEATHER_202307010105.csv
Deleting WEATHER_202307010110.csv
Deleting WEATHER_202307010115.csv
Deleting WEATHER_202307010120.csv
Deleting WEATHER_202307010125.csv
Deleting WEATHER_202307010130.csv
Deleting WEATHER_202307010135.csv
Deleting WEATHER_202307010140.csv
Deleting WEATHER_202307010145.csv
Deleting WEATHER_202307010150.csv
Deleting WEATHER_202307010155.csv
Deleting WEATHER_202307010205.csv
Deleting WEATHER_202307010210.csv
Deleting WEATHER_202307010215.csv
Deleting WEATHER_202307010220.csv
Deleting WEATHER_202307010225.csv
Deleting WEATHER_202307010230.csv
Deleting WEATHER_202307010235.csv
Deleting WEATH

In [35]:
import math

# -----------------------------------
# 상수 및 맵핑
# -----------------------------------
X0 = 700000
Y0 = 1300000
Unit100_000m = 100000
IDX08 = 8

# Java 코드에서 사용한 char[] Names = {'가', '나', '다', '라', '마', '바', '사', '아', 'a', 'b'};
Names = ['가', '나', '다', '라', '마', '바', '사', '아', 'a', 'b']

# Java의 hashNames 초기화 로직
hashNames = {}
k = 0
for i, ch in enumerate(Names):
    if i >= IDX08:
        # i=8부터 'a','b'에 대해 0,1 할당
        hashNames[ch] = k
        k += 1
    else:
        # i=0~7에 대해 '가'=0, '나'=1, ... '아'=7
        hashNames[ch] = i

# -----------------------------------
# (1) UTM-K → 500m 그리드 ID
# -----------------------------------
def nationalGridIDx500m(ux, uy):
    """
    UTM-K 좌표 (ux, uy)를 입력받아 500m 단위 그리드 ID를 생성한다.
    """
    # 1) 100km 단위
    unit = Unit100_000m

    xn = int(ux - X0)
    yn = int(uy - Y0)

    xn0 = xn // unit  # x축 100km 블록 인덱스
    yn0 = yn // unit  # y축 100km 블록 인덱스

    if xn0 < 0 or xn0 > 7 or yn0 < 0 or yn0 > 7:
        return None

    xn1 = xn - xn0 * unit  # 100km 블록 내부 x잔여
    yn1 = yn - yn0 * unit  # 100km 블록 내부 y잔여

    # 2) 1km 단위 (1000m)
    unit = unit // 100
    xr0 = xn1 // unit  # x축 1km 블록 인덱스
    yr0 = yn1 // unit  # y축 1km 블록 인덱스

    xn2 = xn1 - xr0 * unit  # 1km 블록 내부 x잔여
    yn2 = yn1 - yr0 * unit  # 1km 블록 내부 y잔여

    # 3) 500m 단위
    unit = unit // 2
    xr1 = xn2 // unit  # 500m 세분화(0 or 1)
    yr1 = yn2 // unit  # 500m 세분화(0 or 1)

    # 4) 그리드 ID 문자열 구성
    #    - 첫 2글자: 100km 인덱스 (가,나,다,라,마,바,사,아)
    #    - 그 뒤 2자리: 1km 인덱스
    #    - 500m: a or b
    #    - y축도 동일 구조
    result = []
    result.append(Names[xn0])   # ex) '다'
    result.append(Names[yn0])   # ex) '사'

    # x축 1km 인덱스 2자리
    if xr0 > 9:
        result.append(str(xr0))
    else:
        result.append("0")
        result.append(str(xr0))

    # 500m 인덱스 (a/b)
    result.append(Names[IDX08 + xr1])   # ex) a or b

    # y축 1km 인덱스 2자리
    if yr0 > 9:
        result.append(str(yr0))
    else:
        result.append("0")
        result.append(str(yr0))

    # 500m 인덱스 (a/b)
    result.append(Names[IDX08 + yr1])   # ex) a or b

    return "".join(result)

# -----------------------------------
# (2) 500m 그리드 ID → UTM-K
# -----------------------------------
def utmKFrom500mGridID(nationalID, center=False):
    """
    500m 단위 그리드 ID를 입력받아 UTM-K 좌표(x, y)를 반환한다.
    center=True 면 해당 500m 셀의 중앙좌표를 반환
    """
    # Java 코드는 정규식 패턴 검사(patterns[3]) 후 진행하지만,
    # 여기서는 단순 길이 체크나 생략 가능. 필요 시 re 모듈로 보강.
    if len(nationalID) != 8:
        raise ValueError(f"Invalid 500m Grid ID: {nationalID}")

    # 1) 100km 블록 인덱스
    xn0 = hashNames[nationalID[0]]  # '다'
    yn0 = hashNames[nationalID[1]]  # '사'

    # 2) 1km 단위
    unit_1km = Unit100_000m // 100
    xr0 = int(nationalID[2:4]) * unit_1km  # x축 1km
    yr0 = int(nationalID[5:7]) * unit_1km  # y축 1km

    # 3) 500m 단위
    unit_500m = unit_1km // 2
    xn1 = hashNames[nationalID[4]]  # 'a' or 'b' → 0 or 1
    yn1 = hashNames[nationalID[7]]  # 'a' or 'b' → 0 or 1

    x = X0 + Unit100_000m * xn0 + xr0 + xn1 * unit_500m
    y = Y0 + Unit100_000m * yn0 + yr0 + yn1 * unit_500m

    # 중심좌표를 원한다면 500m 절반(=250m)만큼 이동
    if center:
        x += unit_500m * 0.5
        y += unit_500m * 0.5

    return (x, y)

# -----------------------------------
# 간단 테스트
# -----------------------------------
if __name__ == "__main__":
    # 예: Java 코드에서 "다사54b44b" -> (x, y) 변환 후 다시 500m ID를 찍어보기
    grid_id = "나b75a78a"
    xy = utmKFrom500mGridID(grid_id, center=True)
    print(f"{grid_id} -> {xy}")

    # 다시 UTM-K -> 500m ID
    back_id = nationalGridIDx500m(xy[0], xy[1])
    print(f"{xy} -> {back_id}")

나b75a78a -> (875250.0, 1478250.0)
(875250.0, 1478250.0) -> 나나75a78a


In [41]:
from shapely.geometry import Point
from shapely import speedups

# Shapely speedups 활성화
if speedups.available:
    speedups.enable()

def create_prepared_dict(gdf):
    """
    GeoDataFrame의 인덱스와 'prep_geom' 열의 prepared geometry를 딕셔너리로 변환합니다.
    """
    return {idx: geom for idx, geom in zip(gdf.index, gdf['prep_geom'])}

def flagContains(utm_x, utm_y, prep_dict, sindex):
    """
    주어진 UTM 좌표 (utm_x, utm_y)가 공간 인덱스(sindex)를 활용해,
    딕셔너리(prep_dict)에 저장된 준비된(prepared) 지오메트리 중 하나에 포함되는지 검사합니다.
    """
    pt = Point(utm_x, utm_y)
    candidate_indexes = list(sindex.intersection(pt.bounds))
    for idx in candidate_indexes:
        # 딕셔너리에서 준비된 지오메트리를 빠르게 조회
        if prep_dict[idx].contains(pt):
            return True
    return False

  speedups.enable()


In [None]:
import zipfile
import csv
import io
import os
import tempfile
import shutil
from pyproj import CRS, Transformer

import pandas as pd
import geopandas as gpd
from shapely.prepared import prep


def add_gid_to_csv_in_zip(zip_path, output_zip_path=None, sido_path=None):
    """
    ZIP 파일 내부의 CSV 파일들을 읽어, 각 CSV 파일에 대해
    - lon, lat 값을 vectorized하게 UTM-K 좌표로 변환하고,
    - GeoPandas의 spatial join을 이용해 shapefile(행정구역) 내에 포함되는 점만 남긴 후,
    - 각 점에 대해 gid 값을 nationalGridIDx500m를 통해 생성하여 'gid' 열에 추가합니다.
    처리 후 결과 CSV 파일들을 새 ZIP 파일에 저장합니다.
    """
    crs_utmk_str = "+proj=tmerc +lat_0=38 +lon_0=127.5 +k=0.9996 +x_0=1000000 +y_0=2000000 +ellps=GRS80 +units=m +no_defs"

    # 행정구역 shapefile이 없으면 처리하지 않음
    if not sido_path:
        print("행정구역 shapefile이 주어지지 않았습니다. gid 필드를 추가하지 않습니다.")
        return

    sido_gdf = gpd.read_file(sido_path)
    
    if sido_gdf.crs is None:
        sido_gdf = sido_gdf.set_crs(crs_utmk_str, allow_override=True)
    else:
        sido_gdf = sido_gdf.to_crs(crs_utmk_str)

    # output_zip_path가 지정되지 않았다면, 임시 파일을 생성하고 이후 원본과 교체
    if not output_zip_path:
        tmp_fd, tmp_zip = tempfile.mkstemp(suffix='.zip')
        os.close(tmp_fd)
        output_zip_path = tmp_zip
        replace_original = True
    else:
        replace_original = False

    # 좌표 변환 설정: WGS84 -> UTM-K
    crs_wsg84 = CRS.from_epsg(4326)
    crs_utmk = CRS.from_proj4(crs_utmk_str)
    transformer = Transformer.from_crs(crs_wsg84, crs_utmk, always_xy=True)

    with zipfile.ZipFile(zip_path, 'r') as zip_in, \
         zipfile.ZipFile(output_zip_path, 'w') as zip_out:

        for item in zip_in.infolist():
            filename = item.filename
            print(f"Processing {filename}...")

            # CSV 파일 처리
            if filename.lower().endswith('.csv'):
                csv_data = zip_in.read(filename)
                text_data = csv_data.decode('utf-8', errors='replace')
                # pandas를 이용해 CSV를 읽음
                try:
                    df = pd.read_csv(io.StringIO(text_data))
                except Exception as e:
                    print(f"[ERROR] {filename} CSV 읽기 실패: {e}")
                    zip_out.writestr(filename, csv_data)
                    continue

                # lon, lat 열이 없는 경우 원본 그대로 기록
                if 'lon' not in df.columns or 'lat' not in df.columns:
                    print(f"[WARNING] '{filename}'에서 'lon' 또는 'lat' 열을 찾을 수 없습니다. 원본 그대로 복사합니다.")
                    zip_out.writestr(filename, csv_data)
                    continue

                # 좌표 변환: vectorized 처리
                try:
                    # 위경도 값들을 float로 변환 후 numpy arrays로 받음
                    lon_vals = df['lon'].astype(float).values
                    lat_vals = df['lat'].astype(float).values
                except Exception as e:
                    print(f"[ERROR] {filename}의 위경도 값 파싱 오류: {e}")
                    zip_out.writestr(filename, csv_data)
                    continue

                utm_x, utm_y = transformer.transform(lon_vals, lat_vals)

                # UTM 좌표를 기반으로 포인트 생성 (vectorized)
                gdf_points = gpd.GeoDataFrame(df, 
                                              geometry=gpd.points_from_xy(utm_x, utm_y),
                                              crs=crs_utmk.to_string())
                # 공간 조인을 통해, sido_gdf 내에 포함되는 점만 선택
                # sjoin은 좌표계가 동일해야 하므로, sido_gdf의 CRS가 UTM-K라고 가정합니다.
                try:
                    # inner join: 점이 polygon 내부에 있어야만 남김
                    joined = gpd.sjoin(gdf_points, sido_gdf, how='inner', predicate='within')
                except Exception as e:
                    print(f"[ERROR] {filename} spatial join 실패: {e}")
                    zip_out.writestr(filename, csv_data)
                    continue

                if joined.empty:
                    # 모든 점이 필터링되어 결과가 없으면, 빈 CSV 파일로 기록
                    new_csv = ""
                else:
                    # gid 값 생성: vectorized 또는 apply 방식으로 각 행에 대해 gid를 생성
                    joined['gid'] = joined.apply(lambda row: nationalGridIDx500m(row.geometry.x, row.geometry.y), axis=1)
                    # 필요한 열만 선택해서 CSV로 저장 (원본 열 순서를 유지하거나, 필요에 따라 조정)
                    # 여기서는 원본 df의 열 + 'gid'
                    # 만약 'gid' 열이 이미 존재했다면 덮어씌우게 됩니다.
                    output_cols = list(df.columns)
                    if 'gid' not in output_cols:
                        output_cols.append('gid')
                    # sjoin 결과에는 추가 정보(인덱스_right 등)가 있으므로, 선택적으로 제거
                    new_csv = joined[output_cols].to_csv(index=False)

                # 기록할 CSV 데이터를 bytes로 인코딩
                zip_out.writestr(filename, new_csv.encode('utf-8'))
            else:
                # CSV 외의 파일은 그대로 복사
                zip_out.writestr(filename, zip_in.read(filename))

    if replace_original:
        shutil.move(output_zip_path, zip_path)
        print(f"'{zip_path}'에 gid 필드가 추가된 버전을 덮어썼습니다.")
    else:
        print(f"'{zip_path}' -> '{output_zip_path}'로 gid 필드를 추가한 새 zip을 생성했습니다.")


def process_zip_files_in_folder(input_folder, sido_path, output_folder):
    # output 폴더가 존재하지 않으면 생성
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    
    # input_folder 내의 모든 파일에 대해 반복
    for filename in os.listdir(input_folder):
        if filename.lower().endswith('.zip'):
            input_zip_path = os.path.join(input_folder, filename)
            base, ext = os.path.splitext(filename)
            new_filename = f"{base}_with_gid{ext}"
            output_zip_path = os.path.join(output_folder, new_filename)
            print(f"Processing {input_zip_path} -> {output_zip_path} ...")
            add_gid_to_csv_in_zip(input_zip_path, output_zip_path, sido_path)

# 사용 예시
if __name__ == "__main__":
    sido_path = "D:\\PROJECT\\2025\\01.CarbonMap\\01.Data\\03.행정구역\\sido_2023.shp"
    original_zip = "D:\\PROJECT\\2025\\01.CarbonMap\\01.Data\\02.Weather"               # 원본 ZIP 파일 경로
    updated_zip = "D:\\PROJECT\\2025\\01.CarbonMap\\01.Data\\02.Weather\\add_grid"      # gid 필드 추가 후 저장할 ZIP 파일 경로
    
    process_zip_files_in_folder(original_zip, sido_path, updated_zip)
    print("모든 작업이 완료되었습니다.")

Processing D:\PROJECT\2025\01.CarbonMap\01.Data\02.Weather\01_Weather_202301.zip -> D:\PROJECT\2025\01.CarbonMap\01.Data\02.Weather\add_grid\01_Weather_202301_with_gid.zip ...
Processing WEATHER_202301010000.csv...
Processing WEATHER_202301010100.csv...
Processing WEATHER_202301010200.csv...
Processing WEATHER_202301010300.csv...
Processing WEATHER_202301010400.csv...
Processing WEATHER_202301010500.csv...
Processing WEATHER_202301010600.csv...
Processing WEATHER_202301010700.csv...
Processing WEATHER_202301010800.csv...
Processing WEATHER_202301010900.csv...
Processing WEATHER_202301011000.csv...
Processing WEATHER_202301011100.csv...
Processing WEATHER_202301011200.csv...
Processing WEATHER_202301011300.csv...
Processing WEATHER_202301011400.csv...
Processing WEATHER_202301011500.csv...
Processing WEATHER_202301011600.csv...
Processing WEATHER_202301011700.csv...
Processing WEATHER_202301011800.csv...
Processing WEATHER_202301011900.csv...
Processing WEATHER_202301012000.csv...
Proce

In [33]:
from shapely.prepared import prep
from pyproj import CRS, Transformer
import geopandas as gpd

# 좌표 변환
crs_wsg84 = CRS.from_epsg(4326)
crs_utmk = CRS.from_proj4("+proj=tmerc +lat_0=38 +lon_0=127.5 +k=0.9996 + x_0=1000000 + y_0=2000000 + ellps=GRS80 +units=m +no_defs")
transformer = Transformer.from_crs(crs_wsg84, crs_utmk, always_xy=True)

lon, lat = 126.18531,31.484838
utm_x, utm_y = transformer.transform(lon, lat)
print(utm_x, utm_y)

print(nationalGridIDx500m(utm_x, utm_y))

sido_path = "D:\\PROJECT\\2025\\01.CarbonMap\\01.Data\\03.행정구역\\sido_2023.shp"
sido_gdf = gpd.read_file(sido_path)
# 각 폴리곤을 준비된 지오메트리로 변환 (속도 향상 효과)
sido_gdf["prep_geom"] = sido_gdf.geometry.apply(prep)
sindex = sido_gdf.sindex  # 공간 인덱스 생성

# 폴리곤 내부 포함 여부 검사하여 gid 결정
if flagContains(utm_x, utm_y, sido_gdf, sindex):
    gid_value = nationalGridIDx500m(utm_x, utm_y)
else:
    gid_value = None

print(gid_value)

875126.1959198355 1278269.4860866773
None
None
