In [1]:
import json
import os
from uuid import uuid4
from cuid2 import Cuid

In [10]:
def classify_language(text):
    """
    Classifies a string into 'en', 'zh', or 'mixed' based on its characters, ignoring punctuation.
    - 'en': Contains only English letters (A-Z, a-z).
    - 'zh': Contains only Chinese characters (including Cantonese-specific ones).
    - 'mixed': Contains both, neither, or other characters (e.g., numbers, symbols).
    """
    if not text:
        return "mixed"
    
    # 定义英文和中文标点
    english_punctuation = r'[.,!?;:\'\"()\[\]{}\-\\/@#$%^&*+=|~`]'
    chinese_punctuation = r'[，。！？；：、“”‘’（）【】{}—…《》〈〉]'
    
    # 去除英文和中文标点
    import re
    cleaned_text = re.sub(english_punctuation, '', text)
    cleaned_text = re.sub(chinese_punctuation, '', cleaned_text)
    
    has_english = False
    has_chinese = False
    
    for char in cleaned_text:
        # Check for English letters (A-Z, a-z)
        if ('\u0041' <= char <= '\u005A') or ('\u0061' <= char <= '\u007A'):
            has_english = True
        # Check for Chinese characters (CJK Unified Ideographs and extensions)
        elif ('\u4E00' <= char <= '\u9FFF') or ('\u3400' <= char <= '\u4DBF') or ('\u20000' <= char <= '\u2A6DF'):
            has_chinese = True
        # Other characters (e.g., numbers, spaces) don't affect en/zh but contribute to mixed
        
    if has_english and has_chinese:
        return "mixed"
    elif has_english:
        return "en"
    elif has_chinese:
        return "zh"
    else:
        return "mixed"

In [11]:
def separate_languages(text):
    """
    Separates Chinese and English text into two strings, preserving punctuation and numbers
    in their original language context. If Chinese is dominant (>70% of alphabetic chars),
    the entire text is assigned to chinese_text.
    Args:
        text (str): Input string containing mixed Chinese and English text.
    Returns:
        list: [chinese_text, english_text], where each includes respective language segments
              with punctuation and numbers preserved.
    """
    if not text:
        return ["", ""]
    
    # Define Chinese Unicode ranges (CJK Unified Ideographs and Extensions)
    def is_chinese(char):
        return (0x4E00 <= ord(char) <= 0x9FFF or  # CJK Unified Ideographs
                0x3400 <= ord(char) <= 0x4DBF or  # CJK Extension A
                0x20000 <= ord(char) <= 0x2A6DF)  # CJK Extensions B-F
    
    # Count Chinese and English alphabetic characters
    chinese_count = 0
    english_count = 0
    for char in text:
        if is_chinese(char):
            chinese_count += 1
        elif char.isalpha() and not is_chinese(char):
            english_count += 1
    
    # If Chinese dominates (>70% of alphabetic characters), return full text as Chinese
    total_alphabetic = chinese_count + english_count
    if total_alphabetic == 0 or (chinese_count / total_alphabetic) > 0.65:
        return [text, ""]
    
    # Separate languages, keeping punctuation and numbers with the current language
    chinese_chars = []
    english_chars = []
    current_is_chinese = None
    
    for char in text:
        is_char_chinese = is_chinese(char)
        
        # Initialize or switch language context based on alphabetic characters
        if char.isalpha():
            if current_is_chinese is None or current_is_chinese != is_char_chinese:
                current_is_chinese = is_char_chinese
        
        # Append to the current language context (including punctuation and numbers)
        if current_is_chinese is None:
            # If no alphabetic chars yet, default to English for non-alphabetic chars
            english_chars.append(char)
        elif current_is_chinese:
            chinese_chars.append(char)
        else:
            english_chars.append(char)
    
    # Convert character lists to strings
    chinese_text = "".join(chinese_chars)
    english_text = "".join(english_chars)
    
    return [chinese_text, english_text]

In [16]:
import nltk
from nltk.tokenize import sent_tokenize

nltk.download('punkt')

def split_paragraph(paragraph):
    sentences = sent_tokenize(paragraph)
    return sentences

