In [None]:
# Instalacja wymaganych pakietów
%pip install -q sentence-transformers transformers pillow sqlalchemy torch numpy pandas

## 1. Inicjalizacja i Konfiguracja

In [None]:
import os
import torch
import numpy as np
from pathlib import Path
from typing import List, Optional, Dict
from dataclasses import dataclass
from datetime import datetime

from sqlalchemy import create_engine, Column, Integer, String, Float, JSON, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker, Session, relationship
from sentence_transformers import SentenceTransformer
from transformers import AutoModelForCausalLM, AutoTokenizer

# ========== KONFIGURACJA ==========
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
DATABASE_URL = "sqlite:///./metal_parts.db"
EMBEDDING_MODEL = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
LLM_MODEL = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"

# Disable TensorFlow in transformers
os.environ["TRANSFORMERS_NO_TF"] = "1"

print(f"✓ Urządzenie: {DEVICE}")
print(f"✓ Baza danych: {DATABASE_URL}")

# BAZA DANYCH 
Base = declarative_base()
engine = create_engine(DATABASE_URL, echo=False)
SessionLocal = sessionmaker(bind=engine)

## 2. Modele Danych

In [None]:
@dataclass
class MetalPart:
    """Reprezentacja części metalowej"""
    part_id: str
    description: str
    material: str
    category: str  # fasteners, bearings, springs, shafts, etc.
    dimensions: Dict  # {"diameter_mm": 8, "length_mm": 20, ...}
    tags: List[str]
    image_path: Optional[str] = None
    image_embedding: Optional[List[float]] = None
    text_embedding: Optional[List[float]] = None


class PartDB(Base):
    """SQLAlchemy model dla bazy części"""
    __tablename__ = "metal_parts"

    id = Column(Integer, primary_key=True, autoincrement=True)
    part_id = Column(String, unique=True, nullable=False)
    description = Column(String, nullable=False)
    material = Column(String, nullable=True)
    category = Column(String, nullable=True)
    dimensions = Column(JSON, nullable=True)  # {"diameter_mm": 8, ...}
    tags = Column(JSON, nullable=True)  # ["śruba", "metalowa", ...]
    image_path = Column(String, nullable=True)
    image_embedding = Column(String, nullable=True)  # Zapisane jako string
    text_embedding = Column(String, nullable=True)  # Zapisane jako string
    created_at = Column(String, nullable=True)


class SearchLog(Base):
    """Historia wyszukiwań"""
    __tablename__ = "search_logs"

    id = Column(Integer, primary_key=True, autoincrement=True)
    query = Column(String, nullable=False)
    query_type = Column(String, nullable=False)  # "text", "image", "hybrid"
    top_k = Column(Integer, nullable=True)
    results_count = Column(Integer, nullable=True)
    timestamp = Column(String, nullable=True)


def init_database():
    """Utwórz tabele w bazie"""
    Base.metadata.create_all(engine)
    print("✓ Baza danych zainicjalizowana")

init_database()

## 3. Embeddingi i Ekstrakcja Cech

In [None]:
# Załaduj modele embeddingów
print("Ładowanie modelu embeddingów...")
embedding_model = SentenceTransformer(EMBEDDING_MODEL, device=DEVICE)
print(f"✓ Model embeddingów załadowany: {EMBEDDING_MODEL}")

def get_text_embedding(text: str) -> List[float]:
    """Konwertuj tekst na embedding"""
    embedding = embedding_model.encode(text, normalize_embeddings=True)
    return embedding.tolist()


def get_image_embedding(image_path: str) -> List[float]:
    """Konwertuj obraz na embedding"""
    from PIL import Image
    try:
        img = Image.open(image_path).convert('RGB')
        img_embedding = embedding_model.encode(img, normalize_embeddings=True)
        return img_embedding.tolist()
    except Exception as e:
        print(f"Błąd wczytywania obrazu {image_path}: {e}")
        return None


