In [None]:
import re
import requests
import pandas as pd
import xml.etree.ElementTree as ET
import csv
from urllib.parse import urlparse, parse_qs

# === SCRAPER ===
def extract_mid(url_or_iframe: str) -> str:
    mid_match = re.search(r"mid=([a-zA-Z0-9\-_]+)", url_or_iframe)
    if not mid_match:
        raise ValueError("Nie udało się znaleźć mid w podanym URL/iframe.")
    return mid_match.group(1)

def scrape_kml(mid: str) -> pd.DataFrame:
    url = f"https://www.google.com/maps/d/u/0/kml?forcekml=1&mid={mid}"
    r = requests.get(url)
    r.raise_for_status()

    root = ET.fromstring(r.content)
    ns = {"kml": "http://www.opengis.net/kml/2.2"}

    records = []
    for pm in root.findall(".//kml:Placemark", ns):
        name = pm.findtext("kml:name", default="", namespaces=ns)
        desc = pm.findtext("kml:description", default="", namespaces=ns)
        coords = pm.find(".//kml:coordinates", ns)

        lon, lat, *_ = coords.text.strip().split(",") if coords is not None else (None, None)

        record = {
            "Title": name,
            "Description": desc,
            "Longitude": lon,
            "Latitude": lat
        }

        # ExtendedData
        ext = pm.find(".//kml:ExtendedData", ns)
        if ext is not None:
            for data in ext.findall("kml:Data", ns):
                key = data.attrib.get("name")
                val = data.findtext("kml:value", default="", namespaces=ns)
                record[key] = val

        records.append(record)

    return pd.DataFrame(records)

# === CLEANER ===
def clean_csv(input_filename: str, output_filename: str):
    try:
        with open(input_filename, 'r', encoding='utf-8') as csvfile:
            dialect = csv.Sniffer().sniff(csvfile.read(1024))
            csvfile.seek(0)
            separator = dialect.delimiter
            print(f"Wykryto separator kolumn: '{separator}'")

        df = pd.read_csv(input_filename, sep=separator)
        print("\nPoprawnie wczytano plik. Nagłówki kolumn:")
        print(list(df.columns))

        # Kolumny do dropa
        columns_to_drop = ['Description', 'Longitude', 'Latitude']
        existing_columns_to_drop = [col for col in columns_to_drop if col in df.columns]

        if existing_columns_to_drop:
            df = df.drop(columns=existing_columns_to_drop)
            print(f"\nUsunięto kolumny: {', '.join(existing_columns_to_drop)}")
        else:
            print("\nBrak kolumn do usunięcia.")

        # Drop pustych kolumn
        initial_column_count = len(df.columns)
        df = df.dropna(axis=1, how='all')
        final_column_count = len(df.columns)
        if initial_column_count > final_column_count:
            print("Usunięto puste kolumny.")
        else:
            print("Nie było całkowicie pustych kolumn.")

        df.to_csv(output_filename, index=False, encoding='utf-8-sig')
        print(f"\n✅ Oczyszczone dane zapisane w '{output_filename}' ({len(df)} rekordów).")

        print("\nPodgląd 5 pierwszych wierszy:")
        print(df.head())

    except FileNotFoundError:
        print(f"Błąd: Plik '{input_filename}' nie został znaleziony.")
    except Exception as e:
        print(f"Wystąpił nieoczekiwany błąd: {e}")

# === ENTRYPOINT COLAB ===
if __name__ == "__main__":
    INPUT = "https://www.google.com/maps/d/embed?mid=1GDQK9SeuuqI07MFnngUMASRKvb0ZG4U&ehbc=2E312F"
    MAP_ID = extract_mid(INPUT)
    print(f"Using MAP_ID: {MAP_ID}")

    # Scraping
    df = scrape_kml(MAP_ID)
    raw_file = "edge_scraped.csv"
    df.to_csv(raw_file, index=False, encoding="utf-8-sig")
    print(f"✅ Surowe dane zapisane do '{raw_file}' ({len(df)} rekordów).")

    # Cleaning
    clean_csv(raw_file, "oczyszczone_dane.csv")


In [None]:
# main_run.py
import sys
import os
import subprocess

# --- Konfiguracja ---
INPUT_CSV_PATH = "oczyszczone_dane.csv"
OUTPUT_CSV_PATH = "wyniki_analizy.csv"  
SCRIPT_TO_RUN = "Agent_Run.py"

# --- Logika skryptu ---

