In [1]:
from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Literal, Optional, Tuple

import pandas as pd


# =========================
# Logging (simple)
# =========================

def log(msg: str) -> None:
    print(f"[build] {msg}")


# =========================
# Schema / constants
# =========================

Cat02 = Literal["000", "001"]  # 000=総数, 001=民営


@dataclass(frozen=True)
class ECConfig:
    stats_data_id: str = "0003353931"
    tab_est: str = "003"  # establishments
    tab_emp: str = "189"  # employees
    cat02_total: Cat02 = "000"
    cat02_private: Cat02 = "001"
    area_width: int = 5


CFG = ECConfig()


class Col:
    # raw keys
    TAB = "@tab"
    CAT01 = "@cat01"
    CAT02 = "@cat02"
    AREA = "@area"
    TIME = "@time"
    DOLLAR = "$"

    # tidy keys
    AREA2 = "area"
    SIC = "sicCode"
    SIC_NAME = "sicName"
    AREA_NAME = "areaName"
    VALUE = "value"
    POP = "population"
    EST = "establishments"
    EMP = "employees"
    EST_DEN = "est_density"
    EMP_DEN = "emp_density"


REQUIRED_EC_RAW = {Col.TAB, Col.CAT01, Col.CAT02, Col.AREA, Col.TIME, Col.DOLLAR}
REQUIRED_POP = {Col.AREA2, Col.POP}


def require_columns(df: pd.DataFrame, required: Iterable[str], df_name: str) -> None:
    missing = sorted(set(required) - set(df.columns))
    if missing:
        raise ValueError(f"{df_name} is missing required columns: {missing}")


def zfill_area(series: pd.Series, width: int = CFG.area_width) -> pd.Series:
    return series.astype(str).str.zfill(width)


def to_numeric(series: pd.Series) -> pd.Series:
    return pd.to_numeric(series, errors="coerce")


# =========================
# Meta: industry classification
# =========================

def fetch_cat01_codes(stats_data_id: str) -> pd.DataFrame:
    """
    getMetaInfo から cat01 のコード表を取得
    columns: ['@code','@name','@level'] を期待
    """
    meta = _get("getMetaInfo", {"statsDataId": stats_data_id})
    mi = meta["GET_META_INFO"]["METADATA_INF"]
    class_list = mi["CLASS_INF"]["CLASS_OBJ"]

    cat01_obj = next(x for x in class_list if x["@id"] == "cat01")
    cat01_codes = pd.DataFrame(cat01_obj["CLASS"])
    return cat01_codes


def build_major_codes(cat01_codes: pd.DataFrame) -> Tuple[List[str], Dict[str, str]]:
    """
    @level == "1" を大分類として抽出し、
    major_codes（コード一覧）と sic_map（code->name）を返す
    """
    required = {"@code", "@name", "@level"}
    require_columns(cat01_codes, required, "cat01_codes")

    major = cat01_codes.loc[cat01_codes["@level"] == "1", ["@code", "@name"]].copy()
    major["@code"] = major["@code"].astype(str)

    major_codes = major["@code"].tolist()
    sic_map = major.set_index("@code")["@name"].to_dict()
    return major_codes, sic_map


# =========================
# e-Stat raw -> tidy
# =========================

def normalize_ec_raw(ec_raw2: pd.DataFrame, cat02: Cat02) -> pd.DataFrame:
    """
    raw を整形して value 数値化 + cat02 フィルタ
    """
    require_columns(ec_raw2, REQUIRED_EC_RAW, "ec_raw2")

    df = ec_raw2.copy()
    df[Col.VALUE] = to_numeric(df[Col.DOLLAR])
    df = df.loc[df[Col.CAT02] == cat02].copy()

    # 型を揃える（merge事故防止）
    df[Col.AREA] = df[Col.AREA].astype(str)
    df[Col.CAT01] = df[Col.CAT01].astype(str)
    df[Col.TIME] = df[Col.TIME].astype(str)
    df[Col.TAB] = df[Col.TAB].astype(str)

    return df


