In [1]:
import functools
import os
from typing import List
from pathlib import Path
import re
import pandas as pd
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from JRDBDataParsingTools.data_schema import load_schema, create_pyspark_schema
from JRDBDataParsingTools.data_parser import parse_line
from JRDBDataParsingTools.file_downloader import download_and_extract_files
from JRDBDataParsingTools.structured_logger import logger

In [2]:
%load_ext dotenv

# Download files from the web

In [3]:
# JRDB credentials
username = os.getenv("JRDB_USERNAME")
password = os.getenv("JRDB_PASSWORD")
# The directory where you want to download the files
# Must be an absolute path
download_dir = "/Users/hankehly/Projects/JRDBDataParsingTools/downloads"

In [4]:
target_dataset_urls = [
    # Taken from http://www.jrdb.com/member/dataindex.html
    # Comment out the ones you don't want to download.
    # Downloading all of them will take about 2-3 hours.
    # "http://www.jrdb.com/member/datazip/Kab/index.html",
    # "http://www.jrdb.com/member/datazip/Bac/index.html",
    "http://www.jrdb.com/member/datazip/Kyi/index.html",
    # "http://www.jrdb.com/member/datazip/Ukc/index.html",
    # "http://www.jrdb.com/member/datazip/Oz/index.html",
    # "http://www.jrdb.com/member/datazip/Oz/index2.html",  # OW data
    # "http://www.jrdb.com/member/datazip/Ou/index.html",
    # "http://www.jrdb.com/member/datazip/Ot/index.html",
    # "http://www.jrdb.com/member/datazip/Ov/index.html",
    # "http://www.jrdb.com/member/datazip/Cyb/index.html",
    # "http://www.jrdb.com/member/datazip/Cha/index.html",
    # "http://www.jrdb.com/member/datazip/Sed/index.html",
    # "http://www.jrdb.com/member/datazip/Skb/index.html",
    # "http://www.jrdb.com/member/datazip/Tyb/index.html",
    # "http://www.jrdb.com/member/datazip/Hjc/index.html",
]

for webpage_url in target_dataset_urls:
    download_and_extract_files(
        webpage_url, username, password, download_dir, skip_year_files=True
    )

