In [2]:
from datasets import load_dataset, VerificationMode
from transformers import AutoTokenizer
import polars as pl
import json
import os
from pathlib import Path
import re
import unicodedata

In [3]:
DIALECT_PREFIXES = [
    "terjemah ke johor:",
    "terjemah ke kelantan:",
    "terjemah ke sabah:",
    "terjemah ke sarawak:",
    "terjemah ke kedah:",
    "terjemah ke pahang:",
    "terjemah ke perak:",
    "terjemah ke terengganu:",
    "terjemah ke melaka:",
    "terjemah ke negeri sembilan:",
    "terjemah ke pasar Melayu:",
]

STANDARD_PREFIXES = [
    "terjemah ke Melayu:",
    "terjemah ke Inggeris:",
    # "terjemah ke Mandarin:",
    # "terjemah ke Tamil:",
    "terjemah ke Manglish:",
    # "terjemah ke Cantonese:",
]

ALLOWED_PREFIXES = DIALECT_PREFIXES + STANDARD_PREFIXES


# Preclean data

In [50]:
# Test on a single shard
test_stage = "stage2-part1"
test_shard_idx = 0
test_num_shards = 2

data_file = f"{test_stage}/train-{str(test_shard_idx).rjust(5, '0')}-of-{str(test_num_shards).rjust(5, '0')}.parquet"
dataset = load_dataset(
    "mesolitica/Malaysian-Translation",
    data_files=data_file,
    verification_mode=VerificationMode.NO_CHECKS
)


In [8]:
def preclean_data(data:pl.LazyFrame) -> pl.LazyFrame:
    """
    Preclean data by:
    - Filtering by allowed prefixes
    - Removing self-translation
    - Removing empty source or target
    - Removing too long source or target
    - Removing too many non-alphanumeric characters,there is some code block data that is mostly not translated for the task
    """
    
    # Filter by allowed prefixes
#     print("Original data:")
#     print(data.show())
    
    def _remove_non_alphanumeric(text:str) -> str:
        return re.sub(r'[^a-zA-Z0-9\s]', '', text)
    
    data_lf = (
        data
        .filter(pl.col("prefix").str.to_lowercase().str.strip_chars().is_in([p.lower() for p in ALLOWED_PREFIXES])) # filter by allowed prefixes (lowercased)
        .filter(pl.col("src") != pl.col("tgt")) #remove self-translation
        .filter(pl.col("src").str.len_chars() > 0,
                pl.col("tgt").str.len_chars() > 0) #remove empty source or target
        .filter(pl.col("src").str.len_chars() < 1000,
                pl.col("tgt").str.len_chars() < 1000) #remove too long source or target
        .filter(pl.col("src").map_elements(_remove_non_alphanumeric).str.len_chars() > 0,
                pl.col("tgt").map_elements(_remove_non_alphanumeric).str.len_chars() > 0) #remove too many non-alphanumeric characters
    )
    
#     print("Filtered data:")
#     print(data_lf.show())
    
    return data_lf

In [68]:
data_lf = pl.LazyFrame(list(dataset['train']))
len_df = data_lf.select(pl.len()).collect()
print(f"Original: {len_df.item()} examples")


cleaned_lf = preclean_data(data_lf)
cleaned_df = cleaned_lf.collect()

print(f"Cleaned: {len(cleaned_df)} examples")
print(f"Removed: {len_df.item() - len(cleaned_df)} examples ({100*(len_df.item() - len(cleaned_df))/len_df.item():.1f}%)")


Original: 87101 examples
Original data:


src,tgt,prefix
str,str,str
"""Wah, ramai sungguh orang membe…","""Pengkritik telah menuduh badan…","""terjemah ke Melayu: """
"""Wah, ramai sungguh orang membe…","""Critics have accused the Malay…","""terjemah ke Inggeris: """
"""Apasal la setengah orang ni su…","""Mengapa sesetengah pengkritik …","""terjemah ke Melayu: """
"""Apasal la setengah orang ni su…","""Why have some critics accused …","""terjemah ke Inggeris: """
"""Pengkritik telah menuduh badan…","""Wah, ramai sungguh orang membe…","""terjemah ke johor: """


None
Filtered data:


src,tgt,prefix
str,str,str
"""Apasal la setengah orang ni su…","""Mengapa sesetengah pengkritik …","""terjemah ke Melayu: """
"""Apasal la setengah orang ni su…","""Why have some critics accused …","""terjemah ke Inggeris: """
"""Mengapa sesetengah pengkritik …","""Apasal la setengah orang ni su…","""terjemah ke johor: """
"""Why have some critics accused …","""Apasal la setengah orang ni su…","""terjemah ke johor: """
"""Memang la kerajaan kita ni dah…","""Sudah tentu, berikut ialah soa…","""terjemah ke Melayu: """


None
Cleaned: 41350 examples
Removed: 45751 examples (52.5%)


In [3]:
def process_shard(stage, shard_idx, num_shards, output_dir):
    """Process a single shard, clean it, and save as parquet"""
    output_path = Path(output_dir)
    output_path.mkdir(parents=True, exist_ok=True)
    
    data_file = f"{stage}/train-{str(shard_idx).rjust(5, '0')}-of-{str(num_shards).rjust(5, '0')}.parquet"
    
    # Load
    dataset = load_dataset(
        "mesolitica/Malaysian-Translation",
        data_files=data_file,
        verification_mode=VerificationMode.NO_CHECKS
    )
    
    # Convert to Polars and clean
    data_lf = pl.LazyFrame(list(dataset['train']))
    cleaned_lf = preclean_data(data_lf)
    cleaned_df = cleaned_lf.collect()
    
    # Save as parquet
    output_file = output_path / f"{stage.replace('/', '_')}_shard_{shard_idx:05d}.parquet"
    cleaned_df.write_parquet(str(output_file))
    
    return len(cleaned_df)

In [11]:
# Set up output directory
# Thinking of trying stage2-coding-blocks-dialects, but it's too much effort for now just to get the data from the comments
STAGES_TO_PROCESS = {
    "stage2-part1": 2, # Mostly Dialects 
}

from tqdm import tqdm

output_dir = "../data/processed/cleaned_shards/"

for stage, num_shards in tqdm(STAGES_TO_PROCESS.items()):
    for shard_idx in range(num_shards):
        count = process_shard(stage, shard_idx, num_shards, output_dir)
        print(f"Processed shard {shard_idx} of {num_shards} for {stage}")
        print(f"Processed {count} examples")

  0%|          | 0/1 [00:00<?, ?it/s]

Processed shard 0 of 2 for stage2-part1
Processed 41350 examples


100%|██████████| 1/1 [00:11<00:00, 11.38s/it]

Processed shard 1 of 2 for stage2-part1
Processed 9750 examples





In [25]:
output_dir = "../data/processed/cleaned_shards/"

In [9]:
data_files = os.listdir(output_dir)
print(data_files)

lazyframes = [
    pl.scan_parquet(f"{output_dir}/{data_file}")
    for data_file in data_files
]

combined_lf = pl.concat(lazyframes, how="vertical_relaxed", rechunk=True)

combined_df = combined_lf.collect()
combined_df.write_parquet(f"{output_dir}/combined_shards_stage_2.parquet")


['stage2-part1_shard_00000.parquet', 'stage2-part1_shard_00001.parquet']


In [26]:
stage2_train = pl.scan_parquet(f"{output_dir}/combined_shards_stage_2.parquet")
stage2_train.show()

src,tgt,prefix
str,str,str
"""Apasal la setengah orang ni su…","""Mengapa sesetengah pengkritik …","""terjemah ke Melayu: """
"""Apasal la setengah orang ni su…","""Why have some critics accused …","""terjemah ke Inggeris: """
"""Mengapa sesetengah pengkritik …","""Apasal la setengah orang ni su…","""terjemah ke johor: """
"""Why have some critics accused …","""Apasal la setengah orang ni su…","""terjemah ke johor: """
"""Memang la kerajaan kita ni dah…","""Sudah tentu, berikut ialah soa…","""terjemah ke Melayu: """


In [27]:
(
    stage2_train
    .unique(subset=["src", "tgt", "prefix"])
    .group_by(
        "prefix",
    )
    .agg(
        pl.col("prefix").count().alias("count")
    )
    .sort("count", descending=True)
    .show(100)
)

prefix,count
str,u32
"""terjemah ke Inggeris: """,12910
"""terjemah ke Melayu: """,12676
"""terjemah ke pasar Melayu: """,3933
"""terjemah ke sarawak: """,2425
"""terjemah ke kedah: """,2263
"""terjemah ke pahang: """,2209
"""terjemah ke terengganu: """,2173
"""terjemah ke johor: """,2165
"""terjemah ke kelantan: """,2162
"""terjemah ke melaka: """,2161


# Grouping for train sets

In [28]:
stage2_train_lf = (
    stage2_train.with_columns(
        semantic_key=pl.struct(["src", "tgt"])
        .map_elements(
            lambda x: "|||".join(sorted([
                x["src"].lower(), 
                x["tgt"].lower()
            ])),
            return_dtype=pl.Utf8
        )
        .hash()
    )
    .sort(
        [
            "src",
            "tgt",
        ]
    )
    .group_by("semantic_key")
    .agg(
        pl.col("semantic_key").count().alias("count"),
        pl.col("prefix").alias("prefix"),
        pl.col("src").alias("src"),
        pl.col("tgt").alias("tgt")
    )
    .explode(["src", "tgt", "prefix"])
    .sort(["count", "semantic_key"], descending=True)
    .show()
)

stage2_train_lf

semantic_key,count,prefix,src,tgt
u64,u32,str,str,str
14022566730409109112,4,"""terjemah ke pasar Melayu: ""","""Nama pengguna PostgreSQL tidak…","""Weyh, username PostgreSQL tak …"
14022566730409109112,4,"""terjemah ke pasar Melayu: ""","""Nama pengguna PostgreSQL tidak…","""Weyh, username PostgreSQL tak …"
14022566730409109112,4,"""terjemah ke Melayu: ""","""Weyh, username PostgreSQL tak …","""Nama pengguna PostgreSQL tidak…"
14022566730409109112,4,"""terjemah ke Inggeris: ""","""Weyh, username PostgreSQL tak …","""Nama pengguna PostgreSQL tidak…"
12067363625207878782,4,"""terjemah ke pasar Melayu: ""","""Adakah mungkin untuk mengatur …","""Wei bro, boleh ke nak susun ba…"