def extract_measure(df: pd.DataFrame, tab: str, out_col: str) -> pd.DataFrame:
    """
    指定 tab の値を (area, sicCode, @time) で返す
    """
    out = (
        df.loc[df[Col.TAB] == tab, [Col.AREA, Col.CAT01, Col.TIME, Col.VALUE]]
          .rename(columns={Col.AREA: Col.AREA2, Col.CAT01: Col.SIC, Col.VALUE: out_col})
    )
    out[Col.AREA2] = out[Col.AREA2].astype(str)
    out[Col.SIC] = out[Col.SIC].astype(str)
    out[Col.TIME] = out[Col.TIME].astype(str)
    return out


def build_base_ec(
    ec_raw2: pd.DataFrame,
    major_codes: List[str],
    sic_map: Dict[str, str],
    cat02: Cat02,
    *,
    keep_only_major: bool = True,
) -> pd.DataFrame:
    """
    tab=003/189 を抽出して外部結合し、産業名を付与
    """
    df = normalize_ec_raw(ec_raw2, cat02=cat02)

    est = extract_measure(df, CFG.tab_est, Col.EST)
    emp = extract_measure(df, CFG.tab_emp, Col.EMP)

    if keep_only_major:
        est = est.loc[est[Col.SIC].isin(major_codes)].copy()
        emp = emp.loc[emp[Col.SIC].isin(major_codes)].copy()

    base_ec = est.merge(emp, on=[Col.AREA2, Col.SIC, Col.TIME], how="outer")
    base_ec[Col.SIC_NAME] = base_ec[Col.SIC].map(sic_map)

    return base_ec


# =========================
# Join population -> density
# =========================

def add_population_and_density(
    base_ec: pd.DataFrame,
    pop: pd.DataFrame,
    area_map: Dict[str, str],
) -> pd.DataFrame:
    """
    人口を結合して密度を作る
    """
    require_columns(pop, REQUIRED_POP, "pop")

    base = base_ec.copy()
    base[Col.AREA2] = zfill_area(base[Col.AREA2])

    pop2 = pop.copy()
    pop2[Col.AREA2] = zfill_area(pop2[Col.AREA2])

    out = base.merge(pop2[[Col.AREA2, Col.POP]], on=Col.AREA2, how="left")
    out[Col.AREA_NAME] = out[Col.AREA2].map(area_map)

    out[Col.EST_DEN] = out[Col.EST] / out[Col.POP] * 10000
    out[Col.EMP_DEN] = out[Col.EMP] / out[Col.POP] * 10000

    return out


def quality_report(df: pd.DataFrame) -> Dict[str, float]:
    """
    欠損率チェック（雑に効く）
    """
    out = {}
    for c in [Col.EST, Col.EMP, Col.POP, Col.EST_DEN, Col.EMP_DEN]:
        if c in df.columns:
            out[c] = float(df[c].isna().mean())
    return out


# =========================
# Pipeline
# =========================

def build_base_dataset(
    ec_raw2: pd.DataFrame,
    pop: pd.DataFrame,
    area_map: Dict[str, str],
    *,
    cat02: Cat02 = CFG.cat02_total,
) -> pd.DataFrame:
    # 1) meta → 大分類
    cat01_codes = fetch_cat01_codes(CFG.stats_data_id)
    major_codes, sic_map = build_major_codes(cat01_codes)
    log(f"major industries: {len(major_codes)}")

    # 2) tab分離 → 結合
    base_ec = build_base_ec(ec_raw2, major_codes, sic_map, cat02=cat02)

    # 3) 人口結合 → 密度
    base2 = add_population_and_density(base_ec, pop, area_map)

    # 4) 品質ログ
    qr = quality_report(base2)
    log("missing rates: " + ", ".join([f"{k}={v:.6f}" for k, v in qr.items()]))

    return base2


def save_parquet(df: pd.DataFrame, path: str | Path) -> Path:
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    df.to_parquet(path, index=False)
    log(f"saved: {path}")
    return path