# paragraph = "This is the first sentence. \n\n This is the second sentence! \nIs this the third sentence?"
paragraph = "Tang dynasty 618–907   \nFive Dynasties and Ten Kingdoms 907–979   \nLiao dynasty 916–1125   \nSong dynasty 960–1279   \n- Northern Song 960–1127   \n- Southern Song 1127–1279"
sentences = split_paragraph(paragraph)
print(sentences)

['Tang dynasty 618–907   \nFive Dynasties and Ten Kingdoms 907–979   \nLiao dynasty 916–1125   \nSong dynasty 960–1279   \n- Northern Song 960–1127   \n- Southern Song 1127–1279']


[nltk_data] Downloading package punkt to D:\AppData\nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [13]:
def preprocess_pipe(content_list_path, output_dir):
    # === LOG BASIC INFOMATION ===
    print(f"=== Starting preprocessing pipeline ===")
    print(f"Input file: {content_list_path}")
    print(f"Output directory: {output_dir}")


    # === CREATE CUID GENERATOR ===
    print(f"=== Creating CUID Generator ===")
    cuid = Cuid(length=8)
    

    # === LOAD CONTENT LIST ===
    contents = json.load(open(content_list_path, 'r', encoding='utf-8'))
    print(f"Loaded {len(contents)} total content items")
    

    # === GET TEXT AND IMAGE CONTENTS ===
    text_contents = [content for content in contents if content['type'] == 'text']
    image_contents = [content for content in contents if content['type'] == 'image']
    print(f"Content breakdown: {len(text_contents)} text items, {len(image_contents)} image items")


    # === GET EN AND ZH CONTENTS ===
    print("\n--- Language Classification ---")
    en_contents = [content for content in text_contents if classify_language(content['text']) == 'en']
    zh_contents = [content for content in text_contents if classify_language(content['text']) == 'zh']
    mixed_contents = [content for content in text_contents if classify_language(content['text']) == 'mixed']
    print(f"Classification results:")
    print(f"  • English-only content: {len(en_contents)} items")
    print(f"  • Chinese-only content: {len(zh_contents)} items")  
    print(f"  • Mixed-language content: {len(mixed_contents)} items")

    # === PROCESS MIXED LANGUAGE CONTENT ===
    print("\n--- Processing Mixed Language Content ---")
    chinese_separated_contents = []
    english_separated_contents = []
    
    if mixed_contents:
        print(f"Processing {len(mixed_contents)} mixed-language items...")
        for i, content in enumerate(mixed_contents):
            chinese_text, english_text = separate_languages(content['text'])
            if chinese_text.strip():
                chinese_separated_contents.append({
                    **content,
                    'text': chinese_text,
                })
            
            if english_text.strip():
                english_separated_contents.append({
                    **content,
                    'text': english_text,
                })
            # Progress indicator for large datasets
            if (i + 1) % 100 == 0 or (i + 1) == len(mixed_contents):
                print(f"  Progress: {i + 1}/{len(mixed_contents)} items processed")
        print(f"Separation results:")
        print(f"  • Chinese portions extracted: {len(chinese_separated_contents)} items")
        print(f"  • English portions extracted: {len(english_separated_contents)} items")
    else:
        print("No mixed-language content to process")

    # Add separated contents back to the original lists
    print("\n--- Merging Separated Content ---")
    original_en_count = len(en_contents)
    original_zh_count = len(zh_contents)
    en_contents.extend(english_separated_contents)
    zh_contents.extend(chinese_separated_contents)
    
    print(f"English content: {original_en_count} → {len(en_contents)} (+{len(english_separated_contents)})")
    print(f"Chinese content: {original_zh_count} → {len(zh_contents)} (+{len(chinese_separated_contents)})")


    # === ADD IMAGE CONTENTS ===
    print("\n--- Adding Image Content ---")
    # convert image path to absolute path if necessary
    for content in image_contents:
        content['img_path'] = os.path.join(os.path.abspath(output_dir), "../auto/", content['img_path'])
    en_contents.extend(image_contents)
    zh_contents.extend(image_contents)
    
    print(f"After adding {len(image_contents)} images to both languages:")
    print(f"  • Total English content: {len(en_contents)} items")
    print(f"  • Total Chinese content: {len(zh_contents)} items")


    # === SORT CONTENTS BY PAGE INDEX ===
    def sort_contents_by_page(contents):
        return sorted(contents, key=lambda x: x.get('page_idx', 0))

    print("\n--- Sorting by Page Index ---")
    sorted_en_contents = sort_contents_by_page(en_contents)
    sorted_zh_contents = sort_contents_by_page(zh_contents)
    print("Content sorted by page index")


    # === CLEAN EMPTY CONTENTS ===
    def clear_empty_contents(contents, lang_name):
        original_count = len(contents)
        cleared_contents = [content for content in contents if content.get('text') or content.get('img_path')]
        removed_count = original_count - len(cleared_contents)
        print(f"  {lang_name}: Removed {removed_count} empty items, kept {len(cleared_contents)} items")
        return cleared_contents

    print("\n--- Cleaning Empty Content ---")
    clean_en_contents = clear_empty_contents(sorted_en_contents, "English")
    clean_zh_contents = clear_empty_contents(sorted_zh_contents, "Chinese")


    # === CHUNKING CONTENTS ===
    def chunk_contents(contents):
        new_contents = []
        for content in contents:
            if content['type'] == 'image':
                new_contents.append({
                    **content,
                    'id': str(cuid.generate()),
                })

            elif content['type'] == 'text':
                # Split text into chunks
                chunks = split_paragraph(content['text'])
                for chunk in chunks:
                    new_contents.append({
                        **content,
                        'text': chunk,
                        'id': str(cuid.generate()),
                    })
        
        return new_contents
    
    print("\n--- Chunking Text Content ---")
    chunked_en_contents = chunk_contents(clean_en_contents)
    chunked_zh_contents = chunk_contents(clean_zh_contents)
    print(f"  English content chunked into {len(chunked_en_contents)} items")
    print(f"  Chinese content chunked into {len(chunked_zh_contents)} items")

    
    # === GROUPING CONTENTS INTO SECTIONS ===
    def group_contents(contents, lang_name):
        print(f"  Grouping {lang_name} content by topics...")
        final_contents = {}
        grouped_contents = []
        topic_count = 0
        
        for content in contents:
            if 'text_level' in content and grouped_contents:
                section_id = str(cuid.generate())
                final_contents[section_id] = grouped_contents
                topic_count += 1
                grouped_contents = []
            
            grouped_contents.append(content)
        
        if grouped_contents:
            section_id = str(cuid.generate())
            final_contents[section_id] = grouped_contents
            topic_count += 1
        
        print(f"  {lang_name} grouped into {topic_count} topic sections")
        
        # Print statistics for each group
        for i, group in list(final_contents.items())[:5]:  # Show first 5 groups to avoid spam
            text_count = len([c for c in group if c.get('type') == 'text'])
            img_count = len([c for c in group if c.get('type') == 'image'])
            print(f"    Topic {i}: {text_count} text + {img_count} images")
        
        if len(final_contents) > 5:
            print(f"    ... and {len(final_contents) - 5} more topics")
        
        return final_contents

    print("\n--- Grouping by Topics ---")
    grouped_en_contents = group_contents(chunked_en_contents, "English")
    grouped_zh_contents = group_contents(chunked_zh_contents, "Chinese")


    # === SAVE RESULTS ===
    print("\n--- Saving Results ---")
    os.makedirs(output_dir, exist_ok=True)
    
    en_output_path = os.path.join(output_dir, 'en_contents.json')
    zh_output_path = os.path.join(output_dir, 'zh_contents.json')
    
    with open(en_output_path, 'w', encoding='utf-8') as f:
        json.dump(grouped_en_contents, f, ensure_ascii=False, indent=2)
    print(f"English content saved to: {en_output_path}")
    
    with open(zh_output_path, 'w', encoding='utf-8') as f:
        json.dump(grouped_zh_contents, f, ensure_ascii=False, indent=2)
    print(f"Chinese content saved to: {zh_output_path}")
    
    print(f"\n=== Preprocessing completed successfully ===")
    print(f"Final results:")
    print(f"  • English: {len(grouped_en_contents)} topic groups")
    print(f"  • Chinese: {len(grouped_zh_contents)} topic groups")
    print("-" * 50)