def build_part_text(part: MetalPart) -> str:
    """Zbuduj tekstowy opis części do embeddingu"""
    dims_str = ", ".join([f"{k}: {v}" for k, v in (part.dimensions or {}).items()])
    tags_str = ", ".join(part.tags)
    
    text = f"""
    Część metalowa: {part.description}
    ID: {part.part_id}
    Kategoria: {part.category}
    Materiał: {part.material}
    Wymiary: {dims_str}
    Tagi: {tags_str}
    """
    return text


# Test
test_text = "Śruba sześciokątna M8 ze stali nierdzewnej"
test_emb = get_text_embedding(test_text)
print(f"\n✓ Test embeddingu tekstowego:")
print(f"  Tekst: {test_text}")
print(f"  Długość embeddingu: {len(test_emb)}D")
print(f"  Pierwsze 5 wartości: {test_emb[:5]}")

## 4. Funkcje do Indeksowania Części

In [None]:
def add_part_to_db(db: Session, part: MetalPart) -> bool:
    """Dodaj część do bazy danych z embeddingami"""
    try:
        # Generuj embeddingi
        text_desc = build_part_text(part)
        text_emb = get_text_embedding(text_desc)
        
        image_emb = None
        if part.image_path and Path(part.image_path).exists():
            image_emb = get_image_embedding(part.image_path)
        
        # Konwertuj embedding na string dla SQLite
        text_emb_str = ";".join(str(x) for x in text_emb) if text_emb else None
        image_emb_str = ";".join(str(x) for x in image_emb) if image_emb else None
        
        # Zapisz do bazy
        db_part = PartDB(
            part_id=part.part_id,
            description=part.description,
            material=part.material,
            category=part.category,
            dimensions=part.dimensions,
            tags=part.tags,
            image_path=part.image_path,
            text_embedding=text_emb_str,
            image_embedding=image_emb_str,
            created_at=datetime.now().isoformat()
        )
        db.add(db_part)
        db.commit()
        return True
    except Exception as e:
        print(f"Błąd dodawania części: {e}")
        return False


def load_parts_from_db(db: Session) -> List[PartDB]:
    """Załaduj wszystkie części z bazy"""
    return db.query(PartDB).all()


def parse_embedding_from_db(emb_str: str) -> List[float]:
    """Konwertuj string embeddingu ze stanu do listy"""
    if not emb_str:
        return None
    return [float(x) for x in emb_str.split(";")]


def cosine_similarity(a: List[float], b: List[float]) -> float:
    """Oblicz cosine similarity między dwoma wektorami"""
    a = np.array(a)
    b = np.array(b)
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b) + 1e-8)

print("Funkcje indeksowania załadowane")

## 5. Retriever - Wyszukiwanie Podobnych Części

In [None]:
def search_parts_by_text(db: Session, query: str, top_k: int = 5, category_filter: Optional[str] = None) -> List[tuple]:
    """
    Wyszukaj części na podstawie tekstu (opis, materiał, tagi)
    Zwraca: [(część, similarity_score), ...]
    """
    # Zkonwertuj zapytanie na embedding
    query_emb = get_text_embedding(query)
    
    # Załaduj wszystkie części z bazy
    all_parts = load_parts_from_db(db)
    
    # Oblicz similarity
    results = []
    for part in all_parts:
        # Filtr po kategorii
        if category_filter and part.category != category_filter:
            continue
        
        # Pobierz embedding
        part_emb = parse_embedding_from_db(part.text_embedding)
        if not part_emb:
            continue
        
        # Oblicz similarity
        score = cosine_similarity(query_emb, part_emb)
        results.append((part, score))
    
    # Sortuj i zwróć top-k
    results.sort(key=lambda x: x[1], reverse=True)
    return results[:top_k]


