#Install Required Dependencies

In [None]:
!pip install -q -U transformers datasets peft accelerate bitsandbytes
!pip install -q sentence-transformers scikit-learn
!pip install -q beautifulsoup4 requests PyPDF2 pdfplumber
!pip install -q rouge-score nltk sacrebleu
!pip install -q fastapi uvicorn python-multipart
!pip install -q wandb mlflow
!pip install -q pyngrok nest_asyncio

print("Installation complete!")

#Import Libraries

In [None]:
import os
import json
import re
import time
from datetime import datetime
from typing import List, Dict, Tuple, Optional
import logging

# Data processing
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

# Web scraping and PDF
import requests
from bs4 import BeautifulSoup
import PyPDF2
import pdfplumber

# ML and NLP
import torch
from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    TrainingArguments,
    Trainer,
    BitsAndBytesConfig,
    DataCollatorForLanguageModeling,
    pipeline
)
from datasets import Dataset, DatasetDict
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity

# Evaluation
from rouge_score import rouge_scorer
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

import gc

print("Libraries imported successfully!")

Libraries imported successfully!
Configuration initialized!
Base Model: Qwen/Qwen2.5-3B-Instruct
Data Directory: ./data
GPU Available: True
GPU Name: Tesla T4
GPU Memory: 15.8 GB


#Initialize Configuration

In [None]:
class Config:
    """Central configuration for the entire pipeline"""

    # Domain and task
    DOMAIN = "electric vehicle charging stations"
    USE_CASE = "question_answering"

    # Directories
    DATA_DIR = "./data"
    OUTPUT_DIR = "./outputs"
    MODEL_DIR = "./models"
    RAW_PDF_DIR = "./data/raw"

    # Model configuration
    BASE_MODEL = "Qwen/Qwen2.5-3B-Instruct"
    QA_GENERATOR_MODEL = "Qwen/Qwen2.5-3B-Instruct"

    # Training parameters
    LORA_R = 16
    LORA_ALPHA = 32
    LORA_DROPOUT = 0.05
    LEARNING_RATE = 2e-4
    NUM_EPOCHS = 3
    BATCH_SIZE = 4  
    GRADIENT_ACCUMULATION_STEPS = 4
    MAX_SEQ_LENGTH = 512

    # Data processing
    CHUNK_SIZE = 500
    CHUNK_OVERLAP = 50
    DEDUP_THRESHOLD = 0.85

    # QA generation
    QUESTIONS_PER_TEXT = 5
    MIN_ANSWER_LENGTH = 20

    # Evaluation
    NUM_EVAL_SAMPLES = 50

config = Config()

# Create directories
os.makedirs(config.DATA_DIR, exist_ok=True)
os.makedirs(config.OUTPUT_DIR, exist_ok=True)
os.makedirs(config.MODEL_DIR, exist_ok=True)
os.makedirs(config.RAW_PDF_DIR, exist_ok=True)

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

print(f"Configuration initialized!")
print(f"Base Model: {config.BASE_MODEL}")
print(f"Data Directory: {config.DATA_DIR}")
print(f"GPU Available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU Name: {torch.cuda.get_device_name(0)}")
    print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")

#Data Sources

In [None]:
# Web URLs for scraping
EV_URLS = {
    # Wikipedia sources - high quality, well-structured
    "wiki_charging_station": "https://en.wikipedia.org/wiki/Charging_station",
    "wiki_ev": "https://en.wikipedia.org/wiki/Electric_vehicle",
    "wiki_chademo": "https://en.wikipedia.org/wiki/CHAdeMO",
    "wiki_ccs": "https://en.wikipedia.org/wiki/Combined_Charging_System",
    "wiki_tesla_supercharger": "https://en.wikipedia.org/wiki/Tesla_Supercharger",
    "wiki_j1772": "https://en.wikipedia.org/wiki/SAE_J1772",
    "wiki_ev_battery": "https://en.wikipedia.org/wiki/Electric_vehicle_battery",
    "iea_report": "https://www.iea.org/reports/global-ev-outlook-2025/electric-vehicle-charging",
    "electrek_guide": "https://electrek.co/guides/dc-fast-charging/",
}

PDF_PATHS = [
    "/content/data/raw/CEER-Report-on-Electric-Vehicles_v2.pdf",
    "/content/data/raw/Charging_ahead_Accelerating_the_roll-out_of_EU_electric_vehicle_charging_infrastructure.pdf",
    "/content/data/raw/EV4EU-Deliverable-1.1.pdf",
    "/content/data/raw/EV_Charger_Selection_Guide_2018-01-112.pdf",
    "/content/data/raw/GlobalEVOutlook2025.pdf",
    "/content/data/raw/Research-Whitepaper-A-European-EV-Charging-Infrastructure-Masterplan.pdf",
    "/content/data/raw/charging-infrastructure-masterplan-ii.pdf",
    "/content/data/raw/roland_berger_ev_charging_index_deep_dive_germany.pdf",
    "/content/data/raw/97fff524db.pdf"
]

print(f"Configured {len(EV_URLS)} web sources")
print(f"Configured {len(PDF_PATHS)} PDF sources")
print("\nWeb sources:")
for name, url in EV_URLS.items():
    print(f"  - {name}")
print("\nPDF sources:")
for path in PDF_PATHS:
    exists = "EXISTS" if os.path.exists(path) else "NOT FOUND"
    print(f"  - {os.path.basename(path)} [{exists}]")

Configured 9 web sources
Configured 9 PDF sources

Web sources:
  - wiki_charging_station
  - wiki_ev
  - wiki_chademo
  - wiki_ccs
  - wiki_tesla_supercharger
  - wiki_j1772
  - wiki_ev_battery
  - iea_report
  - electrek_guide

PDF sources:
  - CEER-Report-on-Electric-Vehicles_v2.pdf [EXISTS]
  - Charging_ahead_Accelerating_the_roll-out_of_EU_electric_vehicle_charging_infrastructure.pdf [EXISTS]
  - EV4EU-Deliverable-1.1.pdf [EXISTS]
  - EV_Charger_Selection_Guide_2018-01-112.pdf [EXISTS]
  - GlobalEVOutlook2025.pdf [EXISTS]
  - Research-Whitepaper-A-European-EV-Charging-Infrastructure-Masterplan.pdf [EXISTS]
  - charging-infrastructure-masterplan-ii.pdf [EXISTS]
  - roland_berger_ev_charging_index_deep_dive_germany.pdf [EXISTS]
  - 97fff524db.pdf [EXISTS]


#Data Collection Module - Web Scraper

