In [1]:
!pip install  huggingface_hub



In [1]:
from huggingface_hub import notebook_login
notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [None]:
pip install accelerate

In [None]:
pip install bitsandbytes

In [None]:
!pip install auto-gptq
!pip install --upgrade accelerate optimum transformers

In [None]:
# !pip install git+https://github.com/huggingface/transformers

In [None]:
# !pip3 install torch --index-url https://download.pytorch.org/whl/cu118

In [5]:
from transformers import pipeline, set_seed
import random
import os
import json
import time  # Import the time module
import torch  # Import torch to check for GPU availability

# Hyperparameters
MODEL_NAME = 'Qwen/Qwen2-1.5B-Instruct'
DEVICE_MAP = 'auto'
MAX_LENGTH = 200
NUM_RETURN_SEQUENCES = 500
OUTPUT_DIR = '/kaggle/working/LETTERS_batch02'
TEXTDATA_DIR = '/kaggle/input/lines0012'  # Updated directory for topics and closing lines
SAVE_INTERVAL = 1000  # Save letters after every 100 generations

# Create the output directory if it doesn't exist
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Check if GPU is available
if torch.cuda.is_available():
    device = "GPU"
else:
    device = "CPU"

print(f"Running on: {device}")

# Load the pre-trained model for text generation
generator = pipeline(
    'text-generation', 
    model=MODEL_NAME, 
    device_map=DEVICE_MAP, 
    torch_dtype=torch.float16, 
    trust_remote_code=True
)

def read_lines_from_file(file_path):
    """Reads lines from a given text file and returns them as a list."""
    with open(file_path, 'r') as file:
        return [line.strip() for line in file.readlines()]

def generate_custom_letter(prompt, max_length=MAX_LENGTH, num_return_sequences=NUM_RETURN_SEQUENCES, topics=None, closing_lines=None):
    """
    Generates synthetic formal letters using a pre-trained language model.
    
    :param prompt: The initial text to start generating from.
    :param max_length: The maximum length of the generated letter.
    :param num_return_sequences: The number of letters to generate.
    :param topics: A list of topics to randomly choose from for each letter generation.
    :param closing_lines: A list of closing lines to randomly choose from for the letter.
    :return: A list of tuples containing generated letters and the prompt used.
    """
    letters = []
    for _ in range(num_return_sequences):
        # Set a new random seed for each letter generation
        seed = random.randint(1, 100000000)
        set_seed(seed)
        
        if topics:
            topic = random.choice(topics)
            if ": " in topic:  # Check if the topic contains the expected format
                subject = topic.split(": ")[1]  # Extract the subject from the topic
                # Prepare the prompt with subject and salutation
                prompt_with_subject = f"{prompt}\n\nSubject: {subject}\n\nDear [Recipient's Name],\n\n"
            else:
                print(f"Warning: Topic '{topic}' does not contain a valid format. Skipping.")
                continue  # Skip this iteration if the topic format is invalid
        else:
            prompt_with_subject = f"{prompt}\n\nDear [Recipient's Name],\n\n"
        
        # Generate the letter
        letter = generator(
            prompt_with_subject,
            max_length=max_length,
            num_return_sequences=1,
            do_sample=True,
            truncation=True
        )[0]
        text = letter['generated_text']
        
        # Add a random closing line
        if closing_lines:
            closing_line = random.choice(closing_lines)
            # Append the closing line and sender's name after the main content
            text += f"\n\n{closing_line}\n\nSincerely,\n\n[Your Name]"
        
        # Store the prompt and letter without additional details
        letters.append((prompt_with_subject.strip(), f"\n\n\n{text.strip()}"))  # Add a newline at the beginning of the letter
    
    return letters

def save_letters_to_json(letters, output_dir=OUTPUT_DIR):
    """
    Saves each letter to a separate JSON file in the specified output directory.
    
    :param letters: A list of generated letters along with their letter counts.
    :param output_dir: The directory where the JSON files will be saved.
    """
    for i, (prompt, letter) in enumerate(letters):
        filename = f"letter_{i+1}.json"
        file_path = os.path.join(output_dir, filename)
        
        # Create a dictionary with the desired order
        letter_data = {
            "prompt": prompt,
            "output": letter  # The letter now starts with a newline
        }
        
        with open(file_path, 'w') as file:
            json.dump(letter_data, file, indent=4)  # Use indent for pretty printing

# Load topics and closing lines from text files
topics_file_path = os.path.join(TEXTDATA_DIR, 'topics.txt')
closing_lines_file_path = os.path.join(TEXTDATA_DIR, 'closing_lines.txt')