In [14]:
preprocess_pipe(r'output\Objectifying_China\auto\Objectifying_China_content_list.json', r'output\Objectifying_China\preprocessed')
preprocess_pipe(r'output\Pictorial_Silk\auto\Pictorial_Silk_content_list.json', r'output\Pictorial_Silk\preprocessed')
preprocess_pipe(r'output\Tradition to Contemporary\auto\Tradition to Contemporary_content_list.json', r'output\Tradition to Contemporary\preprocessed')

=== Starting preprocessing pipeline ===
Input file: output\Objectifying_China\auto\Objectifying_China_content_list.json
Output directory: output\Objectifying_China\preprocessed
=== Creating CUID Generator ===
Loaded 820 total content items
Content breakdown: 745 text items, 74 image items

--- Language Classification ---
Classification results:
  • English-only content: 286 items
  • Chinese-only content: 228 items
  • Mixed-language content: 231 items

--- Processing Mixed Language Content ---
Processing 231 mixed-language items...
  Progress: 100/231 items processed
  Progress: 200/231 items processed
  Progress: 231/231 items processed
Separation results:
  • Chinese portions extracted: 131 items
  • English portions extracted: 103 items

--- Merging Separated Content ---
English content: 286 → 389 (+103)
Chinese content: 228 → 359 (+131)

