In [1]:
import os
from dotenv import load_dotenv
from google import genai

load_dotenv()
API_KEY = os.getenv("GOOGLE_API_KEY")
if not API_KEY:
    raise ValueError("GOOGLE_API_KEY not found...")

client = genai.Client(api_key=API_KEY)
MODEL_TO_USE = "gemini-1.5-flash"

In [2]:
while True:
    prompt = input("\nEnter your prompt (or type 'exit' to quit): ")
    if prompt.lower() == "exit":
        break
    try:
        response = client.models.generate_content(
            model=MODEL_TO_USE,
            contents=prompt
        )
        print("\nResponse:\n", response.text)
    except Exception as e:
        print(f"An error occurred: {e}")


Enter your prompt (or type 'exit' to quit):  hello there



Response:
 Hello there! How can I help you today?




Enter your prompt (or type 'exit' to quit):  xit



Response:
 "xit" isn't a command or a word with a standard meaning.  It might be:

* **A typo:**  Perhaps you meant something else?  Could you clarify what you're trying to do or say?
* **An abbreviation:**  In a specific context (like a game or program), it might have a particular meaning.  If so, please provide more context.
* **A made-up word:**  It might be a newly coined term or slang.

Please provide more information so I can understand what you need.




Enter your prompt (or type 'exit' to quit):  exit


In [3]:
import os
import json
import pandas as pd
import re
from dotenv import load_dotenv
from google import genai
from datetime import datetime