# =========================
# Usage
# =========================
# base2_total = build_base_dataset(ec_raw2, pop, area_map, cat02=CFG.cat02_total)
# save_parquet(base2_total, "data/base_2014_ec_2020_pop.parquet")
#
# 民営で作りたいとき:
# base2_private = build_base_dataset(ec_raw2, pop, area_map, cat02=CFG.cat02_private)
# save_parquet(base2_private, "data/base_2014_ec_2020_pop_private.parquet")


In [3]:
import requests

ESTAT_APP_ID = "f48c9df486353309de3e29de28cacbdea7b68a92"  # ← 自分のAPP_IDに置き換え

BASE_URL = "https://api.e-stat.go.jp/rest/3.0/app/json/"

def _get(api_name: str, params: dict):
    """
    e-Stat API を呼び出して JSON を返す簡易ラッパー
    """
    url = BASE_URL + api_name
    q = {"appId": ESTAT_APP_ID}
    q.update(params)

    r = requests.get(url, params=q, timeout=60)
    r.raise_for_status()
    return r.json()


In [9]:
import pandas as pd

EC_ID = "0003353931"

meta = _get("getMetaInfo", {"statsDataId": EC_ID})
mi = meta["GET_META_INFO"]["METADATA_INF"]

# CLASS_INF の取り出し（CLASS_OBJ が dict の場合もあるので吸収）
class_inf = mi["CLASS_INF"]["CLASS_OBJ"]
if isinstance(class_inf, dict):
    class_list = [class_inf]
else:
    class_list = class_inf

cat01_obj = next(x for x in class_list if x["@id"] == "cat01")
cat01_codes = pd.DataFrame(cat01_obj["CLASS"]).copy()

# 型整形
for c in ["@code", "@name", "@level"]:
    cat01_codes[c] = cat01_codes[c].astype(str).str.strip()

print("==== cat01 level別件数 ====")
print(cat01_codes["@level"].value_counts().sort_index())

# levelごとに一覧表示
def show_level(lv: str, n: int = 200):
    sub = cat01_codes[cat01_codes["@level"] == lv][["@code", "@name", "@level"]].copy()
    sub = sub.sort_values("@code").reset_index(drop=True)
    print(f"\n==== level={lv}（件数={len(sub)}）上位{min(n, len(sub))}件 ====")
    display(sub.head(n))
    return sub

lv1 = show_level("1")
lv2 = show_level("2")


==== cat01 level別件数 ====
@level
1     5
2    19
Name: count, dtype: int64

==== level=1（件数=5）上位5件 ====


Unnamed: 0,@code,@name,@level
0,0,A～S 全産業,1
1,10,A～R 全産業（S公務を除く）,1
2,20,A～B 農林漁業,1
3,260,C～S 非農林漁業,1
4,270,C～R 非農林漁業（S公務を除く）,1



==== level=2（件数=19）上位19件 ====


Unnamed: 0,@code,@name,@level
0,30,A 農業，林業,2
1,170,B 漁業,2
2,2700,F 電気・ガス・熱供給・水道業,2
3,280,C 鉱業，採石業，砂利採取業,2
4,2850,G 情報通信業,2
5,3140,H 運輸業，郵便業,2
6,3560,I 卸売業，小売業,2
7,370,D 建設業,2
8,4480,J 金融業，保険業,2
9,4790,K 不動産業，物品賃貸業,2


In [16]:
import pandas as pd

# 既存の parquet（人口入り）から population だけ抜き出す
pop_src = pd.read_parquet("data/base_2014_ec_2020_pop.parquet")

pop = (
    pop_src[["area", "population"]]
    .drop_duplicates()
    .copy()
)

pop["area"] = pop["area"].astype(str).str.zfill(5)

print("pop shape:", pop.shape)
pop.head()


pop shape: (1963, 2)


Unnamed: 0,area,population
0,0,126146099.0
5,1000,5224614.0
10,1100,1973395.0
15,1101,248680.0
20,1102,289323.0


