In [None]:
# ✅ Install all required libraries
!pip install transformers gradio diffusers torch accelerate evaluate rouge_score bert_score matplotlib sentence-transformers --quiet

# -------- Library Imports --------
import re
import io
import gc
import torch
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import gradio as gr
from transformers import (
    pipeline,
    GPT2LMHeadModel,
    GPT2TokenizerFast,
    BlenderbotTokenizer,
    BlenderbotForConditionalGeneration,
    CLIPProcessor,
    CLIPModel
)

from diffusers import StableDiffusionPipeline
import evaluate
from sentence_transformers import SentenceTransformer, util

# -------- Device Detection --------
if torch.backends.mps.is_available():
    device = "mps"
elif torch.cuda.is_available():
    device = "cuda"
else:
    device = "cpu"

device_id = 0 if device != "cpu" else -1
pipe = None  # For image generation

# -------- Load Evaluation Metrics --------
rouge = evaluate.load("rouge")
bertscore = evaluate.load("bertscore")
meteor = evaluate.load("meteor")
embed_model = SentenceTransformer('all-MiniLM-L6-v2')
fluency_model = GPT2LMHeadModel.from_pretrained("gpt2").to(device)
fluency_tokenizer = GPT2TokenizerFast.from_pretrained("gpt2")
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
chat_model = BlenderbotForConditionalGeneration.from_pretrained("facebook/blenderbot-400M-distill").to(device)

def compute_clip_score(image: Image, prompt: str):
    try:
        inputs = clip_processor(text=[prompt], images=image, return_tensors="pt", padding=True).to(device)
        with torch.no_grad():
            outputs = clip_model(**inputs)
            score = outputs.logits_per_image.softmax(dim=1).item()
        return score
    except Exception as e:
        return 0.0

def plot_clip_score(score):
    fig, ax = plt.subplots(figsize=(4, 2.5))
    ax.bar(["CLIPScore"], [score], color='green')
    ax.set_ylim(0, 1)
    ax.set_ylabel("Score")
    ax.set_title("Image-Text Similarity (CLIPScore)")
    ax.grid(axis='y')
    buf = io.BytesIO()
    plt.tight_layout()
    plt.savefig(buf, format='png')
    buf.seek(0)
    return Image.open(buf)

def compute_perplexity(text):
    enc = fluency_tokenizer(text, return_tensors="pt").to(device)
    with torch.no_grad():
        output = fluency_model(**enc, labels=enc.input_ids)
    loss = output.loss
    return np.exp(loss.item())

def evaluate_chitchat_similarity(response, reference):
    if not reference.strip():
        return "", None

    # Semantic similarity
    emb1 = embed_model.encode(response, convert_to_tensor=True)
    emb2 = embed_model.encode(reference, convert_to_tensor=True)
    similarity = util.cos_sim(emb1, emb2).item()

    # Fluency via perplexity
    fluency = compute_perplexity(response)
    fluency_score = max(0.0, 1 - np.log(fluency) / 10)  # Normalize to 0–1

    # Length score
    token_len = len(response.split())
    length_score = min(1.0, token_len / 30)  # Ideal = 30 tokens

    # Plot chart
    fig, ax = plt.subplots(figsize=(6, 2.5))
    labels = ["Semantic", "Fluency", "Length"]
    values = [similarity, fluency_score, length_score]
    ax.bar(labels, values, color='teal')
    ax.set_ylim(0, 1)
    ax.set_title("Chitchat Evaluation Metrics")
    ax.set_ylabel("Score")
    ax.grid(axis='y')

    buf = io.BytesIO()
    plt.tight_layout()
    plt.savefig(buf, format='png')
    buf.seek(0)
    image = Image.open(buf)

    return f"🧠 Semantic: {similarity:.2f} | Fluency: {fluency_score:.2f} | Length: {length_score:.2f}", image

def compute_metrics(prediction, reference):
    try:
        rouge_score = rouge.compute(predictions=[prediction], references=[reference])["rougeL"]
        bert_score = bertscore.compute(predictions=[prediction], references=[reference], lang="en")["f1"][0]
        meteor_score = meteor.compute(predictions=[prediction], references=[reference])["meteor"]

        # Generate chart
        fig, ax = plt.subplots(figsize=(6, 3))
        labels = ["ROUGE-L", "BERTScore-F1", "METEOR"]
        values = [rouge_score, bert_score, meteor_score]
        ax.plot(labels, values, marker='o', linestyle='-', color='blue')
        ax.set_ylim(0, 1)
        ax.set_title("Evaluation Metrics")
        ax.set_ylabel("Score")
        ax.grid(True)

        buf = io.BytesIO()
        plt.tight_layout()
        plt.savefig(buf, format='png')
        buf.seek(0)
        image = Image.open(buf)
        return f"📊 ROUGE-L: {rouge_score:.4f} | BERTScore-F1: {bert_score:.4f} | METEOR: {meteor_score:.4f}", image
    except Exception as e:
        return f"Evaluation error: {e}", None