def search_parts_by_image(db: Session, image_path: str, top_k: int = 5, category_filter: Optional[str] = None) -> List[tuple]:
    """
    Wyszukaj części na podstawie obrazu
    """
    # Zkonwertuj obraz na embedding
    query_emb = get_image_embedding(image_path)
    if not query_emb:
        return []
    
    # Załaduj wszystkie części
    all_parts = load_parts_from_db(db)
    
    # Oblicz similarity
    results = []
    for part in all_parts:
        if category_filter and part.category != category_filter:
            continue
        
        part_emb = parse_embedding_from_db(part.image_embedding)
        if not part_emb:
            continue
        
        score = cosine_similarity(query_emb, part_emb)
        results.append((part, score))
    
    results.sort(key=lambda x: x[1], reverse=True)
    return results[:top_k]

def search_parts_hybrid(db: Session, text_query: str, image_path: Optional[str] = None, 
                       top_k: int = 5, category_filter: Optional[str] = None) -> List[tuple]:
    """
    Wyszukaj części hybrydowo (tekst + obraz)
    Kombinuje wyniki z wyszukiwania tekstowego i obrazowego
    """
    text_results = search_parts_by_text(db, text_query, top_k=top_k*2, category_filter=category_filter)
    
    if image_path and Path(image_path).exists():
        image_results = search_parts_by_image(db, image_path, top_k=top_k*2, category_filter=category_filter)
        
        # Połącz wyniki (średnia z obu wyszukiwań)
        combined = {}
        for part, score in text_results:
            combined[part.id] = {"part": part, "score": score * 0.6}  # 60% wagi dla tekstu
        
        for part, score in image_results:
            if part.id in combined:
                combined[part.id]["score"] += score * 0.4  # 40% wagi dla obrazu
            else:
                combined[part.id] = {"part": part, "score": score * 0.4}
        
        results = [(v["part"], v["score"]) for v in combined.values()]
    else:
        results = text_results
    
    results.sort(key=lambda x: x[1], reverse=True)
    return results[:top_k]

print("Retriever załadowany")

## 6. LLM - Generacja Raportu

In [None]:
# Załaduj LLM
print("Ładowanie modelu LLM...")
try:
    tokenizer = AutoTokenizer.from_pretrained(LLM_MODEL)
    llm_model = AutoModelForCausalLM.from_pretrained(LLM_MODEL, low_cpu_mem_usage=True)
    llm_model.to(DEVICE)
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    print(f"✓ LLM załadowany: {LLM_MODEL}")
except Exception as e:
    print(f"✗ Błąd ładowania LLM: {e}")
    llm_model = None
    tokenizer = None


def generate_report(results: List[tuple], query: str) -> str:
    """
    Wygeneruj raport o znalezionych częściach za pomocą LLM
    """
    if not llm_model or not tokenizer:
        # Fallback: zwróć prosty raport tekstowy
        return generate_simple_report(results, query)
    
    # Zbuduj kontekst
    context = "\n\n".join([
        f"Część {i+1}:\n"
        f"  ID: {part.part_id}\n"
        f"  Opis: {part.description}\n"
        f"  Materiał: {part.material}\n"
        f"  Kategoria: {part.category}\n"
        f"  Wymiary: {part.dimensions}\n"
        f"  Tagi: {', '.join(part.tags or [])}\n"
        f"  Dopasowanie: {score*100:.1f}%"
        for i, (part, score) in enumerate(results)
    ])
    
    prompt = f"""
    Użytkownik szuka części metalowych. Zapytanie: "{query}"
    
    Znalezione części:
    {context}
    
    Na podstawie wyników wyszukiwania, wygeneruj krótki, rzeczowy raport o znalezionych częściach.
    Wskaż, które części najlepiej pasują do zapytania i dlaczego.
    """
    
    # Tokenizuj i wygeneruj
    inputs = tokenizer(prompt, return_tensors="pt", max_length=512, truncation=True).to(DEVICE)
    
    with torch.no_grad():
        outputs = llm_model.generate(
            **inputs,
            max_new_tokens=256,
            do_sample=False,
            pad_token_id=tokenizer.eos_token_id
        )
    
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return response.split("Na podstawie wyników")[-1].strip() if "Na podstawie wyników" in response else response


