In [1]:
import datetime
import functools
import os
import re
from pathlib import Path
from typing import List

import pandas as pd
from loguru import logger
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

from jhra.data.data_schema import create_pyspark_schema, load_schema, parse_line

# from jhra.data.file_downloader import download_and_extract_files

# Download files from JRDB

In [None]:
username = os.getenv("JRDB_USERNAME")
password = os.getenv("JRDB_PASSWORD")
download_dir = "path/to/data/jrdb"

In [None]:
target_dataset_urls = [
    # Taken from http://www.jrdb.com/member/dataindex.html
    # Comment out the ones you don't want to download.
    # Downloading all of them will take about ?
    "http://www.jrdb.com/member/datazip/Kab/index.html",
    "http://www.jrdb.com/member/datazip/Bac/index.html",
    "http://www.jrdb.com/member/datazip/Kyi/index.html",
    "http://www.jrdb.com/member/datazip/Ukc/index.html",
    "http://www.jrdb.com/member/datazip/Oz/index.html",
    # "http://www.jrdb.com/member/datazip/Oz/index2.html",  # OW data
    # "http://www.jrdb.com/member/datazip/Ou/index.html",
    # "http://www.jrdb.com/member/datazip/Ot/index.html",
    # "http://www.jrdb.com/member/datazip/Ov/index.html",
    "http://www.jrdb.com/member/datazip/Cyb/index.html",
    "http://www.jrdb.com/member/datazip/Cha/index.html",
    "http://www.jrdb.com/member/datazip/Sed/index.html",
    "http://www.jrdb.com/member/datazip/Skb/index.html",
    "http://www.jrdb.com/member/datazip/Tyb/index.html",
    "http://www.jrdb.com/member/datazip/Hjc/index.html",
]

for webpage_url in target_dataset_urls:
    download_and_extract_files(
        webpage_url, username, password, download_dir, start_date=datetime.date(2023, 12, 1)
    )

# Ingest data into warehouse

In [2]:
spark = (
    SparkSession.builder
    .config("spark.sql.warehouse.dir", os.environ.get("SPARK_WAREHOUSE_DIR"))
    .config("javax.jdo.option.ConnectionURL", "jdbc:postgresql://postgres/metastore")
    .config("javax.jdo.option.ConnectionDriverName", "org.postgresql.Driver")
    .config("javax.jdo.option.ConnectionUserName", "admin")
    .config("javax.jdo.option.ConnectionPassword", "admin")

    # .config("spark.hadoop.datanucleus.autoCreateTables", "true")
    # .config("spark.hadoop.datanucleus.schema.autoCreateTables", "true")
    .config("datanucleus.schema.autoCreateTables", "true")

    .config("hive.metastore.schema.verification", "false")
    .config("spark.driver.extraClassPath", os.environ.get("POSTGRES_JDBC_JAR"))
    .enableHiveSupport()
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/10/13 12:01:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# spark.sql("SHOW DATABASES").show()
spark.sql("set -v").filter("key rlike 'warehouse.dir$'").show(truncate=False)

+-----------------------+---------------------+------------------------------------------------------+-------------+
|key                    |value                |meaning                                               |Since version|
+-----------------------+---------------------+------------------------------------------------------+-------------+
|spark.sql.warehouse.dir|file:/spark-warehouse|The default location for managed databases and tables.|2.0.0        |
+-----------------------+---------------------+------------------------------------------------------+-------------+



In [7]:
spark.sql("SHOW SCHEMAS").show()

+------------+
|   namespace|
+------------+
|     default|
|jrdb_staging|
| raw_staging|
+------------+



In [6]:
spark.sql("DROP SCHEMA IF EXISTS raw CASCADE").show()

25/10/13 12:02:28 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
25/10/13 12:02:28 WARN FileUtils: File file:/spark-warehouse/raw.db/raw_jrdb__bac does not exist; Force to delete it.
25/10/13 12:02:28 ERROR FileUtils: Failed to delete file:/spark-warehouse/raw.db/raw_jrdb__bac
25/10/13 12:02:28 WARN FileUtils: File file:/spark-warehouse/raw.db does not exist; Force to delete it.
25/10/13 12:02:28 ERROR FileUtils: Failed to delete file:/spark-warehouse/raw.db
25/10/13 12:02:28 WARN TxnHandler: Cannot perform cleanup since metastore table does not exist
++
||
++
++



In [None]:
schema_name = "raw"

In [None]:
# Necessary
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schema_name}").show()