In [None]:
class RealDataCollector:
    """Collect real data from web and PDF sources"""

    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.session = requests.Session()

        # Enhanced headers to avoid blocking
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.9',
            'Accept-Encoding': 'gzip, deflate, br',
            'DNT': '1',
            'Connection': 'keep-alive',
        })

    def scrape_webpage(self, url: str, source_name: str = "") -> Optional[Dict]:
        """Extract content from a webpage"""
        try:
            self.logger.info(f"Scraping: {source_name} - {url}")

            # Delay to avoid rate limiting
            time.sleep(2)

            response = self.session.get(url, timeout=30)
            response.raise_for_status()

            soup = BeautifulSoup(response.content, 'html.parser')

            # Remove unwanted elements
            for element in soup(['script', 'style', 'nav', 'footer',
                                'header', 'aside', 'form', 'iframe',
                                'noscript', 'meta', 'link', 'button']):
                element.decompose()

            # Extract content
            content = self._extract_content(soup, url)

            if len(content) < 200:
                self.logger.warning(f"Very short content from {url} ({len(content)} chars)")
                return None

            self.logger.info(f"Success: {len(content)} characters extracted")

            return {
                "source": url,
                "source_name": source_name,
                "content": content,
                "scraped_at": datetime.now().isoformat(),
                "type": "web",
                "content_length": len(content)
            }

        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 403:
                self.logger.error(f"Blocked (403): {url}")
            elif e.response.status_code == 404:
                self.logger.error(f"Not found (404): {url}")
            else:
                self.logger.error(f"HTTP error {e.response.status_code}: {url}")
            return None

        except Exception as e:
            self.logger.error(f"Error scraping {url}: {e}")
            return None

    def _extract_content(self, soup: BeautifulSoup, url: str) -> str:
        """Extract content intelligently based on site type"""

        # Wikipedia has special handling
        if "wikipedia.org" in url:
            return self._extract_wikipedia(soup)

        # General websites
        return self._extract_general(soup)

    def _extract_wikipedia(self, soup: BeautifulSoup) -> str:
        """Extract Wikipedia content"""
        content_div = soup.find('div', {'id': 'mw-content-text'})

        if not content_div:
            return ""

        paragraphs = []

        # Extract paragraphs
        for p in content_div.find_all('p'):
            text = p.get_text(strip=True)
            # Ignore short paragraphs and references
            if len(text) > 50 and not text.startswith('['):
                # Remove reference numbers [1], [2], etc.
                text = re.sub(r'\[\d+\]', '', text)
                paragraphs.append(text)

        # Extract headings and important lists
        for heading in content_div.find_all(['h2', 'h3']):
            title = heading.get_text(strip=True)
            if title and len(title) > 3:
                paragraphs.append(f"\n{title}:")

        for li in content_div.find_all('li'):
            text = li.get_text(strip=True)
            if 30 < len(text) < 500:
                text = re.sub(r'\[\d+\]', '', text)
                paragraphs.append(text)

        content = ' '.join(paragraphs)
        content = re.sub(r'\s+', ' ', content)

        return content.strip()

    def _extract_general(self, soup: BeautifulSoup) -> str:
        """Extract general website content"""
        # Look for main content
        main_selectors = [
            'article', 'main', '.content', '.main-content',
            '#content', '#main', '.post-content', '.entry-content',
            '.article-body', '[role="main"]', '.page-content'
        ]

        main_content = None
        for selector in main_selectors:
            main_content = soup.select_one(selector)
            if main_content:
                break

        if not main_content:
            main_content = soup.body if soup.body else soup

        paragraphs = []
        for element in main_content.find_all(['p', 'li', 'h1', 'h2', 'h3', 'h4']):
            text = element.get_text(strip=True)
            if len(text) > 40:
                paragraphs.append(text)

        content = ' '.join(paragraphs)
        content = re.sub(r'\s+', ' ', content)

        return content.strip()

    def extract_pdf(self, pdf_path: str) -> Optional[Dict]:
        """Extract text from PDF file"""
        try:
            if not os.path.exists(pdf_path):
                self.logger.error(f"File not found: {pdf_path}")
                return None

            self.logger.info(f"Extracting PDF: {pdf_path}")

            text_content = []

            with pdfplumber.open(pdf_path) as pdf:
                total_pages = len(pdf.pages)
                self.logger.info(f"  Pages: {total_pages}")

                for page_num, page in enumerate(pdf.pages):
                    try:
                        # Extract text
                        text = page.extract_text()
                        if text:
                            text_content.append(text)

                        # Extract tables
                        tables = page.extract_tables()
                        for table in tables:
                            table_text = self._table_to_text(table)
                            if table_text:
                                text_content.append(table_text)

                    except Exception as e:
                        self.logger.warning(f"Error on page {page_num}: {e}")
                        continue

            full_content = "\n".join(text_content)

            # Clean up
            full_content = re.sub(r'\s+', ' ', full_content)

            self.logger.info(f"Extracted {len(full_content)} characters from {total_pages} pages")

            return {
                "source": pdf_path,
                "source_name": os.path.basename(pdf_path),
                "content": full_content.strip(),
                "extracted_at": datetime.now().isoformat(),
                "type": "pdf",
                "num_pages": total_pages,
                "content_length": len(full_content)
            }

        except Exception as e:
            self.logger.error(f"PDF error: {e}")
            return None

    def _table_to_text(self, table: List) -> str:
        """Convert table to text"""
        if not table:
            return ""
        rows = []
        for row in table:
            if row:
                cells = [str(cell).strip() if cell else "" for cell in row]
                row_text = " | ".join(c for c in cells if c)
                if row_text:
                    rows.append(row_text)
        return " ".join(rows)

    def collect_from_urls(self, urls_dict: Dict[str, str]) -> List[Dict]:
        """Collect data from list of URLs"""
        results = []

        self.logger.info(f"\nCollecting data from {len(urls_dict)} URLs")

        for name, url in urls_dict.items():
            result = self.scrape_webpage(url, name)
            if result:
                results.append(result)
            time.sleep(1)

        self.logger.info(f"\nSuccess: {len(results)}/{len(urls_dict)} URLs")
        return results

    def collect_from_pdfs(self, pdf_paths: List[str]) -> List[Dict]:
        """Collect data from list of PDF files"""
        results = []

        self.logger.info(f"\nCollecting data from {len(pdf_paths)} PDFs")

        for pdf_path in pdf_paths:
            result = self.extract_pdf(pdf_path)
            if result:
                results.append(result)

        self.logger.info(f"\nSuccess: {len(results)}/{len(pdf_paths)} PDFs")
        return results

    def collect_all(self, urls: Dict[str, str], pdfs: List[str]) -> List[Dict]:
        """Collect all data from web and PDFs"""
        all_data = []

        # Collect from web
        web_data = self.collect_from_urls(urls)
        all_data.extend(web_data)

        # Collect from PDFs
        pdf_data = self.collect_from_pdfs(pdfs)
        all_data.extend(pdf_data)

        # Save raw data
        output_path = os.path.join(config.DATA_DIR, "raw_collected_data.json")
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(all_data, f, ensure_ascii=False, indent=2)

        self.logger.info(f"\nTotal collected sources: {len(all_data)}")
        self.logger.info(f"Saved to: {output_path}")

        return all_data

print("RealDataCollector class defined successfully!")

RealDataCollector class defined successfully!


#Data Processing Module

In [None]:
class DataProcessor:
    """Process and clean collected data"""

    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.embedder = None

    def clean_text(self, text: str) -> str:
        """Clean and normalize text"""
        if not text:
            return ""

        # Remove extra whitespace
        text = re.sub(r'\s+', ' ', text)

        # Remove special characters but keep punctuation
        text = re.sub(r'[^\w\s.,!?;:\'\"-]', ' ', text)

        # Remove extra whitespace again
        text = re.sub(r'\s+', ' ', text)

        return text.strip()

    def chunk_text(self, text: str,
                   chunk_size: int = None,
                   overlap: int = None) -> List[str]:
        """Split text into chunks with overlap"""
        if not text:
            return []

        # Use config defaults if not provided
        chunk_size = chunk_size or config.CHUNK_SIZE
        overlap = overlap or config.CHUNK_OVERLAP

        # Split into sentences
        sentences = re.split(r'(?<=[.!?])\s+', text)

        chunks = []
        current_chunk = []
        current_length = 0

        for sentence in sentences:
            sentence = sentence.strip()
            if not sentence:
                continue

            words = len(sentence.split())

            if current_length + words > chunk_size and current_chunk:
                chunk_text = ' '.join(current_chunk)
                if len(chunk_text) > 100:  # Ignore very small chunks
                    chunks.append(chunk_text)

                # Keep last 1-2 sentences for overlap
                overlap_sentences = current_chunk[-2:] if len(current_chunk) > 2 else []
                current_chunk = overlap_sentences + [sentence]
                current_length = sum(len(s.split()) for s in current_chunk)
            else:
                current_chunk.append(sentence)
                current_length += words

        # Add last chunk
        if current_chunk:
            chunk_text = ' '.join(current_chunk)
            if len(chunk_text) > 100:
                chunks.append(chunk_text)

        return chunks

    def deduplicate(self, texts: List[str],
                   threshold: float = None) -> List[str]:
        """Remove similar texts using semantic similarity"""
        if not texts or len(texts) < 2:
            return texts

        threshold = threshold or config.DEDUP_THRESHOLD

        self.logger.info(f"Deduplicating {len(texts)} texts...")

        try:
            # Load embedder if not already loaded
            if self.embedder is None:
                self.embedder = SentenceTransformer('all-MiniLM-L6-v2')

            # Generate embeddings
            embeddings = self.embedder.encode(texts, show_progress_bar=True)

            # Find unique texts
            unique_indices = [0]
            unique_embeddings = [embeddings[0]]

            for i in range(1, len(texts)):
                similarities = cosine_similarity([embeddings[i]], unique_embeddings)[0]
                if max(similarities) < threshold:
                    unique_indices.append(i)
                    unique_embeddings.append(embeddings[i])

            unique_texts = [texts[i] for i in unique_indices]
            self.logger.info(f"Reduced from {len(texts)} to {len(unique_texts)} unique texts")

            return unique_texts

        except Exception as e:
            self.logger.warning(f"Deduplication failed: {e}")
            return texts

    def process_data(self, raw_data: List[Dict]) -> pd.DataFrame:
        """Process raw collected data"""
        if not raw_data:
            self.logger.error("No data to process!")
            return pd.DataFrame()

        self.logger.info(f"\nProcessing {len(raw_data)} sources")

        all_chunks = []

        for item in raw_data:
            content = item.get('content', '')
            if not content:
                continue

            # Clean text
            cleaned = self.clean_text(content)

            # Split into chunks
            chunks = self.chunk_text(cleaned)

            for chunk in chunks:
                all_chunks.append({
                    'source': item.get('source', 'unknown'),
                    'source_name': item.get('source_name', 'unknown'),
                    'type': item.get('type', 'unknown'),
                    'text': chunk,
                    'text_length': len(chunk),
                    'processed_at': datetime.now().isoformat()
                })

        self.logger.info(f"Total chunks before deduplication: {len(all_chunks)}")

        # Deduplicate
        if all_chunks:
            texts = [c['text'] for c in all_chunks]
            unique_texts = self.deduplicate(texts)

            # Filter to keep only unique texts
            unique_set = set(unique_texts)
            all_chunks = [c for c in all_chunks if c['text'] in unique_set]

        df = pd.DataFrame(all_chunks)

        self.logger.info(f"Final chunks: {len(df)}")

        return df

