In [5]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [13]:
import json
import os
import pandas as pd
from sklearn.model_selection import train_test_split

# 仅在 Colab 环境下使用，下载生成的文件
from google.colab import files

def merge_ingredient_with_ner(ingredient: str, ner: list) -> str:
    """
    如果 NER 列表中存在与 ingredient 部分匹配的短语，
    则返回该完整的 NER 短语；否则返回 ingredient 本身。
    例如：ingredient 为 "chicken" 而 NER 中有 "chicken tender"，则返回 "chicken tender"。
    """
    ingredient_tokens = set(ingredient.lower().split())
    for ner_item in ner:
        ner_tokens = set(ner_item.lower().split())
        # 如果 ingredient 中的单词集合与 NER 短语有交集，则认为匹配成功
        if ingredient_tokens.issubset(ner_tokens) or ingredient_tokens.intersection(ner_tokens):
            return ner_item  # 使用 NER 中识别到的更完整短语
    return ingredient

def get_ingredient_entry(ingredient: str, title: str, ner: list) -> dict:
    """
    根据标题和 NER 中的食物短语判断食材是否为核心食材，
    如果标题中包含该食材相关词汇，则返回合并后的食材名称，并添加字段 type: "core"；
    否则仅返回食材名称。
    """
    merged = merge_ingredient_with_ner(ingredient, ner)
    title_words = set(title.lower().split())
    ing_tokens = set(merged.lower().split())
    # 如果合并后的食材名称中有单词出现在标题中，则认为该食材为核心食材
    if ing_tokens.intersection(title_words):
        return {'name': merged, 'type': 'core'}
    else:
        return {'name': ingredient}

def get_dietary_tags(ingredients: list) -> list:
    """基本饮食标签——仅检查素食/无麸质"""
    tags = []
    ingredients_str = ' '.join(ingredients).lower()

    # 素食判断
    if not any(meat in ingredients_str for meat in ['chicken', 'beef', 'pork', 'fish']):
        tags.append('vegetarian')

    # 无麸质判断
    if not any(gluten in ingredients_str for gluten in ['flour', 'wheat', 'bread']):
        tags.append('gluten-free')

    return tags

def get_course_type(title: str) -> str:
    """基础菜品类型判断"""
    title_lower = str(title).lower()
    if any(keyword in title_lower for keyword in ['dessert', 'cake', 'pie']):
        return 'dessert'
    if any(keyword in title_lower for keyword in ['salad', 'soup', 'side']):
        return 'side'
    return 'main'

def safe_eval(data, default):
    """安全地解析字符串表示的列表"""
    if isinstance(data, str):
        try:
            return eval(data)
        except:
            return default
    return data if data is not None else default

def clean_recipe(recipe: dict) -> dict:
    """数据清洗函数，并进行基础的分类处理"""
    try:
        # 检查来源（假设 source 为 "Recipes1M" 或 "Gathered"）
        if str(recipe.get('source')) != "Recipes1M":
            return None

        title = str(recipe['title']).strip() if pd.notna(recipe['title']) else ""
        ingredients = [str(x).strip() for x in safe_eval(recipe['ingredients'], []) if x and str(x).strip()]
        # 处理 NER 字段，获取食物短语列表
        ner_list = [str(x).strip() for x in safe_eval(recipe['NER'], []) if x and str(x).strip()]

        return {
            'id': int(recipe['id']),
            'title': title,
            'ingredients': [get_ingredient_entry(ing, title, ner_list) for ing in ingredients],
            'directions': [str(x).strip() for x in safe_eval(recipe['directions'], []) if x and str(x).strip()],
            'dietary': get_dietary_tags(ingredients),
            'course_type': get_course_type(title),
            'ner': ner_list
        }
    except Exception as e:
        print(f"Skipping recipe {recipe.get('id')}: {str(e)}")
        return None

def process_dataset(input_csv: str, output_files: dict):
    """数据集处理主流程，将数据写入当前工作目录下的文件并触发下载"""
    # 读取 CSV 数据
    df = pd.read_csv(input_csv)

    # 清洗及过滤数据
    cleaned_data = [x for x in map(clean_recipe, df.to_dict('records')) if x is not None]

    # 划分数据集
    train, temp = train_test_split(cleaned_data, test_size=0.3, random_state=42)
    dev, test = train_test_split(temp, test_size=0.5, random_state=42)

    # 分别写入文件并下载
    splits = {'train': train, 'dev': dev, 'test': test}
    for split, data in splits.items():
        file_name = output_files[split]
        with open(file_name, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)
        # 下载文件到本地
        files.download(file_name)

if __name__ == "__main__":
    config = {
        'input_csv': '/content/drive/MyDrive/Spring 2025/CS469/RecipeNLG_dataset.csv',  # CSV 文件的路径，请确保该路径正确
        'output_files': {
            # 改为保存到当前工作目录，不再使用固定盘符路径
            'train': 'train.json',
            'dev': 'dev.json',
            'test': 'test.json'
        }
    }

    process_dataset(config['input_csv'], config['output_files'])
    print("Processing completed. Files are ready for download.")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Processing completed. Files are ready for download.


In [None]:
import math
import re
from collections import Counter

def tokenize(text: str) -> list:
    return re.findall(r'\b\w+\b', text.lower())

def compute_tf(tokens: list) -> dict:
    """
    Compute the term frequency (TF) for a list of tokens.
    """
    tf_counter = Counter(tokens)
    total_tokens = len(tokens)
    tf = {term: count / total_tokens for term, count in tf_counter.items()}
    return tf

def compute_idf(documents_tokens: list) -> dict:
    """
    Compute the IDF for each term in a corpus.
    """
    N = len(documents_tokens)
    idf = {}
    for tokens in documents_tokens:
        unique_tokens = set(tokens)
        for term in unique_tokens:
            idf[term] = idf.get(term, 0) + 1

    # Convert document frequency to idf score using logarithm
    for term, df in idf.items():
        idf[term] = math.log(N / df)
    return idf

def compute_tfidf(documents: list) -> list:
    """
    Compute the TF-IDF for each document in a corpus.
    """
    # Tokenize each document
    tokenized_docs = [tokenize(doc) for doc in documents]

    # Compute IDF using all tokenized documents
    idf = compute_idf(tokenized_docs)

    tfidf_documents = []
    for tokens in tokenized_docs:
        tf = compute_tf(tokens)
        tfidf = {term: tf_val * idf.get(term, 0) for term, tf_val in tf.items()}
        tfidf_documents.append(tfidf)

    return tfidf_documents