In [30]:
def test_single_patent(jsonl_file_path: str, patent_index: int = 0):
    """
    Test function to process a single patent and display the results
    
    Args:
        jsonl_file_path: Path to your JSONL file
        patent_index: Index of the patent to test (0-based)
    """
    
    # Setup API
    load_dotenv()
    api_key = os.getenv("GOOGLE_API_KEY")
    if not api_key:
        raise ValueError("GOOGLE_API_KEY not found in environment variables")
    
    client = genai.Client(api_key=api_key)
    model = "gemini-1.5-flash"
    
    # Load single patent
    print(f"Loading patent at index {patent_index}...")
    with open(jsonl_file_path, 'r', encoding='utf-8') as f:
        for i, line in enumerate(f):
            if i == patent_index:
                patent_data = json.loads(line.strip())
                break
        else:
            print(f"Patent index {patent_index} not found in file")
            return
    
    print(f"Loaded patent: {patent_data.get('lens_id', '000-152-677-120-075')}")
    print(f"Title: {patent_data.get('invention_title_text', 'No title')[:100]}...")
    
    # Preprocess text function
    def preprocess_text(text):
        if pd.isna(text) or text == '':
            return ''
        text = str(text)
        text = re.sub(r'\b(fig\.|figure)\s*\d+\b', 'figure', text, flags=re.IGNORECASE)
        text = re.sub(r'\bclaim\s*\d+\b', 'claim', text, flags=re.IGNORECASE)
        text = re.sub(r'\s+', ' ', text)
        return text.strip()
    
    # Prepare patent text
    def prepare_patent_text(patent_data, max_tokens=3000):
        title = preprocess_text(patent_data.get('invention_title_text', ''))
        abstract = preprocess_text(patent_data.get('abstract_text', ''))
        
        claims = patent_data.get('claims', [])
        if isinstance(claims, list):
            claims_text = ' '.join([str(claim) for claim in claims])
        else:
            claims_text = str(claims)
        claims_text = preprocess_text(claims_text)
        
        # Estimate tokens (rough: 1 token ≈ 4 characters)
        def estimate_tokens(text):
            return len(text) // 4
        
        combined_parts = []
        total_tokens = 0
        
        if title:
            combined_parts.append(f"TITLE: {title}")
            total_tokens += estimate_tokens(title)
        
        if abstract and (total_tokens + estimate_tokens(abstract)) <= max_tokens:
            combined_parts.append(f"ABSTRACT: {abstract}")
            total_tokens += estimate_tokens(abstract)
        
        remaining_tokens = max_tokens - total_tokens
        if claims_text and remaining_tokens > 100:
            if estimate_tokens(claims_text) <= remaining_tokens:
                combined_parts.append(f"CLAIMS: {claims_text}")
                total_tokens += estimate_tokens(claims_text)
            else:
                max_chars = remaining_tokens * 4
                truncated_claims = claims_text[:max_chars]
                combined_parts.append(f"CLAIMS: {truncated_claims}")
                total_tokens += estimate_tokens(truncated_claims)
        
        return "\n\n".join(combined_parts), total_tokens
    
    # Prepare the patent text
    patent_text, token_count = prepare_patent_text(patent_data)
    
    print(f"\nPrepared text (≈{token_count} tokens):")
    print("-" * 50)
    print(patent_text[:500] + "..." if len(patent_text) > 500 else patent_text)
    print("-" * 50)
    
    # Create analysis prompt
    prompt = f"""
You are an expert patent analyst specializing in autonomous vehicle technologies. Analyze the following patent and provide a structured analysis.

PATENT CONTENT:
{patent_text}

Please provide your analysis in the following EXACT JSON format (ensure valid JSON syntax):

{{
    "core_innovation": {{
        "problem_addressed": "Brief description of the main problem this patent addresses",
        "proposed_solution": "Brief description of the key solution or innovation",
        "novelty_aspect": "What makes this innovation novel or unique",
        "technical_approach": "Brief description of the technical approach used"
    }},
    "conceptual_categories": {{
        "primary_category": "The main category this patent falls into",
        "secondary_categories": ["List of additional relevant categories"],
        "confidence_score": "High/Medium/Low confidence in categorization"
    }},
    "av_technology_areas": [
        "List of relevant AV technology areas from: perception_sensing, localization_mapping, path_planning_control, ai_ml_architecture, v2x_communication, safety_validation, simulation_testing, cybersecurity, human_machine_interface, hardware_sensors, software_algorithms, data_processing, vehicle_control_systems, other"
    ]
}}

IMPORTANT: 
- Respond ONLY with valid JSON - no additional text or explanations
- Use the exact field names shown above
- Keep descriptions concise but informative (max 2-3 sentences each)
- For av_technology_areas, select from the provided list only
- If uncertain about a field, use "Not clearly specified" rather than leaving empty
"""
    
    # Call API
    print("\nCalling Gemini API...")
    try:
        response = client.models.generate_content(
            model=model,
            contents=prompt
        )
        
        # Parse response
        response_text = response.text.strip()
        print(f"\nRaw API Response:")
        print("-" * 50)
        print(response_text)
        print("-" * 50)
        
        # Try to extract JSON if wrapped in markdown
        if "```json" in response_text:
            start = response_text.find("```json") + 7
            end = response_text.find("```", start)
            if end > start:
                response_text = response_text[start:end].strip()
        elif "```" in response_text:
            start = response_text.find("```") + 3
            end = response_text.find("```", start)
            if end > start:
                response_text = response_text[start:end].strip()
        
        # Parse JSON
        analysis_result = json.loads(response_text)
        
        print(f"\nParsed JSON Result:")
        print("-" * 50)
        print(json.dumps(analysis_result, indent=2))
        print("-" * 50)
        
        # Show structured output
        print(f"\nStructured Analysis for Patent: {patent_data.get('lens_id', '000-152-677-120-075')}")
        print("=" * 60)
        
        core_innovation = analysis_result.get('core_innovation', {})
        print(f"PROBLEM ADDRESSED: {core_innovation.get('problem_addressed', 'N/A')}")
        print(f"PROPOSED SOLUTION: {core_innovation.get('proposed_solution', 'N/A')}")
        print(f"NOVELTY ASPECT: {core_innovation.get('novelty_aspect', 'N/A')}")
        print(f"TECHNICAL APPROACH: {core_innovation.get('technical_approach', 'N/A')}")
        
        categories = analysis_result.get('conceptual_categories', {})
        print(f"\nPRIMARY CATEGORY: {categories.get('primary_category', 'N/A')}")
        print(f"SECONDARY CATEGORIES: {categories.get('secondary_categories', [])}")
        print(f"CONFIDENCE: {categories.get('confidence_score', 'N/A')}")
        
        print(f"\nAV TECHNOLOGY AREAS: {analysis_result.get('av_technology_areas', [])}")
        
        # Create final result structure
        final_result = {
            # Original patent metadata
            'lens_id': patent_data.get('lens_id', ''),
            'invention_title_text': patent_data.get('invention_title_text', ''),
            'abstract_text': patent_data.get('abstract_text', ''),
            'applicant_name': patent_data.get('applicant_name', ''),
            'date_published': patent_data.get('date_published', ''),
            'earliest_claim_date': patent_data.get('earliest_claim_date', ''),
            'cpc_symbols': patent_data.get('cpc_symbols', []),
            
            # Analysis results
            'problem_addressed': core_innovation.get('problem_addressed', ''),
            'proposed_solution': core_innovation.get('proposed_solution', ''),
            'novelty_aspect': core_innovation.get('novelty_aspect', ''),
            'technical_approach': core_innovation.get('technical_approach', ''),
            'primary_category': categories.get('primary_category', ''),
            'secondary_categories': categories.get('secondary_categories', []),
            'categorization_confidence': categories.get('confidence_score', ''),
            'av_technology_areas': analysis_result.get('av_technology_areas', []),
            
            # Processing metadata
            'processing_timestamp': datetime.now().isoformat(),
            'estimated_tokens': token_count
        }
        
        print(f"\nFinal DataFrame Row Structure:")
        print("=" * 60)
        for key, value in final_result.items():
            if isinstance(value, str) and len(value) > 100:
                print(f"{key}: {value[:100]}...")
            else:
                print(f"{key}: {value}")
        
        return final_result, analysis_result
        
    except json.JSONDecodeError as e:
        print(f"JSON parsing error: {e}")
        print(f"Raw response: {response_text}")
        return None, None
        
    except Exception as e:
        print(f"Error: {e}")
        return None, None

