<a href="https://colab.research.google.com/github/Peagledor/ai-projects/blob/main/DataForge.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install torch transformers gradio pandas
!python your_script.py

Collecting gradio
  Downloading gradio-5.12.0-py3-none-any.whl.metadata (16 kB)
Collecting aiofiles<24.0,>=22.0 (from gradio)
  Downloading aiofiles-23.2.1-py3-none-any.whl.metadata (9.7 kB)
Collecting fastapi<1.0,>=0.115.2 (from gradio)
  Downloading fastapi-0.115.6-py3-none-any.whl.metadata (27 kB)
Collecting ffmpy (from gradio)
  Downloading ffmpy-0.5.0-py3-none-any.whl.metadata (3.0 kB)
Collecting gradio-client==1.5.4 (from gradio)
  Downloading gradio_client-1.5.4-py3-none-any.whl.metadata (7.1 kB)
Collecting markupsafe~=2.0 (from gradio)
  Downloading MarkupSafe-2.1.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.0 kB)
Collecting pydub (from gradio)
  Downloading pydub-0.25.1-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting python-multipart>=0.0.18 (from gradio)
  Downloading python_multipart-0.0.20-py3-none-any.whl.metadata (1.8 kB)
Collecting ruff>=0.2.2 (from gradio)
  Downloading ruff-0.9.1-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.meta

In [None]:
import os
import torch
from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    TextStreamer,
    BitsAndBytesConfig,
    pipeline
)
import gradio as gr
import pandas as pd
import json
from datetime import datetime, timedelta
import random
from google.colab import userdata
from huggingface_hub import login

hf_token = userdata.get('HF_TOKEN')
login(hf_token, add_to_git_credential=True)