In [18]:
import pandas as pd

# 既存 parquet から area → areaName の対応表を作る
area_src = pd.read_parquet("data/base_2014_ec_2020_pop.parquet")

area_map = (
    area_src[["area", "areaName"]]
    .drop_duplicates()
    .assign(area=lambda df: df["area"].astype(str).str.zfill(5))
    .set_index("area")["areaName"]
    .to_dict()
)

print("area_map size:", len(area_map))
list(area_map.items())[:5]


area_map size: 1963


[('00000', '全国'),
 ('01000', '北海道'),
 ('01100', '札幌市'),
 ('01101', '札幌市中央区'),
 ('01102', '札幌市北区')]

In [10]:
res = _get("getStatsData", {"statsDataId": EC_ID, "limit": 1})
# 返ってきたキーをざっと見る
print(res["GET_STATS_DATA"].keys())
print(res["GET_STATS_DATA"]["STATISTICAL_DATA"].keys())
print(res["GET_STATS_DATA"]["STATISTICAL_DATA"].keys())

# DATA_INF の中身（VALUEがどう入ってるか）
data_inf = res["GET_STATS_DATA"]["STATISTICAL_DATA"]["DATA_INF"]
print("DATA_INF keys:", list(data_inf.keys()))
print("VALUE sample:", data_inf["VALUE"])


dict_keys(['RESULT', 'PARAMETER', 'STATISTICAL_DATA'])
dict_keys(['RESULT_INF', 'TABLE_INF', 'CLASS_INF', 'DATA_INF'])
dict_keys(['RESULT_INF', 'TABLE_INF', 'CLASS_INF', 'DATA_INF'])
DATA_INF keys: ['NOTE', 'VALUE']
VALUE sample: {'@tab': '003', '@cat01': '000', '@cat02': '000', '@area': '00000', '@time': '2014000000', '$': '5689366'}


In [12]:
EC_ID = "0003353931"

test_params = {
    "statsDataId": EC_ID,
    "cdTab": "003",      # 事業所数
    "cdCat02": "000",    # 総数
    "cdCat01": "640",    # level=2 の例（製造業）
    "limit": 10
}

res = _get("getStatsData", test_params)

# RESULTを必ず確認
print("RESULT:", res["GET_STATS_DATA"]["RESULT"])

data_inf = res["GET_STATS_DATA"]["STATISTICAL_DATA"].get("DATA_INF", {})
values = data_inf.get("VALUE", [])
if isinstance(values, dict):
    values = [values]

print("VALUE count:", len(values))
print("VALUE sample:", values[0] if values else None)


RESULT: {'STATUS': 0, 'ERROR_MSG': '正常に終了しました。', 'DATE': '2026-01-18T16:05:16.761+09:00'}
VALUE count: 10
VALUE sample: {'@tab': '003', '@cat01': '640', '@cat02': '000', '@area': '00000', '@time': '2014000000', '$': '487191'}


In [13]:
import pandas as pd
from pathlib import Path

EC_ID = "0003353931"

meta = _get("getMetaInfo", {"statsDataId": EC_ID})
mi = meta["GET_META_INFO"]["METADATA_INF"]
class_list = mi["CLASS_INF"]["CLASS_OBJ"]
if isinstance(class_list, dict):
    class_list = [class_list]

cat01_obj = next(x for x in class_list if x["@id"] == "cat01")
cat01_codes = pd.DataFrame(cat01_obj["CLASS"]).copy()
for c in ["@code", "@name", "@level"]:
    cat01_codes[c] = cat01_codes[c].astype(str).str.strip()

major = cat01_codes[cat01_codes["@level"] == "2"].copy()
major_codes = major["@code"].tolist()
sic_map = major.set_index("@code")["@name"].to_dict()

print("major count(level=2):", len(major_codes))
print("major codes sample:", major_codes[:10])


major count(level=2): 19
major codes sample: ['030', '170', '280', '370', '640', '2700', '2850', '3140', '3560', '4480']


In [14]:
import pandas as pd

