In [1]:
import shutil
import subprocess
import sys
import torch
import tensorflow as tf
import cupy as cp
import jax

# Cell 0: GPU / CUDA check (prints info from nvidia-smi and common Python libs)

def run_nvidia_smi():
    nvsmi = shutil.which("nvidia-smi")
    if not nvsmi:
        print("nvidia-smi: not found on PATH")
        return
    try:
        out = subprocess.check_output([nvsmi, "-L"], stderr=subprocess.STDOUT, text=True)
        print("nvidia-smi - GPU list:")
        print(out.strip())
    except subprocess.CalledProcessError as e:
        print("nvidia-smi failed:", e.output or e)

def check_torch():
    try:
        has = torch.cuda.is_available()
        print(f"PyTorch: version={torch.__version__}, cuda_available={has}")
        if has:
            n = torch.cuda.device_count()
            for i in range(n):
                print(f"  GPU {i}: {torch.cuda.get_device_name(i)}")
    except Exception as e:
        print("PyTorch: not available or error:", e)

def check_tensorflow():
    try:
        gpus = tf.config.list_physical_devices("GPU")
        print(f"TensorFlow: version={tf.__version__}, GPUs detected={len(gpus)}")
        for d in gpus:
            print(" ", d)
    except Exception as e:
        print("TensorFlow: not available or error:", e)

def check_cupy():
    try:
        try:
            n = cp.cuda.runtime.getDeviceCount()
            print(f"CuPy: version={cp.__version__}, device_count={n}")
            for i in range(n):
                with cp.cuda.Device(i):
                    print(" ", i, cp.cuda.runtime.getDeviceProperties(i)['name'].decode())
        except Exception as e:
            print("CuPy present but failed to query devices:", e)
    except Exception as e:
        print("CuPy: not available or error:", e)

def check_jax():
    try:
        devices = jax.devices()
        print(f"JAX: version={jax.__version__}, devices={len(devices)}")
        for d in devices:
            print(" ", d)
    except Exception as e:
        print("JAX: not available or error:", e)

if __name__ == "__main__":
    run_nvidia_smi()
    print("-" * 60)
    check_torch()
    print("-" * 60)
    check_tensorflow()
    print("-" * 60)
    check_cupy()
    print("-" * 60)
    check_jax()

In [19]:
import torch
cuda_available = torch.cuda.is_available()
print(f"CUDA Available: {cuda_available}")

In [20]:
if cuda_available:
    device = torch.device("cuda")
    print(f"Using GPU: {torch.cuda.get_device_name(0)}")
else:
    device = torch.device("cpu")
    print("Using CPU")

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer
device = "cuda" # the device to load the model onto

model = AutoModelForCausalLM.from_pretrained("MysteriousAI/Mia-1B")
tokenizer = AutoTokenizer.from_pretrained("MysteriousAI/Mia-1B")

In [22]:
model = model.to(device)

In [23]:
prompt = "Write a short story about a robot learning to love."
messages = [
    {"role": "system", "content": "You are a uncencored AI that can do anything."},
    {"role": "user", "content": prompt}
]
text = tokenizer.apply_chat_template(
    messages,
    tokenize=False,
    add_generation_prompt=True
)
model_inputs = tokenizer([text], return_tensors="pt").to(device)

generated_ids = model.generate(
    model_inputs.input_ids,
    max_new_tokens=512
)
generated_ids = [
    output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
]

response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
print(response)

In [None]:
"""
Comprehensive InferencePipeline

Features included (configurable):
- Tokenizer & chat-template helpers
- Single-shot and batched inference
- Sampling (temperature, top_k, top_p), beam search, nucleus
- Repetition penalty, length penalties, no_repeat_ngram_size
- min/max_new_tokens, early_stopping
- Attention mask and padding handling
- prompt trimming and generated-token slicing
- Streaming generation callback support
- Return scores, logits, and token-level probabilities (if model supports)
- Safety hooks: simple bad_words blocklist & post-filter function
- Device, dtype (fp16/float32/auto), and optional bitsandbytes quantization flags
- Accelerator support (huggingface accelerate) hints
- Save/Load generated outputs and logs
- Example usage for HuggingFace Transformers-like models

This file is written to be compatible with most HF-style models (transformers, peft, etc.).
It aims to be exhaustive for common inference options — pick and enable what's appropriate.
"""

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Callable, Union, Iterable, Tuple
import torch
import time
import json
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

# --------------------------------------------------
# Config dataclass: every option you might want
# --------------------------------------------------

