# Read Fixed-Width File

修正版本：解決換行符問題，按行讀取而非按字節切分

## 問題說明
原始程式碼按 190 bytes 固定切分，但實際每行是 `190 bytes + \n`，導致數據錯位。
修正方案：按行讀取後移除換行符，確保每筆記錄都是正確的 190 bytes。


In [9]:
# 修正版本的程式碼 - 解決換行符問題

from __future__ import annotations

import gzip
import io
import struct
from pathlib import Path
from typing import Iterator, List
import json

import pyarrow as pa
import pyarrow.parquet as pq
import polars as pl

# --- 1. 定義 Schema (190 bytes per line) -----------------------------------

FMT = struct.Struct("6s12s1s1s1s1s6s8s1s1s70s1s1s70s8s2s")  # 共 190 bytes

COLUMNS: List[tuple[str, pa.DataType]] = [
    ("securities_code", pa.string()),
    ("display_time", pa.string()),
    ("remark", pa.string()),
    ("trend_flag", pa.string()),
    ("match_flag", pa.string()),
    ("trade_ul_flag", pa.string()),
    ("trade_price", pa.float64()),        # 除以 100 得到實際價格
    ("trade_volume", pa.int64()),
    ("buy_tick_size", pa.int8()),
    ("buy_ul_flag", pa.string()),
    ("buy_5_price_volume", pa.string()),  # JSON 格式的五檔資料
    ("sell_tick_size", pa.int8()),
    ("sell_ul_flag", pa.string()),
    ("sell_5_price_volume", pa.string()), # JSON 格式的五檔資料
    ("display_date", pa.int32()),
    ("match_staff", pa.string()),
]
ARROW_SCHEMA = pa.schema(COLUMNS)

# --- 2. 解析 5 檔價量資料 ---------------------------------------------------

def parse_5_price_volume(raw_bytes: bytes) -> str:
    """
    解析 70 bytes 的 5 檔價量資料成 JSON 字串
    每檔：6 bytes 價格 + 8 bytes 數量 = 14 bytes，共 5 檔 = 70 bytes
    """
    prices_volumes = []
    for i in range(5):
        offset = i * 14
        price_bytes = raw_bytes[offset:offset+6]
        volume_bytes = raw_bytes[offset+6:offset+14]
        
        try:
            price_str = price_bytes.decode().strip()
            volume_str = volume_bytes.decode().strip()
            
            price = int(price_str) / 100.0 if price_str else 0.0
            volume = int(volume_str) if volume_str else 0
        except (ValueError, UnicodeDecodeError):
            price = 0.0
            volume = 0
            
        prices_volumes.append({"price": price, "volume": volume})
    
    return json.dumps(prices_volumes)

# --- 3. 安全的數值轉換 ------------------------------------------------------

def safe_int(value: bytes, default: int = 0) -> int:
    """安全地將 bytes 轉換為 int"""
    try:
        decoded = value.decode().strip()
        return int(decoded) if decoded else default
    except (ValueError, UnicodeDecodeError):
        return default

def safe_float(value: bytes, divisor: float = 1.0, default: float = 0.0) -> float:
    """安全地將 bytes 轉換為 float"""
    try:
        decoded = value.decode().strip()
        return float(decoded) / divisor if decoded else default
    except (ValueError, UnicodeDecodeError):
        return default

# --- 4. 按行解析函數 --------------------------------------------------------

def parse_line(line: bytes) -> list:
    """
    解析單行 190 bytes 的記錄
    """
    if len(line) != 190:
        raise ValueError(f"Line length should be 190 bytes, got {len(line)}")
        
    rec = FMT.unpack(line)
    
    return [
        rec[0].decode().strip(),                    # securities_code
        rec[1].decode().strip(),                    # display_time
        rec[2].decode().strip(),                    # remark
        rec[3].decode().strip(),                    # trend_flag
        rec[4].decode().strip(),                    # match_flag
        rec[5].decode().strip(),                    # trade_ul_flag
        safe_float(rec[6], 100.0),                  # trade_price (除以100)
        safe_int(rec[7]),                          # trade_volume
        safe_int(rec[8]),                          # buy_tick_size
        rec[9].decode().strip(),                    # buy_ul_flag
        parse_5_price_volume(rec[10]),              # buy_5_price_volume
        safe_int(rec[11]),                          # sell_tick_size
        rec[12].decode().strip(),                   # sell_ul_flag
        parse_5_price_volume(rec[13]),              # sell_5_price_volume
        safe_int(rec[14]),                          # display_date
        rec[15].decode().strip(),                   # match_staff
    ]