def main():
    """Główna funkcja uruchomieniowa."""

    print("--- PRZYGOTOWANIE DO URUCHOMIENIA ANALIZY ---", flush=True)

    # 1. Sprawdzenia plików
    if not os.path.exists(INPUT_CSV_PATH):
        print(f"❌ BŁĄD: Plik wejściowy '{INPUT_CSV_PATH}' nie został znaleziony.")
        print("     Upewnij się, że plik istnieje i jest w tym samym katalogu.")
        sys.exit(1)

    if not os.path.exists(SCRIPT_TO_RUN):
        print(f"❌ BŁĄD: Główny skrypt '{SCRIPT_TO_RUN}' nie został znaleziony.")
        print("     Sprawdź ścieżkę do skryptu run_analysis.py.")
        sys.exit(1)

    # 2. Przygotowanie polecenia
    command = [
        sys.executable,
        "-u",  # Wymusza tryb bez buforowania
        SCRIPT_TO_RUN,
        "--input-file",
        INPUT_CSV_PATH,
        "--output-file",
        OUTPUT_CSV_PATH
    ]

    print("✅ Konfiguracja poprawna. Rozpoczynam analizę...")
    print(f"▶️ Uruchamiam polecenie: {' '.join(command)}\n")

    # 3. Uruchomienie procesu i przechwytywanie wyjścia na żywo
    try:
        with subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            text=True,
            encoding='utf-8',
            bufsize=1
        ) as process:
            for line in iter(process.stdout.readline, ''):
                print(line, end='')

        if process.returncode != 0:
            raise subprocess.CalledProcessError(process.returncode, process.args)

        print(f"\n--- Analiza zakończona pomyślnie ---")
        if os.path.exists(OUTPUT_CSV_PATH):
            print(f"🎉 Wyniki zostały zapisane w pliku: {os.path.abspath(OUTPUT_CSV_PATH)}")
        else:
            print(f"🤔 OSTRZEŻENIE: Proces zakończył się, ale plik wyjściowy '{OUTPUT_CSV_PATH}' nie został utworzony.")

    except subprocess.CalledProcessError:
        print(f"\n❌ BŁĄD: Wystąpił problem podczas wykonywania skryptu '{SCRIPT_TO_RUN}'.")
        print("     Sprawdź powyższe logi, aby znaleźć przyczynę błędu.")
    except KeyboardInterrupt:
        print("\n⏹️ Analiza przerwana przez użytkownika.")
    except Exception as e:
        print(f"\n❌ BŁĄD: Wystąpił nieoczekiwany błąd: {e}")
    
    print("\n--- SKRYPT GŁÓWNY ZAKOŃCZONY ---", flush=True)

if __name__ == "__main__":
    main()

## llama.cpp ROCm -->TEST

In [None]:
"""
Ulepszona wersja skryptu do symulacji rozmowy między modelami LLM.

Główne zmiany w wersji 8.0 (Lepsze Prompty i Parser):
1.  **Inteligentny Parser Odpowiedzi (`_parse_and_validate_response`)**:
    Zastępuje prostą funkcję czyszczącą. Potrafi teraz aktywnie usuwać
    całe bloki "myśli" (np. <think>...</think>) i wyodrębniać właściwą
    odpowiedź z potencjalnie zaszumionego wyjścia modelu.
2.  **Nowa Technika Promptowania ("Role-playing with examples")**:
    Prompt został całkowicie przebudowany. Zamiast listy suchych instrukcji,
    teraz jasno definiuje rolę agenta i, co najważniejsze, zawiera krótki
    przykład idealnej interakcji (tzw. few-shot prompting). Jest to
    znacznie skuteczniejsza metoda instruowania modeli.
3.  **Zwiększona Odporność**: Połączenie lepszego promptu na wejściu
    i inteligentniejszego parsera na wyjściu powinno radykalnie zwiększyć
    odporność symulacji na "lekkie" błędy modeli i zapewnić płynniejszy
    przebieg rozmowy.
"""
import logging
import asyncio
from pathlib import Path
from typing import Optional, Dict, Any, List, Tuple
from dataclasses import dataclass, field
import sys
import os
import numpy as np
import re
import time
import nest_asyncio

# --- Sprawdzenie dostępności llama_cpp i definicje klas pozornych ---
try:
    from llama_cpp import Llama, LlamaCache, LLAMA_SPLIT_MODE_NONE
    LLAMA_CPP_AVAILABLE = True
except ImportError:
    logging.warning("Biblioteka 'llama_cpp' nie znaleziona. Używam obiektów pozornych (mock).")
    LLAMA_CPP_AVAILABLE = False
    
    class Llama:
        def __init__(self, model_path: str = "mock_path", **kwargs):
            self._model_name = Path(model_path).name
            logging.info(f"[MOCK] Inicjalizacja Llama dla modelu: {self._model_name}")
        def create_completion(self, prompt: str, **kwargs) -> Dict[str, Any]:
            response_text = f"<think>Model {self._model_name} myśli...</think> A oto dynamiczna, pozorowana odpowiedź po polsku."
            return {'choices': [{'text': response_text}]}
        def embed(self, text: str) -> List[float]:
            logging.info(f"[MOCK] Tworzenie osadzeń dla tekstu: '{text[:40]}...'")
            np.random.seed(sum(ord(c) for c in text))
            return np.random.randn(128).tolist()

    class LlamaCache: pass
    LLAMA_SPLIT_MODE_NONE = 0