print("DataProcessor class defined successfully!")

DataProcessor class defined successfully!


#Execute Data Collection and Processing

In [None]:
print("-"*40)
print("Starting Data Collection Pipeline")
print("-"*40)

# Initialize collector
collector = RealDataCollector()

# Check which PDFs exist
existing_pdfs = [p for p in PDF_PATHS if os.path.exists(p)]
missing_pdfs = [p for p in PDF_PATHS if not os.path.exists(p)]

if missing_pdfs:
    print(f"\nWarning: {len(missing_pdfs)} PDF files not found:")
    for p in missing_pdfs:
        print(f"  - {p}")
    print()

print(f"\nCollecting from:")
print(f"  - {len(EV_URLS)} web sources")
print(f"  - {len(existing_pdfs)} PDF files")
print()

# Collect all data
raw_data = collector.collect_all(EV_URLS, existing_pdfs)

if not raw_data:
    print("ERROR: No data was collected!")
else:
    print(f"\n{'-'*70}")
    print(f"Collection Summary:")
    print(f"{'-'*70}")
    print(f"Total sources collected: {len(raw_data)}")

    # Show breakdown by type
    types = {}
    for item in raw_data:
        t = item.get('type', 'unknown')
        types[t] = types.get(t, 0) + 1

    for t, count in types.items():
        print(f"  - {t}: {count}")

    total_chars = sum(item.get('content_length', 0) for item in raw_data)
    print(f"Total characters: {total_chars:,}")

# Process the collected data
print(f"\n{'-'*70}")
print("Processing Data")
print("-"*70)

processor = DataProcessor()
processed_df = processor.process_data(raw_data)

if processed_df.empty:
    print("ERROR: Data processing failed!")
else:
    # Save processed data
    output_path = os.path.join(config.DATA_DIR, "processed_chunks.csv")
    processed_df.to_csv(output_path, index=False)
    print(f"\nProcessed data saved to: {output_path}")

    # Show statistics
    print(f"\n{'-'*70}")
    print("Processing Summary:")
    print("-"*70)
    print(f"Total chunks: {len(processed_df)}")
    print(f"Average chunk length: {processed_df['text_length'].mean():.0f} characters")
    print(f"Total words: {sum(len(t.split()) for t in processed_df['text']):,}")

    print(f"\nDistribution by source type:")
    for source_type, count in processed_df['type'].value_counts().items():
        print(f"  - {source_type}: {count} chunks")

    # Show sample
    print(f"\n{'-'*70}")
    print("Sample Chunks:")
    print("-"*70)
    for idx, row in processed_df.head(3).iterrows():
        print(f"\nChunk {idx+1}:")
        print(f"Source: {row['source_name']}")
        print(f"Text: {row['text'][:200]}...")
        print("-"*50)

print("\nData collection and processing complete!")

----------------------------------------
Starting Data Collection Pipeline
----------------------------------------

Collecting from:
  - 9 web sources
  - 9 PDF files



ERROR:__main__:Blocked (403): https://www.iea.org/reports/global-ev-outlook-2025/electric-vehicle-charging



----------------------------------------------------------------------
Collection Summary:
----------------------------------------------------------------------
Total sources collected: 15
  - web: 6
  - pdf: 9
Total characters: 1,462,223

----------------------------------------------------------------------
Processing Data
----------------------------------------------------------------------


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Batches:   0%|          | 0/19 [00:00<?, ?it/s]


Processed data saved to: ./data/processed_chunks.csv

----------------------------------------------------------------------
Processing Summary:
----------------------------------------------------------------------
Total chunks: 510
Average chunk length: 3011 characters
Total words: 240,365

Distribution by source type:
  - pdf: 504 chunks
  - web: 6 chunks

----------------------------------------------------------------------
Sample Chunks:
----------------------------------------------------------------------

Chunk 1:
Source: wiki_ev
Text: Anelectric vehicle EV is anymotorizedvehiclewhosepropulsionis provided fully or mostly byelectric power, viagrid electricityor from onboardrechargeable batteries.EVs encompass a wide range of transpor...
--------------------------------------------------

Chunk 2:
Source: wiki_chademo
Text: CHAdeMOis afast-charging systemforbattery electric vehicles, developed in 2010 by the CHAdeMO Association, formed by theTokyo Electric Power Companyand five

#QA Generation

In [None]:
def clear_memory():
    """Clear GPU and system memory"""
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