# --- 5. 批次處理函數 --------------------------------------------------------

def process_lines_batch(lines: List[bytes]) -> pa.RecordBatch:
    """
    處理一批行記錄，轉換為 Arrow RecordBatch
    """
    records = []
    for line_num, line in enumerate(lines, 1):
        try:
            # 移除換行符 - 這是關鍵修正！
            clean_line = line.rstrip(b'\n\r')
            if len(clean_line) == 190:
                record = parse_line(clean_line)
                records.append(record)
            elif len(clean_line) > 0:  # 跳過空行
                print(f"警告：第 {line_num} 行長度異常: {len(clean_line)} bytes")
        except Exception as e:
            print(f"錯誤：第 {line_num} 行解析失敗: {e}")
            continue
    
    if not records:
        # 回傳空的 RecordBatch
        arrays = [pa.array([], type=t) for t in ARROW_SCHEMA.types]
        return pa.RecordBatch.from_arrays(arrays, schema=ARROW_SCHEMA)
    
    # 轉 Arrow
    cols = list(zip(*records))
    arrays = [pa.array(col, type=t) for col, t in zip(cols, ARROW_SCHEMA.types)]
    return pa.RecordBatch.from_arrays(arrays, schema=ARROW_SCHEMA)

# --- 6. 按行讀取 gzip 檔案 --------------------------------------------------

def stream_batches_by_lines(path_gz: Path, *, batch_lines: int = 50_000) -> Iterator[pa.RecordBatch]:
    """
    按行讀取 gzip 檔案，每次處理 batch_lines 行
    """
    with gzip.open(path_gz, "rb") as fh:
        lines_batch = []
        for line in fh:
            lines_batch.append(line)
            
            if len(lines_batch) >= batch_lines:
                batch = process_lines_batch(lines_batch)
                if batch.num_rows > 0:
                    yield batch
                lines_batch = []
        
        # 處理剩餘的行
        if lines_batch:
            batch = process_lines_batch(lines_batch)
            if batch.num_rows > 0:
                yield batch

# --- 7. 轉換到 Parquet -----------------------------------------------------

def convert_to_parquet(src: Path, dst: Path) -> None:
    """
    將 gzip 檔案轉換為 Parquet 格式，使用 ZSTD 壓縮
    """
    dst.parent.mkdir(parents=True, exist_ok=True)
    
    print(f"開始轉換: {src} -> {dst}")
    
    with pq.ParquetWriter(
        dst,
        schema=ARROW_SCHEMA,
        compression="zstd",
        compression_level=7,
        use_dictionary=True,
    ) as writer:
        total_rows = 0
        total_batches = 0
        
        for batch in stream_batches_by_lines(src):
            writer.write_batch(batch)
            total_rows += batch.num_rows
            total_batches += 1
            
            if total_batches % 10 == 0:
                print(f"已處理 {total_batches} 個 batches，共 {total_rows:,} 筆記錄")
    
    # 統計結果
    file_size = dst.stat().st_size / 1024 / 1024
    print(f"轉換完成！總共 {total_rows:,} 筆記錄")
    print(f"Parquet 檔案大小: {file_size:.2f} MB")
    print(f"平均每筆記錄: {file_size * 1024 / total_rows:.2f} bytes")

print("✅ 程式碼載入完成！")


✅ 程式碼載入完成！


In [10]:
# 測試轉換 Sample_new.gz
gz_file = Path("../snapshot/Sample_new.gz")
pq_file = Path("./parquet/YY=2024/MM=11/DD=11/sample_corrected.parquet")

print(f"輸入檔案: {gz_file}")
print(f"輸出檔案: {pq_file}")
print(f"檔案存在: {gz_file.exists()}")