# --- Aliasy i Konfiguracja (bez zmian) ---
ConversationHistory = List[Tuple[str, str]]
MemoryEntry = Tuple[str, str, List[float]]

@dataclass
class LlamaParams:
    n_gpu_layers: int = -1; main_gpu: int = 0; n_ctx: int = 4096
    n_threads: int = 8; embedding: bool = False; verbose: bool = False

@dataclass
class ModelConfig:
    path: str; is_embedding: bool = False

@dataclass
class ModelsConfig:
    """Konfiguracja wszystkich modeli."""
    models: Dict[str, ModelConfig] = field(default_factory=lambda: {
        "base": ModelConfig(path=os.getenv("BASE_MODEL_PATH", "/run/media/dominik/F2B452FDB452C42F/LLM models/Qwen3-Esper3-Reasoning-Instruct-6B-Brainstorm20x-Enhanced-E32-192k-ctx.i1-Q4_K_M.gguf")),
        "finance_instruct": ModelConfig(path=os.getenv("FINANCE_MODEL_PATH", "/run/media/dominik/F2B452FDB452C42F/LLM models/qwen3-4B-Claude-Sonnet-4-Reasoning-Distill_Q4_K_M.gguf")),
        "embedding": ModelConfig(path=os.getenv("EMBEDDING_MODEL_PATH", "/run/media/dominik/F2B452FDB452C42F/LLM models/nomic-embed-text-v1.5-Q4_K_M.gguf"), is_embedding=True)
    })

@dataclass
class AppConfig:
    models: ModelsConfig = field(default_factory=ModelsConfig)
    llama_params: LlamaParams = field(default_factory=LlamaParams)
    max_tokens: int = 256
    embedding_cache_size: int = 100
    memory_retrieval_threshold: float = 0.65
    repeat_penalty: float = 1.15

logging.basicConfig(level=logging.INFO, format="[%(asctime)s][%(name)s][%(levelname)s] %(message)s", force=True)
logger = logging.getLogger(__name__)

def cosine_similarity(v1: List[float], v2: List[float]) -> float:
    vec1, vec2 = np.array(v1), np.array(v2)
    dot = np.dot(vec1, vec2); norm = np.linalg.norm(vec1) * np.linalg.norm(vec2)
    return dot / norm if norm > 0 else 0.0

def preprocess_text(text: str) -> str:
    return text.strip().replace("\n", " ")[:512]

# --- Zarządzanie Modelami (bez zmian) ---
class ModelLoader:
    def __init__(self, config: AppConfig):
        self.config = config; self.models: Dict[str, Llama] = {}
        self.locks: Dict[str, asyncio.Lock] = {}; self.embedding_cache: Dict[str, List[float]] = {}

    def _setup_single_model(self, model_name: str, model_config: ModelConfig) -> Optional[Llama]:
        path = Path(model_config.path)
        if not model_config.path or (LLAMA_CPP_AVAILABLE and not path.is_file()):
            logger.warning(f"Plik modelu '{model_name}' nie istnieje. Używam mocka.")
            return Llama(model_path=model_config.path, embedding=model_config.is_embedding)
        try:
            params = self.config.llama_params
            llm = Llama(model_path=model_config.path, n_gpu_layers=params.n_gpu_layers,
                        main_gpu=params.main_gpu, n_ctx=params.n_ctx, n_threads=params.n_threads,
                        embedding=model_config.is_embedding, split_mode=LLAMA_SPLIT_MODE_NONE,
                        cache=LlamaCache() if LLAMA_CPP_AVAILABLE else None, verbose=params.verbose)
            logger.info(f"Model '{model_name}' wczytany poprawnie.")
            return llm
        except Exception as e:
            logger.error(f"Błąd ładowania modelu '{model_name}': {e}"); return None

    async def load_all_models(self) -> bool:
        load_tasks = {name: asyncio.to_thread(self._setup_single_model, name, mc) for name, mc in self.config.models.models.items()}
        results = await asyncio.gather(*load_tasks.values())
        all_loaded = True
        for (name, _), model in zip(load_tasks.items(), results):
            if model: self.models[name], self.locks[name] = model, asyncio.Lock()
            else: all_loaded = False
        return all_loaded

    def unload_all_models(self):
        self.models.clear(); self.locks.clear(); self.embedding_cache.clear()

    def get_model_and_lock(self, name: str) -> Optional[Tuple[Llama, asyncio.Lock]]:
        return self.models.get(name), self.locks.get(name)