class QAGenerator:
    """Generate question-answer pairs using a language model"""

    def __init__(self, model_name: str = None):
        self.model_name = model_name or config.QA_GENERATOR_MODEL
        self.model = None
        self.tokenizer = None
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.logger = logging.getLogger(__name__)

    def load_model(self):
        """Load the QA generation model"""
        print(f"Loading QA model: {self.model_name}")
        print(f"Device: {self.device}")

        self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        self.model = AutoModelForCausalLM.from_pretrained(
            self.model_name,
            torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
            device_map="auto"
        )

        print("Model loaded successfully!")

    def generate_questions(self, context: str,
                          num_questions: int = None) -> List[str]:
        """Generate questions from context"""
        num_questions = num_questions or config.QUESTIONS_PER_TEXT

        # Prompts for diverse questions
        prompts = [
            f"Generate a specific factual question about EV charging based on this text: {context[:800]}",
            f"What is an important technical question about electric vehicle charging that can be answered from: {context[:800]}",
            f"Create a 'what is' question about EV charging infrastructure based on: {context[:800]}",
            f"Generate a 'how does' question about electric vehicle charging from: {context[:800]}",
            f"Ask a question about charging standards or protocols based on: {context[:800]}",
            f"Create a comparison question about EV charging levels from: {context[:800]}",
            f"Generate a question about charging costs or time based on: {context[:800]}",
        ]

        questions = []

        for prompt in prompts[:num_questions]:
            try:
                inputs = self.tokenizer(
                    prompt,
                    return_tensors="pt",
                    max_length=512,
                    truncation=True
                ).to(self.device)

                with torch.no_grad():
                    outputs = self.model.generate(
                        **inputs,
                        max_new_tokens=100,
                        num_beams=4,
                        temperature=0.8,
                        do_sample=True,
                        #top_p=0.9,
                        no_repeat_ngram_size=2,
                        pad_token_id=self.tokenizer.eos_token_id
                    )

                question = self.tokenizer.decode(outputs[0], skip_special_tokens=True)

                # Extract generated question after the prompt
                if prompt in question:
                    question = question.split(prompt, 1)[-1].strip()

                # Clean and validate
                question = question.strip()
                if question and len(question) > 10:
                    # Add question mark if missing
                    if not question.endswith('?') and len(question.split()) > 3:
                        question += '?'
                    questions.append(question)

            except Exception as e:
                self.logger.warning(f"Error generating question: {e}")
                continue

        return list(set(questions))  # Remove duplicates

    def generate_answer(self, context: str, question: str) -> str:
        """Generate answer for a question based on context"""

        # Use consistent format with fine-tuning
        prompt = f"""<|system|>
You are an expert assistant for electric vehicle charging stations. Answer questions accurately and concisely based on the given context.</s>
<|user|>
Context: {context[:1200]}

Question: {question}</s>
<|assistant|>
"""

        try:
            inputs = self.tokenizer(
                prompt,
                return_tensors="pt",
                max_length=512,
                truncation=True
            ).to(self.device)

            with torch.no_grad():
                outputs = self.model.generate(
                    **inputs,
                    max_new_tokens=200,
                    num_beams=4,
                    temperature=0.7,
                    do_sample=True,
                    top_p=0.9,
                    repetition_penalty=1.2,
                    pad_token_id=self.tokenizer.eos_token_id,
                    eos_token_id=self.tokenizer.eos_token_id
                )

            answer = self.tokenizer.decode(outputs[0], skip_special_tokens=True)

            # Extract only assistant's response
            if "<|assistant|>" in answer:
                answer = answer.split("<|assistant|>", 1)[-1].strip()

            # Clean up
            answer = answer.replace("<|eot_id|>", "").strip()

            return answer

        except Exception as e:
            self.logger.warning(f"Error generating answer: {e}")
            return ""

    def create_qa_dataset(self, texts: List[str],
                         questions_per_text: int = None) -> List[Dict]:
        """Create QA dataset from texts"""
        if self.model is None:
            self.load_model()

        questions_per_text = questions_per_text or config.QUESTIONS_PER_TEXT

        qa_pairs = []
        total_texts = len(texts)

        print(f"\n{'-'*60}")
        print(f"Generating QA from {total_texts} texts")
        print(f"{'-'*60}\n")

        for idx, text in enumerate(texts):
            print(f"Processing text {idx+1}/{total_texts}...")

            # Generate questions
            questions = self.generate_questions(text, questions_per_text)
            print(f"  Generated {len(questions)} questions")

            # Generate answers
            for q_idx, question in enumerate(questions):
                answer = self.generate_answer(text, question)

                if answer and len(answer) > config.MIN_ANSWER_LENGTH:
                    qa_pairs.append({
                        'context': text,
                        'question': question,
                        'answer': answer,
                        'source_idx': idx
                    })
                    print(f"    Q{q_idx+1}: {question[:60]}...")

            # Clear memory every 3 texts
            if (idx + 1) % 3 == 0:
                clear_memory()

        print(f"\n{'-'*60}")
        print(f"Generated {len(qa_pairs)} QA pairs")
        print(f"{'-'*60}\n")

        return qa_pairs

    def add_manual_qa(self, qa_pairs: List[Dict]) -> List[Dict]:
        """Add manually curated QA pairs for quality"""

        manual_qa = [
            {
                "context": "EV charging levels and standards",
                "question": "What are the main differences between Level 1, Level 2, and Level 3 EV charging?",
                "answer": "Level 1 charging uses standard 120V household outlets and provides 2-5 miles of range per hour, suitable for overnight charging. Level 2 uses 240V and delivers 10-30 miles per hour, ideal for home or workplace charging. Level 3, also called DC Fast Charging, provides 60-100 miles in just 20 minutes, designed for quick public charging stops."
            },
            {
                "context": "EV charging standards",
                "question": "What is CHAdeMO and which vehicles use it?",
                "answer": "CHAdeMO is a DC fast charging standard developed in Japan. It can deliver up to 62.5 kW of power and is primarily used by Japanese electric vehicles such as the Nissan Leaf, Mitsubishi Outlander PHEV, and earlier models of the Kia Soul EV. The name comes from 'Charge de Move', which is a play on the Japanese phrase 'O cha demo ikaga desuka' meaning 'How about a cup of tea?', suggesting the time it takes to charge."
            },
            {
                "context": "EV charging standards",
                "question": "What is CCS (Combined Charging System)?",
                "answer": "CCS is a fast-charging standard that combines AC and DC charging capabilities in a single connector. There are two variants: CCS1 used in North America and CCS2 used in Europe. It can deliver up to 350 kW of power and is supported by most European and American automakers including BMW, Volkswagen, Ford, GM, and Hyundai."
            },
            {
                "context": "EV charging time",
                "question": "How long does it typically take to fully charge an electric vehicle?",
                "answer": "Charging time varies significantly based on the charger type and battery size. With Level 1 charging, expect 8-24 hours for a full charge. Level 2 charging typically takes 4-8 hours. DC Fast Charging can charge most EVs to 80% in just 20-40 minutes, though charging slows down after 80% to protect the battery. The exact time also depends on the vehicle's battery capacity and its current charge level."
            },
            {
                "context": "AC vs DC charging",
                "question": "What is the difference between AC and DC charging for electric vehicles?",
                "answer": "AC (Alternating Current) charging uses the vehicle's onboard charger to convert AC power from the grid to DC power for the battery. This conversion process limits charging speed but is more common and less expensive. DC (Direct Current) charging bypasses the onboard charger and delivers DC power directly to the battery, enabling much faster charging speeds of up to 350 kW. DC charging requires more sophisticated and expensive charging equipment."
            },
            {
                "context": "Tesla Supercharger network",
                "question": "What is Tesla's Supercharger network?",
                "answer": "Tesla Supercharger is Tesla's proprietary DC fast charging network with over 45,000 charging stations worldwide. Superchargers can deliver up to 250 kW of power, enabling Tesla vehicles to charge to 80% in approximately 20-30 minutes. The network was initially exclusive to Tesla vehicles but is now opening to other EV brands in select regions through the use of adapters."
            },
            {
                "context": "J1772 standard",
                "question": "What is the J1772 charging connector?",
                "answer": "J1772, officially known as SAE J1772, is the North American standard for AC charging. It supports both Level 1 and Level 2 charging with power delivery up to 19.2 kW. Almost all electric vehicles sold in North America, except Tesla (which uses an adapter), use this connector for AC charging. The connector has five pins: two for AC power, one for ground, and two for communication between the vehicle and charger."
            },
            {
                "context": "EV charging costs",
                "question": "How much does it cost to charge an electric vehicle?",
                "answer": "Home charging typically costs between $0.10-0.20 per kWh depending on local electricity rates, resulting in $8-15 for a full charge for most EVs. Public Level 2 charging ranges from free to $0.50 per kWh. DC fast charging is usually more expensive at $0.25-0.50 per kWh, and some networks charge by the minute instead of by energy delivered. Overall, charging an EV costs significantly less than filling a gas tank for equivalent range."
            },
            {
                "context": "Range anxiety",
                "question": "What is range anxiety and how is it being addressed?",
                "answer": "Range anxiety is the fear that an electric vehicle will run out of battery power before reaching a charging station. This concern is diminishing as modern EVs achieve ranges exceeding 300 miles on a single charge, charging infrastructure expands rapidly, and mobile apps help drivers locate nearby charging stations and plan routes. Features like real-time range estimation and charging station availability have further reduced range anxiety."
            },
            {
                "context": "Home EV charger installation",
                "question": "Can I install a home EV charger and what does it involve?",
                "answer": "Yes, you can install a Level 2 home charger, which requires a 240V outlet similar to what's used for electric dryers. The total cost typically ranges from $500-2000 including equipment and professional installation. Many utility companies offer rebates or incentives for home charger installation. It's essential to have a licensed electrician perform the installation to ensure safety and compliance with local electrical codes. The installation usually takes 3-4 hours."
            },
        ]

        qa_pairs.extend(manual_qa)
        print(f"Added {len(manual_qa)} manually curated QA pairs")

        return qa_pairs

print("QAGenerator class defined successfully!")

QAGenerator class defined successfully!


#Execute QA Generation

In [None]:
print("-"*70)
print("Starting QA Generation")
print("-"*70)

# Check if we have processed data
if 'processed_df' not in dir() or processed_df is None or processed_df.empty:
    # Try to load from file
    csv_path = os.path.join(config.DATA_DIR, "processed_chunks.csv")
    if os.path.exists(csv_path):
        processed_df = pd.read_csv(csv_path)
        print(f"Loaded processed data: {len(processed_df)} chunks")
    else:
        print("ERROR: No processed data found!")
        print("Please run the data collection pipeline first (Cell 7)")
        raise ValueError("No processed data available")

# Get texts for QA generation
# For faster testing, you can limit the number of texts
# For production, use all texts
USE_SAMPLE = True  # Set to False to use all texts
SAMPLE_SIZE = 20   # Number of texts to use for testing

if USE_SAMPLE:
    print(f"\nUsing sample of {SAMPLE_SIZE} texts for faster generation")
    print("Set USE_SAMPLE = False to use all texts")
    texts = processed_df['text'].tolist()[:SAMPLE_SIZE]
else:
    texts = processed_df['text'].tolist()
    print(f"\nUsing all {len(texts)} texts")

# Initialize QA generator
qa_gen = QAGenerator(model_name=config.QA_GENERATOR_MODEL)

# Generate QA pairs
print(f"\nGenerating {config.QUESTIONS_PER_TEXT} questions per text...")
qa_pairs = qa_gen.create_qa_dataset(texts, questions_per_text=config.QUESTIONS_PER_TEXT)

# Add manual QA pairs
qa_pairs = qa_gen.add_manual_qa(qa_pairs)

# Save QA dataset
qa_path = os.path.join(config.DATA_DIR, "qa_dataset.json")
with open(qa_path, 'w', encoding='utf-8') as f:
    json.dump(qa_pairs, f, ensure_ascii=False, indent=2)

print(f"\n{'-'*70}")
print("QA Generation Summary:")
print("-"*70)
print(f"Total QA pairs: {len(qa_pairs)}")
print(f"Saved to: {qa_path}")

# Show statistics
avg_question_len = np.mean([len(qa['question']) for qa in qa_pairs])
avg_answer_len = np.mean([len(qa['answer']) for qa in qa_pairs])

print(f"\nStatistics:")
print(f"  Average question length: {avg_question_len:.0f} characters")
print(f"  Average answer length: {avg_answer_len:.0f} characters")

# Show samples
print(f"\n{'-'*70}")
print("Sample QA Pairs:")
print("-"*70)

for i, qa in enumerate(qa_pairs[:5], 1):
    print(f"\nQA Pair {i}:")
    print(f"Q: {qa['question']}")
    print(f"A: {qa['answer'][:200]}..." if len(qa['answer']) > 200 else f"A: {qa['answer']}")
    print("-"*50)

# Clear memory
del qa_gen
clear_memory()

print("\nQA generation complete!")

----------------------------------------------------------------------
Starting QA Generation
----------------------------------------------------------------------