# 執行轉換
convert_to_parquet(gz_file, pq_file)


輸入檔案: ../snapshot/Sample_new.gz
輸出檔案: parquet/YY=2024/MM=11/DD=11/sample_corrected.parquet
檔案存在: True
開始轉換: ../snapshot/Sample_new.gz -> parquet/YY=2024/MM=11/DD=11/sample_corrected.parquet
轉換完成！總共 40 筆記錄
Parquet 檔案大小: 0.01 MB
平均每筆記錄: 0.16 bytes


In [11]:
# 驗證轉換結果：讀取 Parquet 檔案並檢視資料
import polars as pl
import json

# 讀取剛才產生的 Parquet 檔案
pq_file = Path("./parquet/YY=2024/MM=11/DD=11/sample_corrected.parquet")

if pq_file.exists():
    print("✅ Parquet 檔案存在，開始讀取...")
    
    # 使用 Polars 讀取
    df = pl.read_parquet(pq_file)
    
    print(f"\n📊 資料形狀: {df.shape}")
    print(f"   總筆數: {df.shape[0]:,}")
    print(f"   欄位數: {df.shape[1]}")
    
    print("\n📋 Schema:")
    for col, dtype in df.schema.items():
        print(f"   {col}: {dtype}")
    
    print("\n🔍 前 3 筆記錄:")
    print(df.head(3))
    
    print("\n📈 檢查證券代號是否正確:")
    securities = df.select("securities_code").unique().sort("securities_code")
    print(f"   不重複證券代號: {securities.to_series().to_list()}")
    
    print("\n💰 價格範圍檢查:")
    price_stats = df.select([
        pl.col("trade_price").min().alias("最低價"),
        pl.col("trade_price").max().alias("最高價"),
        pl.col("trade_price").mean().alias("平均價格"),
        pl.col("trade_price").filter(pl.col("trade_price") > 0).count().alias("有成交價筆數")
    ])
    print(price_stats)
    
    print("\n🎯 範例：解析 buy_5_price_volume (第一筆記錄):")
    first_buy_5 = df.select("buy_5_price_volume").item(0, 0)
    print(f"   原始 JSON: {first_buy_5}")
    parsed_buy_5 = json.loads(first_buy_5)
    print("   解析後五檔買盤:")
    for i, level in enumerate(parsed_buy_5, 1):
        if level['volume'] > 0:
            print(f"     {i}檔: 價格 {level['price']:.2f}, 數量 {level['volume']}")
    
    print("\n✅ 資料格式驗證完成！")
    
else:
    print("❌ Parquet 檔案不存在！請先執行轉換。")


✅ Parquet 檔案存在，開始讀取...

📊 資料形狀: (40, 16)
   總筆數: 40
   欄位數: 16

📋 Schema:
   securities_code: String
   display_time: String
   remark: String
   trend_flag: String
   match_flag: String
   trade_ul_flag: String
   trade_price: Float64
   trade_volume: Int64
   buy_tick_size: Int8
   buy_ul_flag: String
   buy_5_price_volume: String
   sell_tick_size: Int8
   sell_ul_flag: String
   sell_5_price_volume: String
   display_date: Int32
   match_staff: String

🔍 前 3 筆記錄:
shape: (3, 16)
┌────────────┬────────────┬────────┬───────────┬───┬───────────┬───────────┬───────────┬───────────┐
│ securities ┆ display_ti ┆ remark ┆ trend_fla ┆ … ┆ sell_ul_f ┆ sell_5_pr ┆ display_d ┆ match_sta │
│ _code      ┆ me         ┆ ---    ┆ g         ┆   ┆ lag       ┆ ice_volum ┆ ate       ┆ ff        │
│ ---        ┆ ---        ┆ str    ┆ ---       ┆   ┆ ---       ┆ e         ┆ ---       ┆ ---       │
│ str        ┆ str        ┆        ┆ str       ┆   ┆ str       ┆ ---       ┆ i32       ┆ str       │
│       