# -------- Load All NLP Pipelines --------
summarizer = pipeline("summarization", model="facebook/bart-large-cnn", device=device_id)
sentiment_analyzer = pipeline("sentiment-analysis", model="distilbert/distilbert-base-uncased-finetuned-sst-2-english", device=device_id)
qa_pipeline = pipeline("question-answering", model="distilbert/distilbert-base-cased-distilled-squad", device=device_id)
next_word_generator = pipeline("text-generation", model="distilgpt2", device=device_id)
story_generator = pipeline("text-generation", model="aspis/gpt2-genre-story-generation", device=device_id)
chat_tokenizer = BlenderbotTokenizer.from_pretrained("facebook/blenderbot-400M-distill")

# -------- Memory Cleanup --------
def clear_memory():
    gc.collect()
    if device == "cuda":
        torch.cuda.empty_cache()
    elif device == "mps":
        from torch.mps import empty_cache
        empty_cache()

# -------- Utility Functions --------
def clean_incomplete_output(text):
    text = text.replace("<n>", " ")
    sentences = re.split(r'(?<=[.!?]) +', text.strip())
    cleaned = [s for s in sentences if s and s[0].isupper() and s[-1] in ".!?" and len(s.split()) >= 4]
    return ' '.join(cleaned) or text

def clean_at_least_two_sentences(text):
    sentences = re.split(r'(?<=[.!?]) +', text.strip())
    cleaned = [s for s in sentences if s and s[0].isupper() and s[-1] in ".!?" and len(s.split()) >= 4]
    return ' '.join(cleaned[:2]) if len(cleaned) >= 2 else ' '.join(cleaned) if cleaned else text

# -------- NLP Task Functions --------
def summarize(text, reference=None):
    if len(text.split()) < 10:
        return text, None, ""

    max_len = min(60, max(50, int(len(text.split()) * 0.7)))
    min_len = max(20, int(max_len * 0.5))

    try:
        result = summarizer(
            text,
            max_length=max_len,
            min_length=min_len,
            do_sample=True,
            top_k=50,
            top_p=0.92,
            temperature=0.8,
            truncation=True
        )
        clear_memory()
        raw_summary = result[0]['summary_text']
        cleaned = clean_incomplete_output(raw_summary)
        metrics, chart = compute_metrics(cleaned, reference) if reference else ("", None)
        return cleaned, chart, metrics
    except Exception as e:
        return f"Summarization error: {e}", None, ""

def predict_next_word(prompt):
    if not prompt.strip():
        return "Please enter a valid prompt."
    trimmed = ' '.join(prompt.split()[:20])
    try:
        result = next_word_generator(
            trimmed,
            max_new_tokens=15,
            do_sample=True,
            top_k=50,
            top_p=0.95,
            temperature=0.9
        )
        return clean_at_least_two_sentences(result[0]['generated_text'])
    except Exception as e:
        return f"Prediction error: {e}"

def predict_story(prompt):
    if not prompt.strip():
        return "Please enter a story prompt."
    seed = "Once upon a time, " + prompt.strip().capitalize()
    try:
        result = story_generator(
            seed,
            max_new_tokens=80,
            do_sample=True,
            top_k=50,
            top_p=0.95,
            temperature=0.9
        )
        return clean_at_least_two_sentences(result[0]['generated_text'])
    except Exception as e:
        return f"Story generation error: {e}"

def chat_response(text):
    inputs = chat_tokenizer([text], return_tensors="pt").to(device)
    reply_ids = chat_model.generate(**inputs)
    reply = chat_tokenizer.batch_decode(reply_ids, skip_special_tokens=True)[0]
    clear_memory()
    return reply

def analyze_sentiment(text):
    result = sentiment_analyzer(text)[0]
    clear_memory()
    return f"{result['label']} ({result['score']:.2f})"

def answer_question(question, context):
    result = qa_pipeline(question=question, context=context)['answer']
    clear_memory()
    return result

def generate_image(prompt):
    global pipe
    if pipe is None:
        pipe = StableDiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5").to(device)
    image = pipe(prompt).images[0]
    clear_memory()
    return image