class DataGenerator:
    def __init__(self):
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.model = None
        self.tokenizer = None

    def load_llama_model(self):
        """Load the LLaMA 3.1B model"""
        try:
            print("Starting model loading process...")
            # Use the same model as in your working example
            model_name = "meta-llama/Llama-2-3.1b-chat-hf"
            print(f"Attempting to load tokenizer for {model_name}")

            self.tokenizer = AutoTokenizer.from_pretrained(model_name)
            self.tokenizer.pad_token = self.tokenizer.eos_token
            print("Tokenizer loaded successfully")

            # Configure quantization like in the PDF example
            quant_config = BitsAndBytesConfig(
                load_in_4bit=True,
                bnb_4bit_use_double_quant=True,
                bnb_4bit_compute_dtype=torch.bfloat16,
                bnb_4bit_quant_type="nf4"
            )

            print("Starting model loading...")
            self.model = AutoModelForCausalLM.from_pretrained(
                model_name,
                device_map="auto",
                quantization_config=quant_config
            )
            print("Model loaded successfully")
            return True

        except Exception as e:
            print(f"Detailed error loading LLaMA model: {str(e)}")
            print(f"Error type: {type(e)}")
            import traceback
            print(f"Full traceback: {traceback.format_exc()}")
            return False

    def generate_with_model(self, prompt, num_records):
        """Generate data using the loaded model"""
        try:
            messages = [
                {"role": "system", "content": "You are a data generation assistant. Generate realistic test data in JSON format."},
                {"role": "user", "content": f"Generate {num_records} records of {prompt} in JSON format."}
            ]

            inputs = self.tokenizer.apply_chat_template(messages, return_tensors="pt")
            inputs = inputs.to(self.device)

            outputs = self.model.generate(
                inputs,
                max_new_tokens=2000,
                temperature=0.7,
                pad_token_id=self.tokenizer.pad_token_id
            )

            response = self.tokenizer.decode(outputs[0])

            # Extract JSON from response
            try:
                # Find JSON content within the response
                start_idx = response.find('[')
                end_idx = response.rfind(']') + 1
                if start_idx != -1 and end_idx != -1:
                    json_str = response[start_idx:end_idx]
                    data = json.loads(json_str)
                    return pd.DataFrame(data)
            except:
                pass

            return self.generate_template_based(prompt, num_records)

        except Exception as e:
            print(f"Error in model generation: {str(e)}")
            return self.generate_template_based(prompt, num_records)

    def generate_template_based(self, description, num_records):
        """Generate data using templates and customization"""
        desc_lower = description.lower()

        # Detect business type from description
        if "retail" in desc_lower or "product" in desc_lower or "store" in desc_lower:
            return self._generate_retail_data(num_records, description)
        elif "health" in desc_lower or "medical" in desc_lower or "patient" in desc_lower:
            return self._generate_healthcare_data(num_records, description)
        elif "finance" in desc_lower or "bank" in desc_lower or "transaction" in desc_lower:
            return self._generate_finance_data(num_records, description)
        elif "tech" in desc_lower or "software" in desc_lower or "user" in desc_lower:
            return self._generate_tech_data(num_records, description)
        else:
            return self._generate_custom_data(num_records, description)

    def _generate_retail_data(self, num_records, description):
        """Generate retail business data with customization"""
        # Parse description for customization hints
        include_inventory = "inventory" in description.lower()
        include_customer = "customer" in description.lower()

        data = {
            'transaction_id': [f'TRX{i:06d}' for i in range(num_records)],
            'date': [(datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d')
                    for i in range(num_records)],
            'product_id': [f'PRD{i:04d}' for i in range(num_records)],
            'product_name': [f'Product {i}' for i in range(num_records)],
            'quantity': [random.randint(1, 100) for _ in range(num_records)],
            'unit_price': [round(random.uniform(10.0, 1000.0), 2) for _ in range(num_records)]
        }

        if include_inventory:
            data['stock_level'] = [random.randint(0, 1000) for _ in range(num_records)]
            data['reorder_point'] = [random.randint(10, 100) for _ in range(num_records)]

        if include_customer:
            data['customer_id'] = [f'CUST{random.randint(1000, 9999)}' for _ in range(num_records)]
            data['customer_segment'] = [random.choice(['Regular', 'Premium', 'VIP'])
                                      for _ in range(num_records)]

        return pd.DataFrame(data)

    def _generate_healthcare_data(self, num_records, description):
        """Generate healthcare data with customization"""
        departments = ['Cardiology', 'Neurology', 'Pediatrics', 'Orthopedics', 'Internal Medicine']
        data = {
            'patient_id': [f'PAT{i:06d}' for i in range(num_records)],
            'appointment_date': [(datetime.now() + timedelta(days=i)).strftime('%Y-%m-%d')
                               for i in range(num_records)],
            'doctor_id': [f'DOC{random.randint(100, 999)}' for _ in range(num_records)],
            'department': [random.choice(departments) for _ in range(num_records)]
        }

        if "insurance" in description.lower():
            data['insurance_provider'] = [f'INS{random.randint(100, 999)}'
                                        for _ in range(num_records)]
            data['coverage_type'] = [random.choice(['Full', 'Partial', 'Basic'])
                                   for _ in range(num_records)]

        return pd.DataFrame(data)

    def _generate_finance_data(self, num_records, description):
        """Generate financial data with customization"""
        transaction_types = ['deposit', 'withdrawal', 'transfer', 'payment']
        data = {
            'transaction_id': [f'FIN{i:06d}' for i in range(num_records)],
            'date': [(datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d')
                    for i in range(num_records)],
            'type': [random.choice(transaction_types) for _ in range(num_records)],
            'amount': [round(random.uniform(10.0, 10000.0), 2) for _ in range(num_records)]
        }
        return pd.DataFrame(data)

    def _generate_tech_data(self, num_records, description):
        """Generate technology-related data with customization"""
        status_options = ['active', 'inactive', 'pending', 'completed']
        data = {
            'event_id': [f'TECH{i:06d}' for i in range(num_records)],
            'timestamp': [(datetime.now() - timedelta(hours=i)).strftime('%Y-%m-%d %H:%M:%S')
                        for i in range(num_records)],
            'status': [random.choice(status_options) for _ in range(num_records)],
            'user_id': [f'USR{random.randint(1000, 9999)}' for _ in range(num_records)]
        }
        return pd.DataFrame(data)

    def _generate_custom_data(self, num_records, description):
        """Generate custom data based on description"""
        # Basic implementation - this could be enhanced based on description parsing
        data = {
            'id': [f'CUSTOM_{i:06d}' for i in range(num_records)],
            'timestamp': [(datetime.now() - timedelta(hours=i)).strftime('%Y-%m-%d %H:%M:%S')
                        for i in range(num_records)],
            'value': [round(random.uniform(0, 1000), 2) for _ in range(num_records)]
        }
        return pd.DataFrame(data)

class DataForgeUI:
    def __init__(self):
        self.generator = DataGenerator()
        self.model_loaded = False

    def create_interface(self):
        with gr.Blocks() as interface:
            gr.Markdown("# Advanced Data Forge: Synthetic Business Data Generator")

            with gr.Row():
                with gr.Column():
                    model_choice = gr.Radio(
                        choices=["Template Based", "LLaMA Model"],
                        label="Generation Method",
                        value="Template Based"
                    )

                    load_model_btn = gr.Button("Load LLaMA Model")
                    model_status = gr.Markdown("Model Status: Not Loaded")

            with gr.Row():
                description = gr.Textbox(
                    label="Describe the data you need",
                    placeholder="Example: Generate retail data with inventory levels and customer segments",
                    lines=3
                )

                num_records = gr.Slider(
                    minimum=5,
                    maximum=100,
                    value=10,
                    step=5,
                    label="Number of Records"
                )

            with gr.Row():
                generate_btn = gr.Button("Generate Data")

            with gr.Row():
                output_table = gr.DataFrame()

            # Example templates
            gr.Markdown("### Example Templates")
            examples = gr.Examples(
                examples=[
                    ["Generate retail data with inventory levels and customer segments", 10],
                    ["Create healthcare records with insurance information", 15],
                    ["Generate financial transaction data with different account types", 20],
                    ["Create technology usage logs with user activity", 25]
                ],
                inputs=[description, num_records]
            )

            def load_model():
                success = self.generator.load_llama_model()
                self.model_loaded = success
                return "Model Status: Loaded Successfully" if success else "Model Status: Loading Failed"

            def generate_data(description, num_records, model_choice):
                if model_choice == "LLaMA Model" and self.model_loaded:
                    return self.generator.generate_with_model(description, num_records)
                else:
                    return self.generator.generate_template_based(description, num_records)

            load_model_btn.click(
                fn=load_model,
                outputs=[model_status]
            )

            generate_btn.click(
                fn=generate_data,
                inputs=[description, num_records, model_choice],
                outputs=[output_table]
            )

        return interface

if __name__ == "__main__":
    ui = DataForgeUI()
    interface = ui.create_interface()
    interface.launch()

Running Gradio in a Colab notebook requires sharing enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://80d9a7fcbe42383175.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
