In [1]:
import zstandard as zstd
import pandas as pd
from datetime import datetime
import pyarrow as pa
import pyarrow.parquet as pq

In [2]:
STAGE = "abfss://chessstage@chess-insights.postgre.azure.dfs.core.windows.net:5432/"
PROCESSED = "abfss://processed@chessinsights.dfs.core.windows.net/"
YEAR_MONTH = datetime.strftime(datetime.now(), "%Y-%m")
MONTHLY_GAME_FILE = "lichess_db_standard_rated_{YEAR_MONTH}.pgn.zst"


target = dbutils.widgets.get("fileName")
# If this notebook is executed from monthly ETL, no fileName value will be passed
target = target if target else MONTHLY_GAME_FILE
OUT = PROCESSED + target.split(".")[0] + ".parquet"

In [16]:
def extract_kv_from_line(line):
    if line.startswith('[White "'):
        return "white_name", line.split('"')[1]
    elif line.startswith('[Black "'):
        return "black_name",  line.split('"')[1]
    elif line.startswith('[UTCDate "'):
        return "date", line.split('"')[1]
    elif line.startswith('[WhiteElo "'):
        return "white_elo", line.split('"')[1]
    elif line.startswith('[BlackElo "'):
        return "black_elo", line.split('"')[1]
    elif line.startswith('[ECO "'):
        return "eco", line.split('"')[1]
    elif line.startswith('[Opening "'):
        return "opening", line.split('"')[1]
    elif line.startswith('[TimeControl "'):
        return "time_control", line.split('"')[1]
    elif line.startswith('[Termination "'):
        return "termination", line.split('"')[1]
    # Moves start after metadata, capture them
    elif line.startswith('1.'):
        return "moves", line
    else:
        return "outofscope", None

def process_batch(batch):
    df = pd.DataFrame(batch)
    # Convert white_elo and black_elo to int
    df['white_elo'] = pd.to_numeric(df['white_elo'], errors='coerce')
    df['white_elo'] = df['white_elo'].fillna(-1).astype('int64')
    df['black_elo'] = pd.to_numeric(df['black_elo'], errors='coerce')
    df['black_elo'] = df['black_elo'].fillna(-1).astype('int64')
    df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%d')
    
    return df

def write_batch_to_parquet(processed_df, parquet_writer=None, path=OUT):

    table = pa.Table.from_pandas(processed_df)
    if parquet_writer is None:
        parquet_writer = pq.ParquetWriter(path, table.schema)

    parquet_writer.write_table(table)

    return parquet_writer

# Stream decompression and line handling
def decompress_and_process_pgn(file_path, batch_size = 10000):

    games = []
    current_game = {}
    parquet_writer = None
    batch_n = 1

    with open(file_path, 'rb') as compressed_file:
        dctx = zstd.ZstdDecompressor()
        
        # Stream the decompressed content
        with dctx.stream_reader(compressed_file) as reader:
            buffer = b""  # Accumulate data that doesn't yet form a complete line
            
            for chunk in iter(lambda: reader.read(65536), b""):
                # Add the chunk to the buffer
                buffer += chunk
                
                lines = buffer.split(b'\n')
                
                # Process all lines except the last one (which may be incomplete)
                for line in lines[:-1]:
                    if line.strip() == b"":  # Skip empty lines
                        continue

                    key, value = extract_kv_from_line(line.decode('utf-8').strip())  # Process each line

                    if key == "outofscope":
                        continue

                    current_game[key] = value
                    if key == "moves": # end of game info
                        games.append(current_game)
                        current_game = {}
                        if len(games) == batch_size:
                            processed_batch = process_batch(games)
                            games.clear()
                            parquet_writer = write_batch_to_parquet(processed_batch, parquet_writer, path="./2015-08.parquet")
                            print(f"Processed batch {batch_n}")
                            batch_n += 1


                # Save the last line (incomplete) to the buffer for the next chunk
                buffer = lines[-1]

        # After the loop, process any remaining buffer data
        if buffer:
            if buffer.strip() != b"":  # Skip empty lines
                key, value = extract_kv_from_line(buffer.decode('utf-8').strip())
                if key != "moves": # Skip incomplete games
                    pass
                else:
                    current_game[key] = value
                    games.append(current_game)
                    current_game = {}
        if games:
            processed_batch = process_batch(games)
            parquet_writer = write_batch_to_parquet(processed_batch, parquet_writer, path="./2015-08.parquet")

              

In [None]:
decompress_and_process_pgn(target)