In [2]:
import sys
import os

os.chdir(os.path.abspath(".."))
sys.path.append(os.path.abspath(".."))

In [4]:
import os
import json
from pathlib import Path
from typing import List, Dict

def load_risk_data_from_json(base_dir: str, min_year: int = 2018) -> List[Dict]:
    """
    指定されたディレクトリ配下からリスクJSONファイルを読み込み、
    辞書リスト（records）として返す。

    Parameters:
        base_dir (str): ベースディレクトリ（例: 'data/processed/risk_sections'）
        min_year (int): 対象とする最小年度（例: 2018以降）

    Returns:
        List[Dict]: 各JSONの辞書を格納したリスト
    """
    records = []
    error_files = []

    for year_folder in os.listdir(base_dir):
        try:
            year = int(year_folder)
        except ValueError:
            continue  # 年度名でないフォルダはスキップ

        if year < min_year:
            continue  # API対象外

        year_dir = os.path.join(base_dir, year_folder)
        if not os.path.isdir(year_dir):
            continue

        for filename in os.listdir(year_dir):
            if not filename.endswith(".json"):
                continue

            filepath = os.path.join(year_dir, filename)
            try:
                if os.stat(filepath).st_size == 0:
                    error_files.append((filepath, "Empty file"))
                    continue

                with open(filepath, "r", encoding="utf-8") as f:
                    data = json.load(f)

                # risk_count を補完
                data.setdefault("risk_count", None)
                records.append(data)

            except json.JSONDecodeError as e:
                error_files.append((filepath, f"JSON decode error: {e}"))
            except Exception as e:
                error_files.append((filepath, f"Other error: {e}"))

    print(f"✅ Loaded: {len(records)} JSON records")
    print(f"⚠️  Skipped: {len(error_files)} files due to errors")

    if error_files:
        for path, reason in error_files[:5]:  # 最初の5件だけ表示
            print(f"[ERROR] {path} - {reason}")

    return records

In [7]:
from sqlalchemy import create_engine
from dotenv import load_dotenv

load_dotenv()

# 環境変数 or 直接指定で接続情報を設定
DB_USER = os.getenv("POSTGRES_USER")
DB_PASS = os.getenv("POSTGRES_PASSWORD")
DB_HOST = os.getenv("POSTGRES_HOST")
DB_PORT = os.getenv("POSTGRES_PORT")
DB_NAME = os.getenv("POSTGRES_DB")

# SQLAlchemyエンジン作成
engine = create_engine(f"postgresql+psycopg2://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}")


from sqlalchemy import text

# UPSERT用のSQL文を用意
upsert_sql = text("""
    INSERT INTO edinet_documents 
        (doc_id, company_id, edinet_code, doc_type_code, submit_date, fiscal_year, description, risk_text, risk_count)
    VALUES 
        (:doc_id, :company_id, :edinet_code, :doc_type_code, :submit_date, :fiscal_year, :description, :risk_text, :risk_count)
    ON CONFLICT (doc_id)
    DO UPDATE 
    SET 
        risk_text  = EXCLUDED.risk_text,
        risk_count = EXCLUDED.risk_count;
""")

# 準備した辞書データのリストをUPSERT実行
with engine.begin() as conn:
    conn.execute(upsert_sql, risk_data_records)

StatementError: (sqlalchemy.exc.InvalidRequestError) A value is required for bind parameter 'doc_id'
[SQL: 
    INSERT INTO edinet_documents 
        (doc_id, company_id, edinet_code, doc_type_code, submit_date, fiscal_year, description, risk_text, risk_count)
    VALUES 
        (%(doc_id)s, %(company_id)s, %(edinet_code)s, %(doc_type_code)s, %(submit_date)s, %(fiscal_year)s, %(description)s, %(risk_text)s, %(risk_count)s)
    ON CONFLICT (doc_id)
    DO UPDATE 
    SET 
        risk_text  = EXCLUDED.risk_text,
        risk_count = EXCLUDED.risk_count;
]
(Background on this error at: https://sqlalche.me/e/20/cd3x)