In [31]:
# Test usage example
if __name__ == "__main__":
    # Test with first patent (index 0)
    result, raw_analysis = test_single_patent("av_patentdata.jsonl", patent_index=0)
    
    if result:
        print("\n✅ Test completed successfully!")
        print("You can now run the full pipeline with confidence.")
    else:
        print("\n❌ Test failed. Check the error messages above.")

2025-06-16 20:58:25,302 - INFO - AFC is enabled with max remote calls: 10.


Loading patent at index 0...
Loaded patent: 143-105-704-034-927
Title: Group driving style learning framework for autonomous vehicles...

Prepared text (≈3000 tokens):
--------------------------------------------------
TITLE: Group driving style learning framework for autonomous vehicles

ABSTRACT: A social driving style learning framework or system for autonomous vehicles is utilized, which can dynamically learn the social driving styles from surrounding vehicles and adopt the driving style as needed. Each of the autonomous vehicles within a particular driving area is equipped with the driving style learning system to perceive the driving behaviors of the surrounding vehicles to derive a set of driving style ...
--------------------------------------------------

Calling Gemini API...


2025-06-16 20:58:25,765 - INFO - HTTP Request: POST https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent "HTTP/1.1 429 Too Many Requests"


Error: 429 RESOURCE_EXHAUSTED. {'error': {'code': 429, 'message': 'You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits.', 'status': 'RESOURCE_EXHAUSTED', 'details': [{'@type': 'type.googleapis.com/google.rpc.QuotaFailure', 'violations': [{'quotaMetric': 'generativelanguage.googleapis.com/generate_content_free_tier_requests', 'quotaId': 'GenerateRequestsPerDayPerProjectPerModel-FreeTier', 'quotaDimensions': {'location': 'global', 'model': 'gemini-1.5-flash'}, 'quotaValue': '500'}]}, {'@type': 'type.googleapis.com/google.rpc.Help', 'links': [{'description': 'Learn more about Gemini API quotas', 'url': 'https://ai.google.dev/gemini-api/docs/rate-limits'}]}, {'@type': 'type.googleapis.com/google.rpc.RetryInfo', 'retryDelay': '33s'}]}}

❌ Test failed. Check the error messages above.


In [4]:
import os
import sys
import json
import pandas as pd
import re
import csv
import time
from typing import Dict, List, Optional, Any
from dotenv import load_dotenv
from google import genai
import logging
from datetime import datetime