25/10/13 12:03:07 WARN ObjectStore: Failed to get database raw, returning NoSuchObjectException
25/10/13 12:03:07 WARN ObjectStore: Failed to get database raw, returning NoSuchObjectException
25/10/13 12:03:07 WARN ObjectStore: Failed to get database raw, returning NoSuchObjectException
++
||
++
++



In [11]:
dataset = "BAC"
dbtable = f"{schema_name}.raw_jrdb__{dataset.lower()}"
data_path = str(Path("/workspace/data/jrdb").joinpath(f"{dataset}*.txt"))
dataset_schema = load_schema("schemas/BAC.yaml")
column_names = [field.name for field in dataset_schema]

df = (
    spark.read.format("binaryFile")
    .load(data_path)
    .select("content")
    .rdd
    .flatMap(lambda x: x[0].splitlines())
    .map(functools.partial(parse_line, schema=dataset_schema))
    .toDF(create_pyspark_schema(dataset_schema))
)

df.write.mode("overwrite").saveAsTable(dbtable)

[Stage 2:>                                                         (0 + 8) / 84]

25/10/13 12:03:14 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

25/10/13 12:03:17 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
25/10/13 12:03:17 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
25/10/13 12:03:17 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
25/10/13 12:03:17 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist


In [12]:
spark.sql("DESCRIBE EXTENDED raw.raw_jrdb__BAC")

DataFrame[col_name: string, data_type: string, comment: string]

In [None]:
# Optional helper: filter or log malformed lines that do not match expected total byte span
# Expected total bytes (approx) = max(relative + repeat_factor*byte_length - 1) across schema
expected_max_end = 0
for field in schema_models:
    total_len = field.byte_length * field.repeat_factor
    end_pos = field.relative - 1 + total_len
    expected_max_end = max(expected_max_end, end_pos)

print("Approx expected record byte length:", expected_max_end)

def is_malformed(line: bytes) -> bool:
    return len(line) < expected_max_end

malformed = raw_lines_rdd.filter(is_malformed).take(5)
if malformed:
    print("Found malformed line samples (byte lengths):", [len(m) for m in malformed])
else:
    print("No malformed lines detected in first pass")

### Parsing logic explanation
We read each BAC*.txt file via the binaryFile source. The `content` column contains the full file as bytes. We `splitlines()` to obtain individual record lines (still bytes), then apply `parse_line`, which:
1. Slices fixed-width byte segments using `FieldModel.relative` and `byte_length`.
2. Decodes each slice with cp932 and strips whitespace.
3. Returns a `Row` whose fields align with the StructType from `create_pyspark_schema` (arrays for repeated fields, strings otherwise).
Finally we build `parsed_df` with the explicit schema to avoid inference and ensure array fields are preserved.

### Note: Fix for PicklingError
Passing a list of column names into `spark.createDataFrame(rdd, schema=column_names)` caused Spark to misinterpret the schema argument. A list of strings is not a valid StructType specification, leading to internal serialization (pickling) errors. We now:
1. Build a proper `StructType` via `create_pyspark_schema`.
2. Map raw lines to `Row` objects with matching arity/order.
3. Call `spark.createDataFrame(rdd, schema=struct_schema)`.
If only renaming columns were needed, `toDF(*column_names)` would suffice without constructing a StructType.

In [None]:
# Debug helper cell: validate schema alignment on a small subset
sample = rdd.take(1)
print("Sample row lens:", [len(r) for r in sample])
print("Schema field count:", len(struct_schema))
print(struct_schema.json())