Using sample of 20 texts for faster generation
Set USE_SAMPLE = False to use all texts

Generating 5 questions per text...
Loading QA model: Qwen/Qwen2.5-3B-Instruct
Device: cuda


tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

config.json:   0%|          | 0.00/661 [00:00<?, ?B/s]

`torch_dtype` is deprecated! Use `dtype` instead!


model.safetensors.index.json: 0.00B [00:00, ?B/s]

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/3.97G [00:00<?, ?B/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/2.20G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/242 [00:00<?, ?B/s]

Model loaded successfully!

------------------------------------------------------------
Generating QA from 20 texts
------------------------------------------------------------

Processing text 1/20...
  Generated 5 questions
    Q1: eroad vehicles of the time, which were powered by internal c...
    Q2: eroad locomotivesof the time. In the early 20thcentury, the ...
    Q3: eroad vehicles of the time, which were powered by internal c...
    Q4: eroad locomotivesof the time. In 1881, the French engineer G...
    Q5: eroad vehicles of the time, which were powered by internal c...
Processing text 2/20...
  Generated 5 questions
    Q1: andue to its compatibility with a large number of vehicles a...
    Q2: andue to its widespread adoption and compatibility with a la...
    Q3: andue to its compatibility with many existing electric vehic...
    Q4: andue to its widespread adoption and compatibility with exis...
    Q5: andue to its widespread adoption and compatibility with exis...
Proce

#Model Fine-tuning

In [None]:
class ModelFineTuner:
    """Fine-tune language model using QLoRA (Quantized LoRA)"""

    def __init__(self,
                 base_model: str = None,
                 output_dir: str = None):
        self.base_model = base_model or config.BASE_MODEL
        self.output_dir = output_dir or config.MODEL_DIR
        self.model = None
        self.tokenizer = None
        self.logger = logging.getLogger(__name__)

        os.makedirs(self.output_dir, exist_ok=True)

    def load_model(self):
        """Load model with 4-bit quantization"""

        print(f"\n{'='*60}")
        print(f"Loading model: {self.base_model}")
        print(f"{'='*60}\n")

        # Configure 4-bit quantization
        bnb_config = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_use_double_quant=True,
            bnb_4bit_quant_type="nf4",
            bnb_4bit_compute_dtype=torch.bfloat16
        )

        # Load model
        self.model = AutoModelForCausalLM.from_pretrained(
            self.base_model,
            quantization_config=bnb_config,
            device_map="auto",
            trust_remote_code=True
        )

        # Load tokenizer
        self.tokenizer = AutoTokenizer.from_pretrained(self.base_model)

        # Configure tokenizer
        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token
        self.tokenizer.padding_side = "right"

        print("Model loaded successfully!")
        print(f"Model size: {self.model.num_parameters():,} parameters")

    def prepare_lora(self):
        """Prepare LoRA for training"""

        print("\nPreparing LoRA configuration...")

        # Enable gradient checkpointing for memory efficiency
        self.model.gradient_checkpointing_enable()
        self.model = prepare_model_for_kbit_training(self.model)

        # LoRA configuration
        lora_config = LoraConfig(
            r=config.LORA_R,
            lora_alpha=config.LORA_ALPHA,
            target_modules=[
                "q_proj", "k_proj", "v_proj", "o_proj",
                "gate_proj", "up_proj", "down_proj"
            ],
            lora_dropout=config.LORA_DROPOUT,
            bias="none",
            task_type="CAUSAL_LM"
        )

        # Apply LoRA
        self.model = get_peft_model(self.model, lora_config)

        # Count parameters
        trainable, total = 0, 0
        for _, param in self.model.named_parameters():
            total += param.numel()
            if param.requires_grad:
                trainable += param.numel()

        print("LoRA configuration applied!")
        print(f"Total parameters: {total:,}")
        print(f"Trainable parameters: {trainable:,} ({100*trainable/total:.2f}%)")

    def format_prompt(self, question: str, answer: str, context: str = "") -> str:
        """Format data for training using instruction format"""

        if context:
            prompt = f"""<|system|>
You are an expert assistant for electric vehicle charging stations. Answer questions accurately based on the given context.</s>
<|user|>
Context: {context[:600]}

Question: {question}</s>
<|assistant|>
{answer}</s>"""
        else:
            prompt = f"""<|system|>
You are an expert assistant for electric vehicle charging stations. Answer questions accurately and helpfully.</s>
<|user|>
{question}</s>
<|assistant|>
{answer}</s>"""

        return prompt

    def prepare_dataset(self, qa_pairs: List[Dict]) -> Dataset:
        """Prepare dataset for training"""

        print(f"\nPreparing dataset from {len(qa_pairs)} QA pairs...")

        formatted_data = []

        for qa in qa_pairs:
            prompt = self.format_prompt(
                question=qa['question'],
                answer=qa['answer'],
                context=qa.get('context', '')[:400]
            )
            formatted_data.append({"text": prompt})

        dataset = Dataset.from_list(formatted_data)

        # Tokenization function
        def tokenize(examples):
            return self.tokenizer(
                examples["text"],
                truncation=True,
                padding="max_length",
                max_length=config.MAX_SEQ_LENGTH,
                return_tensors=None
            )

        tokenized_dataset = dataset.map(
            tokenize,
            batched=True,
            remove_columns=["text"],
            desc="Tokenizing"
        )

        # Add labels
        def add_labels(examples):
            examples["labels"] = examples["input_ids"].copy()
            return examples

        tokenized_dataset = tokenized_dataset.map(add_labels, batched=True)

        print(f"Dataset prepared: {len(tokenized_dataset)} samples")

        return tokenized_dataset

    def train(self,
              train_dataset: Dataset,
              num_epochs: int = None,
              batch_size: int = None,
              learning_rate: float = None):
        """Train the model"""

        num_epochs = num_epochs or config.NUM_EPOCHS
        batch_size = batch_size or config.BATCH_SIZE
        learning_rate = learning_rate or config.LEARNING_RATE

        print(f"\n{'='*60}")
        print("Starting Training")
        print(f"{'='*60}")
        print(f"Samples: {len(train_dataset)}")
        print(f"Epochs: {num_epochs}")
        print(f"Batch size: {batch_size}")
        print(f"Learning rate: {learning_rate}")
        print(f"{'='*60}\n")

        # Training arguments
        training_args = TrainingArguments(
            output_dir=self.output_dir,
            num_train_epochs=num_epochs,
            per_device_train_batch_size=batch_size,
            gradient_accumulation_steps=config.GRADIENT_ACCUMULATION_STEPS,
            learning_rate=learning_rate,
            weight_decay=0.01,
            warmup_ratio=0.1,
            logging_steps=5,
            save_steps=50,
            save_total_limit=2,
            fp16=True,
            optim="paged_adamw_8bit",
            lr_scheduler_type="cosine",
            report_to="none",
            remove_unused_columns=False,
        )

        # Data collator
        data_collator = DataCollatorForLanguageModeling(
            tokenizer=self.tokenizer,
            mlm=False
        )

        # Trainer
        trainer = Trainer(
            model=self.model,
            args=training_args,
            train_dataset=train_dataset,
            data_collator=data_collator,
        )

        # Train
        print("Training started...")
        trainer.train()

        # Save final model
        final_path = os.path.join(self.output_dir, "final_model")
        self.model.save_pretrained(final_path)
        self.tokenizer.save_pretrained(final_path)

        print(f"\nTraining complete!")
        print(f"Model saved to: {final_path}")

        return trainer

    def test_model(self, questions: List[str], max_new_tokens: int = 150):
        """Test the fine-tuned model"""

        print(f"\n{'='*60}")
        print("Testing Fine-tuned Model")
        print(f"{'='*60}\n")

        self.model.eval()

        for question in questions:
            prompt = f"""<|system|>
You are an expert assistant for electric vehicle charging stations.</s>
<|user|>
{question}</s>
<|assistant|>
"""

            inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device)

            with torch.no_grad():
                outputs = self.model.generate(
                    **inputs,
                    max_new_tokens=max_new_tokens,
                    temperature=0.7,
                    do_sample=True,
                    top_p=0.9,
                    repetition_penalty=1.2,
                    pad_token_id=self.tokenizer.pad_token_id,
                    eos_token_id=self.tokenizer.eos_token_id
                )

            response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)

            # Extract assistant's response
            if "<|assistant|>" in response:
                response = response.split("<|assistant|>", 1)[-1].strip()

            # Clean up
            response = response.replace("<|eot_id|>", "").strip()

            print(f"Question: {question}")
            print(f"Answer: {response}")
            print("-"*50)

print("ModelFineTuner class defined successfully!")

ModelFineTuner class defined successfully!


#Run Fine-Tuning Pipeline

In [None]:
print("="*70)
print("Starting Model Fine-Tuning Pipeline")
print("="*70)