In [12]:
# 完整資料分析範例
def analyze_tick_data(parquet_path: Path):
    """
    示範如何使用 Polars 分析轉換後的 tick 資料
    """
    if not parquet_path.exists():
        print(f"❌ 檔案不存在: {parquet_path}")
        return
    
    print(f"🔎 分析檔案: {parquet_path}")
    
    # 使用 Polars 的 lazy evaluation 提高效能
    df = pl.scan_parquet(parquet_path)
    
    # 1. 基本統計
    print("\n=== 📊 基本統計 ===")
    basic_stats = df.select([
        pl.count().alias("總筆數"),
        pl.col("securities_code").n_unique().alias("不重複證券數"),
        pl.col("trade_price").filter(pl.col("trade_price") > 0).min().alias("最低成交價"),
        pl.col("trade_price").filter(pl.col("trade_price") > 0).max().alias("最高成交價"),
        pl.col("trade_volume").sum().alias("總成交量"),
    ]).collect()
    print(basic_stats)
    
    # 2. 時間分佈分析
    print("\n=== ⏰ 時間分佈分析 ===")
    time_analysis = df.with_columns([
        pl.col("display_time").str.slice(0, 2).alias("hour"),
    ]).group_by("hour").agg([
        pl.count().alias("tick數"),
        pl.col("trade_volume").sum().alias("成交量"),
    ]).sort("hour").collect()
    print(time_analysis)
    
    # 3. 依證券代號統計
    print("\n=== 📈 證券統計 ===")
    securities_stats = df.group_by("securities_code").agg([
        pl.count().alias("tick數"),
        pl.col("trade_volume").sum().alias("總成交量"),
        pl.col("trade_price").filter(pl.col("trade_price") > 0).mean().alias("平均成交價"),
        pl.col("trade_price").filter(pl.col("trade_price") > 0).min().alias("最低價"),
        pl.col("trade_price").filter(pl.col("trade_price") > 0).max().alias("最高價"),
    ]).sort("總成交量", descending=True).collect()
    print(securities_stats)
    
    # 4. 展示解析五檔資料
    print("\n=== 💹 五檔資料解析範例 ===")
    sample_data = df.select([
        "securities_code",
        "display_time", 
        "trade_price",
        "buy_5_price_volume",
        "sell_5_price_volume"
    ]).limit(2).collect()
    
    for idx, row in enumerate(sample_data.iter_rows(named=True), 1):
        print(f"\n📋 範例 {idx}:")
        print(f"   證券: {row['securities_code']}")
        print(f"   時間: {row['display_time']}")
        print(f"   成交價: {row['trade_price']}")
        
        # 解析買盤五檔
        buy_5 = json.loads(row['buy_5_price_volume'])
        print("   📉 買盤五檔:")
        buy_count = 0
        for i, level in enumerate(buy_5, 1):
            if level['volume'] > 0:
                print(f"      {i}檔: 價格 {level['price']:8.2f}, 數量 {level['volume']:>8}")
                buy_count += 1
        if buy_count == 0:
            print("      (無買盤)")
        
        # 解析賣盤五檔  
        sell_5 = json.loads(row['sell_5_price_volume'])
        print("   📈 賣盤五檔:")
        sell_count = 0
        for i, level in enumerate(sell_5, 1):
            if level['volume'] > 0:
                print(f"      {i}檔: 價格 {level['price']:8.2f}, 數量 {level['volume']:>8}")
                sell_count += 1
        if sell_count == 0:
            print("      (無賣盤)")

# 執行完整分析
print("🚀 開始執行完整資料分析...")
pq_file = Path("./parquet/YY=2024/MM=11/DD=11/sample_corrected.parquet")
analyze_tick_data(pq_file)


🚀 開始執行完整資料分析...
🔎 分析檔案: parquet/YY=2024/MM=11/DD=11/sample_corrected.parquet

=== 📊 基本統計 ===
shape: (1, 5)
┌────────┬──────────────┬────────────┬────────────┬──────────┐
│ 總筆數 ┆ 不重複證券數 ┆ 最低成交價 ┆ 最高成交價 ┆ 總成交量 │
│ ---    ┆ ---          ┆ ---        ┆ ---        ┆ ---      │
│ u32    ┆ u32          ┆ f64        ┆ f64        ┆ i64      │
╞════════╪══════════════╪════════════╪════════════╪══════════╡
│ 40     ┆ 2            ┆ 184.5      ┆ 185.0      ┆ 190165   │
└────────┴──────────────┴────────────┴────────────┴──────────┘