CAT02_TOTAL = "000"      # 総数（民営なら "001"）
TABS = ["003", "189"]    # 003=事業所数, 189=従業者数

def fetch_statsdata_one_cat(stats_id: str, tab: str, cat02: str, cat01: str, limit: int = 50000) -> list[dict]:
    """
    1つの (tab, cat02, cat01) について、offset を回して全件取得する。
    """
    all_values: list[dict] = []
    offset = 1

    while True:
        res = _get(
            "getStatsData",
            {
                "statsDataId": stats_id,
                "cdTab": tab,
                "cdCat02": cat02,
                "cdCat01": cat01,
                "limit": limit,
                "offset": offset,
            },
        )

        result = res["GET_STATS_DATA"]["RESULT"]
        if str(result.get("STATUS")) != "0":
            raise RuntimeError(f"getStatsData failed: {result}")

        data_inf = res["GET_STATS_DATA"]["STATISTICAL_DATA"]["DATA_INF"]
        values = data_inf.get("VALUE", [])
        if isinstance(values, dict):
            values = [values]

        all_values.extend(values)

        # これ以上無いなら終了
        if len(values) < limit:
            break
        offset += limit

    return all_values


# ===== 実行：level=2(19) × tab(2) を全部回収 =====
records: list[dict] = []
for tab in TABS:
    for code in major_codes:
        vals = fetch_statsdata_one_cat(EC_ID, tab, CAT02_TOTAL, code, limit=50000)
        records.extend(vals)

ec_raw2 = pd.DataFrame(records)
print("ec_raw2 shape:", ec_raw2.shape)
print("columns:", ec_raw2.columns.tolist())
ec_raw2.head()


ec_raw2 shape: (68422, 6)
columns: ['@tab', '@cat01', '@cat02', '@area', '@time', '$']


Unnamed: 0,@tab,@cat01,@cat02,@area,@time,$
0,3,30,0,0,2014000000,30662
1,3,30,0,1000,2014000000,3983
2,3,30,0,1100,2014000000,103
3,3,30,0,1101,2014000000,33
4,3,30,0,1102,2014000000,8


In [19]:
TAB_EST = "003"
TAB_EMP = "189"

def build_tab_df(df_raw: pd.DataFrame, tab: str, value_name: str) -> pd.DataFrame:
    df = df_raw.copy()
    df["value"] = pd.to_numeric(df["$"], errors="coerce")

    df = df[df["@tab"] == tab].copy()
    df = df.rename(columns={"@area": "area", "@cat01": "sicCode"})[["area", "sicCode", "@time", "value"]]
    df = df.rename(columns={"value": value_name})

    df["area"] = df["area"].astype(str).str.zfill(5)
    df["sicCode"] = df["sicCode"].astype(str).str.strip()
    return df

ec_est = build_tab_df(ec_raw2, TAB_EST, "establishments")
ec_emp = build_tab_df(ec_raw2, TAB_EMP, "employees")

base_ec = ec_est.merge(ec_emp, on=["area", "sicCode", "@time"], how="outer")
base_ec["sicName"] = base_ec["sicCode"].map(sic_map)

# 人口結合
pop2 = pop.copy()
pop2["area"] = pop2["area"].astype(str).str.zfill(5)

base2 = base_ec.merge(pop2[["area", "population"]], on="area", how="left")
base2["areaName"] = base2["area"].map(area_map)

# 密度
base2["est_density"] = base2["establishments"] / base2["population"] * 10000
base2["emp_density"] = base2["employees"] / base2["population"] * 10000

# 保存
out = Path("data")
out.mkdir(exist_ok=True)
fn = out / "base_2014_ec_2020_pop_level2.parquet"
base2.to_parquet(fn, index=False)

print("saved:", fn.resolve())
print("employees missing rate:", float(base2["employees"].isna().mean()))


saved: /Users/hamadatakahito/cliate2/data/base_2014_ec_2020_pop_level2.parquet
employees missing rate: 0.00023381557796288177