# --- Pamięć Wektorowa (bez zmian) ---
class VectorMemory:
    def __init__(self):
        self.entries: List[MemoryEntry] = []

    def add_entry(self, speaker: str, text: str, embedding: List[float]):
        self.entries.append((speaker, text, embedding))
        logger.info(f"[Pamięć] Dodano wpis od '{speaker}'. Całkowita liczba wpisów: {len(self.entries)}.")

    def search_similar(self, query_embedding: List[float], top_k: int = 2, threshold: float = 0.65) -> List[MemoryEntry]:
        if not self.entries: return []
        similarities = [cosine_similarity(query_embedding, entry[2]) for entry in self.entries]
        sorted_indices = np.argsort(similarities)[::-1]
        results = []
        for i in sorted_indices:
            if len(results) >= top_k: break
            if similarities[i] >= threshold and i < len(self.entries) - 1:
                results.append(self.entries[i])
        return results

# --- Agent Konwersacyjny (ZMODYFIKOWANY) ---
class ConversationalAgent:
    def __init__(self, name: str, model_loader: ModelLoader, completion_model: str, memory: VectorMemory):
        self.name = name; self.model_loader = model_loader
        self.completion_model = completion_model; self.memory = memory

    async def _get_embedding(self, text: str) -> Optional[List[float]]:
        model, lock = self.model_loader.get_model_and_lock("embedding")
        if not model: return None
        preprocessed = preprocess_text(text)
        if preprocessed in self.model_loader.embedding_cache:
            return self.model_loader.embedding_cache[preprocessed]
        try:
            async with lock:
                embedding = await asyncio.to_thread(model.embed, preprocessed)
                if len(self.model_loader.embedding_cache) < self.model_loader.config.embedding_cache_size:
                    self.model_loader.embedding_cache[preprocessed] = embedding
                return embedding
        except Exception as e:
            logger.error(f"Błąd tworzenia osadzenia dla '{text[:30]}...': {e}"); return None

    def _parse_and_validate_response(self, text: str) -> Optional[str]:
        """Inteligentnie parsuje i waliduje odpowiedź modelu."""
        # 1. Usuń bloki <think>
        cleaned_text = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL).strip()
        
        # 2. Podziel na linie i weź ostatnią niepustą - często to jest właściwa odpowiedź
        lines = [line.strip() for line in cleaned_text.split('\n') if line.strip()]
        if not lines:
            logger.warning("Odpowiedź odrzucona (pusta po parsowaniu).")
            return None
        
        final_response = lines[-1]
        
        # 3. Dodatkowe czyszczenie artefaktów
        final_response = re.sub(r'^\s*(Użytkownik|Agent Ogólny|Agent Finansowy):\s*', '', final_response, flags=re.IGNORECASE).strip()
        
        # 4. Walidacja
        if len(final_response) < 15:
            logger.warning(f"Odpowiedź odrzucona (za krótka po parsowaniu): '{final_response}'")
            return None
        
        return final_response

    async def _generate_response(self, history: ConversationHistory, retrieved_memories: List[MemoryEntry]) -> Optional[str]:
        model, lock = self.model_loader.get_model_and_lock(self.completion_model)
        if not model: return None
        
        formatted_history = "\n".join([f"{s}: {t}" for s, t in history[-4:]])
        
        memory_section = "Brak."
        if retrieved_memories:
            memory_texts = [f"- {s} wcześniej powiedział: '{t}'" for s, t, _ in retrieved_memories]
            memory_section = "\n".join(memory_texts)

        prompt = (
            f"Twoim jedynym zadaniem jest odegrać rolę asystenta AI o nazwie '{self.name}' w rozmowie.\n"
            f"Odpowiedź MUSI być JEDYNIE tekstem wypowiedzi w języku polskim. NIE dodawaj swoich myśli, tagów ani niczego innego.\n\n"
            f"### PRZYKŁAD POPRAWNEJ ODPOWIEDZI:\n"
            f"Użytkownik: Myślę o nowym projekcie.\n"
            f"{self.name}: Brzmi interesująco, czy możesz opowiedzieć o nim coś więcej?\n\n"
            f"### KONIEC PRZYKŁADU ###\n\n"
            f"--- AKTUALNA ROZMOWA ---\n"
            f"### Kluczowe fakty z tej rozmowy:\n{memory_section}\n\n"
            f"### Ostatnie wiadomości:\n{formatted_history}\n\n"
            f"--- KONIEC ROZMOWY ---\n\n"
            f"{self.name}:"
        )

        try:
            async with lock:
                stop = [f"{name}:" for name in ["Użytkownik", "Agent Ogólny", "Agent Finansowy"]]
                output = await asyncio.to_thread(model.create_completion, prompt,
                    max_tokens=self.model_loader.config.max_tokens,
                    temperature=0.7,
                    top_p=0.9,
                    stop=stop,
                    repeat_penalty=self.model_loader.config.repeat_penalty
                )
                raw_text = output['choices'][0]['text'].strip()
                return self._parse_and_validate_response(raw_text)
        except Exception as e:
            logger.error(f"Błąd generowania odpowiedzi: {e}"); return None

    async def respond(self, history: ConversationHistory) -> Optional[str]:
        if not history: return None
        last_utterance = history[-1][1]
        query_embedding = await self._get_embedding(last_utterance)
        if not query_embedding:
            logger.warning("Nie udało się stworzyć wektora zapytania. Odpowiadam bez pamięci.")
            return await self._generate_response(history, [])
        retrieved_memories = self.memory.search_similar(query_embedding, top_k=2, threshold=self.model_loader.config.memory_retrieval_threshold)
        if retrieved_memories:
            logger.info(f"[{self.name}] Odnaleziono {len(retrieved_memories)} istotnych wspomnień.")
        else:
            logger.info(f"[{self.name}] Brak istotnych wspomnień, odpowiadam na podstawie bieżącego kontekstu.")
        return await self._generate_response(history, retrieved_memories)