# Check GPU availability
if not torch.cuda.is_available():
    print("\nWARNING: No GPU detected!")
    print("Training will be very slow on CPU.")
    print("Please enable GPU in Runtime > Change runtime type > GPU")
    response = input("Continue anyway? (yes/no): ")
    if response.lower() != 'yes':
        raise RuntimeError("GPU required for training")
else:
    print(f"\nGPU available: {torch.cuda.get_device_name(0)}")
    print(f"GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")

# Load QA dataset
qa_path = os.path.join(config.DATA_DIR, "qa_dataset.json")
if not os.path.exists(qa_path):
    print(f"\nERROR: QA dataset not found at {qa_path}")
    print("Please run QA generation first (Cell 9)")
    raise FileNotFoundError("QA dataset not found")

with open(qa_path, 'r', encoding='utf-8') as f:
    qa_pairs = json.load(f)

print(f"\nLoaded {len(qa_pairs)} QA pairs")

# Initialize fine-tuner
finetuner = ModelFineTuner(
    base_model=config.BASE_MODEL,
    output_dir=config.MODEL_DIR
)

# Load and prepare model
print("\n" + "="*60)
print("Step 1: Loading Base Model")
print("="*60)
finetuner.load_model()

print("\n" + "="*60)
print("Step 2: Preparing LoRA")
print("="*60)
finetuner.prepare_lora()

# Prepare dataset
print("\n" + "="*60)
print("Step 3: Preparing Training Dataset")
print("="*60)
train_dataset = finetuner.prepare_dataset(qa_pairs)

# Train model
print("\n" + "="*60)
print("Step 4: Training Model")
print("="*60)

# You can adjust these parameters
TRAIN_EPOCHS = 3
TRAIN_BATCH_SIZE = 2 
TRAIN_LR = 2e-4

trainer = finetuner.train(
    train_dataset=train_dataset,
    num_epochs=TRAIN_EPOCHS,
    batch_size=TRAIN_BATCH_SIZE,
    learning_rate=TRAIN_LR
)

print("\n" + "-"*70)
print("Fine-tuning complete!")
print("-"*70)

Starting Model Fine-Tuning Pipeline

GPU available: Tesla T4
GPU memory: 15.8 GB

Loaded 110 QA pairs

Step 1: Loading Base Model

Loading model: Qwen/Qwen2.5-3B-Instruct



Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Model loaded successfully!
Model size: 3,085,938,688 parameters

Step 2: Preparing LoRA

Preparing LoRA configuration...
LoRA configuration applied!
Total parameters: 1,728,606,208
Trainable parameters: 29,933,568 (1.73%)

Step 3: Preparing Training Dataset

Preparing dataset from 110 QA pairs...


Tokenizing:   0%|          | 0/110 [00:00<?, ? examples/s]

Map:   0%|          | 0/110 [00:00<?, ? examples/s]

Dataset prepared: 110 samples

Step 4: Training Model

Starting Training
Samples: 110
Epochs: 3
Batch size: 2
Learning rate: 0.0002

Training started...


`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`.
  return fn(*args, **kwargs)


Step,Training Loss
5,1.8917
10,1.4613
15,1.2491
20,1.1112
25,0.9529
30,0.9019
35,0.8017
40,0.8437



Training complete!
Model saved to: ./models/final_model

----------------------------------------------------------------------
Fine-tuning complete!
----------------------------------------------------------------------


#Test Fine-tuned Model

In [None]:
print("-"*70)
print("Testing Fine-Tuned Model")
print("-"*70)

# Define test questions
test_questions = [
    "What is the difference between Level 1 and Level 2 charging?",
    "How fast is DC fast charging?",
    "What is CHAdeMO?",
    "How much does it cost to charge an EV at home?",
    "What is the CCS charging standard?",
    "What are the main components of an EV charging station?",
    "How does temperature affect EV charging?",
    "What is the typical range of an electric vehicle?",
    "Can I use a Tesla Supercharger with a non-Tesla vehicle?",
    "What safety features do EV chargers have?"
]

print(f"\nTesting with {len(test_questions)} questions\n")

# Test the model
finetuner.test_model(test_questions, max_new_tokens=200)

print("\n" + "-"*70)
print("Testing complete!")
print("-"*70)

# Additional: Interactive testing
print("\n" + "-"*70)
print("Interactive Testing")
print("-"*70)
print("Type your questions below (type 'quit' to exit):\n")

while True:
    user_question = input("Your question: ").strip()

    if user_question.lower() in ['quit', 'exit', 'q']:
        print("Exiting interactive mode.")
        break

    if not user_question:
        continue

    finetuner.test_model([user_question], max_new_tokens=200)
    print()

----------------------------------------------------------------------
Testing Fine-Tuned Model
----------------------------------------------------------------------

Testing with 10 questions


Testing Fine-tuned Model

Question: What is the difference between Level 1 and Level 2 charging?
Answer: Level 1 chargers use standard household outlets with a 120V power supply, typically delivering about 3.5-4.8A of current. This type of charger can take several hours to fully charge most EV batteries.

In contrast, Level 2 chargers provide higher voltages (typically 240V) and currents up to 62A or more, which allows them to deliver significantly faster charges than Level 1 chargers. A typical home installation might include a Level 2 circuit capable of supplying at least 7kW continuously from its connected device output terminals when drawing full load on that circuit under normal conditions as specified in UL 963/UL C1057 standards. These systems generally require professional installation

#Model Evaluation

In [None]:
import nltk
try:
    nltk.download('punkt', quiet=True)
except:
    pass

class ModelEvaluator:
    """Evaluate model performance"""

    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
        self.device = model.device
        self.rouge_scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'],
                                                      use_stemmer=True)

    def generate_answer(self, question: str, context: str = "") -> str:
        """Generate answer for evaluation"""

        if context:
            prompt = f"""<|system|>
You are an expert assistant for electric vehicle charging stations. Answer based on the context.</s>
<|user|>
Context: {context[:600]}

Question: {question}</s>
<|assistant|>
"""
        else:
            prompt = f"""<|system|>
You are an expert assistant for electric vehicle charging stations.</s>
<|user|>
{question}</s>
<|assistant|>
"""

        inputs = self.tokenizer(prompt, return_tensors="pt",
                               max_length=512, truncation=True).to(self.device)

        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=150,
                temperature=0.7,
                do_sample=True,
                top_p=0.9,
                pad_token_id=self.tokenizer.pad_token_id
            )

        response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)

        if "<|assistant|>" in response:
            response = response.split("<|assistant|>", 1)[-1].strip()

        return response.replace("<|eot_id|>", "").strip()

    def calculate_rouge(self, prediction: str, reference: str) -> Dict:
        """Calculate ROUGE scores"""
        scores = self.rouge_scorer.score(reference, prediction)
        return {
            'rouge1': scores['rouge1'].fmeasure,
            'rouge2': scores['rouge2'].fmeasure,
            'rougeL': scores['rougeL'].fmeasure
        }

    def calculate_bleu(self, prediction: str, reference: str) -> float:
        """Calculate BLEU score"""
        reference_tokens = reference.split()
        prediction_tokens = prediction.split()

        smoothing = SmoothingFunction().method1
        return sentence_bleu([reference_tokens], prediction_tokens,
                           smoothing_function=smoothing)

    def evaluate_dataset(self, qa_pairs: List[Dict], sample_size: int = 50) -> Dict:
        """Evaluate on a sample of QA pairs"""

        print(f"Evaluating on {sample_size} samples...")

        # Sample if dataset is larger
        if len(qa_pairs) > sample_size:
            import random
            qa_sample = random.sample(qa_pairs, sample_size)
        else:
            qa_sample = qa_pairs

        rouge_scores = {'rouge1': [], 'rouge2': [], 'rougeL': []}
        bleu_scores = []

        for idx, qa in enumerate(qa_sample):
            if idx % 10 == 0:
                print(f"Progress: {idx}/{len(qa_sample)}")

            # Generate prediction
            prediction = self.generate_answer(qa['question'], qa.get('context', ''))
            reference = qa['answer']

            # Calculate metrics
            rouge = self.calculate_rouge(prediction, reference)
            bleu = self.calculate_bleu(prediction, reference)

            rouge_scores['rouge1'].append(rouge['rouge1'])
            rouge_scores['rouge2'].append(rouge['rouge2'])
            rouge_scores['rougeL'].append(rouge['rougeL'])
            bleu_scores.append(bleu)

        # Calculate averages
        results = {
            'rouge1': np.mean(rouge_scores['rouge1']),
            'rouge2': np.mean(rouge_scores['rouge2']),
            'rougeL': np.mean(rouge_scores['rougeL']),
            'bleu': np.mean(bleu_scores),
            'num_samples': len(qa_sample)
        }

        return results

# Run evaluation
print("="*70)
print("Model Evaluation")
print("="*70)

# Load QA dataset
qa_path = os.path.join(config.DATA_DIR, "qa_dataset.json")
with open(qa_path, 'r', encoding='utf-8') as f:
    qa_pairs = json.load(f)

