# tsl_loto_etl 実行ノートブック（PostgreSQL / Spark / SQLite）

このノートブックは、ロト系CSVを取得→整形→特徴量化（時系列変換 + 横持ち特徴量）→ **DB（PostgreSQL）/ Parquet / SQLite** へ書き込むETLを実行します。

- 推奨の配置: `C:\tsl\pipelines\tsl_loto_etl\`（このノートも同じ場所）
- 入力SQLite: `C:\lib_ana\src\dataset\base\data\loto.sqlite3`
- 出力（Parquet）: `C:\lib_ana\src\dataset\base\spark_out`
- 一時ディレクトリ（COPY用CSV）: `C:\tsl\tmp\loto_ts_pg`


## 1) セットアップ（パス・psql・表示補助）

- Windowsの `psql` 出力は文字コードが環境依存（CP932など）になりがちなので、**Decodeエラーを潰すラッパー**を使います。
- `PGPASSWORD` は `args.password` から環境変数で渡します（ノートに平文で残さない運用が理想）。


In [3]:
import os, sys, subprocess, textwrap

# === このノートが置かれている想定フォルダ（フルパス）===
PROJECT_DIR = r"C:\lib_ana\src\dataset\base"
os.chdir(PROJECT_DIR)
if PROJECT_DIR not in sys.path:
    sys.path.insert(0, PROJECT_DIR)

print("cwd =", os.getcwd())
print("sys.path[0] =", sys.path[0])

# === psql.exe のフルパス（あなたの環境に合わせて）===
PSQL_BIN = r"C:\tools\pgsql\postgresql-16.11\pgsql\bin\psql.exe"
assert os.path.exists(PSQL_BIN), f"psql.exe が見つかりません: {PSQL_BIN}"

def _decode_best(b: bytes) -> str:
    if not b:
        return ""
    for enc in ("cp932", "utf-8"):
        try:
            return b.decode(enc)
        except UnicodeDecodeError:
            pass
    return b.decode("cp932", errors="replace")

def psql(sql: str, *, host: str, port: str, db: str, user: str, password: str):
    # psql結果を必ずセルに表示する。Windowsの文字コード問題を回避する。
    env = dict(os.environ)
    env["PGPASSWORD"] = password
    cmd = [
        PSQL_BIN,
        "-h", host,
        "-p", str(port),
        "-U", user,
        "-d", db,
        "-v", "ON_ERROR_STOP=1",
        "-P", "pager=off",
        "-P", "footer=on",
        "-c", sql
    ]
    r = subprocess.run(cmd, capture_output=True, env=env)
    stdout = _decode_best(r.stdout).strip()
    stderr = _decode_best(r.stderr).strip()

    print("----- SQL -----")
    print(textwrap.dedent(sql).strip())
    print("----- STDOUT -----")
    print(stdout)
    if stderr:
        print("----- STDERR -----")
        print(stderr)
    if r.returncode != 0:
        raise RuntimeError(f"psql failed (returncode={r.returncode})")

print("psql helper ready")


cwd = C:\lib_ana\src\dataset\base
sys.path[0] = C:\lib_ana\src\dataset\base
psql helper ready


## 2) ETL実行（PostgreSQL例：引数を全部埋める）

- `backend='psql'` の場合、`host/port/db/user/password/schema/maintenance_db/psql_bin/tmp_dir` を埋めます。
- `psql_bin` は `psql` コマンド名でもフルパスでもOKですが、WindowsはPATH事故が多いので **フルパス推奨**です。


In [7]:
# Cell: setup + run (C:\lib_ana 用)

import os, sys
from types import SimpleNamespace

BASE = r"C:\lib_ana\src\dataset\base"

# 1) 実行基点（import・相対パス事故を防ぐ）
os.chdir(BASE)
if BASE not in sys.path:
    sys.path.insert(0, BASE)

from loto_y_ts_etl import main

args = SimpleNamespace(
    # ===== backend =====
    backend="sqlite",  # "psql" | "spark" | "sqlite"

    # ===== 入力（SQLite）=====
    sqlite_path=r"C:\lib_ana\src\dataset\base\data\loto.sqlite3",

    # ===== 出力（Spark/Parquet）=====
    spark_out=r"C:\lib_ana\src\dataset\base\spark_out",
    spark_master=None,

    # ===== PostgreSQL（psql backendで使用）=====
    host="127.0.0.1",
    port="5432",
    db="loto_db",
    user="loto_user",
    password="z",          # ← あなたのパスワードに置換
    schema="loto",                 # publicではなく専用schema推奨
    maintenance_db="postgres",
    psql_bin=r"C:\tools\pgsql\postgresql-16.11\pgsql\bin\psql.exe",

    # ===== 出力テーブル =====
    output_table="loto_y_ts",
    hist_table="loto_hist_feat",

    # ===== 実行モード =====
    mode="replace",
    lotos="mini,loto6,loto7,bingo5,numbers3,numbers4",

    # ===== COPY用の一時ディレクトリ（CSVが出ます）=====
    tmp_dir=r"C:\lib_ana\tmp\loto_ts_pg",
)

rc = main(args)
print("return_code =", rc)
assert rc == 0, "ETL が失敗しました（ログを確認してください）"


2026-02-09 16:49:19,340 - INFO - SQLite接続成功: path=C:\lib_ana\src\dataset\base\data\loto.sqlite3
2026-02-09 16:49:19,342 - INFO - Start ETL backend=sqlite mode=replace ts_table=loto_y_ts hist_table=loto_hist_feat
Processing mini...
Processing loto6...
Processing loto7...
Processing bingo5...
Processing numbers3...
Processing numbers4...
2026-02-09 16:49:23,904 - INFO - 定義された時系列変換: ['raw', 'cumsum', 'roll3_sum', 'roll7_mean', 'diff1', 'is_odd']
2026-02-09 16:49:25,374 - INFO - 最終データフレーム: (455946, 39). 6種類の時系列を生成。
2026-02-09 16:49:25,568 - INFO - 時系列テーブルデータ行数: 455946
2026-02-09 16:49:25,569 - INFO - 横持ち特徴量テーブルデータ行数: 75991
2026-02-09 16:51:00,793 - INFO - [OK] loto_y_ts: UPSERT完了（rows=455946）
2026-02-09 16:52:11,676 - INFO - [OK] loto_hist_feat: UPSERT完了（rows=75991）
2026-02-09 16:52:11,739 - INFO - 処理完了（SQLite）。所要時間: 172.49秒。
return_code = 0


## 3) 書き込み確認（DB/スキーマ/テーブル概要・列・制約・件数・サンプル）

psqlコマンドで **現物** を確認します（GUI不要）。


In [4]:
# 接続確認
psql(
    "SELECT current_database() AS db, current_user AS user, current_schema() AS current_schema, current_setting('search_path') AS search_path;",
    host=args.host, port=args.port, db=args.db, user=args.user, password=args.password
)

# スキーマ一覧
psql("""
SELECT nspname AS schema
FROM pg_namespace
WHERE nspname NOT LIKE 'pg_%' AND nspname <> 'information_schema'
ORDER BY 1;
""", host=args.host, port=args.port, db=args.db, user=args.user, password=args.password)

# テーブル一覧（schema内）
psql(f"""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = '{args.schema}'
ORDER BY table_name;
""", host=args.host, port=args.port, db=args.db, user=args.user, password=args.password)

# 件数
psql(f"""
SELECT
  (SELECT COUNT(*) FROM {args.schema}.{args.output_table}) AS cnt_ts,
  (SELECT COUNT(*) FROM {args.schema}.{args.hist_table})   AS cnt_hist;
""", host=args.host, port=args.port, db=args.db, user=args.user, password=args.password)

# 列定義
psql(f"""
SELECT table_name, ordinal_position, column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = '{args.schema}'
  AND table_name IN ('{args.output_table}', '{args.hist_table}')
ORDER BY table_name, ordinal_position;
""", host=args.host, port=args.port, db=args.db, user=args.user, password=args.password)

# 制約
psql(f"""
SELECT
  rel.relname AS table,
  con.conname AS name,
  con.contype AS type,
  pg_get_constraintdef(con.oid) AS definition
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_namespace n ON n.oid = rel.relnamespace
WHERE n.nspname = '{args.schema}'
  AND rel.relname IN ('{args.output_table}', '{args.hist_table}')
ORDER BY rel.relname, con.contype, con.conname;
""", host=args.host, port=args.port, db=args.db, user=args.user, password=args.password)

# インデックス
psql(f"""
SELECT schemaname, tablename, indexname, indexdef
FROM pg_indexes
WHERE schemaname = '{args.schema}'
  AND tablename IN ('{args.output_table}', '{args.hist_table}')
ORDER BY tablename, indexname;
""", host=args.host, port=args.port, db=args.db, user=args.user, password=args.password)

# サンプル
psql(f"SELECT * FROM {args.schema}.{args.output_table} ORDER BY ds DESC LIMIT 20;",
     host=args.host, port=args.port, db=args.db, user=args.user, password=args.password)

psql(f"SELECT * FROM {args.schema}.{args.hist_table} ORDER BY ds DESC LIMIT 20;",
     host=args.host, port=args.port, db=args.db, user=args.user, password=args.password)


----- SQL -----
SELECT current_database() AS db, current_user AS user, current_schema() AS current_schema, current_setting('search_path') AS search_path;
----- STDOUT -----
db    |   user    | current_schema |   search_path   
---------+-----------+----------------+-----------------
 loto_db | loto_user | public         | "$user", public
(1 行)
----- SQL -----
SELECT nspname AS schema
FROM pg_namespace
WHERE nspname NOT LIKE 'pg_%' AND nspname <> 'information_schema'
ORDER BY 1;
----- STDOUT -----
schema 
--------
 loto
 public
(2 行)
----- SQL -----
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'loto'
ORDER BY table_name;
----- STDOUT -----
table_name   
----------------
 loto_hist_feat
 loto_y_ts
(2 行)
----- SQL -----
SELECT
  (SELECT COUNT(*) FROM loto.loto_y_ts) AS cnt_ts,
  (SELECT COUNT(*) FROM loto.loto_hist_feat)   AS cnt_hist;
----- STDOUT -----
cnt_ts | cnt_hist 
--------+----------
 455946 |    75991
(1 行)
----- SQL -----
SELECT table_name, ordinal_posi

## 4) SQLiteで確認したい場合（オプション）

DBを使わずローカルだけで確認したいときの補助です。


In [8]:
import sqlite3
import pandas as pd

sqlite_path = r"C:\lib_ana\src\dataset\base\data\loto.sqlite3"
conn = sqlite3.connect(sqlite_path)

df_tables = pd.read_sql_query(
    "SELECT name AS table_name FROM sqlite_master WHERE type='table' ORDER BY name;",
    conn
)
display(df_tables)

def sqlite_table_info(table: str) -> pd.DataFrame:
    return pd.read_sql_query(f"PRAGMA table_info('{table}');", conn)

display(sqlite_table_info("loto_y_ts"))
display(sqlite_table_info("loto_hist_feat"))

df_counts = pd.DataFrame({
    "table": ["loto_y_ts","loto_hist_feat"],
    "rows": [
        pd.read_sql_query("SELECT COUNT(*) AS n FROM loto_y_ts;", conn)["n"][0],
        pd.read_sql_query("SELECT COUNT(*) AS n FROM loto_hist_feat;", conn)["n"][0],
    ]
})
display(df_counts)

display(pd.read_sql_query("SELECT * FROM loto_y_ts LIMIT 20;", conn))
display(pd.read_sql_query("SELECT * FROM loto_hist_feat LIMIT 20;", conn))


Unnamed: 0,table_name
0,loto_hist_feat
1,loto_y_ts


Unnamed: 0,cid,name,type,notnull,dflt_value,pk
0,0,loto,TEXT,0,,0
1,1,ds,TEXT,0,,0
2,2,unique_id,TEXT,0,,0
3,3,ts_type,TEXT,0,,0
4,4,y,FLOAT,0,,0
5,5,exec_ts,DATETIME,0,,0
6,6,updated_ts,DATETIME,0,,0
7,7,proc_seconds,FLOAT,0,,0


Unnamed: 0,cid,name,type,notnull,dflt_value,pk
0,0,loto,TEXT,0,,0
1,1,ds,TEXT,0,,0
2,2,unique_id,TEXT,0,,0
3,3,hist_b1,FLOAT,0,,0
4,4,hist_pn1,FLOAT,0,,0
5,5,hist_pm1,FLOAT,0,,0
6,6,hist_pn2,FLOAT,0,,0
7,7,hist_pm2,FLOAT,0,,0
8,8,hist_pn3,FLOAT,0,,0
9,9,hist_pm3,FLOAT,0,,0


Unnamed: 0,table,rows
0,loto_y_ts,455946
1,loto_hist_feat,75991


Unnamed: 0,loto,ds,unique_id,ts_type,y,exec_ts,updated_ts,proc_seconds
0,mini,1999-04-13,N1,raw,1.0,2026-02-09 16:49:19.248642,2026-02-09 16:49:19.248642,5.925153
1,mini,1999-04-27,N1,raw,2.0,2026-02-09 16:49:19.248642,2026-02-09 16:49:19.248642,5.925153
2,mini,1999-05-11,N1,raw,1.0,2026-02-09 16:49:19.248642,2026-02-09 16:49:19.248642,5.925153
3,mini,1999-05-25,N1,raw,11.0,2026-02-09 16:49:19.248642,2026-02-09 16:49:19.248642,5.925153
4,mini,1999-06-08,N1,raw,8.0,2026-02-09 16:49:19.248642,2026-02-09 16:49:19.248642,5.925153
5,mini,1999-06-22,N1,raw,2.0,2026-02-09 16:49:19.248642,2026-02-09 16:49:19.248642,5.925153
6,mini,1999-07-06,N1,raw,3.0,2026-02-09 16:49:19.248642,2026-02-09 16:49:19.248642,5.925153
7,mini,1999-07-20,N1,raw,22.0,2026-02-09 16:49:19.248642,2026-02-09 16:49:19.248642,5.925153
8,mini,1999-08-03,N1,raw,2.0,2026-02-09 16:49:19.248642,2026-02-09 16:49:19.248642,5.925153
9,mini,1999-08-17,N1,raw,8.0,2026-02-09 16:49:19.248642,2026-02-09 16:49:19.248642,5.925153


Unnamed: 0,loto,ds,unique_id,hist_b1,hist_pn1,hist_pm1,hist_pn2,hist_pm2,hist_pn3,hist_pm3,...,hist_bxm,hist_ssc,hist_ssm,hist_sbc,hist_sbm,hist_mnc,hist_mnm,exec_ts,updated_ts,proc_seconds
0,mini,1999-04-13,N1,26.0,29.0,6090500.0,46.0,275800.0,2331.0,9400.0,...,,,,,,,,2026-02-09 16:49:19.248642,2026-02-09 16:49:19.248642,5.925153
1,mini,1999-04-27,N1,28.0,11.0,16868900.0,35.0,380800.0,1672.0,13800.0,...,,,,,,,,2026-02-09 16:49:19.248642,2026-02-09 16:49:19.248642,5.925153
2,mini,1999-05-11,N1,28.0,17.0,11481000.0,34.0,412400.0,2014.0,12000.0,...,,,,,,,,2026-02-09 16:49:19.248642,2026-02-09 16:49:19.248642,5.925153
3,mini,1999-05-25,N1,29.0,5.0,38866300.0,35.0,398800.0,1055.0,22900.0,...,,,,,,,,2026-02-09 16:49:19.248642,2026-02-09 16:49:19.248642,5.925153
4,mini,1999-06-08,N1,23.0,18.0,11107000.0,54.0,265900.0,2029.0,12200.0,...,,,,,,,,2026-02-09 16:49:19.248642,2026-02-09 16:49:19.248642,5.925153
5,mini,1999-06-22,N1,4.0,70.0,3181000.0,239.0,66900.0,4162.0,6600.0,...,,,,,,,,2026-02-09 16:49:19.248642,2026-02-09 16:49:19.248642,5.925153
6,mini,1999-07-06,N1,2.0,3.0,40000000.0,57.0,2381300.0,1289.0,23000.0,...,,,,,,,,2026-02-09 16:49:19.248642,2026-02-09 16:49:19.248642,5.925153
7,mini,1999-07-20,N1,21.0,16.0,13732500.0,77.0,205000.0,1677.0,16300.0,...,,,,,,,,2026-02-09 16:49:19.248642,2026-02-09 16:49:19.248642,5.925153
8,mini,1999-08-03,N1,15.0,27.0,7304600.0,93.0,152300.0,2856.0,8500.0,...,,,,,,,,2026-02-09 16:49:19.248642,2026-02-09 16:49:19.248642,5.925153
9,mini,1999-08-17,N1,5.0,15.0,13787900.0,121.0,122700.0,2004.0,12800.0,...,,,,,,,,2026-02-09 16:49:19.248642,2026-02-09 16:49:19.248642,5.925153


In [9]:
import os
from pyspark.sql import SparkSession

BASE_OUT = r"C:\lib_ana\src\dataset\base\spark_out"
TS_PATH   = os.path.join(BASE_OUT, "loto_y_ts")
HIST_PATH = os.path.join(BASE_OUT, "loto_hist_feat")

spark = (SparkSession.builder
         .appName("inspect_parquet")
         .master("local[*]")     # ローカル確認
         .getOrCreate())

def inspect_parquet(path: str, name: str):
    print(f"\n=== {name} ===")
    print("path:", path)
    df = spark.read.parquet(path)
    print("rows:", df.count(), "cols:", len(df.columns))
    df.printSchema()
    df.show(20, truncate=False)

inspect_parquet(TS_PATH, "loto_y_ts")
inspect_parquet(HIST_PATH, "loto_hist_feat")

# ちょい便利：SQLっぽく見たい場合
spark.read.parquet(TS_PATH).createOrReplaceTempView("loto_y_ts")
spark.sql("SELECT loto, ts_type, COUNT(*) AS n FROM loto_y_ts GROUP BY loto, ts_type ORDER BY loto, ts_type").show(200, truncate=False)



=== loto_y_ts ===
path: C:\lib_ana\src\dataset\base\spark_out\loto_y_ts
rows: 455946 cols: 8
root
 |-- loto: string (nullable = true)
 |-- ds: date (nullable = true)
 |-- unique_id: string (nullable = true)
 |-- ts_type: string (nullable = true)
 |-- y: double (nullable = true)
 |-- exec_ts: timestamp (nullable = true)
 |-- updated_ts: timestamp (nullable = true)
 |-- proc_seconds: double (nullable = true)

+--------+----------+---------+-------+---+--------------------------+--------------------------+-----------------+
|loto    |ds        |unique_id|ts_type|y  |exec_ts                   |updated_ts                |proc_seconds     |
+--------+----------+---------+-------+---+--------------------------+--------------------------+-----------------+
|numbers4|2012-03-16|N4       |raw    |6.0|2026-02-09 16:47:28.189389|2026-02-09 16:47:28.189389|5.112370491027832|
|numbers4|2012-03-19|N4       |raw    |4.0|2026-02-09 16:47:28.189389|2026-02-09 16:47:28.189389|5.112370491027832|
|numbers