# --- Symulator Rozmowy (bez zmian) ---
class ConversationSimulator:
    def __init__(self, agents: List[ConversationalAgent], memory: VectorMemory, initial_prompt: Tuple[str, str], max_turns: int = 4):
        self.agents = agents; self.memory = memory
        self.history: ConversationHistory = []
        self.initial_prompt = initial_prompt
        self.max_turns = max_turns

    async def initialize(self):
        speaker, text = self.initial_prompt
        embedding = await self.agents[0]._get_embedding(text)
        if embedding:
            self.history.append((speaker, text))
            self.memory.add_entry(speaker, text, embedding)
        else:
            raise RuntimeError("Nie udało się zwektoryzować początkowego promptu.")

    async def run(self):
        await self.initialize()
        logger.info("--- Rozpoczynam symulację rozmowy (v8 Lepsze Prompty i Parser) ---")
        logger.info(f"\033[1;33m{self.history[0][0]}: {self.history[0][1]}\033[0m")
        
        current_agent_idx = 0
        for _ in range(self.max_turns * len(self.agents)):
            agent = self.agents[current_agent_idx]
            response = await agent.respond(self.history)
            
            if response:
                embedding = await agent._get_embedding(response)
                if embedding:
                    self.history.append((agent.name, response))
                    self.memory.add_entry(agent.name, response, embedding)
                    color = "\033[1;34m" if current_agent_idx == 0 else "\033[1;32m"
                    logger.info(f"{color}{agent.name}: {response}\033[0m")
                else:
                    logger.warning(f"Nie udało się zwektoryzować ZWALIDOWANEJ odpowiedzi od {agent.name}.")
            else:
                logger.warning(f"{agent.name} nie wygenerował poprawnej odpowiedzi. Koniec tury.")
            
            current_agent_idx = (current_agent_idx + 1) % len(self.agents)
            await asyncio.sleep(1)
        logger.info("--- Koniec symulacji rozmowy ---")

# --- Główna Funkcja Aplikacji (bez zmian) ---
async def main():
    config = AppConfig()
    loader = ModelLoader(config)
    try:
        if await loader.load_all_models():
            vector_memory = VectorMemory()
            agent_a = ConversationalAgent("Agent Ogólny", loader, "base", vector_memory)
            agent_b = ConversationalAgent("Agent Finansowy", loader, "finance_instruct", vector_memory)
            
            initial_prompt = ("Użytkownik", "Nasz nowy system logistyczny oparty na AI zredukował koszty o 20%, ale wymaga drogiej infrastruktury chmurowej. Zastanawiam się nad jego rentownością.")
            simulator = ConversationSimulator([agent_a, agent_b], vector_memory, initial_prompt, max_turns=3)
            await simulator.run()
    finally:
        loader.unload_all_models()

if __name__ == "__main__":
    if 'ipykernel' in sys.modules:
        nest_asyncio.apply()
    asyncio.run(main())