# Initialize evaluator
evaluator = ModelEvaluator(finetuner.model, finetuner.tokenizer)

# Evaluate
eval_sample_size = min(50, len(qa_pairs))
results = evaluator.evaluate_dataset(qa_pairs, sample_size=eval_sample_size)

# Display results
print("\n" + "-"*70)
print("Evaluation Results")
print("-"*70)
print(f"Samples evaluated: {results['num_samples']}")
print(f"\nROUGE Scores:")
print(f"  ROUGE-1: {results['rouge1']:.4f}")
print(f"  ROUGE-2: {results['rouge2']:.4f}")
print(f"  ROUGE-L: {results['rougeL']:.4f}")
print(f"\nBLEU Score: {results['bleu']:.4f}")
print("-"*70)

# Save results
results_path = os.path.join(config.OUTPUT_DIR, "evaluation_results.json")
with open(results_path, 'w') as f:
    json.dump(results, f, indent=2)
print(f"\nResults saved to: {results_path}")

Model Evaluation
Evaluating on 50 samples...
Progress: 0/50
Progress: 10/50
Progress: 20/50
Progress: 30/50
Progress: 40/50

----------------------------------------------------------------------
Evaluation Results
----------------------------------------------------------------------
Samples evaluated: 50

ROUGE Scores:
  ROUGE-1: 0.5489
  ROUGE-2: 0.3345
  ROUGE-L: 0.4036

BLEU Score: 0.2181
----------------------------------------------------------------------

Results saved to: ./outputs/evaluation_results.json


#Execute Model Evaluation

In [None]:
import shutil
from datetime import datetime

print("="*70)
print("Saving and Exporting Model")
print("="*70)

# Create export directory with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
export_dir = os.path.join(config.OUTPUT_DIR, f"ev_qa_model_{timestamp}")
os.makedirs(export_dir, exist_ok=True)

print(f"\nExport directory: {export_dir}")

# 1. Copy the fine-tuned model
print("\n1. Copying fine-tuned model...")
model_source = os.path.join(config.MODEL_DIR, "final_model")
model_dest = os.path.join(export_dir, "model")

if os.path.exists(model_source):
    shutil.copytree(model_source, model_dest)
    print(f"   Model copied to: {model_dest}")
else:
    print("   WARNING: Fine-tuned model not found!")

# 2. Copy QA dataset
print("\n2. Copying QA dataset...")
qa_source = os.path.join(config.DATA_DIR, "qa_dataset.json")
qa_dest = os.path.join(export_dir, "qa_dataset.json")

if os.path.exists(qa_source):
    shutil.copy(qa_source, qa_dest)
    print(f"   QA dataset copied to: {qa_dest}")
else:
    print("   WARNING: QA dataset not found!")

# 3. Copy evaluation results if available
print("\n3. Copying evaluation results...")
eval_source = os.path.join(config.OUTPUT_DIR, "evaluation_results.json")
eval_dest = os.path.join(export_dir, "evaluation_results.json")

if os.path.exists(eval_source):
    shutil.copy(eval_source, eval_dest)
    print(f"   Evaluation results copied to: {eval_dest}")
else:
    print("   No evaluation results found (optional)")

# 4. Create README
print("\n4. Creating README...")
readme_content = f"""# EV Charging Station QA Model

## Model Information
- Base Model: {config.BASE_MODEL}
- Fine-tuned on: {timestamp}
- Training Method: QLoRA (Quantized Low-Rank Adaptation)
- Domain: Electric Vehicle Charging Stations

## Model Configuration
- LoRA Rank: {config.LORA_R}
- LoRA Alpha: {config.LORA_ALPHA}
- Learning Rate: {config.LEARNING_RATE}
- Training Epochs: {config.NUM_EPOCHS}
- Batch Size: {config.BATCH_SIZE}

## Dataset
- Total QA pairs: {len(qa_pairs) if 'qa_pairs' in dir() else 'N/A'}
- Questions per text: {config.QUESTIONS_PER_TEXT}
- Chunk size: {config.CHUNK_SIZE} words

## Usage

### Loading the Model
```python
from transformers import AutoTokenizer, AutoModelForCausalLM
from peft import PeftModel

# Load base model
base_model = AutoModelForCausalLM.from_pretrained("{config.BASE_MODEL}")
tokenizer = AutoTokenizer.from_pretrained("{config.BASE_MODEL}")

# Load LoRA weights
model = PeftModel.from_pretrained(base_model, "./model")
```

### Generating Answers
```python
question = "What is the difference between Level 1 and Level 2 charging?"

prompt = f'''<|system|>
You are an expert assistant for electric vehicle charging stations.</s>
<|user|>
{{question}}</s>
<|assistant|>
'''

inputs = tokenizer(prompt, return_tensors="pt")
outputs = model.generate(**inputs, max_new_tokens=150)
answer = tokenizer.decode(outputs[0], skip_special_tokens=True)
print(answer)
```

## Files
- `model/` - Fine-tuned model weights (LoRA adapters)
- `qa_dataset.json` - Training QA pairs
- `evaluation_results.json` - Model performance metrics
- `README.md` - This file

## Performance
See `evaluation_results.json` for detailed metrics.

## License
This model is based on {config.BASE_MODEL}. Please refer to the original model's license.

## Contact
For questions or issues, please contact the model creator.
"""

readme_path = os.path.join(export_dir, "README.md")
with open(readme_path, 'w', encoding='utf-8') as f:
    f.write(readme_content)
print(f"   README created: {readme_path}")

# 5. Create inference script
print("\n5. Creating inference script...")
inference_script = '''import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from peft import PeftModel

class EVChargingQA:
    """EV Charging Station QA Assistant"""

    def __init__(self, model_path="./model", base_model="''' + config.BASE_MODEL + '''"):
        print("Loading model...")
        self.tokenizer = AutoTokenizer.from_pretrained(base_model)
        self.base_model = AutoModelForCausalLM.from_pretrained(
            base_model,
            torch_dtype=torch.float16,
            device_map="auto"
        )
        self.model = PeftModel.from_pretrained(self.base_model, model_path)
        self.model.eval()
        print("Model loaded successfully!")

    def answer(self, question, context="", max_tokens=150):
        """Generate answer for a question"""

        if context:
            prompt = f"""<|system|>
You are an expert assistant for electric vehicle charging stations. Answer based on context.</s>
<|user|>
Context: {context}

Question: {question}</s>
<|assistant|>
"""
        else:
            prompt = f"""<|system|>
You are an expert assistant for electric vehicle charging stations.</s>
<|user|>
{question}</s>
<|assistant|>
"""

        inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device)

        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=max_tokens,
                temperature=0.7,
                do_sample=True,
                top_p=0.9,
                pad_token_id=self.tokenizer.pad_token_id
            )

        response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)

        if "<|assistant|>" in response:
            response = response.split("<|assistant|>", 1)[-1].strip()

        return response.replace("<|eot_id|>", "").strip()

if __name__ == "__main__":
    # Initialize model
    qa_bot = EVChargingQA()

    # Example questions
    questions = [
        "What is Level 2 charging?",
        "How long does DC fast charging take?",
        "What is CHAdeMO?"
    ]

    print("\\nTesting model:\\n")
    for q in questions:
        print(f"Q: {q}")
        answer = qa_bot.answer(q)
        print(f"A: {answer}\\n")
'''

inference_path = os.path.join(export_dir, "inference.py")
with open(inference_path, 'w', encoding='utf-8') as f:
    f.write(inference_script)
print(f"   Inference script created: {inference_path}")

# 6. Create requirements.txt
print("\n6. Creating requirements.txt...")
requirements = """transformers>=4.35.0
torch>=2.0.0
peft>=0.6.0
accelerate>=0.24.0
bitsandbytes>=0.41.0
sentencepiece>=0.1.99
"""

requirements_path = os.path.join(export_dir, "requirements.txt")
with open(requirements_path, 'w') as f:
    f.write(requirements)
print(f"   Requirements file created: {requirements_path}")

# 7. Create archive
print("\n7. Creating archive...")
archive_name = f"ev_qa_model_{timestamp}"
archive_path = shutil.make_archive(
    os.path.join(config.OUTPUT_DIR, archive_name),
    'zip',
    export_dir
)
print(f"   Archive created: {archive_path}")

# Summary
print("\n" + "="*70)
print("Export Summary")
print("="*70)
print(f"Export directory: {export_dir}")
print(f"Archive file: {archive_path}")
print(f"\nContents:")
print(f"  - model/           Fine-tuned model (LoRA adapters)")
print(f"  - qa_dataset.json  Training data")
print(f"  - inference.py     Ready-to-use inference script")
print(f"  - README.md        Documentation")
print(f"  - requirements.txt Dependencies")