# -------- Gradio UI --------
def handle_tasks(task_type, input_text_val, reference_text_val):
    try:
        if task_type == "Text Summarization":
            summary, chart, metrics = summarize(input_text_val, reference_text_val)
            return summary, gr.update(visible=False), gr.update(value=chart, visible=True) if chart else gr.update(visible=False), metrics

        elif task_type == "Image Generation":
            image = generate_image(input_text_val)
            clip_score = compute_clip_score(image, input_text_val)
            chart = plot_clip_score(clip_score)
            return "", gr.update(value=image, visible=True), gr.update(value=chart, visible=True), f"CLIPScore: {clip_score:.2f}"

        elif task_type == "Next Word Prediction":
            output = predict_next_word(input_text_val)
            if reference_text_val.strip():
                metrics, chart = compute_metrics(output, reference_text_val)
                return output, gr.update(visible=False), gr.update(value=chart, visible=True), metrics
            else:
                return output, gr.update(visible=False), gr.update(visible=False), ""

        elif task_type == "Story Prediction":
            output = predict_story(input_text_val)
            if reference_text_val.strip():
                metrics, chart = compute_metrics(output, reference_text_val)
                return output, gr.update(visible=False), gr.update(value=chart, visible=True), metrics
            else:
                return output, gr.update(visible=False), gr.update(visible=False), ""

        elif task_type == "Chatbot":
            output = chat_response(input_text_val)
            if reference_text_val.strip():
                metrics, chart = evaluate_chitchat_similarity(output, reference_text_val)
                return output, gr.update(visible=False), gr.update(value=chart, visible=True), metrics
            else:
                return output, gr.update(visible=False), gr.update(visible=False), ""

        elif task_type == "Sentiment Analysis":
            output = analyze_sentiment(input_text_val)
            if reference_text_val.strip():
                metrics, chart = compute_metrics(output, reference_text_val)
                return output, gr.update(visible=False), gr.update(value=chart, visible=True), metrics
            else:
                return output, gr.update(visible=False), gr.update(visible=False), ""

        elif task_type == "Question Answering":
            output = answer_question(input_text_val, reference_text_val)
            if reference_text_val.strip():
                metrics, chart = compute_metrics(output, reference_text_val)
                return output, gr.update(visible=False), gr.update(value=chart, visible=True), metrics
            else:
                return output, gr.update(visible=False), gr.update(visible=False), ""

        else:
            return "❌ Unsupported task.", gr.update(visible=False), gr.update(visible=False), ""

    except Exception as e:
        return f"{task_type} error: {e}", gr.update(visible=False), gr.update(visible=False), ""

def clear_all():
    return "", "", "", gr.update(visible=False), gr.update(visible=False), ""

with gr.Blocks() as demo:
    gr.Markdown("# 🤖 Multifunctional NLP & Image Generation Tool")

    task = gr.Dropdown(
        choices=[
            "Text Summarization",
            "Next Word Prediction",
            "Story Prediction",
            "Chatbot",
            "Sentiment Analysis",
            "Question Answering",
            "Image Generation"
        ],
        label="Choose Task",
        interactive=True
    )

    input_text = gr.Textbox(label="Enter Input")
    reference_input = gr.Textbox(label="Reference Output (Optional)")
    output_text = gr.Textbox(label="Output (Text)", lines=4)
    generated_image = gr.Image(label="Generated Image", visible=False)
    graph_image = gr.Image(label="Evaluation Chart", visible=False)
    metrics_output = gr.Textbox(label="Evaluation Metrics", interactive=False)

    clear_button = gr.Button("Clear All")
    generate_button = gr.Button("Generate Output")

    gr.Markdown("### 📌 Notes:\n- Reference is optional but used for evaluation chart.")

    generate_button.click(
        fn=handle_tasks,
        inputs=[task, input_text, reference_input],
        outputs=[output_text, generated_image, graph_image, metrics_output]
    )

    clear_button.click(
        fn=clear_all,
        inputs=[],
        outputs=[input_text, reference_input, output_text, generated_image, graph_image, metrics_output]
    )

demo.launch(share=True, inline=False, debug=True)

[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!
[nltk_data] Downloading package omw-1.4 to /root/nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!
Device set to use cuda:0
Device set to use cuda:0
Device set to use cuda:0
Device set to use cuda:0
Device set to use cuda:0


Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://b0e6dbfff8daac44f9.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


model.safetensors:   0%|          | 0.00/1.42G [00:00<?, ?B/s]

Some weights of RobertaModel were not initialized from the model checkpoint at roberta-large and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.
`loss_type=None` was set in the config but it is unrecognised.Using the default loss: `ForCausalLMLoss`.


model_index.json:   0%|          | 0.00/541 [00:00<?, ?B/s]

Fetching 15 files:   0%|          | 0/15 [00:00<?, ?it/s]

config.json:   0%|          | 0.00/617 [00:00<?, ?B/s]

preprocessor_config.json:   0%|          | 0.00/342 [00:00<?, ?B/s]

config.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

model.safetensors:   0%|          | 0.00/1.22G [00:00<?, ?B/s]

scheduler_config.json:   0%|          | 0.00/308 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/472 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/492M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/743 [00:00<?, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

config.json:   0%|          | 0.00/547 [00:00<?, ?B/s]

diffusion_pytorch_model.safetensors:   0%|          | 0.00/3.44G [00:00<?, ?B/s]

diffusion_pytorch_model.safetensors:   0%|          | 0.00/335M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/806 [00:00<?, ?B/s]

Loading pipeline components...:   0%|          | 0/7 [00:00<?, ?it/s]

  0%|          | 0/50 [00:00<?, ?it/s]