def generate_simple_report(results: List[tuple], query: str) -> str:
    """
    Fallback: prosty raport tekstowy bez LLM
    """
    report = f"RAPORT WYSZUKIWANIA\n"
    report += f"Zapytanie: {query}\n"
    report += f"Znaleziono: {len(results)} części\n"
    report += "\n" + "="*60 + "\n\n"
    
    for i, (part, score) in enumerate(results, 1):
        report += f"#{i} (Dopasowanie: {score*100:.1f}%)\n"
        report += f"  ID: {part.part_id}\n"
        report += f"  Opis: {part.description}\n"
        report += f"  Materiał: {part.material}\n"
        report += f"  Kategoria: {part.category}\n"
        if part.dimensions:
            report += f"  Wymiary: {part.dimensions}\n"
        if part.tags:
            report += f"  Tagi: {', '.join(part.tags)}\n"
        report += "\n"
    
    return report

print("Funkcje generacji raportu załadowane")

## 7. Główna Funkcja RAG

In [None]:
def rag_search_metal_parts(query: str, query_type: str = "text", image_path: Optional[str] = None,
                          category_filter: Optional[str] = None, top_k: int = 5) -> Dict:
    """
    Główna funkcja RAG do wyszukiwania części metalowych
    
    Args:
        query: Tekstowe zapytanie lub opis części
        query_type: "text", "image", lub "hybrid"
        image_path: Ścieżka do obrazu (jeśli query_type="image" lub "hybrid")
        category_filter: Filtruj po kategorii (np. "fasteners")
        top_k: Ile części zwrócić
    
    Returns:
        {
            "query": str,
            "results": [(part_dict, score), ...],
            "report": str,
            "timestamp": str
        }
    """
    with SessionLocal() as db:
        # Wyszukiwanie
        if query_type == "text":
            results = search_parts_by_text(db, query, top_k=top_k, category_filter=category_filter)
        elif query_type == "image":
            results = search_parts_by_image(db, image_path, top_k=top_k, category_filter=category_filter)
        elif query_type == "hybrid":
            results = search_parts_hybrid(db, query, image_path=image_path, top_k=top_k, category_filter=category_filter)
        else:
            return {"error": f"Nieznany typ wyszukiwania: {query_type}"}
        
        # Konwertuj części na słowniki
        results_dicts = [
            ({
                "part_id": part.part_id,
                "description": part.description,
                "material": part.material,
                "category": part.category,
                "dimensions": part.dimensions,
                "tags": part.tags
            }, score)
            for part, score in results
        ]
        
        # Wygeneruj raport
        report = generate_simple_report(results, query)
        
        # Zapisz do logu
        log = SearchLog(
            query=query,
            query_type=query_type,
            top_k=top_k,
            results_count=len(results),
            timestamp=datetime.now().isoformat()
        )
        db.add(log)
        db.commit()
        
        return {
            "query": query,
            "query_type": query_type,
            "results": results_dicts,
            "report": report,
            "timestamp": datetime.now().isoformat()
        }

print("✓ Główna funkcja RAG załadowana")

## 8. Przykładowe Dane i Test

In [None]:
# Syntetyczne dane części metalowych
SAMPLE_PARTS = [
    MetalPart(
        part_id="SCR-M8-1.25-20",
        description="Śruba sześciokątna M8 ze stali nierdzewnej",
        material="Stal nierdzewna A2-70",
        category="fasteners",
        dimensions={"diameter_mm": 8.0, "length_mm": 20.0, "pitch_mm": 1.25},
        tags=["śruba", "metalowa", "nierdzewna", "heksagonalna", "M8"]
    ),
    MetalPart(
        part_id="SCR-M6-1.0-16",
        description="Śruba sześciokątna M6 ze stali zwykłej",
        material="Stal zwykła ocynkowana",
        category="fasteners",
        dimensions={"diameter_mm": 6.0, "length_mm": 16.0, "pitch_mm": 1.0},
        tags=["śruba", "metalowa", "ocynkowana", "heksagonalna", "M6"]
    ),
    MetalPart(
        part_id="BRG-6205-2RS",
        description="Łożysko kulkowe 6205-2RS",
        material="Stal chromowa",
        category="bearings",
        dimensions={"bore_mm": 25.0, "outer_mm": 52.0, "width_mm": 15.0},
        tags=["łożysko", "kulkowe", "metalowe", "6205"]
    ),
    MetalPart(
        part_id="SHF-12mm-300mm",
        description="Wał stalowy chromowany 12mm x 300mm",
        material="Stal chromowana",
        category="shafts",
        dimensions={"diameter_mm": 12.0, "length_mm": 300.0},
        tags=["wał", "stalowy", "chromowany", "okrągły"]
    ),
    MetalPart(
        part_id="SPN-1.2mm-500mm",
        description="Sprężyna naciągowa ze stali nierdzewnej",
        material="Stal nierdzewna A2-70",
        category="springs",
        dimensions={"wire_diameter_mm": 1.2, "free_length_mm": 500.0},
        tags=["sprężyna", "naciągowa", "nierdzewna", "metalowa"]
    )
]