{"event": "Downloading and extracting files from http://www.jrdb.com/member/datazip/Kyi/index.html", "level": "info", "timestamp": "2023-12-24T14:04:29.999899Z", "logger": "JRDBDataParsingTools.file_downloader"}
{"event": "Downloading http://www.jrdb.com/member/datazip/Kyi/2023/KYI231224.zip", "level": "info", "timestamp": "2023-12-24T14:04:32.707460Z", "logger": "JRDBDataParsingTools.file_downloader"}
{"event": "Downloading http://www.jrdb.com/member/datazip/Kyi/2023/KYI231223.zip", "level": "info", "timestamp": "2023-12-24T14:04:32.708082Z", "logger": "JRDBDataParsingTools.file_downloader"}
{"event": "Downloading http://www.jrdb.com/member/datazip/Kyi/2023/KYI231217.zip", "level": "info", "timestamp": "2023-12-24T14:04:32.708230Z", "logger": "JRDBDataParsingTools.file_downloader"}
{"event": "Downloading http://www.jrdb.com/member/datazip/Kyi/2023/KYI231216.zip", "level": "info", "timestamp": "2023-12-24T14:04:32.708416Z", "logger": "JRDBDataParsingTools.file_downloader"}
{"event": "D

# Handle edge cases in TYB files

The following TYB file in the annual pack contains null byte characters. Its daily file counterpart does not, so we must replace it before the file can be processed.
* TYB060121.txt

Starting 2021-09-04, TYB files are duplicated in the annual pack. One file name contains a "_t" while the other does not. The daily file counterpart contains the same information as the annual pack file whose name does not contain a "_t" in it. In addition, some of the "_t" files contain null byte characters. The following files are affected. All files with "_t" in the name are ignored when parsing.
* TYB210904_t.txt
* TYB210905_t.txt
* TYB210911_t.txt

# Import data into Postgres

In [7]:
datasets = [
    "KAB",
    "BAC",
    "KYI",
    "UKC",
    "OZ",
    "OW",
    "OU",
    "OT",
    "OV",
    "CYB",
    "CHA",
    "SED",
    "SKB",
    "HJC",
    # "TYB"  # TYB is a special case because the file names are not consistent
]


def etl(spark, schema_path: str, data_path: str | List[str], dbtable: str):
    logger.info(f"Processing dataset {dbtable}")
    schema = load_schema(schema_path)
    logger.info("Creating PySpark DataFrame")
    df = (
        spark.read.format("binaryFile")
        .load(data_path)
        .select("content")
        .rdd.flatMap(lambda x: x[0].splitlines())
        .map(functools.partial(parse_line, schema=schema))
        .toDF(create_pyspark_schema(schema))
        .withColumn("input_file_name", f.input_file_name())
    )
    logger.info("Writing to data warehouse")
    (
        df.write.mode("overwrite")
        .format("jdbc")
        .options(
            url="jdbc:postgresql://localhost:5432/jrdb",
            user="admin",
            password="admin",
            driver="org.postgresql.Driver",
            dbtable=dbtable,
        )
        .save()
    )


spark = SparkSession.builder.config("spark.jars", "postgresql-42.7.1.jar").getOrCreate()
schema_name = "jrdb_raw"


for dataset in datasets:
    etl(
        spark,
        schema_path=f"schemas/{dataset}.yaml",
        data_path=str(Path(download_dir).joinpath(f"{dataset}*.txt")),
        dbtable=f"{schema_name}.{dataset.lower()}",
    )


# TYB is a special case because the file names are not consistent
tyb_pattern = re.compile(r"TYB\d{6}\.txt$")
tyb_files_glob = Path(download_dir).glob("TYB*.txt")
tyb_files = [str(file) for file in tyb_files_glob if tyb_pattern.match(file.name)]
etl(
    spark,
    schema_path="schemas/TYB.yaml",
    data_path=tyb_files,
    dbtable=f"{schema_name}.tyb",
)

{"event": "Processing dataset jrdb_raw.kab", "level": "info", "timestamp": "2023-12-25T12:24:44.612154Z", "logger": "__main__"}
{"event": "Creating PySpark DataFrame", "level": "info", "timestamp": "2023-12-25T12:24:44.621469Z", "logger": "__main__"}
{"event": "Writing to data warehouse", "level": "info", "timestamp": "2023-12-25T12:24:48.086182Z", "logger": "__main__"}
{"event": "Processing dataset jrdb_raw.bac", "level": "info", "timestamp": "2023-12-25T12:24:49.237583Z", "logger": "__main__"}
{"event": "Creating PySpark DataFrame", "level": "info", "timestamp": "2023-12-25T12:24:49.246777Z", "logger": "__main__"}
{"event": "Writing to data warehouse", "level": "info", "timestamp": "2023-12-25T12:24:52.592021Z", "logger": "__main__"}
{"event": "Processing dataset jrdb_raw.kyi", "level": "info", "timestamp": "2023-12-25T12:24:54.065179Z", "logger": "__main__"}
{"event": "Creating PySpark DataFrame", "level": "info", "timestamp": "2023-12-25T12:24:54.090643Z", "logger": "__main__"}
{"e

# Convert codes to CSV format

Copy and paste text from the code web pages into the following block, run cell, and save as a CSV file in the `seeds` directory.

* [ＪＲＤＢデータコード表](http://www.jrdb.com/program/jrdb_code.txt)
* [脚元コード表（2017.02.20）](http://www.jrdb.com/program/ashimoto_code.txt)
* [馬具コード表（2017.07.02）](http://www.jrdb.com/program/bagu_code.txt)
* [特記コード表（2008.02.23）](http://www.jrdb.com/program/tokki_code.txt)
* [系統コード表（2003.05.15）](http://www.jrdb.com/program/keito_code.txt)
* [調教コースコード表（2009.10.09）](http://www.jrdb.com/program/cyokyo_course_code.txt)
* [追い状態コード表（2008.09.28）](http://www.jrdb.com/program/oi_code.txt)

In [16]:
code_text = """
01      流す
02      余力あり
03      終い抑え
04      一杯
05      バテる
06      伸びる
07      テンのみ
08      鋭く伸び
09      強目
10      終い重点
11      ８分追い
12      追って伸
13      向正面
14      ゲート
15      障害練習
16      中間軽め
17      キリ
21      引っ張る
22      掛かる
23      掛リバテ
24      テン掛る
25      掛り一杯
26      ササル
27      ヨレル
28      バカつく
29      手間取る
99      その他
"""

result = []
for line in code_text.strip().splitlines():
    result.append(line.strip().split())

print(pd.DataFrame(result).to_csv(index=False, header=False))

01,流す
02,余力あり
03,終い抑え
04,一杯
05,バテる
06,伸びる
07,テンのみ
08,鋭く伸び
09,強目
10,終い重点
11,８分追い
12,追って伸
13,向正面
14,ゲート
15,障害練習
16,中間軽め
17,キリ
21,引っ張る
22,掛かる
23,掛リバテ
24,テン掛る
25,掛り一杯
26,ササル
27,ヨレル
28,バカつく
29,手間取る
99,その他