--- Adding Image Content ---
After adding 74 images to both languages:
  • Total English content: 463 items
  • Total Chinese content: 433 items


In [17]:
import json
import os
from uuid import uuid4

def propositionize(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        contents = json.load(f)

    res = {}
    for section_id, section_contents in contents.items():
        res[section_id] = []
        for content in section_contents:
            if content['type'] == "image":
                res[section_id].append(content)
            elif content['type'] == "text":
                if "chunks" not in content or content["chunks"] is None:
                    content.pop("chunks", None)
                    res[section_id].append(content)
                    continue
            
                for chunk in content.get("chunks", []):
                    if chunk.strip():
                        res[section_id].append({"type": content["type"], "page_idx": content["page_idx"], "text": chunk, "id": str(uuid4())[:8]})

    with open(os.path.dirname(file_path) + "/propositionized.json", 'w', encoding='utf-8') as f:
        json.dump(res, f, ensure_ascii=False, indent=2)

In [18]:
propositionize(r"output\Objectifying_China\en_objectifying_contents_propositionized.json")

In [19]:
propositionize(r"output\Pictorial_Silk\en_contents_pictorial_silk_propositionized.json")

In [37]:
propositionize(r"output\Tradition to Contemporary\en_contents_tradition_contemporary.json")

In [35]:
def process_third_party(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        contents = json.load(f)

    contents = contents["Tradition_Contemporary_removed"]
    res = {}
    groups = []
    for i in range(2, 94):
        group = []
        for (idx, content) in enumerate(contents):
            # print(i, content["metadata"]["page"])
            if content["metadata"]["page"] == i:
                group.append(content)
        
        groups.append(group)

    for idx, group in enumerate(groups):
        section_id = str(uuid4())[:8]
        res[section_id] = []
        for content in group:
            if content["metadata"]["type"] == "image":
                res[section_id].append({
                    "type": "image",
                    "img_path": content["image_path"],
                    "id": str(uuid4())[:8],
                    "page_idx": content["metadata"]["page"],
                    "img_caption": content["image_caption"]
                })
            elif content["metadata"]["type"] == "text" or content["metadata"]["type"] == "header":
                text = content["text"]
                if text.strip():
                    res[section_id].append({
                        "type": "text",
                        "text": text,
                        "id": str(uuid4())[:8],
                        "page_idx": content["metadata"]["page"],
                        "chunks": content.get("chunks", None)
                    })

    with open(os.path.dirname(file_path) + "/en_contents_tradition_contemporary.json", 'w', encoding='utf-8') as f:
        json.dump(res, f, ensure_ascii=False, indent=2)
    

In [36]:
process_third_party(r"output\Tradition to Contemporary\Tradition_Contemporary_removed_propositionized.json")