# Dodaj przykładowe dane do bazy
with SessionLocal() as db:
    # Sprawdź, czy baza jest pusta
    existing = db.query(PartDB).count()
    if existing == 0:
        print(f"Dodawanie {len(SAMPLE_PARTS)} części do bazy...")
        for part in SAMPLE_PARTS:
            success = add_part_to_db(db, part)
            if success:
                print(f"  ✓ {part.part_id}: {part.description}")
        print(f"\n✓ Wszystkie przykładowe części dodane")
    else:
        print(f"✓ Baza zawiera już {existing} części")

## 9. Testy Wyszukiwania RAG

In [None]:
# Test 1: Wyszukiwanie tekstowe
print("\n" + "="*70)
print("TEST 1: Wyszukiwanie tekstowe")
print("="*70)

result1 = rag_search_metal_parts(
    query="Szukam śruby metalowej do połączenia części",
    query_type="text",
    top_k=3
)

print(f"\nQuery: {result1['query']}")
print(f"Typ: {result1['query_type']}")
print(f"\nRaport:\n{result1['report']}")

In [None]:
# Test 2: Wyszukiwanie ze filtrem kategorii
print("\n" + "="*70)
print("TEST 2: Wyszukiwanie ze filtrem kategorii (bearings)")
print("="*70)

result2 = rag_search_metal_parts(
    query="Łożysko do maszyny obrotowej",
    query_type="text",
    category_filter="bearings",
    top_k=2
)

print(f"\nQuery: {result2['query']}")
print(f"Filtr kategorii: bearings")
print(f"\nRaport:\n{result2['report']}")

In [None]:
# Test 3: Wyszukiwanie po konkretnych wymiarach
print("\n" + "="*70)
print("TEST 3: Wyszukiwanie po wymiarach")
print("="*70)

result3 = rag_search_metal_parts(
    query="Wał o średnicy 12mm ze stali chromowanej",
    query_type="text",
    top_k=3
)

print(f"\nQuery: {result3['query']}")
print(f"\nRaport:\n{result3['report']}")

## 10. Statystyki i Podsumowanie

In [None]:
# Wyświetl statystyki bazy
with SessionLocal() as db:
    total_parts = db.query(PartDB).count()
    by_category = {}
    for part in db.query(PartDB).all():
        cat = part.category or "unknown"
        by_category[cat] = by_category.get(cat, 0) + 1
    
    search_logs = db.query(SearchLog).all()
    
    print("\n" + "="*70)
    print("STATYSTYKI BAZY DANYCH")
    print("="*70)
    print(f"Łączna liczba części: {total_parts}")
    print(f"\nPodzielenie po kategoriach:")
    for cat, count in by_category.items():
        print(f"  - {cat}: {count}")
    print(f"\nLiczba przeprowadzonych wyszukiwań: {len(search_logs)}")
    
    if search_logs:
        print(f"\nOstatnie wyszukiwania:")
        for log in search_logs[-5:]:
            print(f"  - {log.query_type}: '{log.query}' → {log.results_count} wyników")