[2025-09-05 16:40:23,009][__main__][INFO] Model 'embedding' wczytany poprawnie.
llama_context: n_ctx_per_seq (4096) < n_ctx_train (40960) -- the full capacity of the model will not be utilized
[2025-09-05 16:40:23,260][__main__][INFO] Model 'finance_instruct' wczytany poprawnie.
llama_context: n_ctx_per_seq (4096) < n_ctx_train (196608) -- the full capacity of the model will not be utilized
[2025-09-05 16:40:23,419][__main__][INFO] Model 'base' wczytany poprawnie.
init: embeddings required but some input tokens were not marked as outputs -> overriding
[2025-09-05 16:40:23,460][__main__][INFO] [Pamięć] Dodano wpis od 'Użytkownik'. Całkowita liczba wpisów: 1.
[2025-09-05 16:40:23,460][__main__][INFO] --- Rozpoczynam symulację rozmowy (v7 Higiena Pamięci) ---
[2025-09-05 16:40:23,461][__main__][INFO] [1;33mUżytkownik: Nasz nowy system logistyczny oparty na AI zredukował koszty o 20%, ale wymaga drogiej infrastruktury chmurowej. Zastanawiam się nad jego rentownością.[0m
[2025-09-05 16:40

## Web Scraper - TEST

In [None]:
import asyncio
import logging
import os
import random
import time
from datetime import datetime
from typing import Optional, List, Dict, Any
from urllib.parse import urlparse, quote_plus, parse_qs, unquote

import httpx
import cloudscraper
from selectolax.parser import HTMLParser
from trafilatura import extract
import nest_asyncio

# Nowy, potężniejszy arsenał
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
from playwright_stealth import Stealth

# Zakładamy, że plik Agent_Config.py z klasą AppConfig istnieje w tym samym folderze
from Agent_Config import AppConfig

# --- Konfiguracja ---
config = AppConfig()

# --- Ustawienia Logowania ---
logging.basicConfig(level=logging.INFO, format="[%(asctime)s][%(levelname)s] %(message)s")
for logger_name in ["httpx", "selenium", "urllib3", "playwright"]:
    logging.getLogger(logger_name).setLevel(logging.WARNING)
logger = logging.getLogger("ULTIMATE_SCRAPER")

# --- Katalog na Zrzuty Ekranu ---
SCREENSHOT_DIR = "screenshots"
os.makedirs(SCREENSHOT_DIR, exist_ok=True)


class UltimateScraper:
    def __init__(self, app_config: AppConfig):
        self.config = app_config
        self._client: Optional[httpx.AsyncClient] = None
        self._semaphore = asyncio.Semaphore(self.config.network.CONCURRENCY_LIMIT)
        self.last_ip_rotation = 0
        self.search_engines = [
            {
                "name": "DuckDuckGo HTML",
                "url": "https://html.duckduckgo.com/html/?q={query}",
                "selector": "a.result__a",
                "decode": True
            }
        ]

    async def _rotate_ip(self) -> bool:
        """Zmienia adres IP Tora, wysyłając sygnał NEWNYM."""
        if not self.config.tor.USE_TOR or time.time() - self.last_ip_rotation < 10:
            return False

        logger.info("🧅 Wysyłam żądanie nowej tożsamości Tor (NEWNYM)...")
        writer = None
        try:
            reader, writer = await asyncio.open_connection(
                self.config.tor.TOR_CONTROL_HOST, self.config.tor.TOR_CONTROL_PORT
            )
            auth_command = f'AUTHENTICATE "{self.config.tor.TOR_CONTROL_PASSWORD}"\r\n'.encode()
            writer.write(auth_command)
            await writer.drain()
            if b"250 OK" not in await reader.read(4096):
                logger.error("❌ Błąd autoryzacji w ControlPort Tora.")
                return False

            writer.write(b"SIGNAL NEWNYM\r\n")
            await writer.drain()
            if b"250 OK" in await reader.read(4096):
                logger.info("✅ Nowa tożsamość Tora przyznana. Resetuję sesję klienta.")
                if self._client and not self._client.is_closed:
                    await self._client.aclose()
                self._client = None
                self.last_ip_rotation = time.time()
                await asyncio.sleep(self.config.tor.NEWNYM_INTERVAL)
                return True
            else:
                logger.error("❌ Nie udało się wysłać sygnału NEWNYM.")
                return False
        except Exception as e:
            logger.error(f"❌ Krytyczny błąd podczas rotacji IP Tora: {e}")
            return False
        finally:
            if writer:
                writer.close()
                await writer.wait_closed()

    async def _get_client(self) -> httpx.AsyncClient:
        if self._client is None or self._client.is_closed:
            proxy = self.config.tor.TOR_SOCKS_PROXY if self.config.tor.USE_TOR else None
            transport = httpx.AsyncHTTPTransport(proxy=proxy) if proxy else None
            self._client = httpx.AsyncClient(transport=transport, verify=False, timeout=self.config.network.REQ_TIMEOUT, follow_redirects=True, http2=True)
            logger.info(f"Zainicjowano nową sesję klienta httpx. Proxy: {proxy if transport else 'Brak'}")
        return self._client

    def _decode_ddg_url(self, url: str) -> str:
        if "duckduckgo.com/l/" in url:
            try:
                qs = parse_qs(urlparse(url).query); decoded = qs.get('uddg', [''])[0]
                if decoded: return unquote(decoded)
            except Exception: return url
        return url

    async def search(self, query: str) -> List[str]:
        client = await self._get_client()
        for engine in self.search_engines:
            logger.info(f"Wyszukuję '{query}' używając {engine['name']}...")
            search_url = engine['url'].format(query=quote_plus(query))
            try:
                response = await client.get(search_url, headers={"User-Agent": random.choice(self.config.network.USER_AGENTS)})
                if response.status_code == 200:
                    links = set()
                    parser = HTMLParser(response.text)
                    for node in parser.css(engine['selector']):
                        href = node.attributes.get('href')
                        if href:
                            url = self._decode_ddg_url(href) if engine['decode'] else href
                            if url.startswith('http'): links.add(url)
                    if links:
                        logger.info(f"✅ {engine['name']} znalazł {len(links)} linków.")
                        return list(links)
                else:
                    logger.warning(f"{engine['name']} zwrócił status {response.status_code}. Próbuję następny silnik...")
            except Exception as e:
                logger.error(f"Błąd podczas wyszukiwania w {engine['name']}: {e}. Próbuję następny silnik...")
        logger.error(f"Wszystkie silniki wyszukiwania zawiodły dla zapytania: '{query}'.")
        return []

    def _extract_content(self, html: str) -> Optional[str]:
        content = extract(html, include_comments=False, include_tables=True)
        if not content:
            logger.warning("Ekstrakcja treści zwróciła pusty wynik.")
        else:
            logger.info(f"Ekstrakcja treści udana, długość: {len(content)} znaków.")
        return content

    def _fetch_playwright_sync(self, url: str) -> Optional[str]:
        """Ostateczna metoda pobierania, używająca Playwright w trybie stealth zgodnie z dokumentacją."""
        
        with sync_playwright() as p:
            # Konfigurujemy ustawienia proxy dla przeglądarki
            proxy_settings = { "server": self.config.tor.TOR_SOCKS_PROXY } if self.config.tor.USE_TOR else None
            
            browser = p.chromium.launch(headless=True, proxy=proxy_settings)
            context = browser.new_context(
                user_agent=random.choice(self.config.network.USER_AGENTS),
                viewport={'width': 1920, 'height': 1080}
            )
            page = context.new_page()
            
            stealth = Stealth()
            stealth.apply_stealth_sync(page)

            try:
                logger.info(f"Playwright (Stealth) nawiguje do: {url}")
                page.goto(url, timeout=90000, wait_until='networkidle')
                
                html = page.content()
                timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
                safe_url = urlparse(url).netloc.replace('.', '_')
                page.screenshot(path=os.path.join(SCREENSHOT_DIR, f"playwright_{safe_url}_{timestamp}.png"))
                
                return html if html and len(html) > 500 else None

            except PlaywrightTimeoutError as e:
                logger.error(f"Playwright TimeoutError: {e}")
                return None
            except Exception as e:
                logger.error(f"Playwright ogólny błąd: {e}")
                return None
            finally:
                browser.close()

    async def fetch(self, url: str, max_retries: int = 3) -> Optional[Dict[str, Any]]:
        for retry in range(max_retries):
            logger.info(f"Próba fetch {retry + 1}/{max_retries} dla {url}")
            async with self._semaphore:
                try:
                    # Etap 1: httpx
                    logger.info(f"[1/3] Próba pobrania (httpx): {url}")
                    client = await self._get_client()
                    response = await client.get(url, headers={"User-Agent": random.choice(self.config.network.USER_AGENTS)})
                    if response.status_code == 200 and "js-challenge" not in response.text:
                        content = self._extract_content(response.text)
                        if content:
                            logger.info(f"✅ Sukces (httpx): {url}")
                            return {"url": url, "content": content, "source": "httpx"}
                    else:
                        logger.warning(f"httpx zwrócił status {response.status_code} dla {url}")
                except (httpx.ProxyError, httpx.ConnectTimeout) as e:
                    logger.error(f"httpx błąd sieci/proxy dla {url}: {e}. Rotuję IP i ponawiam.")
                    if self.config.tor.USE_TOR:
                        await self._rotate_ip()
                    continue
                except Exception as e:
                    logger.error(f"httpx napotkał błąd dla {url}: {e}")

                # Etap 2: cloudscraper
                logger.warning(f"[2/3] httpx nie powiódł się. Próba (ulepszony cloudscraper): {url}")
                try:
                    loop = asyncio.get_running_loop()
                    scraper = cloudscraper.create_scraper(
                        browser={'browser': 'chrome', 'platform': 'windows', 'desktop': True},
                        delay=10, 
                        enable_stealth=True, 
                        stealth_options={'human_like_delays': True, 'randomize_headers': True, 'browser_quirks': True},
                        auto_refresh_on_403=True,
                        max_403_retries=2,
                        rotate_tls_ciphers=True
                    )
                    proxies = {"http": self.config.tor.TOR_SOCKS_PROXY, "https": self.config.tor.TOR_SOCKS_PROXY} if self.config.tor.USE_TOR else None
                    response = await loop.run_in_executor(None, lambda: scraper.get(url, proxies=proxies, timeout=self.config.network.REQ_TIMEOUT))
                    if response.status_code == 200:
                        content = self._extract_content(response.text)
                        if content:
                            logger.info(f"✅ Sukces (cloudscraper): {url}")
                            return {"url": url, "content": content, "source": "cloudscraper_v2"}
                    else:
                        logger.warning(f"cloudscraper zwrócił status {response.status_code} dla {url}")
                except (httpx.ProxyError, httpx.ConnectTimeout) as e:
                    logger.error(f"cloudscraper błąd sieci/proxy dla {url}: {e}. Rotuję IP i ponawiam.")
                    if self.config.tor.USE_TOR:
                        await self._rotate_ip()
                    continue
                except Exception as e:
                    logger.error(f"Ulepszony cloudscraper również zawiódł dla {url}: {e}")

                # Etap 3: Playwright
                logger.warning(f"[3/3] cloudscraper nie powiódł się. Ostateczna próba (Playwright): {url}")
                try:
                    loop = asyncio.get_running_loop()
                    html = await asyncio.wait_for(
                        loop.run_in_executor(None, self._fetch_playwright_sync, url),
                        timeout=180.0  # Zwiększony timeout dla wolnych połączeń Tor
                    )
                    if html:
                        content = self._extract_content(html)
                        if content:
                            logger.info(f"✅ Sukces (Playwright): {url}")
                            return {"url": url, "content": content, "source": "playwright"}
                except asyncio.TimeoutError:
                    logger.error(f"❌ Ostateczna próba (playwright) dla {url} przekroczyła limit czasu 180s. Rotuję IP i ponawiam.")
                    if self.config.tor.USE_TOR:
                        await self._rotate_ip()
                    continue
                except Exception as e:
                    logger.error(f"❌ Ostateczna próba (playwright) dla {url} nie powiodła się: {e}")

            logger.warning(f"Wszystkie etapy nie powiodły się w próbie {retry + 1}. Ponawiam...")

        logger.critical(f"Pobranie {url} nie powiodło się po {max_retries} próbach.")
        return None
            
    async def run(self, query: str, max_links: int = 7, failure_threshold: int = 4):
        start_time = time.time()
        await self.get_public_ip()
        
        links = await self.search(query)
        if not links:
            logger.critical("Nie udało się znaleźć żadnych linków. Zatrzymuję pracę.")
            return

        ROTATE_EVERY_N_REQUESTS = 2
        fetch_counter = 0
        failed_count = 0
        tasks = [self.fetch(url) for url in links[:max_links]]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        valid_results = []
        for i, res in enumerate(results):
            fetch_counter += 1
            if isinstance(res, Exception):
                logger.error(f"Pobranie linku nr {i+1} ({links[i]}) nie powiodło się: {res}")
                failed_count += 1
            elif res is None:
                logger.error(f"Pobranie linku nr {i+1} ({links[i]}) zwróciło None.")
                failed_count += 1
            else:
                logger.info(f"Dodano wynik dla {links[i]} (Źródło: {res['source']}, Długość treści: {len(res['content'])})")
                valid_results.append(res)
            
            if failed_count >= failure_threshold:
                logger.critical(f"Przekroczono próg błędów ({failed_count}/{len(tasks)}). Rotuję IP.")
                if self.config.tor.USE_TOR:
                    await self._rotate_ip()
                break

            if fetch_counter % ROTATE_EVERY_N_REQUESTS == 0 and fetch_counter < len(tasks):
                logger.info(f"Osiągnięto próg {ROTATE_EVERY_N_REQUESTS} zapytań. Prewencyjna rotacja IP.")
                if self.config.tor.USE_TOR:
                    await self._rotate_ip()

        logger.info(f"\n--- ZAKOŃCZONO W {time.time() - start_time:.2f}s ---")
        logger.info(f"Pomyślnie pobrano {len(valid_results)} z {len(tasks)} stron.")
        
        if not valid_results:
            logger.warning("Brak wyników do wyświetlenia.")
        else:
            logger.info(f"Liczba ważnych wyników: {len(valid_results)}")
            for i, doc in enumerate(valid_results):
                logger.info(f"Wyświetlam dokument {i+1} (Źródło: {doc['source']})")
                logger.info(f"\n===== DOKUMENT {i+1} (Źródło: {doc['source']}) =====\nURL: {doc['url']}\nTreść: {doc['content'][:500]}...\n")
        
        if self._client and not self._client.is_closed:
            await self._client.aclose()
            
    # Metoda get_public_ip bez zmian...
    async def get_public_ip(self) -> Optional[str]:
        try:
            client = await self._get_client()
            response = await client.get("https://check.torproject.org/api/ip")
            response.raise_for_status()
            data = response.json()
            logger.info(f"Kontrola IP: Twój publiczny adres IP to {data.get('IP')} (Używasz Tora: {data.get('IsTor', False)})")
            return data.get('IP')
        except Exception as e:
            logger.error(f"Błąd podczas kontroli IP: {e}")
            return None

async def main():
    app_config = AppConfig()
    scraper = UltimateScraper(app_config)
    await scraper.run(query="CLEOPATRA PREMIUM SPÓŁKA Z OGRANICZONĄ ODPOWIEDZIALNOŚCIĄ")

if __name__ == "__main__":
    nest_asyncio.apply()
    asyncio.run(main())