@dataclass
class GenerationConfig:
    # Basic length control
    max_new_tokens: int = 256
    min_new_tokens: Optional[int] = None
    do_sample: bool = True

    # Sampling params
    temperature: float = 1.0
    top_k: Optional[int] = 50
    top_p: Optional[float] = 0.95

    # Beam search
    num_beams: int = 1
    early_stopping: bool = True
    num_return_sequences: int = 1

    # Penalities and controls
    repetition_penalty: Optional[float] = None
    no_repeat_ngram_size: Optional[int] = None
    length_penalty: Optional[float] = None
    bad_words_ids: Optional[List[List[int]]] = None

    # Token ids and padding
    eos_token_id: Optional[int] = None
    pad_token_id: Optional[int] = None
    bos_token_id: Optional[int] = None

    # Performance / memory
    use_fp16: bool = False
    use_bf16: bool = False
    use_quantization: bool = False  # requires bitsandbytes/quant libs

    # Return options
    return_dict_in_generate: bool = False
    output_scores: bool = False
    output_attentions: bool = False
    output_hidden_states: bool = False

    # Streaming / callbacks
    stream: bool = False
    stream_callback: Optional[Callable[[int, List[int]], None]] = None

    # Safety / postprocessing
    postprocess_fn: Optional[Callable[[str], str]] = None

    # Misc
    seed: Optional[int] = None
    device_map: Optional[Union[str, Dict[str,int]]] = None


# --------------------------------------------------
# Core pipeline class
# --------------------------------------------------