# Read topics and closing lines from files
topics = read_lines_from_file(topics_file_path)
closing_lines = read_lines_from_file(closing_lines_file_path)

# Define a new prompt based on the uploaded letter structure
new_prompt = """
[Your Name]
[Your Address]
[City, State, ZIP Code]
[Email Address]
[Phone Number]

[Date]

[Recipient's Name]
[Recipient's Address]
[City, State, ZIP Code]
"""



Running on: GPU


In [None]:
# Generate synthetic letters with randomly chosen topics
synthetic_letters = generate_custom_letter(new_prompt, topics=topics, closing_lines=closing_lines)
# Save the generated letters to individual JSON files
for i in range(0, len(synthetic_letters), SAVE_INTERVAL):
    batch_letters = synthetic_letters[i:i+SAVE_INTERVAL]
    save_letters_to_json(batch_letters)
    print(f"Saved letters {i+1} to {i+len(batch_letters)}")
    time.sleep(1)  # Add a short delay to avoid overloading the file system

You seem to be using the pipelines sequentially on GPU. In order to maximize efficiency please use a dataset


In [None]:
import os
import json

# Directory where the letter files are stored
LETTER_DIR = '/kaggle/working/LETTERS_batch02'

def check_letter_format(letter_data):
    """
    Checks if the letter follows the expected format.
    
    :param letter_data: The letter data dictionary containing 'prompt' and 'output'.
    :return: A list of format issues found in the letter.
    """
    issues = []

    # Check if the output starts with a newline
    if not letter_data['output'].startswith('\n'):
        issues.append("Output does not start with a newline.")

    # Check if the prompt contains the necessary components
    prompt_lines = letter_data['prompt'].strip().split('\n')
    if len(prompt_lines) < 6:
        issues.append("Prompt is missing sender or recipient details.")
    
    # Check for the subject line
    if "Subject:" not in letter_data['prompt']:
        issues.append("Prompt is missing a subject line.")

    # Check for the salutation
    if "Dear [Recipient's Name]" not in letter_data['output']:
        issues.append("Output is missing a salutation.")

    # Check for the closing line
    if "Sincerely," not in letter_data['output']:
        issues.append("Output is missing a closing line.")

    return issues

def check_all_letters(directory):
    """
    Checks all letter files in the specified directory for format compliance.
    
    :param directory: The directory containing the letter files.
    """
    report = {}
    
    for filename in os.listdir(directory):
        if filename.endswith('.json'):
            file_path = os.path.join(directory, filename)
            with open(file_path, 'r') as file:
                letter_data = json.load(file)
                
                # Check the letter format
                issues = check_letter_format(letter_data)
                if issues:
                    report[filename] = issues

    return report

# Run the check and print the report
format_report = check_all_letters(LETTER_DIR)

if format_report:
    print("The following files do not follow the correct format:")
    for file, issues in format_report.items():
        print(f"File: {file}")
        for issue in issues:
            print(f" - {issue}")
else:
    print("All letter files follow the correct format.")

In [None]:
import os

# Directory where the files are stored
directory = '/kaggle/working/LETTERS_batch02'

# Count the number of files in the directory
num_files = len([f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))])

print(f"Number of files in {directory}: {num_files}")

In [None]:
import os
import shutil

# Define the directory you want to download
working_dir = '/kaggle/working/LETTERS_batch02'
# Define the name of the zip file
zip_file_name = 'letters1.zip'
# Define the output path for the zip file
output_path = f'/kaggle/working/{zip_file_name}'

# Create a zip file of the directory
shutil.make_archive(output_path.replace('.zip', ''), 'zip', working_dir)

print(f"Zip file created: {output_path}")

# Now you can download the zip file from the Kaggle output section

In [4]:
import os
import shutil

# Specify the directory to delete
kaggle_dir = '/kaggle/working/'

# Check if the directory exists
if os.path.exists(kaggle_dir):
    # Delete the contents of the directory
    for filename in os.listdir(kaggle_dir):
        file_path = os.path.join(kaggle_dir, filename)
        if os.path.isfile(file_path) or os.path.islink(file_path):
            os.unlink(file_path)
        elif os.path.isdir(file_path):
            shutil.rmtree(file_path)
    print(f"Contents of {kaggle_dir} have been deleted.")
else:
    print(f"{kaggle_dir} does not exist.")

Contents of /kaggle/working/ have been deleted.
