In [8]:
import csv
import os

input_path = 'prompts/SD1_final.csv'
output_path = 'prompts/SD1_final.txt'

prompts = []

if os.path.exists(input_path):
    with open(input_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()

    print(f"Reading {len(lines)} lines from {input_path}...")

    for line in lines:
        line = line.strip()
        if not line:
            continue
        
        # Skip header if present
        if line.startswith(',prompt,urls') or line.startswith('id,prompt,urls'):
            continue

        # Attempt to parse as CSV row
        # Original format: index, prompt, urls
        # We check if it parses into at least 3 columns and the last one looks like a list
        is_csv_row = False
        try:
            reader = csv.reader([line], quotechar='"')
            row = next(reader)
            if len(row) >= 3 and row[-1].strip().startswith('[') and row[-1].strip().endswith(']'):
                # Likely the original CSV format -> Extract second column (prompt)
                prompts.append(row[1])
                is_csv_row = True
        except:
            pass
        
        if not is_csv_row:
            # Treat as raw text line (from previous appends or non-csv format)
            prompts.append(line)
else:
    print(f"Error: {input_path} does not exist.")

# Deduplicate while preserving order
unique_prompts = []
seen = set()
for p in prompts:
    if p not in seen:
        unique_prompts.append(p)
        seen.add(p)

# Save to SD1_final.txt
with open(output_path, 'w', encoding='utf-8') as f:
    for p in unique_prompts:
        f.write(p + '\n')

print(f"Successfully saved {len(unique_prompts)} unique prompts to {output_path}.")

Reading 3001 lines from prompts/SD1_final.csv...
Successfully saved 3000 unique prompts to prompts/SD1_final.txt.


In [9]:
# Install datasets if not present: !pip install datasets
import random
import pyarrow

# Monkey patch for pyarrow compatibility (PyExtensionType removed in newer pyarrow)
if not hasattr(pyarrow, 'PyExtensionType'):
    try:
        pyarrow.PyExtensionType = pyarrow.ExtensionType
    except AttributeError:
        pass

from datasets import load_dataset

def generate_random_prompts(count=500):
    adjectives = ["beautiful", "dark", "colorful", "abstract", "vintage", "futuristic", "serene", "chaotic", "bright", "minimalist"]
    subjects = ["cat", "dog", "landscape", "cityscape", "portrait", "flower", "robot", "ocean", "mountain", "forest"]
    verbs = ["running", "sleeping", "flying", "standing", "sitting", "fighting", "dancing", "glowing", "falling", "rising"]
    styles = ["oil painting", "digital art", "sketch", "photography", "watercolor", "cyberpunk", "steampunk", "surrealism", "impressionism", "concept art"]
    
    gen_prompts = []
    for _ in range(count):
        adj = random.choice(adjectives)
        sub = random.choice(subjects)
        verb = random.choice(verbs)
        style = random.choice(styles)
        prompt = f"{adj} {sub} {verb} in {style} style"
        gen_prompts.append(prompt)
    return gen_prompts

def get_coco_captions(count=1000):
    # Use a parquet-based COCO dataset for better streaming support
    print("Streaming COCO captions...")
    try:
        # merve/coco2017 is widely used and usually parquet-friendly
        dataset = load_dataset("merve/coco2017", split="train", streaming=True)
        captions = []
        for item in dataset:
            if 'caption' in item:
                captions.append(item['caption'])
            elif 'text' in item:
                 captions.append(item['text'])
            
            if len(captions) >= count:
                break
        return captions[:count]
    except Exception as e:
        print(f"Error loading COCO (merve/coco2017): {e}. Falling back to generation.")
        return generate_random_prompts(count)

def get_laion_captions(count=1000):
    # Use a high-quality subset of LAION which is Parquet-based
    print("Streaming LAION captions...")
    try:
        dataset = load_dataset("ChristophSchuhmann/improved_aesthetics_6.5plus", split="train", streaming=True)
        captions = []
        for item in dataset:
            if 'TEXT' in item:
                captions.append(item['TEXT'])
            elif 'caption' in item:
                captions.append(item['caption'])
            
            if len(captions) >= count:
                break
        return captions[:count]
    except Exception as e:
        print(f"Error loading LAION: {e}. Falling back to generation.")
        return generate_random_prompts(count)

# Main execution
new_nm_prompts = []

# 1. COCO
coco_p = get_coco_captions(1000)
print(f"Fetched {len(coco_p)} COCO prompts")
new_nm_prompts.extend(coco_p)

# 2. LAION
laion_p = get_laion_captions(1000)
print(f"Fetched {len(laion_p)} LAION prompts")
new_nm_prompts.extend(laion_p)

# 3. Generated
gen_p = generate_random_prompts(500)
print(f"Generated {len(gen_p)} synthetic prompts")
new_nm_prompts.extend(gen_p)

# Filter duplicates against existing SD1_final.txt
existing_path = 'prompts/SD1_final.txt'
existing_set = set()
if os.path.exists(existing_path):
    with open(existing_path, 'r', encoding='utf-8') as f:
        existing_set = set(line.strip() for line in f)

final_new_prompts = []
for p in new_nm_prompts:
    p_clean = p.strip()
    if p_clean and p_clean not in existing_set:
        final_new_prompts.append(p_clean)
        existing_set.add(p_clean) # Avoid duplicates within the new set too

print(f"\nPrepared {len(final_new_prompts)} unique non-memorized prompts.")

# Save to prompts/sd1_nm_2500.txt
out_nm_path = 'prompts/sd1_nm_2500.txt'
with open(out_nm_path, 'w', encoding='utf-8') as f:
    for p in final_new_prompts:
        f.write(p + '\n')

print(f"Saved to {out_nm_path}")

AttributeError: partially initialized module 'datasets' has no attribute 'utils' (most likely due to a circular import)