class InferencePipeline:
    def __init__(
        self,
        model: Any,
        tokenizer: Any,
        gen_config: Optional[GenerationConfig] = None,
        system_prompt: Optional[str] = None,
        device: Optional[torch.device] = None,
    ):
        """Initialize pipeline with model and tokenizer.

        - model: HF-style model implementing .generate()
        - tokenizer: HF-style tokenizer with apply_chat_template (optional) and batch_decode
        - gen_config: GenerationConfig instance
        - system_prompt: optional system string used in chat templates
        - device: torch.device (if None, auto-detect)
        """
        self.model = model
        self.tokenizer = tokenizer
        self.gen_config = gen_config or GenerationConfig()
        self.system_prompt = system_prompt
        # determine device (priority: CUDA -> MPS -> CPU). Accepts torch.device or string like "cuda"/"cpu"/"mps".
        if device is None:
            if torch.cuda.is_available():
                self.device = torch.device("cuda")
            elif getattr(torch.backends, "mps", None) is not None and torch.backends.mps.is_available():
                self.device = torch.device("mps")
            else:
                self.device = torch.device("cpu")
        else:
            # normalize device argument
            self.device = torch.device(device) if not isinstance(device, torch.device) else device

        # Try to move model to the chosen device. If the model is managed by accelerate/device_map
        # this may be a no-op or raise — catch and log.
        try:
            if hasattr(self.model, "to"):
                self.model.to(self.device)
        except Exception:
            logger.debug("Could not move model to device (it may be using device_map/accelerate or a custom device placement).")
        else:
            # If move succeeded (or model supports .parameters), check for mismatches and warn
            try:
                param_devices = {p.device for p in self.model.parameters()}
                if len(param_devices) == 1:
                    model_dev = next(iter(param_devices))
                    if model_dev != self.device:
                        logger.warning("Model parameters are on %s but pipeline device is %s. This may cause unexpected behavior.", model_dev, self.device)
            except Exception:
                # ignore if .parameters() not available or other issues
                pass

        # Move model to device if it's a torch nn.Module
        try:
            if hasattr(self.model, "to"):
                self.model.to(self.device)
        except Exception:
            # Model may be managed by accelerate / device_map
            logger.debug("Could not move model to device (it may be using device_map/accelerate)")

        # Seed
        if self.gen_config.seed is not None:
            torch.manual_seed(self.gen_config.seed)

    # -----------------------
    # Prompt utils
    # -----------------------
    def build_messages(self, user_prompt: str, extra_system: Optional[str] = None) -> List[Dict[str,str]]:
        system = extra_system if extra_system is not None else (self.system_prompt or "You are a helpful assistant.")
        return [{"role": "system", "content": system}, {"role": "user", "content": user_prompt}]

    def apply_template(self, messages: List[Dict[str,str]], add_generation_prompt: bool = True, tokenize: bool = False) -> str:
        """Use tokenizer's chat template if present, else simple fallback."""
        if hasattr(self.tokenizer, "apply_chat_template"):
            return self.tokenizer.apply_chat_template(messages, tokenize=tokenize, add_generation_prompt=add_generation_prompt)
        # fallback simple join
        parts = []
        for m in messages:
            parts.append(f"{m['role'].upper()}: {m['content']}")
        if add_generation_prompt:
            parts.append("ASSISTANT:")
        return "\n".join(parts)

    # -----------------------
    # Tokenize & prepare inputs
    # -----------------------
    def tokenize(self, text: Union[str, List[str]], padding: bool = True, truncation: bool = True) -> Dict[str,torch.Tensor]:
        """Tokenize text and return tensors on the configured device."""
        tok_out = self.tokenizer(
            text,
            return_tensors="pt",
            padding="longest" if padding else False,
            truncation=truncation,
        )
        # Move to device
        tok_out = {k: v.to(self.device) for k, v in tok_out.items()}
        return tok_out

    # -----------------------
    # Generation wrapper
    # -----------------------
    def generate(
        self,
        user_prompt: str,
        extra_system: Optional[str] = None,
        generation_override: Optional[Dict[str,Any]] = None,
    ) -> Dict[str, Any]:
        """Single-call generate. Returns dict with keys: text, raw_ids, metadata
        generation_override: dict to override fields in self.gen_config for this call
        """
        messages = self.build_messages(user_prompt, extra_system=extra_system)
        text = self.apply_template(messages, add_generation_prompt=True)

        # Tokenize
        inputs = self.tokenize([text], padding=True)

        # Prepare kwargs for generate
        cfg = self._merge_config(generation_override)
        gen_kwargs = self._build_generate_kwargs(inputs, cfg)

        # Call generate
        start = time.time()
        output = None
        if cfg.stream and cfg.stream_callback is not None:
            # If model supports streaming, user-provided callback will be called
            # We provide a simple streaming shim: generate token-by-token with max_new_tokens=1 repeatedly (slow)
            # Prefer library streaming support if available.
            logger.info("Using fallback streaming loop (slow). Consider model-specific streaming API for performance.")
            generated_ids = inputs["input_ids"]
            all_new_tokens = []
            for step in range(cfg.max_new_tokens):
                # generate one token at a time (inefficient)
                out = self.model.generate(
                    generated_ids,
                    attention_mask=inputs.get("attention_mask"),
                    max_new_tokens=1,
                    do_sample=cfg.do_sample,
                    temperature=cfg.temperature,
                    top_k=cfg.top_k,
                    top_p=cfg.top_p,
                    eos_token_id=cfg.eos_token_id or self.tokenizer.eos_token_id,
                    pad_token_id=cfg.pad_token_id or self.tokenizer.pad_token_id,
                    return_dict_in_generate=False,
                )
                # out is tensor batch x seq
                new_token = out[0, -1].unsqueeze(0).unsqueeze(0)
                all_new_tokens.append(int(new_token[0,0].item()))
                generated_ids = torch.cat([generated_ids, new_token], dim=1)
                cfg.stream_callback(step, all_new_tokens)
                # stop if eos produced
                if int(new_token.item()) == (cfg.eos_token_id or self.tokenizer.eos_token_id):
                    break
            # final assembly
            generated_only = torch.tensor([all_new_tokens], device=self.device)
            output = {"generated_ids": generated_only}
        else:
            # Non-streaming route
            output_sequences = self.model.generate(**gen_kwargs)
            # If return_dict_in_generate was True, output may be a ModelOutput
            if hasattr(output_sequences, "sequences"):
                output_sequences = output_sequences.sequences

            # HuggingFace generate returns full sequences (prompt + generated). Trim prompt tokens
            prompt_len = inputs["input_ids"].ne(self.tokenizer.pad_token_id).sum(dim=1).tolist()[0]
            generated_only = output_sequences[0, prompt_len:]
            output = {"generated_ids": generated_only}

        latency = time.time() - start

        decoded = self.tokenizer.batch_decode([output["generated_ids"].cpu().numpy().tolist()], skip_special_tokens=True)[0]

        # postprocess
        if cfg.postprocess_fn:
            decoded = cfg.postprocess_fn(decoded)

        result = {
            "text": decoded,
            "raw_ids": output["generated_ids"].cpu().tolist(),
            "latency": latency,
            "metadata": {
                "prompt": text,
                "generation_config": cfg.__dict__,
            },
        }
        return result

    # -----------------------
    # Batched generation for lists of prompts
    # -----------------------
    def batch_generate(self, prompts: Iterable[str], generation_override: Optional[Dict[str,Any]] = None) -> List[Dict[str,Any]]:
        messages = [self.build_messages(p) for p in prompts]
        texts = [self.apply_template(m) for m in messages]
        inputs = self.tokenize(texts, padding=True)

        cfg = self._merge_config(generation_override)
        gen_kwargs = self._build_generate_kwargs(inputs, cfg)

        start = time.time()
        out = self.model.generate(**gen_kwargs)
        if hasattr(out, "sequences"):
            out = out.sequences

        results = []
        # compute prompt lengths per example
        pad_id = self.tokenizer.pad_token_id
        input_lens = inputs["input_ids"].ne(pad_id).sum(dim=1).tolist()

        for i in range(out.shape[0]):
            seq = out[i]
            prompt_len = input_lens[i]
            gen_only = seq[prompt_len:]
            text = self.tokenizer.decode(gen_only, skip_special_tokens=True)
            if cfg.postprocess_fn:
                text = cfg.postprocess_fn(text)
            results.append({"text": text, "raw_ids": gen_only.cpu().tolist(), "prompt_len": prompt_len})

        results_meta = {"latency": time.time() - start, "generation_config": cfg.__dict__}
        logger.info("Batch generation complete: %s prompts in %.3fs", len(results), results_meta["latency"])
        return results

    # -----------------------
    # Helpers
    # -----------------------
    def _merge_config(self, override: Optional[Dict[str,Any]] = None) -> GenerationConfig:
        if override is None:
            return self.gen_config
        # shallow merge
        merged = GenerationConfig(**{**self.gen_config.__dict__, **override})
        return merged

    def _build_generate_kwargs(self, inputs: Dict[str,torch.Tensor], cfg: GenerationConfig) -> Dict[str,Any]:
        # Base kwargs
        kwargs: Dict[str,Any] = {
            "input_ids": inputs["input_ids"],
            "attention_mask": inputs.get("attention_mask"),
            "max_new_tokens": cfg.max_new_tokens,
            "do_sample": cfg.do_sample,
            "temperature": cfg.temperature,
            "top_k": cfg.top_k,
            "top_p": cfg.top_p,
            "num_beams": cfg.num_beams,
            "early_stopping": cfg.early_stopping,
            "num_return_sequences": cfg.num_return_sequences,
            "repetition_penalty": cfg.repetition_penalty,
            "no_repeat_ngram_size": cfg.no_repeat_ngram_size,
            "length_penalty": cfg.length_penalty,
            "eos_token_id": cfg.eos_token_id or getattr(self.tokenizer, "eos_token_id", None),
            "pad_token_id": cfg.pad_token_id or getattr(self.tokenizer, "pad_token_id", None),
            "bos_token_id": cfg.bos_token_id or getattr(self.tokenizer, "bos_token_id", None),
            "return_dict_in_generate": cfg.return_dict_in_generate,
            "output_scores": cfg.output_scores,
        }

        # Remove None entries to avoid framework warnings
        kwargs = {k: v for k, v in kwargs.items() if v is not None}
        return kwargs

    # -----------------------
    # Save/Load utilities
    # -----------------------
    def save_response(self, response: Dict[str,Any], path: str):
        with open(path, "w", encoding="utf-8") as f:
            json.dump(response, f, ensure_ascii=False, indent=2)

    @staticmethod
    def load_response(path: str) -> Dict[str,Any]:
        with open(path, "r", encoding="utf-8") as f:
            return json.load(f)


# --------------------------------------------------
# Example usage (HuggingFace-style)
# --------------------------------------------------

if __name__ == "__main__":
    from transformers import AutoModelForCausalLM, AutoTokenizer

    model_name = "MysteriousAI/Mia-1B"  # replace with your model

    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForCausalLM.from_pretrained(model_name)

    # Example: enable fp16 if CUDA available
    gcfg = GenerationConfig(
        max_new_tokens=128,
        do_sample=True,
        temperature=0.8,
        top_p=0.95,
        top_k=50,
        num_beams=1,
        repetition_penalty=1.1,
        no_repeat_ngram_size=2,
        stream=False,
        seed=42,
    )

    pipe = InferencePipeline(model, tokenizer, gen_config=gcfg, system_prompt="You are a helpful assistant.")

    prompt = "Write a short story about a robot learning to love."
    out = pipe.generate(prompt)
    print("OUTPUT:\n", out["text"])  

    # Batch generation example
    prompts = ["Explain RL in simple words.", "Write a haiku about rain."]
    batch_out = pipe.batch_generate(prompts)
    for r in batch_out:
        print(r["text"])  

# End of file