=== ⏰ 時間分佈分析 ===
shape: (2, 3)
┌──────┬────────┬────────┐
│ hour ┆ tick數 ┆ 成交量 │
│ ---  ┆ ---    ┆ ---    │
│ str  ┆ u32    ┆ i64    │
╞══════╪════════╪════════╡
│ 08   ┆ 20     ┆ 0      │
│ 13   ┆ 20     ┆ 190165 │
└──────┴────────┴────────┘

=== 📈 證券統計 ===
shape: (2, 6)
┌─────────────────┬────────┬──────────┬────────────┬────────┬────────┐
│ securities_code ┆ tick數 ┆ 總成交量 ┆ 平均成交價 ┆ 最低價 ┆ 最高價 │
│ ---             ┆ ---    ┆ ---      ┆ ---        ┆ ---    ┆ ---    │
│ st

(Deprecated in version 0.20.5)
  pl.count().alias("總筆數"),
(Deprecated in version 0.20.5)
  pl.count().alias("tick數"),
(Deprecated in version 0.20.5)
  pl.count().alias("tick數"),


In [13]:
# 大檔案處理範例 (可選執行)

def process_large_file_optimized(src: Path, dst: Path, *, batch_lines: int = 500_000) -> None:
    """
    針對大檔案優化的版本：
    1. 更大的 batch size
    2. 詳細的進度回報
    3. 檔案大小統計和壓縮率計算
    """
    if not src.exists():
        print(f"❌ 輸入檔案不存在: {src}")
        return
        
    dst.parent.mkdir(parents=True, exist_ok=True)
    
    # 取得原始檔案大小
    original_size = src.stat().st_size / 1024 / 1024  # MB
    print(f"📁 原始檔案大小: {original_size:.2f} MB")
    print(f"🔄 開始轉換: {src} -> {dst}")
    print(f"📦 Batch 大小: {batch_lines:,} 筆記錄")
    
    import time
    start_time = time.time()
    
    with pq.ParquetWriter(
        dst,
        schema=ARROW_SCHEMA,
        compression="zstd",
        compression_level=7,
        use_dictionary=True,
        row_group_size=batch_lines,  # 明確設定 row group 大小
    ) as writer:
        total_rows = 0
        total_batches = 0
        
        for batch in stream_batches_by_lines(src, batch_lines=batch_lines):
            writer.write_batch(batch)
            total_rows += batch.num_rows
            total_batches += 1
            
            if total_batches % 5 == 0:  # 每 5 個 batch 印一次進度
                elapsed = time.time() - start_time
                rate = total_rows / elapsed
                print(f"⏳ 已處理 {total_batches:,} 個 batches，共 {total_rows:,} 筆記錄 ({rate:.0f} 筆/秒)")
    
    # 統計結果
    end_time = time.time()
    elapsed_time = end_time - start_time
    final_size = dst.stat().st_size / 1024 / 1024  # MB
    compression_ratio = (1 - final_size / original_size) * 100
    
    print(f"\n=== 🎉 轉換完成 ===")
    print(f"⏱️  處理時間: {elapsed_time:.1f} 秒")
    print(f"📊 總筆數: {total_rows:,}")
    print(f"📁 原始大小: {original_size:.2f} MB")
    print(f"💾 Parquet 大小: {final_size:.2f} MB")
    print(f"📉 壓縮率: {compression_ratio:.1f}%")
    print(f"⚡ 處理速度: {total_rows / elapsed_time:.0f} 筆/秒")
    print(f"💿 平均每筆記錄: {final_size * 1024 / total_rows:.2f} bytes")

# 範例：處理更大檔案的使用方式
print("📝 大檔案處理範例：")
print("""# 如果要處理更大的檔案，可以這樣使用：
large_gz_file = Path("../snapshot/dsp20241101.gz")  # 假設有大檔案
large_pq_file = Path("./parquet/YY=2024/MM=11/DD=01/dsp20241101.parquet")
process_large_file_optimized(large_gz_file, large_pq_file, batch_lines=1_000_000)
""")