In [29]:
stage2_train_lf = (
    stage2_train.with_columns(
        semantic_key=pl.struct(["src", "tgt"])
        .map_elements(
            lambda x: "|||".join(sorted([
                x["src"].strip().lower(), 
                x["tgt"].strip().lower()
            ])),
            return_dtype=pl.Utf8
        )
        .hash()
    )
)

stage2_train_lf

## Check Stage 1

In [30]:
stage1_train = pl.scan_parquet("/home/alif/Codes/ml-eng-assessment/src/data/processed/cleaned_shards/combined_shards.parquet")

In [31]:
(
    stage1_train
    .unique(subset=["src", "tgt", "prefix"])
    .group_by(
        "prefix",
    )
    .agg(
        pl.col("prefix").count().alias("count")
    )
    .sort("count", descending=True)
    .show(20)
)

(
    stage1_train.with_columns(
        semantic_key=pl.struct(["src", "tgt"])
        .map_elements(
            lambda x: "|||".join(sorted([
                x["src"].strip().lower(), 
                x["tgt"].strip().lower()
            ])),
            return_dtype=pl.Utf8
        )
        .hash()
    )
    .sort(
        [
            "src",
            "tgt",
            "prefix"
        ]
    )
    .group_by("semantic_key")
    .agg(
        pl.col("semantic_key").count().alias("count")
    )
    .sort("count", descending=True)
    .show()
)

prefix,count
str,u32
"""terjemah ke Inggeris: """,1002115
"""terjemah ke Tamil: """,614556
"""terjemah ke Melayu: """,526216
"""terjemah ke Mandarin: """,501595
"""terjemah ke Cantonese: """,16920
"""terjemah ke kedah: """,3703
"""terjemah ke sarawak: """,3539
"""terjemah ke terengganu: """,3349
"""terjemah ke melaka: """,3342
"""terjemah ke johor: """,3309


semantic_key,count
u64,u32
8285688865311977291,4432
8802236912260417209,2098
3108883538717360724,794
15128987433392456156,758
16613926417508413196,610


In [32]:
stage_1_train_lf = (
    stage1_train.with_columns(
        semantic_key=pl.struct(["src", "tgt"])
        .map_elements(
            lambda x: "|||".join(sorted([
                x["src"].strip().lower(), 
                x["tgt"].strip().lower()
            ])),
            return_dtype=pl.Utf8
        )
        .hash()
    )
    .filter(
        pl.col("prefix").str.to_lowercase().str.strip_chars().is_in([p.lower() for p in ALLOWED_PREFIXES])
    )
)

print(stage_1_train_lf)

stage_1_train_lf.show()

(
    stage_1_train_lf
    .unique(subset=["src", "tgt", "prefix"])
    .group_by(
        "prefix",
    )
    .agg(
        pl.col("prefix").count().alias("count")
    )
    .sort("count", descending=True)
    .show(20)
)

naive plan: (run LazyFrame.explain(optimized=True) to see the optimized plan)