In [None]:
def etl(
    spark,
    schema_path: str,
    data_path: str | List[str],
    dbtable: str,
    surrogate_key_name: str,
):
    logger.info(f"Processing dataset {dbtable}")
    schema = load_schema(schema_path)
    logger.info("Creating PySpark DataFrame")
    df = (
        spark.read.format("binaryFile")
        .load(data_path)
        .select("content")
        .rdd.flatMap(lambda x: x[0].splitlines())
        .map(functools.partial(parse_line, schema=schema))
        .toDF(create_pyspark_schema(schema))
        # Todo: monotonic increasing id does not mean files with lower dates will have lower ids!
        .withColumn(surrogate_key_name, F.monotonically_increasing_id())
        # Returns the wrong file name..
        # .withColumn("input_file_name", F.input_file_name())
    )
    logger.info("Writing to data warehouse")
    df.write.mode("overwrite").saveAsTable(dbtable)

The following TYB file in the annual pack contains null byte characters. Its daily file counterpart does not, so we must replace it before the file can be processed.
* TYB060121.txt

Starting 2021-09-04, TYB files are duplicated in the annual pack. One file name contains a "_t" while the other does not. The daily file counterpart contains the same information as the annual pack file whose name does not contain a "_t" in it. In addition, some of the "_t" files contain null byte characters. The following files are affected. All files with "_t" in the name are ignored when parsing.
* TYB210904_t.txt
* TYB210905_t.txt
* TYB210911_t.txt

In [None]:
datasets = [
    "KAB",
    "BAC",
    "KYI",
    "UKC",
    "OZ",
    "OW",
    "OU",
    "OT",
    "OV",
    "CYB",
    "CHA",
    "SKB",
    "SRB",
    "HJC",
    "SED",  # Run remove_sed_duplicates.sql after loading this dataset
    "TYB",
]

schema_name = "jhra_raw"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")

for dataset in datasets:
    schema_path = f"schemas/{dataset}.yaml"
    dbtable = f"{schema_name}.raw_jrdb__{dataset.lower()}"
    surrogate_key_name = f"{dataset.lower()}_sk"
    # TYB is a special case because the file names are not consistent
    if dataset == "TYB":
        tyb_pattern = re.compile(r"TYB\d{6}\.txt$")
        tyb_files_glob = Path(download_dir).glob("TYB*.txt")
        tyb_files = [
            str(file) for file in tyb_files_glob if tyb_pattern.match(file.name)
        ]
        etl(
            spark,
            schema_path=schema_path,
            data_path=tyb_files,
            dbtable=dbtable,
            surrogate_key_name=surrogate_key_name,
        )
    else:
        etl(
            spark,
            schema_path=schema_path,
            data_path=str(Path(download_dir).joinpath(f"{dataset}*.txt")),
            dbtable=dbtable,
            surrogate_key_name=surrogate_key_name,
        )

# Convert codes to CSV format

Copy and paste text from the code web pages into the following block, run cell, and save as a CSV file in the `seeds` directory.

* [ＪＲＤＢデータコード表](http://www.jrdb.com/program/jrdb_code.txt)
* [脚元コード表（2017.02.20）](http://www.jrdb.com/program/ashimoto_code.txt)
* [馬具コード表（2017.07.02）](http://www.jrdb.com/program/bagu_code.txt)
* [特記コード表（2008.02.23）](http://www.jrdb.com/program/tokki_code.txt)
* [系統コード表（2003.05.15）](http://www.jrdb.com/program/keito_code.txt)
* [調教コースコード表（2009.10.09）](http://www.jrdb.com/program/cyokyo_course_code.txt)
* [追い状態コード表（2008.09.28）](http://www.jrdb.com/program/oi_code.txt)

In [None]:
code_text = """
01      流す
02      余力あり
03      終い抑え
04      一杯
05      バテる
06      伸びる
07      テンのみ
08      鋭く伸び
09      強目
10      終い重点
11      ８分追い
12      追って伸
13      向正面
14      ゲート
15      障害練習
16      中間軽め
17      キリ
21      引っ張る
22      掛かる
23      掛リバテ
24      テン掛る
25      掛り一杯
26      ササル
27      ヨレル
28      バカつく
29      手間取る
99      その他
"""

result = []
for line in code_text.strip().splitlines():
    result.append(line.strip().split())

print(pd.DataFrame(result).to_csv(index=False, header=False))