print("\n" + "-"*70)
print("Export complete!")
print("-"*70)
print(f"\nTo use the model:")
print(f"1. Download the archive: {archive_path}")
print(f"2. Extract it")
print(f"3. Install requirements: pip install -r requirements.txt")
print(f"4. Run inference: python inference.py")
print("-"*70)

Saving and Exporting Model

Export directory: ./outputs/ev_qa_model_20260117_184257

1. Copying fine-tuned model...
   Model copied to: ./outputs/ev_qa_model_20260117_184257/model

2. Copying QA dataset...
   QA dataset copied to: ./outputs/ev_qa_model_20260117_184257/qa_dataset.json

3. Copying evaluation results...
   Evaluation results copied to: ./outputs/ev_qa_model_20260117_184257/evaluation_results.json

4. Creating README...
   README created: ./outputs/ev_qa_model_20260117_184257/README.md

5. Creating inference script...
   Inference script created: ./outputs/ev_qa_model_20260117_184257/inference.py

6. Creating requirements.txt...
   Requirements file created: ./outputs/ev_qa_model_20260117_184257/requirements.txt

7. Creating archive...
   Archive created: /content/outputs/ev_qa_model_20260117_184257.zip

Export Summary
Export directory: ./outputs/ev_qa_model_20260117_184257
Archive file: /content/outputs/ev_qa_model_20260117_184257.zip

Contents:
  - model/           Fine-

#API File Creation

In [None]:
%%writefile api.py

from fastapi import FastAPI, HTTPException, Depends, Security, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import Optional, Dict
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from peft import PeftModel
import time
import logging
from datetime import datetime
import os
from collections import deque

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Configuration
class ServerConfig:
    MODEL_PATH = "./models/final_model"

    BASE_MODEL = "Qwen/Qwen2.5-3B-Instruct"

    API_KEY = "my-secret-token-123"  # Tmp api key
    MAX_LENGTH = 512
    TEMPERATURE = 0.7
    TOP_P = 0.9

config = ServerConfig()

# Security
security = HTTPBearer()

def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)):
    token = credentials.credentials
    if token != config.API_KEY:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials"
        )
    return token

# Monitoring
class RequestMonitor:
    def __init__(self, max_size=1000):
        self.requests = deque(maxlen=max_size)
        self.total_requests = 0
        self.total_errors = 0

    def log_request(self, endpoint: str, latency: float, success: bool, error: str = None):
        self.total_requests += 1
        if not success:
            self.total_errors += 1

        self.requests.append({
            'timestamp': datetime.now().isoformat(),
            'endpoint': endpoint,
            'latency': latency,
            'success': success,
            'error': error
        })

    def get_stats(self) -> Dict:
        recent_requests = list(self.requests)
        if not recent_requests:
            return {'total_requests': self.total_requests, 'total_errors': self.total_errors}

        latencies = [r['latency'] for r in recent_requests]
        success_rate = sum(1 for r in recent_requests if r['success']) / len(recent_requests)

        return {
            'total_requests': self.total_requests,
            'total_errors': self.total_errors,
            'success_rate': success_rate,
            'avg_latency': sum(latencies) / len(latencies)
        }

monitor = RequestMonitor()

# Model Manager
class ModelManager:
    def __init__(self):
        self.model = None
        self.tokenizer = None
        self.device = "cuda" if torch.cuda.is_available() else "cpu"

    def load_model(self):
        try:
            logger.info(f"Loading base model: {config.BASE_MODEL}...")
            self.tokenizer = AutoTokenizer.from_pretrained(config.BASE_MODEL)

            base_model = AutoModelForCausalLM.from_pretrained(
                config.BASE_MODEL,
                torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
                device_map="auto"
            )

            logger.info(f"Loading LoRA weights from: {config.MODEL_PATH}...")
            self.model = PeftModel.from_pretrained(base_model, config.MODEL_PATH)
            self.model.eval()

            logger.info("Model loaded successfully!")
            return True
        except Exception as e:
            logger.error(f"Error loading model: {e}")
            return False

    def generate_response(self, question: str, max_length: int = None, temperature: float = None):
        if self.model is None:
            raise HTTPException(status_code=503, detail="Model not loaded")

        prompt = f"""<|system|>
You are an expert assistant for electric vehicle charging stations. Answer strictly based on your fine-tuned knowledge.</s>
<|user|>
{question}</s>
<|assistant|>
"""

        start_time = time.time()
        try:
            inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)

            with torch.no_grad():
                outputs = self.model.generate(
                    **inputs,
                    max_new_tokens=max_length or config.MAX_LENGTH,
                    temperature=temperature or config.TEMPERATURE,
                    top_p=config.TOP_P,
                    do_sample=True,
                    pad_token_id=self.tokenizer.eos_token_id
                )

            full_response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)

            if "<|assistant|>" in full_response:
                answer = full_response.split("<|assistant|>")[-1].strip()
            else:
                answer = full_response.replace(prompt, "").strip()

            inference_time = time.time() - start_time
            return {'answer': answer, 'inference_time': inference_time}

        except Exception as e:
            logger.error(f"Inference error: {e}")
            raise HTTPException(status_code=500, detail=str(e))

model_manager = ModelManager()

# FastAPI App
app = FastAPI(title="EV Charging QA API")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

class QuestionRequest(BaseModel):
    question: str
    max_length: Optional[int] = 200
    temperature: Optional[float] = 0.7

@app.on_event("startup")
async def startup_event():
    model_manager.load_model()

@app.get("/")
def root():
    return {"message": "EV Charging QA Model Online", "docs": "/docs"}

@app.post("/ask")
async def ask(request: QuestionRequest, token: str = Depends(verify_token)):
    start_time = time.time()
    try:
        result = model_manager.generate_response(
            request.question, request.max_length, request.temperature
        )
        monitor.log_request("/ask", time.time() - start_time, True)
        return result
    except Exception as e:
        monitor.log_request("/ask", time.time() - start_time, False, str(e))
        raise

@app.get("/stats")
async def stats(token: str = Depends(verify_token)):
    return monitor.get_stats()

#Server Launch with Ngrok

In [None]:
import os
import threading
import uvicorn
from pyngrok import ngrok

NGROK_TOKEN = "NGROK_AUTHTOKEN"  
os.environ["NGROK_AUTHTOKEN"] = NGROK_TOKEN

try:
    public_url = ngrok.connect(8000).public_url
    print(f"\n================================================================")
    print(f"🚀 API IS LIVE AT: {public_url}")
    print(f"📄 Swagger Docs:   {public_url}/docs")
    print(f"🔑 API Key:        my-secret-token-123")
    print(f"================================================================\n")
except Exception as e:
    print(f"Error starting ngrok: {e}")

def run_server():
    uvicorn.run("api:app", host="0.0.0.0", port=8000, log_level="error")

thread = threading.Thread(target=run_server)
thread.start()


🚀 API IS LIVE AT: https://impervious-scotty-sloughy.ngrok-free.dev
📄 Swagger Docs:   https://impervious-scotty-sloughy.ngrok-free.dev/docs
🔑 API Key:        my-secret-token-123



#API Testing

In [None]:
import requests
import time

API_URL = public_url  
API_KEY = "my-secret-token-123"

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}

questions = [
    "What is the difference between AC and DC charging?",
    "How fast is a CHAdeMO charger?",
    "Can I charge a Tesla at a CCS station?"
]

print(f"Testing API at: {API_URL}\n")

for q in questions:
    payload = {
        "question": q,
        "max_length": 150,
        "temperature": 0.7
    }

    try:
        start = time.time()
        response = requests.post(f"{API_URL}/ask", json=payload, headers=headers)
        duration = time.time() - start

        if response.status_code == 200:
            print(f"✅ Question: {q}")
            print(f"💡 Answer: {response.json()['answer']}")
            print(f"⏱️ Time: {duration:.2f}s")
            print("-" * 50)
        else:
            print(f"❌ Error {response.status_code}: {response.text}")

    except Exception as e:
        print(f"Connection Error: {e}")

print("\n📊 Server Stats:")
try:
    stats = requests.get(f"{API_URL}/stats", headers=headers).json()
    print(stats)
except:
    pass

Testing API at: https://impervious-scotty-sloughy.ngrok-free.dev

✅ Question: What is the difference between AC and DC charging?
💡 Answer: Certainly! AC charging typically operates at lower voltages and higher currents than DC charging. For example, Level 1 charging uses 120V and can deliver up to 1.4 kW of power
⏱️ Time: 17.53s
--------------------------------------------------
✅ Question: How fast is a CHAdeMO charger?
💡 Answer: Certainly! Here are some details on the charging speeds for other common Electric Vehicle (EV) charging
⏱️ Time: 18.01s
--------------------------------------------------
✅ Question: Can I charge a Tesla at a CCS station?
💡 Answer: For detailed information on the compatibility of Tesla models with different charging standards such as CCS (Combined Charging System), you can refer to Tesla's official website or their app. Additionally, websites like EVCompare and PlugShare provide user-generated data on charging stations and their compatibility. You can also ch