print("\n📋 使用建議：")
print("• 小檔案 (<100MB): batch_lines=50,000")
print("• 中檔案 (100MB-1GB): batch_lines=200,000")
print("• 大檔案 (>1GB): batch_lines=500,000 或更大")
print("• 記憶體有限時，降低 batch_lines 參數")


📝 大檔案處理範例：
# 如果要處理更大的檔案，可以這樣使用：
large_gz_file = Path("../snapshot/dsp20241101.gz")  # 假設有大檔案
large_pq_file = Path("./parquet/YY=2024/MM=11/DD=01/dsp20241101.parquet")
process_large_file_optimized(large_gz_file, large_pq_file, batch_lines=1_000_000)


📋 使用建議：
• 小檔案 (<100MB): batch_lines=50,000
• 中檔案 (100MB-1GB): batch_lines=200,000
• 大檔案 (>1GB): batch_lines=500,000 或更大
• 記憶體有限時，降低 batch_lines 參數


----

In [14]:
import pandas as pd

In [15]:
import pandas as pd

df = pd.read_parquet("./parquet/YY=2024/MM=11/DD=11/sample_corrected.parquet")
df

Unnamed: 0,securities_code,display_time,remark,trend_flag,match_flag,trade_ul_flag,trade_price,trade_volume,buy_tick_size,buy_ul_flag,buy_5_price_volume,sell_tick_size,sell_ul_flag,sell_5_price_volume,display_date,match_staff
0,50,83004446448,T,,,,0.0,0,5,,"[{""price"": 199.5, ""volume"": 29}, {""price"": 199...",1,,"[{""price"": 203.0, ""volume"": 2}, {""price"": 0.0,...",20241111,AA
1,50,83009462008,T,,,,0.0,0,5,,"[{""price"": 199.5, ""volume"": 27}, {""price"": 199...",5,,"[{""price"": 200.0, ""volume"": 1}, {""price"": 200....",20241111,AA
2,50,83014478574,T,,,,0.0,0,5,,"[{""price"": 199.5, ""volume"": 17}, {""price"": 199...",5,,"[{""price"": 200.0, ""volume"": 2}, {""price"": 200....",20241111,AA
3,50,83019493222,T,,,,0.0,0,5,,"[{""price"": 199.5, ""volume"": 14}, {""price"": 199...",5,,"[{""price"": 200.0, ""volume"": 2}, {""price"": 200....",20241111,AA
4,50,83024508770,T,,,,0.0,0,5,,"[{""price"": 199.5, ""volume"": 12}, {""price"": 199...",5,,"[{""price"": 200.0, ""volume"": 13}, {""price"": 200...",20241111,AA
5,50,83029525440,T,,,,0.0,0,5,,"[{""price"": 199.5, ""volume"": 11}, {""price"": 199...",5,,"[{""price"": 200.0, ""volume"": 34}, {""price"": 200...",20241111,AA
6,50,83034542076,T,,,,0.0,0,5,,"[{""price"": 199.5, ""volume"": 11}, {""price"": 199...",5,,"[{""price"": 199.8, ""volume"": 1}, {""price"": 200....",20241111,AA
7,50,83039555650,T,,,,0.0,0,5,,"[{""price"": 199.5, ""volume"": 12}, {""price"": 199...",5,,"[{""price"": 199.8, ""volume"": 1}, {""price"": 200....",20241111,AA
8,50,83044571226,T,,,,0.0,0,5,,"[{""price"": 199.5, ""volume"": 4}, {""price"": 199....",5,,"[{""price"": 199.8, ""volume"": 1}, {""price"": 200....",20241111,AA
9,50,83049585802,T,,,,0.0,0,5,,"[{""price"": 199.5, ""volume"": 4}, {""price"": 199....",5,,"[{""price"": 199.6, ""volume"": 1}, {""price"": 199....",20241111,AA


In [20]:
df['buy_5_price_volume'][0]

'[{"price": 199.5, "volume": 29}, {"price": 199.0, "volume": 20}, {"price": 191.5, "volume": 3}, {"price": 190.0, "volume": 1}, {"price": 188.0, "volume": 1}]'