In [5]:
class PatentAnalysisPipeline:
    """
    Pipeline for analyzing patent data using Gemini API to extract core innovations
    and categorize patents into conceptual categories.
    """
    
    def __init__(self, jsonl_file_path: str, output_dir: str = "output"):
        """
        Initialize the pipeline
        
        Args:
            jsonl_file_path: Path to the JSONL file containing patent data
            output_dir: Directory to save output files
        """
        self.jsonl_file_path = jsonl_file_path
        self.output_dir = output_dir
        self.data = None
        self.processed_data = []
        self.failed_patents = []
        
        # Load environment and setup API
        self.setup_api()
        
        # Create output directory
        os.makedirs(output_dir, exist_ok=True)
        
        # Token limits and processing parameters
        self.max_tokens_per_patent = 3000  # Conservative limit for Gemini
        self.base_delay = 2  # Base delay between requests (seconds)
        self.max_delay = 300  # Maximum delay cap (5 minutes)
    
    def setup_api(self):
        """Setup Gemini API client"""
        load_dotenv()
        self.api_key = os.getenv("GOOGLE_API_KEY")
        if not self.api_key:
            raise ValueError("GOOGLE_API_KEY not found in environment variables")
        
        self.client = genai.Client(api_key=self.api_key)
        self.model = "gemini-1.5-flash"
    
    def load_data(self) -> pd.DataFrame:
        """Load and parse JSONL file, keeping only records missing from the JSON checkpoint"""
    
        # Define the file paths
        json_path = r"C:\Users\Aniket Shinde\Desktop\Main\Jupyter Notebooks\patent_analysis_output\processed_patents_checkpoint_650_20250616_193652.json"
        csv_path = "lens-export.csv"
    
        # Step 1: Load existing JSON and collect lens_ids
        try:
            with open(json_path, 'r', encoding='utf-8') as f_json:
                json_data = json.load(f_json)
                json_lens_ids = {patent.get("lens_id") for patent in json_data if "lens_id" in patent}
        except Exception as e:
            raise Exception(f"Error reading JSON checkpoint: {e}")
    
        # Step 2: Load CSV and collect all lens_ids
        csv_lens_ids = set()
        try:
            with open(csv_path, 'r', encoding='utf-8') as f_csv:
                reader = csv.DictReader(f_csv)
                for row in reader:
                    lens_id = row.get("Lens ID")
                    if lens_id:
                        csv_lens_ids.add(lens_id)
        except Exception as e:
            raise Exception(f"Error reading CSV file: {e}")
    
        # Step 3: Identify lens_ids missing from JSON
        missing_lens_ids = csv_lens_ids - json_lens_ids
        print(f"Number of patents in JSON: {len(json_lens_ids)}")
        print(f"Number of patents in CSV: {len(csv_lens_ids)}")
        print(f"Number of missing patents in JSON: {len(missing_lens_ids)}")
    
        # Step 4: Read only missing patents from JSONL
        data = []
        try:
            with open(self.jsonl_file_path, 'r', encoding='utf-8') as f_jsonl:
                for line in f_jsonl:
                    try:
                        record = json.loads(line.strip())
                        if record.get("lens_id") in missing_lens_ids:
                            data.append(record)
                    except json.JSONDecodeError:
                        continue
    
            self.data = pd.DataFrame(data)
            return self.data
    
        except FileNotFoundError:
            raise FileNotFoundError(f"File not found: {self.jsonl_file_path}")
        except Exception as e:
            raise Exception(f"Error loading data from JSONL: {e}")
    
    def preprocess_text(self, text: str) -> str:
        """Clean and preprocess text for better LLM processing"""
        if pd.isna(text) or text == '':
            return ''
        
        # Convert to string if not already
        text = str(text)
        
        # Clean figure and claim references
        text = re.sub(r'\b(fig\.|figure)\s*\d+\b', 'figure', text, flags=re.IGNORECASE)
        text = re.sub(r'\bclaim\s*\d+\b', 'claim', text, flags=re.IGNORECASE)
        
        # Remove excessive whitespace and normalize
        text = re.sub(r'\s+', ' ', text)
        text = text.strip()
        
        return text

    def extract_retry_delay(self, error_details: str) -> int:
        """Extract retry delay from API error response"""
        try:
            # Look for retryDelay in the error message
            delay_match = re.search(r"'retryDelay':\s*'(\d+)s'", error_details)
            if delay_match:
                return int(delay_match.group(1))
            
            # Fallback: look for other delay patterns
            delay_match = re.search(r"retry.*?(\d+)\s*s", error_details, re.IGNORECASE)
            if delay_match:
                return int(delay_match.group(1))
                
        except Exception:
            pass
        
        return 60  # Default fallback
    
    def prepare_patent_text(self, patent_row: pd.Series) -> tuple[str, int]:
        """
        Prepare combined text for a single patent with token management
        
        Returns:
            tuple: (combined_text, estimated_tokens)
        """
        # Extract and clean components
        title = self.preprocess_text(patent_row.get('invention_title_text', ''))
        abstract = self.preprocess_text(patent_row.get('abstract_text', ''))
        
        # Handle claims - convert list to string
        claims = patent_row.get('claims', [])
        if isinstance(claims, list):
            claims_text = ' '.join([str(claim) for claim in claims])
        else:
            claims_text = str(claims)
        claims_text = self.preprocess_text(claims_text)
        
        # Estimate tokens (rough approximation: 1 token ≈ 4 characters)
        def estimate_tokens(text):
            return len(text) // 4
        
        title_tokens = estimate_tokens(title)
        abstract_tokens = estimate_tokens(abstract)
        claims_tokens = estimate_tokens(claims_text)
        
        # Build combined text with priority: title (always include) -> abstract -> claims
        combined_parts = []
        total_tokens = 0
        
        # Always include title
        if title:
            combined_parts.append(f"TITLE: {title}")
            total_tokens += title_tokens
        
        # Include abstract if space allows
        if abstract and (total_tokens + abstract_tokens) <= self.max_tokens_per_patent:
            combined_parts.append(f"ABSTRACT: {abstract}")
            total_tokens += abstract_tokens
        
        # Include claims if space allows (truncate if necessary)
        remaining_tokens = self.max_tokens_per_patent - total_tokens
        if claims_text and remaining_tokens > 100:  # Keep some buffer
            if claims_tokens <= remaining_tokens:
                combined_parts.append(f"CLAIMS: {claims_text}")
                total_tokens += claims_tokens
            else:
                # Truncate claims to fit
                max_chars = remaining_tokens * 4
                truncated_claims = claims_text[:max_chars]
                combined_parts.append(f"CLAIMS: {truncated_claims}")
                total_tokens += estimate_tokens(truncated_claims)
        
        combined_text = "\n\n".join(combined_parts)
        return combined_text, total_tokens
    
    def create_analysis_prompt(self, patent_text: str) -> str:
        """Create the analysis prompt for Gemini API"""
        prompt = f"""
You are an expert patent analyst specializing in autonomous vehicle technologies. Analyze the following patent and provide a structured analysis.

PATENT CONTENT:
{patent_text}

Please provide your analysis in the following EXACT JSON format (ensure valid JSON syntax):

{{
    "core_innovation": {{
        "problem_addressed": "Brief description of the main problem this patent addresses",
        "proposed_solution": "Brief description of the key solution or innovation",
        "novelty_aspect": "What makes this innovation novel or unique",
        "technical_approach": "Brief description of the technical approach used"
    }},
    "conceptual_categories": {{
        "primary_category": "The main category this patent falls into",
        "secondary_categories": ["List of additional relevant categories"],
        "confidence_score": "High/Medium/Low confidence in categorization"
    }},
    "av_technology_areas": [
        "List of relevant AV technology areas from: perception_sensing, localization_mapping, path_planning_control, ai_ml_architecture, v2x_communication, safety_validation, simulation_testing, cybersecurity, human_machine_interface, hardware_sensors, software_algorithms, data_processing, vehicle_control_systems, other"
    ]
}}

IMPORTANT: 
- Respond ONLY with valid JSON - no additional text or explanations
- Use the exact field names shown above
- Keep descriptions concise but informative (max 2-3 sentences each)
- For av_technology_areas, select from the provided list only
- If uncertain about a field, use "Not clearly specified" rather than leaving empty
"""
        return prompt
    
    def call_gemini_api(self, prompt: str) -> Optional[Dict[str, Any]]:
        """
        Call Gemini API with robust retry logic that respects API limits
        """
        attempt = 0
        
        while True:
            attempt += 1
            
            try:
                # Always wait before making a request (except first attempt)
                if attempt > 1:
                    time.sleep(self.base_delay)
                
                response = self.client.models.generate_content(
                    model=self.model,
                    contents=prompt
                )
                
                # Parse JSON response
                response_text = response.text.strip()
                
                # Try to extract JSON if wrapped in markdown code blocks
                if "```json" in response_text:
                    start = response_text.find("```json") + 7
                    end = response_text.find("```", start)
                    if end > start:
                        response_text = response_text[start:end].strip()
                elif "```" in response_text:
                    start = response_text.find("```") + 3
                    end = response_text.find("```", start)
                    if end > start:
                        response_text = response_text[start:end].strip()
                
                # Parse JSON
                parsed_response = json.loads(response_text)
                return parsed_response
                
            except json.JSONDecodeError:
                # For JSON decode errors, try a few times then give up
                if attempt >= 3:
                    return None
                time.sleep(5)
                continue
                
            except Exception as e:
                error_str = str(e)
                
                # Handle 429 RESOURCE_EXHAUSTED - respect the API's suggested delay
                if "429" in error_str and "RESOURCE_EXHAUSTED" in error_str:
                    retry_delay = self.extract_retry_delay(error_str)
                    # Add 50% buffer to the suggested delay to be extra safe
                    actual_delay = min(int(retry_delay * 1.5), self.max_delay)
                    print(f"Rate limit hit (429). Waiting {actual_delay} seconds...")
                    time.sleep(actual_delay)
                    continue
                
                # Handle server errors (5xx) - use exponential backoff
                elif any(code in error_str for code in ["500", "502", "503", "504", "520", "521", "522", "523", "524"]):
                    delay = min(self.base_delay * (2 ** min(attempt - 1, 6)), self.max_delay)
                    print(f"Server error (attempt {attempt}). Waiting {delay} seconds...")
                    time.sleep(delay)
                    continue
                
                # Handle client errors (4xx except 429) - these are permanent
                elif any(code in error_str for code in ["400", "401", "403", "404"]):
                    return None
                
                # Unknown errors - try with exponential backoff
                else:
                    if attempt <= 5:
                        delay = min(self.base_delay * (2 ** min(attempt - 1, 4)), self.max_delay)
                        print(f"Unknown error (attempt {attempt}). Waiting {delay} seconds...")
                        time.sleep(delay)
                        continue
                    else:
                        return None
    
    def process_single_patent(self, idx: int, patent_row: pd.Series) -> Optional[Dict[str, Any]]:
        """Process a single patent and return the analysis"""
        lens_id = patent_row.get('lens_id', f'patent_{idx}')
        
        try:
            # Prepare patent text
            patent_text, token_count = self.prepare_patent_text(patent_row)
            
            if not patent_text.strip():
                return None
            
            print(f"Processing patent {idx + 1}: {lens_id} (~{token_count} tokens)")
            
            # Create prompt and call API
            prompt = self.create_analysis_prompt(patent_text)
            analysis_result = self.call_gemini_api(prompt)
            
            if analysis_result is None:
                return None
            
            # Combine original data with analysis
            result = {
                # Original patent metadata
                'lens_id': lens_id,
                'invention_title_text': patent_row.get('invention_title_text', ''),
                'abstract_text': patent_row.get('abstract_text', ''),
                'applicant_name': patent_row.get('applicant_name', ''),
                'date_published': patent_row.get('date_published', ''),
                'earliest_claim_date': patent_row.get('earliest_claim_date', ''),
                'cpc_symbols': patent_row.get('cpc_symbols', []),
                'claims': patent_row.get('claims', []),
                'description': patent_row.get('description', ''),
                
                # Analysis results
                'problem_addressed': analysis_result.get('core_innovation', {}).get('problem_addressed', ''),
                'proposed_solution': analysis_result.get('core_innovation', {}).get('proposed_solution', ''),
                'novelty_aspect': analysis_result.get('core_innovation', {}).get('novelty_aspect', ''),
                'technical_approach': analysis_result.get('core_innovation', {}).get('technical_approach', ''),
                'primary_category': analysis_result.get('conceptual_categories', {}).get('primary_category', ''),
                'secondary_categories': analysis_result.get('conceptual_categories', {}).get('secondary_categories', []),
                'categorization_confidence': analysis_result.get('conceptual_categories', {}).get('confidence_score', ''),
                'av_technology_areas': analysis_result.get('av_technology_areas', []),
                
                # Processing metadata
                'processing_timestamp': datetime.now().isoformat(),
                'estimated_tokens': token_count
            }
            
            return result
            
        except Exception:
            return None
    
    def process_all_patents(self, start_idx: int = 0, batch_size: int = 50):
        """
        Process all patents in the dataset with conservative rate limiting
        
        Args:
            start_idx: Index to start processing from (for resuming)
            batch_size: Save progress every N patents
        """
        if self.data is None:
            self.load_data()
        
        total_patents = len(self.data)
        print(f"Starting to process {total_patents - start_idx} patents (starting from index {start_idx})")
        
        for idx in range(start_idx, total_patents):
            patent_row = self.data.iloc[idx]
            lens_id = patent_row.get('lens_id', f'patent_{idx}')
            
            print(f"Processing patent {idx + 1}/{total_patents}: {lens_id}")
            
            # Process single patent
            result = self.process_single_patent(idx, patent_row)
            
            if result is not None:
                self.processed_data.append(result)
                print(f"✓ Successfully processed patent {idx + 1}/{total_patents}")
            else:
                self.failed_patents.append({
                    'index': idx,
                    'lens_id': lens_id,
                    'reason': 'Processing failed'
                })
                print(f"✗ Failed to process patent {idx + 1}/{total_patents}")
            
            # Always add delay between patents to be conservative
            if idx < total_patents - 1:  # Don't delay after the last patent
                time.sleep(self.base_delay)
            
            # Save progress periodically
            if (idx + 1) % batch_size == 0:
                self.save_progress(f"checkpoint_{idx + 1}")
                print(f"📁 Saved progress at patent {idx + 1}")
        
        # Final save
        self.save_results()
        print(f"Processing complete! Successfully processed {len(self.processed_data)} patents")
        print(f"Failed patents: {len(self.failed_patents)}")
    
    def save_progress(self, filename_suffix: str = ""):
        """Save current progress to files"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        if filename_suffix:
            progress_file = f"{self.output_dir}/processed_patents_{filename_suffix}_{timestamp}.json"
        else:
            progress_file = f"{self.output_dir}/processed_patents_{timestamp}.json"
        
        # Save processed data
        with open(progress_file, 'w', encoding='utf-8') as f:
            json.dump(self.processed_data, f, indent=2, ensure_ascii=False)
        
        # Save failed patents log
        failed_file = f"{self.output_dir}/failed_patents_{timestamp}.json"
        with open(failed_file, 'w', encoding='utf-8') as f:
            json.dump(self.failed_patents, f, indent=2, ensure_ascii=False)
        
        print(f"Progress saved to {progress_file}")
    
    def save_results(self):
        """Save final results as DataFrame and various formats"""
        if not self.processed_data:
            print("No processed data to save")
            return
        
        # Create DataFrame
        df = pd.DataFrame(self.processed_data)
        
        # Save as different formats
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        # CSV
        csv_file = f"{self.output_dir}/patent_analysis_results_{timestamp}.csv"
        df.to_csv(csv_file, index=False, encoding='utf-8')
        
        # Excel
        excel_file = f"{self.output_dir}/patent_analysis_results_{timestamp}.xlsx"
        df.to_excel(excel_file, index=False, engine='openpyxl')
        
        # JSON
        json_file = f"{self.output_dir}/patent_analysis_results_{timestamp}.json"
        df.to_json(json_file, orient='records', indent=2, force_ascii=False)
        
        # Pickle for future use
        pickle_file = f"{self.output_dir}/patent_analysis_results_{timestamp}.pkl"
        df.to_pickle(pickle_file)
        
        print(f"Results saved in multiple formats with timestamp {timestamp}")
        
        # Print summary statistics
        self.print_summary_stats(df)
        
        return df
    
    def print_summary_stats(self, df: pd.DataFrame):
        """Print summary statistics of the processing results"""
        print("\n" + "="*50)
        print("PROCESSING SUMMARY")
        print("="*50)
        print(f"Total patents processed: {len(df)}")
        print(f"Total patents failed: {len(self.failed_patents)}")
        print(f"Success rate: {len(df)/(len(df) + len(self.failed_patents))*100:.1f}%")
        
        if len(df) > 0:
            print(f"\nTop Primary Categories:")
            primary_cats = df['primary_category'].value_counts().head(10)
            for cat, count in primary_cats.items():
                print(f"  {cat}: {count}")
            
            print(f"\nTop AV Technology Areas:")
            # Flatten the list of lists
            all_areas = []
            for areas in df['av_technology_areas']:
                if isinstance(areas, list):
                    all_areas.extend(areas)
            area_counts = pd.Series(all_areas).value_counts().head(10)
            for area, count in area_counts.items():
                print(f"  {area}: {count}")
        
        print("="*50)

In [6]:
# Example usage
if __name__ == "__main__":
    # Initialize pipeline
    pipeline = PatentAnalysisPipeline(
        jsonl_file_path="av_patentdata.jsonl",
        output_dir="patent_analysis_output1"
    )
    
    # Load data
    pipeline.load_data()
    
    # Process all patents (can resume from specific index if needed)
    pipeline.process_all_patents(start_idx=0, batch_size=10)
    
    # Or process a subset for testing
    # pipeline.process_all_patents(start_idx=0, batch_size=5)  # Process first 5 patents

Number of patents in JSON: 493
Number of patents in CSV: 667
Number of missing patents in JSON: 174
Starting to process 174 patents (starting from index 0)
Processing patent 1/174: 196-557-021-684-98X
Processing patent 1: 196-557-021-684-98X (~746 tokens)
✓ Successfully processed patent 1/174
Processing patent 2/174: 097-230-147-819-508
Processing patent 2: 097-230-147-819-508 (~2568 tokens)
✓ Successfully processed patent 2/174
Processing patent 3/174: 165-884-370-654-504
Processing patent 3: 165-884-370-654-504 (~2386 tokens)
✓ Successfully processed patent 3/174
Processing patent 4/174: 175-337-774-629-516
Processing patent 4: 175-337-774-629-516 (~1716 tokens)
✓ Successfully processed patent 4/174
Processing patent 5/174: 185-160-791-286-190
Processing patent 5: 185-160-791-286-190 (~1774 tokens)
✓ Successfully processed patent 5/174
Processing patent 6/174: 158-879-448-169-219
Processing patent 6: 158-879-448-169-219 (~3000 tokens)
✓ Successfully processed patent 6/174
Processing 

In [23]:
import json
import csv

# File paths
json_path = r"C:\Users\Aniket Shinde\Desktop\Main\Jupyter Notebooks\patent_analysis_output\processed_patents_checkpoint_650_20250616_193652.json"
csv_path = "lens-export.csv"

# Read JSON and collect lens_ids
with open(json_path, 'r', encoding='utf-8') as f:
    json_data = json.load(f)
    json_lens_ids = {patent.get("lens_id") for patent in json_data if "lens_id" in patent}

# Read CSV and collect Lens ID column
csv_lens_ids = set()
with open(csv_path, 'r', encoding='utf-8') as f:
    reader = csv.DictReader(f)
    for row in reader:
        lens_id = row.get("Lens ID")
        if lens_id:
            csv_lens_ids.add(lens_id)

# Find missing IDs
missing_in_json = csv_lens_ids - json_lens_ids

# Print results
print(f"Number of patents in JSON: {len(json_lens_ids)}")
print(f"Number of patents in CSV: {len(csv_lens_ids)}")
print(f"Number of missing patents in JSON: {len(missing_in_json)}")
print("Missing Lens IDs:")
for lens_id in sorted(missing_in_json):
    print(lens_id)

Number of patents in JSON: 493
Number of patents in CSV: 667
Number of missing patents in JSON: 174
Missing Lens IDs:
000-152-677-120-075
004-109-111-936-894
004-203-580-589-363
005-666-763-856-839
006-624-306-595-099
007-126-905-721-573
007-846-758-300-992
009-098-732-754-779
009-208-440-609-259
010-090-759-416-162
014-143-781-289-974
014-417-462-061-441
017-138-129-876-230
019-860-269-598-923
019-910-078-190-629
020-498-612-514-202
021-382-421-165-451
021-883-642-933-923
022-953-362-574-29X
025-635-593-546-830
026-040-568-655-473
027-652-790-587-92X
029-248-311-267-194
030-386-153-783-88X
033-164-148-639-82X
035-513-494-375-092
035-961-451-891-408
037-124-820-840-257
038-556-992-896-279
039-521-201-094-074
039-861-614-932-225
041-668-086-786-996
043-605-959-100-979
045-603-964-182-773
046-637-369-926-530
047-148-638-280-176
047-534-075-006-290
047-751-888-209-301
051-651-442-641-868
051-786-376-412-976
053-601-310-222-917
054-801-105-273-208
054-823-927-077-588
056-572-752-855-040
05

In [8]:
import json

json_path = r"C:\Users\Aniket Shinde\Desktop\Main\Jupyter Notebooks\patent_analysis_output1\patent_analysis_results_20250617_092207.json"

with open(json_path, 'r', encoding='utf-8') as f:
    data = json.load(f)
    lens_ids = [patent.get("lens_id") for patent in data if "lens_id" in patent]

print(f"Number of patent objects (lens_ids): {len(lens_ids)}")

Number of patent objects (lens_ids): 174


In [9]:
import json

json_path = r"C:\Users\Aniket Shinde\Desktop\Main\Jupyter Notebooks\patent_analysis_output\processed_patents_checkpoint_650_20250616_193652.json"

with open(json_path, 'r', encoding='utf-8') as f:
    data = json.load(f)
    lens_ids = [patent.get("lens_id") for patent in data if "lens_id" in patent]

print(f"Number of patent objects (lens_ids): {len(lens_ids)}")

Number of patent objects (lens_ids): 493


In [10]:
import json

# Input file paths
file1 = r"C:\Users\Aniket Shinde\Desktop\Main\Jupyter Notebooks\patent_analysis_output\processed_patents_checkpoint_650_20250616_193652.json"
file2 = r"C:\Users\Aniket Shinde\Desktop\Main\Jupyter Notebooks\patent_analysis_output1\patent_analysis_results_20250617_092207.json"

# Output file path
output_file = r"C:\Users\Aniket Shinde\Desktop\Main\Jupyter Notebooks\av_patent_data.json"

# Load both files
with open(file1, 'r', encoding='utf-8') as f1:
    data1 = json.load(f1)

with open(file2, 'r', encoding='utf-8') as f2:
    data2 = json.load(f2)

# Combine them
combined_data = data1 + data2

# Optional: confirm no duplicate lens_ids
lens_ids = [item.get("lens_id") for item in combined_data if "lens_id" in item]
assert len(lens_ids) == len(set(lens_ids)), "Duplicate lens_ids detected!"

# Save to output
with open(output_file, 'w', encoding='utf-8') as out:
    json.dump(combined_data, out, indent=2)

print(f"Combined file saved to: {output_file}")
print(f"Total patents combined: {len(combined_data)}")

Combined file saved to: C:\Users\Aniket Shinde\Desktop\Main\Jupyter Notebooks\av_patent_data.json
Total patents combined: 667


In [11]:
import json
import csv

# File paths
csv_path = "lens-export.csv"
combined_json_path = r"C:\Users\Aniket Shinde\Desktop\Main\Jupyter Notebooks\av_patent_data.json"

# Step 1: Load CSV Lens IDs
csv_lens_ids = set()
with open(csv_path, 'r', encoding='utf-8') as f_csv:
    reader = csv.DictReader(f_csv)
    for row in reader:
        lens_id = row.get("Lens ID")
        if lens_id:
            csv_lens_ids.add(lens_id)

# Step 2: Load Combined JSON Lens IDs
with open(combined_json_path, 'r', encoding='utf-8') as f_json:
    json_data = json.load(f_json)
    json_lens_ids = {patent.get("lens_id") for patent in json_data if "lens_id" in patent}

# Step 3: Compare
missing_ids = csv_lens_ids - json_lens_ids

# Results
if not missing_ids:
    print("✅ All lens IDs in lens-export.csv are present in the combined JSON.")
else:
    print(f"❌ {len(missing_ids)} lens IDs are missing in the combined JSON.")
    print("Missing Lens IDs:")
    for lens_id in sorted(missing_ids):
        print(lens_id)

✅ All lens IDs in lens-export.csv are present in the combined JSON.


In [12]:
import json

# Path to combined JSON file
combined_json_path = r"C:\Users\Aniket Shinde\Desktop\Main\Jupyter Notebooks\av_patent_data.json"

# Load JSON data
with open(combined_json_path, 'r', encoding='utf-8') as f:
    data = json.load(f)

# Sanity check
if not data:
    print("❌ The JSON file is empty.")
else:
    # Print top-level keys of the first patent object
    print("Top-level keys in each patent object:")
    for key in data[0].keys():
        print("-", key)

Top-level keys in each patent object:
- lens_id
- invention_title_text
- abstract_text
- applicant_name
- date_published
- earliest_claim_date
- cpc_symbols
- claims
- description
- problem_addressed
- proposed_solution
- novelty_aspect
- technical_approach
- primary_category
- secondary_categories
- categorization_confidence
- av_technology_areas
- processing_timestamp
- estimated_tokens