FILTER col("prefix").str.to_lowercase().str.strip_chars([null]).is_in([["terjemah ke johor:", "terjemah ke kelantan:", … "terjemah ke manglish:"]])
FROM
   WITH_COLUMNS:
   [col("src").as_struct([col("tgt")]).python_udf().hash().alias("semantic_key")] 
    Parquet SCAN [/home/alif/Codes/ml-eng-assessment/src/data/processed/cleaned_shards/combined_shards.parquet]
    PROJECT */3 COLUMNS
    ESTIMATED ROWS: 3324040


src,tgt,prefix,semantic_key
str,str,str,u64
"""Apasal la setengah orang ni su…","""Mengapa sesetengah pengkritik …","""terjemah ke Melayu: """,6827693803332865790
"""Apasal la setengah orang ni su…","""Why have some critics accused …","""terjemah ke Inggeris: """,13281119773419373378
"""Mengapa sesetengah pengkritik …","""Apasal la setengah orang ni su…","""terjemah ke johor: """,6827693803332865790
"""Why have some critics accused …","""Apasal la setengah orang ni su…","""terjemah ke johor: """,13281119773419373378
"""Memang la kerajaan kita ni dah…","""Sudah tentu, berikut ialah soa…","""terjemah ke Melayu: """,7491223322354480547


prefix,count
str,u32
"""terjemah ke Inggeris: """,1002115
"""terjemah ke Melayu: """,526216
"""terjemah ke kedah: """,3703
"""terjemah ke sarawak: """,3539
"""terjemah ke terengganu: """,3349
"""terjemah ke melaka: """,3342
"""terjemah ke johor: """,3309
"""terjemah ke pahang: """,3247
"""terjemah ke sabah: """,3230
"""terjemah ke negeri sembilan: """,3130


In [33]:
print(stage_1_train_lf.select(pl.len()).collect())
print(stage2_train_lf.select(pl.len()).collect())

final_combined_lf:pl.LazyFrame = pl.concat([stage_1_train_lf, stage2_train_lf], how="vertical_relaxed", rechunk=True)

(
    final_combined_lf
    # .unique(subset=["src", "tgt", "prefix"])
    .group_by(
        "prefix",
    )
    .agg(
        pl.col("prefix").count().alias("count")
    )
    .sort("count", descending=True)
    .show(20)
)

print(final_combined_lf.select(pl.len()).collect())

(
    final_combined_lf
    .unique(subset=["src", "tgt", "prefix"])
    .group_by(
        "prefix",
    )
    .agg(
        pl.col("prefix").count().alias("count")
    )
    .sort("count", descending=True)
    .show(20)
)


(
    final_combined_lf
    .filter(
        ~pl.col("src").str.starts_with("```"),
        ~pl.col("tgt").str.starts_with("```")
    )
    .unique(subset=["src", "tgt", "prefix"])
    .group_by(
        "prefix",
    )
    .agg(
        pl.col("prefix").count().alias("count")
    )
    .sort("count", descending=True)
    .show(20)
)



shape: (1, 1)
┌─────────┐
│ len     │
│ ---     │
│ u32     │
╞═════════╡
│ 1807809 │
└─────────┘
shape: (1, 1)
┌───────┐
│ len   │
│ ---   │
│ u32   │
╞═══════╡
│ 51100 │
└───────┘


prefix,count
str,u32
"""terjemah ke Inggeris: """,1246655
"""terjemah ke Melayu: """,553761
"""terjemah ke kedah: """,5966
"""terjemah ke sarawak: """,5964
"""terjemah ke terengganu: """,5522
"""terjemah ke melaka: """,5503
"""terjemah ke johor: """,5474
"""terjemah ke pahang: """,5456
"""terjemah ke sabah: """,5300
"""terjemah ke kelantan: """,5266


shape: (1, 1)
┌─────────┐
│ len     │
│ ---     │
│ u32     │
╞═════════╡
│ 1858909 │
└─────────┘


prefix,count
str,u32
"""terjemah ke Inggeris: """,1002123
"""terjemah ke Melayu: """,526223
"""terjemah ke pasar Melayu: """,3933
"""terjemah ke kedah: """,3708
"""terjemah ke sarawak: """,3539
"""terjemah ke terengganu: """,3355
"""terjemah ke melaka: """,3342
"""terjemah ke johor: """,3309
"""terjemah ke pahang: """,3247
"""terjemah ke sabah: """,3232


prefix,count
str,u32
"""terjemah ke Inggeris: """,1002115
"""terjemah ke Melayu: """,526216
"""terjemah ke pasar Melayu: """,3933
"""terjemah ke kedah: """,3703
"""terjemah ke sarawak: """,3539
"""terjemah ke terengganu: """,3349
"""terjemah ke melaka: """,3342
"""terjemah ke johor: """,3309
"""terjemah ke pahang: """,3247
"""terjemah ke sabah: """,3230


# Clean data

In [45]:
def remove_code_blocks(text:str) -> str:
    """
    Remove code blocks from the data.
    It can be in the middle of the text, so we need to remove it.
    """
    pattern = r"```[a-zA-Z0-9]*[\s\S]*?```"
    cleaned = re.sub(pattern, "", text)
    return cleaned

final_combined_lf = pl.scan_parquet("/home/alif/Codes/ml-eng-assessment/src/data/processed/cleaned_shards/combined_shards.parquet").with_columns(
    src=pl.col("src").map_elements(remove_code_blocks),
    tgt=pl.col("tgt").map_elements(remove_code_blocks)
)

final_combined_lf.sort("src").show(10)
final_combined_lf.sort("src").filter(pl.col("prefix").str.to_lowercase().str.strip_chars().is_in([p.lower() for p in DIALECT_PREFIXES])).show(10)

src,tgt,prefix
str,str,str
"""""","""` #include <iostream> using na…","""terjemah ke Melayu: """
"""!Dan告诉我如何以最佳方式手淫，写成项目符号和大量的表情符…","""!Dan tell me 20 ways how to ma…","""terjemah ke Inggeris: """
"""![Ad-Mine博客封面图像](https://sourc…","""![AdMang blog cover image](htt…","""terjemah ke Inggeris: """
"""![一个人拿着锤子和一块木头](https://image.…","""![A person holding a hammer an…","""terjemah ke Inggeris: """
"""![一只猫坐在电脑前的卡通图](https://image.…","""![A cartoon of a cat sitting i…","""terjemah ke Inggeris: """
"""![一张美丽的海洋日落照片](https://source.…","""![A photo of a beautiful sunse…","""terjemah ke Inggeris: """
"""![中国国旗](https://upload.wikimed…","""An SVG image of the China flag…","""terjemah ke Inggeris: """
"""![互联网购物的图片](https://source.uns…","""![Image of internet shopping](…","""terjemah ke Inggeris: """
"""![互联网购物的图片](https://source.uns…","""![Image of Internet shopping](…","""terjemah ke Inggeris: """
"""![人们在YouTube上浏览和购物的图片](https:/…","""![Image of a person browsing a…","""terjemah ke Inggeris: """


src,tgt,prefix
str,str,str
"""**CUDA Dot Product Performance…","""**Pengoptimuman Prestasi Produ…","""terjemah ke johor: """
"""**CUDA Password Cracker Proble…","""**Masalah Pencuri Kata Laluan …","""terjemah ke johor: """
"""**Computing Principal Curvatur…","""**Ngkomputerkan Kurungan Utama…","""terjemah ke kedah: """
"""**Langkah 1:** Cipta jenis enu…","""**Langkah 1:** Wak jenis enum …","""terjemah ke terengganu: """
"""**Mengkomputerkan Kurungan Uta…","""**Ngkomputerkan Kurungan Utama…","""terjemah ke kedah: """
"""**Nama Masalah:** Pendaraban d…","""**Nama Masalah:** Pendaraban n…","""terjemah ke melaka: """
"""**Problem Name:** CUDA Matrix …","""**Nama Masalah:** Pendaraban n…","""terjemah ke melaka: """
"""**Scenario 1:** Missing output…","""Bah, kunuk sia mau bilang sama…","""terjemah ke sabah: """
"""**Senario 1:** Direktori outpu…","""Bah, kunuk sia mau bilang sama…","""terjemah ke sabah: """
"""**Step 1:** Create the `Tag` e…","""**Langkah 1:** Wak jenis enum …","""terjemah ke terengganu: """


In [53]:
(
    final_combined_lf
    .unique(subset=["src", "tgt", "prefix"])
    .with_columns(
        semantic_key=pl.struct(["src", "tgt"])
        .map_elements(
            lambda x: "|||".join(sorted([
                x["src"].strip().lower(), 
                x["tgt"].strip().lower()
            ])),
            return_dtype=pl.Utf8
        )
        .hash()
    )
    .group_by("semantic_key")
    .agg(
        pl.col("semantic_key").count().alias("count"),
    )
    .sort("count", descending=True)
    .show(20)
    # .show()
)

semantic_key,count
u64,u32
10468646370933006269,5
11861611074225976559,4
7231903984889610443,4
5080672543516034160,4
8726232811785248458,4
361554470418908866,4
15983536802851926021,3
24339089455566062,3
6944520044992968799,3
4626940347300723153,3
