In [16]:
import io
import re
import zipfile
from pathlib import Path
from tempfile import TemporaryDirectory, TemporaryFile
from typing import Mapping

import dask.dataframe as dd
import pandas as pd
from dask.diagnostics import ProgressBar
from tqdm import tqdm

from openeais import get_numpy_type, openeais_to_parquet

In [7]:
for file in Path("data/").glob("국토교통부_*.zip"):
    print(file)

data\국토교통부_건축인허가_가설건축물+(2022년+12월).zip
data\국토교통부_건축인허가_공작물관리대장+(2022년+12월).zip
data\국토교통부_건축인허가_기본개요+(2022년+12월).zip
data\국토교통부_건축인허가_대수선+(2022년+12월).zip
data\국토교통부_건축인허가_대지위치+(2022년+12월).zip
data\국토교통부_건축인허가_도로대장+(2022년+12월).zip
data\국토교통부_건축인허가_동별개요+(2022년+12월).zip
data\국토교통부_건축인허가_부설주차장+(2022년+12월).zip
data\국토교통부_건축인허가_오수정화시설+(2022년+12월).zip
data\국토교통부_건축인허가_전유공용면적+(2022년+12월).zip
data\국토교통부_건축인허가_주차장+(2022년+12월).zip
data\국토교통부_건축인허가_주택유형+(2022년+12월).zip
data\국토교통부_건축인허가_지역지구구역+(2022년+12월).zip
data\국토교통부_건축인허가_철거멸실관리대장+(2022년+12월).zip
data\국토교통부_건축인허가_층별개요+(2022년+12월).zip
data\국토교통부_건축인허가_호별개요+(2022년+12월).zip
data\국토교통부_건축인허가_호별전유공용면적+(2022년+12월).zip
data\국토교통부_주택인허가_관리공동부대복리시설+(2022년+12월).zip
data\국토교통부_주택인허가_관리공동형별개요+(2022년+12월).zip
data\국토교통부_주택인허가_기본개요+(2022년+12월).zip
data\국토교통부_주택인허가_대지위치+(2022년+12월).zip
data\국토교통부_주택인허가_동별개요+(2022년+12월).zip
data\국토교통부_주택인허가_복리분양시설+(2022년+12월).zip
data\국토교통부_주택인허가_부대시설+(2022년+12월).zip
data\국토교통부_주택인허가_부설주차장+(2022년+12월).zip
data\국토교통부_주택인허가_오수정

In [14]:
zpath = sorted(Path("data/").glob("국토교통부_*.zip"), reverse=True)[0]
print(zpath)

with zipfile.ZipFile(zpath) as zf:
    for fname in zf.namelist():
        with zf.open(fname) as bf:
            with io.TextIOWrapper(bf, encoding="cp949") as f:
                print(f.readline())
                break


data\국토교통부_주택인허가_호별개요+(2022년+12월).zip
26410-100003037|26410-100004542|부산광역시 금정구 장전동 521번지|장전동 금정산 SK VIEW|26410|10800|0|0521|||||102|13|20|지상|0|1304|160|||20100828



In [13]:
with TemporaryFile() as tf:
    tf.write(b"Hello world!")
    tf.seek(0)
    print(tf.read())


b'Hello world!'


In [11]:
with TemporaryDirectory() as tdir:
    print(type(tdir))
    print(tdir)


<class 'str'>
C:\Users\USER\AppData\Local\Temp\tmpy5t1hxke


In [15]:
print(zpath)

with TemporaryDirectory() as tdir:  # type: str
    with zipfile.ZipFile(zpath) as zf:
        print(zf.infolist())


data\국토교통부_주택인허가_호별개요+(2022년+12월).zip
[<ZipInfo filename='mart_jty_04.txt' compress_type=deflate filemode='-rw-r--r--' file_size=884446270 compress_size=58499197>]


In [34]:
schema_dir = Path("data/schema")

substrings_filename = re.split(r"[_ +\.]", zpath.name)
schema_filename = "_".join(["schema"] + substrings_filename[1:3]) + ".csv"

df_schema = pd.read_csv(schema_dir / schema_filename, header=None)
df_schema[1] = df_schema[1].apply(get_numpy_type)
df_schema = df_schema.set_index(0)
df_schema

Unnamed: 0_level_0,1
0,Unnamed: 1_level_1
관리_호별_명세_PK,string
관리_동별_개요_PK,string
대지_위치,string
건물_명,string
시군구_코드,string
법정동_코드,string
대지_구분_코드,string
번,string
지,string
특수지_명,string


In [38]:
df_schema[1].to_dict()

{'관리_호별_명세_PK': 'string',
 '관리_동별_개요_PK': 'string',
 '대지_위치': 'string',
 '건물_명': 'string',
 '시군구_코드': 'string',
 '법정동_코드': 'string',
 '대지_구분_코드': 'string',
 '번': 'string',
 '지': 'string',
 '특수지_명': 'string',
 '블록': 'string',
 '로트': 'string',
 '동명': 'string',
 '층_번호': 'Int64',
 '층_구분_코드': 'string',
 '층_구분_코드_명': 'string',
 '호_번호': 'Int64',
 '호_명': 'string',
 '평형_구분_명': 'string',
 '변경_구분_코드': 'string',
 '변경_구분_코드_명': 'string',
 '생성_일자': 'string'}

In [37]:
print(zpath)

with TemporaryDirectory() as tdir:  # type: str
    with zipfile.ZipFile(zpath) as zf:
        zf.extractall(path=tdir, members=tqdm(zf.infolist(), desc="Extracting zip"))

    paths = sorted(Path(tdir).glob("*.txt"))
    ddf = dd.read_csv(
        paths,
        encoding="cp949",
        header=None,
        sep="|",
        names=df_schema.index.to_list(),
        dtype=df_schema[1].to_dict(),
    )


data\국토교통부_주택인허가_호별개요+(2022년+12월).zip


Extracting zip: 100%|██████████| 1/1 [00:01<00:00,  1.93s/it]


In [39]:
ddf

Unnamed: 0_level_0,관리_호별_명세_PK,관리_동별_개요_PK,대지_위치,건물_명,시군구_코드,법정동_코드,대지_구분_코드,번,지,특수지_명,블록,로트,동명,층_번호,층_구분_코드,층_구분_코드_명,호_번호,호_명,평형_구분_명,변경_구분_코드,변경_구분_코드_명,생성_일자
npartitions=13,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1
,string,string,string,string,string,string,string,string,string,string,string,string,string,Int64,string,string,Int64,string,string,string,string,string
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [48]:
substrings_filename = re.split(r"[_ +.()]+", zpath.name)
substrings_filename

['국토교통부', '주택인허가', '호별개요', '2022년', '12월', 'zip']

In [49]:
substrings_filename[1:5]

['주택인허가', '호별개요', '2022년', '12월']

In [52]:
def zip_to_parquet(
    path,
    savedir="data",
    member_pattern="*.txt",
    encoding="cp949",
    header=None,    
    sep="|",
    index=None,
    schema_dir = "data/schema",
    **kwargs,
):
    substrings_filename = re.split(r"[_ +.()]+", path.name)

    path = Path(path)
    results_dir = Path(savedir) / "_".join(substrings_filename[1:5])

    if results_dir.exists() and results_dir.stat().st_mtime > path.stat().st_mtime:
        print("already done")
        return savedir
    elif results_dir.exists():
        for file in results_dir.iterdir():
            file.unlink()




    schema_dir = Path(schema_dir)

    schema_filename = "_".join(["schema"] + substrings_filename[1:3]) + ".csv"

    df_schema = pd.read_csv(schema_dir / schema_filename, header=None)
    df_schema[1] = df_schema[1].apply(get_numpy_type)
    df_schema = df_schema.set_index(0)
    schema_dict = df_schema[1].to_dict()

    with TemporaryDirectory() as tdir:  # type: str
        with zipfile.ZipFile(path) as zf:
            zf.extractall(path=tdir, members=tqdm(zf.infolist(), desc="Extracting zip"))

        paths = sorted(Path(tdir).glob(member_pattern))
        ddf: dd.DataFrame = dd.read_csv(
            paths,
            encoding=encoding,
            header=header,
            sep=sep,
            names=schema_dict.keys(),
            dtype=schema_dict,
            **kwargs
        )
        
        with ProgressBar():
            if index:
                print("Setting index...")

                if isinstance(index, int):
                    ddf = ddf.set_index(ddf.columns[index])
                else:
                    ddf = ddf.set_index(index)  
            print("Saving...")
            ddf.to_parquet(results_dir)


In [53]:
zpath = sorted(Path("data/").glob("국토교통부_*.zip"), reverse=True)[0]
zip_to_parquet(zpath)


Extracting zip: 100%|██████████| 1/1 [00:02<00:00,  2.01s/it]

Saving...
[                                        ] | 0% Completed | 106.01 ms




[########################################] | 100% Completed | 29.06 s


In [54]:
zpath

WindowsPath('data/국토교통부_주택인허가_호별개요+(2022년+12월).zip')

In [55]:
ppath = Path("data") / "_".join(substrings_filename[1:5])
ppath


WindowsPath('data/주택인허가_호별개요_2022년_12월')

In [5]:
zpath.stat()

os.stat_result(st_mode=33206, st_ino=18295873486342750, st_dev=239204461, st_nlink=1, st_uid=0, st_gid=0, st_size=703469167, st_atime=1663289397, st_mtime=1663289397, st_ctime=1663289015)

In [57]:
zpath.stat().st_mtime > ppath.stat().st_mtime

False

In [58]:
if ppath.exists() and ppath.stat().st_mtime > zpath.stat().st_mtime:
    print("nope")

nope
