In [10]:
import os
import time
import csv
from pathlib import Path
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
import chromedriver_autoinstaller

# CONFIG
BASE_URL = "https://indiankanoon.org"
BROWSE_URL = f"{BASE_URL}/browse/"
DOWNLOAD_DIR = os.path.join(os.getcwd(), "indiankanoon-case-dump")
YEARS = list(range(1946, 2026))
USERNAME = "sabu.s.alan@gmail.com"
PASSWORD = "Alan@123"

chromedriver_autoinstaller.install()
Path(DOWNLOAD_DIR).mkdir(parents=True, exist_ok=True)

# Setup Chrome
options = Options()
options.add_argument("--start-maximized")
options.add_experimental_option("prefs", {"download.default_directory": DOWNLOAD_DIR})
driver = webdriver.Chrome(options=options)
wait = WebDriverWait(driver, 20)

def login():
    driver.get(f"{BASE_URL}/members/login/?nextpage=/")
    wait.until(EC.presence_of_element_located((By.NAME, "email"))).send_keys(USERNAME)
    driver.find_element(By.NAME, "passwd").send_keys(PASSWORD)
    driver.find_element(By.XPATH, "//input[@type='submit']").click()
    time.sleep(2)

def get_court_category_links():
    driver.get(BROWSE_URL)
    soup = BeautifulSoup(driver.page_source, 'html.parser')
    info_blocks = soup.find_all('div', class_='info_indian_kanoon')
    court_links = {}
    for block in info_blocks:
        for link in block.find_all('a'):
            label = link.text.strip()
            court_links[label] = BASE_URL + link['href']
    return court_links

def get_case_text(url):
    try:
        driver.get(url)
        soup = BeautifulSoup(driver.page_source, 'html.parser')
        content = soup.find('div', class_='judgments') or soup.find('pre')
        return content.get_text(separator='\n', strip=True) if content else "N/A"
    except Exception:
        return "N/A"

def get_cases_by_year(court_url, year):
    all_cases = []
    page = 0
    while True:
        paginated_url = f"{court_url}?year={year}&p={page}"
        driver.get(paginated_url)
        soup = BeautifulSoup(driver.page_source, 'html.parser')
        results = soup.find_all('div', class_='result')
        if not results:
            break

        for div in results:
            title_tag = div.find("a")
            if not title_tag:
                continue
            title = title_tag.text.strip()
            url = BASE_URL + title_tag['href']
            snippet = div.find("p", class_="snippet")
            snippet_text = snippet.text.strip() if snippet else ""
            date_span = div.find("span", class_="result_date")
            date_text = date_span.text.strip() if date_span else ""

            print(f"   📄 {year} | {title}")
            full_text = get_case_text(url)

            all_cases.append({
                "Title": title,
                "URL": url,
                "Snippet": snippet_text,
                "Date": date_text,
                "Full_Text": full_text
            })
        page += 1
        time.sleep(0.5)
    return all_cases

def save_to_csv(court_name, cases):
    if not cases:
        return
    safe_name = court_name.replace(" ", "_").replace("/", "-")
    file_path = os.path.join(DOWNLOAD_DIR, f"{safe_name}.csv")
    with open(file_path, "w", newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=["Title", "URL", "Snippet", "Date", "Full_Text"])
        writer.writeheader()
        writer.writerows(cases)

def scrape_all():
    login()
    court_links = get_court_category_links()
    print("✅ Found court categories:", list(court_links.keys()))

    for court_name, court_url in court_links.items():
        print(f"\n📂 Scraping {court_name}")
        court_cases = []
        for year in YEARS:
            yearly_cases = get_cases_by_year(court_url, year)
            if yearly_cases:
                court_cases.extend(yearly_cases)
        print(f"   ✅ Total {len(court_cases)} cases fetched.")
        save_to_csv(court_name, court_cases)

# RUN
scrape_all()
driver.quit()


✅ Found court categories: ['Supreme Court of India', 'Supreme Court - Daily Orders', 'Allahabad High Court', 'Andhra HC (Pre-Telangana)', 'Andhra Pradesh High Court - Amravati', 'Bombay High Court', 'Calcutta High Court', 'Calcutta High Court (Appellete Side)', 'Chattisgarh High Court', 'Delhi High Court', 'Delhi High Court - Orders', 'Gauhati High Court', 'Gujarat High Court', 'Himachal Pradesh High Court', 'Jammu & Kashmir High Court', 'Jammu & Kashmir High Court - Srinagar Bench', 'Jharkhand High Court', 'Karnataka High Court', 'Kerala High Court', 'Madhya Pradesh High Court', 'Manipur High Court', 'Meghalaya High Court', 'Madras High Court', 'Orissa High Court', 'Patna High Court', 'Patna High Court - Orders', 'Punjab-Haryana High Court', 'Rajasthan High Court - Jaipur', 'Rajasthan High Court - Jodhpur', 'Sikkim High Court', 'Uttarakhand High Court', 'Tripura High Court', 'Telangana High Court', 'Delhi District Court', 'Bangalore District Court', 'Appellate Tribunal For Electricity

KeyboardInterrupt: 

In [2]:
import os
import time
import csv
import json
import logging
from datetime import datetime
from pathlib import Path
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from bs4 import BeautifulSoup
import chromedriver_autoinstaller
import argparse
import random

class IndianKanoonScraper:
    def __init__(self, config):
        # Configuration
        self.base_url = config.get('base_url', "https://indiankanoon.org")
        self.browse_url = f"{self.base_url}/browse/"
        self.download_dir = config.get('download_dir', os.path.join(os.getcwd(), "indiankanoon-case-dump"))
        self.start_year = config.get('start_year', 1946)
        self.end_year = config.get('end_year', datetime.now().year)
        self.username = config.get('username')
        self.password = config.get('password')
        self.delay_min = config.get('delay_min', 1)
        self.delay_max = config.get('delay_max', 3)
        self.checkpoint_file = os.path.join(self.download_dir, "checkpoint.json")
        self.checkpoint_interval = config.get('checkpoint_interval', 10)
        self.retry_limit = config.get('retry_limit', 3)
        self.include_full_text = config.get('include_full_text', True)
        
        # Setup logging
        log_file = os.path.join(self.download_dir, "scraper.log")
        self.setup_logging(log_file)
        
        # Setup directories
        Path(self.download_dir).mkdir(parents=True, exist_ok=True)
        
        # Checkpoint data
        self.checkpoint = self.load_checkpoint()
        
        # Stats
        self.stats = {
            "cases_scraped": 0,
            "courts_completed": 0,
            "years_completed": 0,
            "errors": 0,
            "start_time": time.time(),
        }
        
        # Initialize browser
        self._setup_browser()
    
    def setup_logging(self, log_file):
        """Set up logging to both file and console"""
        self.logger = logging.getLogger('indiankanoon_scraper')
        self.logger.setLevel(logging.INFO)
        
        # File handler
        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
        self.logger.addHandler(file_handler)
        
        # Console handler
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
        self.logger.addHandler(console_handler)
    
    def _setup_browser(self):
        """Set up the Chrome browser with appropriate options"""
        self.logger.info("Setting up Chrome browser")
        chromedriver_autoinstaller.install()
        
        options = Options()
        options.add_argument("--start-maximized")
        options.add_argument("--disable-notifications")
        options.add_argument("--disable-popup-blocking")
        # Uncomment the next line for headless mode if needed
        # options.add_argument("--headless")
        options.add_experimental_option("prefs", {
            "download.default_directory": self.download_dir,
            "download.prompt_for_download": False,
            "plugins.always_open_pdf_externally": True
        })
        
        self.driver = webdriver.Chrome(options=options)
        self.wait = WebDriverWait(self.driver, 20)
    
    def load_checkpoint(self):
        """Load checkpoint from file if exists"""
        if os.path.exists(self.checkpoint_file):
            try:
                with open(self.checkpoint_file, 'r') as f:
                    checkpoint = json.load(f)
                self.logger.info(f"Loaded checkpoint: {checkpoint}")
                return checkpoint
            except Exception as e:
                self.logger.error(f"Error loading checkpoint: {e}")
        
        # Default checkpoint data
        return {
            "courts_completed": [],
            "current_court": None,
            "current_court_years_completed": [],
            "current_year": None,
            "current_year_page": 0
        }
    
    def save_checkpoint(self):
        """Save current progress to checkpoint file"""
        try:
            with open(self.checkpoint_file, 'w') as f:
                json.dump(self.checkpoint, f)
            self.logger.info(f"Checkpoint saved: {self.checkpoint}")
        except Exception as e:
            self.logger.error(f"Error saving checkpoint: {e}")
    
    def random_delay(self):
        """Sleep for a random time to avoid detection"""
        delay = random.uniform(self.delay_min, self.delay_max)
        time.sleep(delay)
    
    def login(self):
        """Log into Indian Kanoon"""
        if not (self.username and self.password):
            self.logger.warning("No login credentials provided, skipping login")
            return False
            
        try:
            self.logger.info("Logging in...")
            self.driver.get(f"{self.base_url}/members/login/?nextpage=/")
            
            # Wait for login form
            self.wait.until(EC.presence_of_element_located((By.NAME, "email")))
            
            # Enter credentials
            self.driver.find_element(By.NAME, "email").send_keys(self.username)
            self.driver.find_element(By.NAME, "passwd").send_keys(self.password)
            
            # Submit form
            self.driver.find_element(By.XPATH, "//input[@type='submit']").click()
            
            # Wait for redirect
            self.random_delay()
            
            # Check if login was successful
            if "members/login" in self.driver.current_url:
                self.logger.error("Login failed. Check credentials.")
                return False
                
            self.logger.info("Login successful")
            return True
            
        except Exception as e:
            self.logger.error(f"Error during login: {e}")
            return False
    
    def get_court_category_links(self):
        """Get links to all courts"""
        self.logger.info("Getting court category links")
        
        # Retry logic for robustness
        for attempt in range(self.retry_limit):
            try:
                self.driver.get(self.browse_url)
                self.random_delay()
                
                soup = BeautifulSoup(self.driver.page_source, 'html.parser')
                info_blocks = soup.find_all('div', class_='info_indian_kanoon')
                
                court_links = {}
                for block in info_blocks:
                    for link in block.find_all('a'):
                        label = link.text.strip()
                        if label:  # Ensure label is not empty
                            court_links[label] = self.base_url + link['href']
                
                self.logger.info(f"Found {len(court_links)} court categories")
                return court_links
                
            except Exception as e:
                self.logger.error(f"Error getting court links (attempt {attempt+1}): {e}")
                self.random_delay()
                
        # If we get here, all attempts failed
        self.logger.critical("Failed to get court categories after multiple attempts")
        return {}
    
    def get_case_text(self, url):
        """Extract full text of a case"""
        if not self.include_full_text:
            return "Full text extraction disabled"
            
        self.logger.debug(f"Getting case text from {url}")
        
        for attempt in range(self.retry_limit):
            try:
                self.driver.get(url)
                self.random_delay()
                
                soup = BeautifulSoup(self.driver.page_source, 'html.parser')
                
                # Try different selectors for the judgment text
                content_selectors = [
                    ('div', {'class': 'judgments'}),
                    ('pre', {}),
                    ('div', {'class': 'docsnippet'}),
                    ('div', {'id': 'doc_content'})
                ]
                
                for tag, attrs in content_selectors:
                    content = soup.find(tag, attrs)
                    if content:
                        return content.get_text(separator='\n', strip=True)
                
                # If no content found with any selector
                self.logger.warning(f"No content found for {url}")
                return "No content found"
                
            except Exception as e:
                self.logger.error(f"Error getting case text (attempt {attempt+1}): {e}")
                self.random_delay()
        
        # If all attempts failed
        self.stats["errors"] += 1
        return "Error retrieving content"
    
    def extract_doc_id(self, url):
        """Extract the document ID from a URL"""
        # URLs are typically in format: /doc/123456/
        parts = url.strip('/').split('/')
        for i, part in enumerate(parts):
            if part == 'doc' and i+1 < len(parts):
                return parts[i+1]
        return None
    
    def get_cases_by_year_and_page(self, court_url, year, page):
        """Get cases for a specific court, year and page"""
        cases = []
        paginated_url = f"{court_url}?year={year}&p={page}"
        self.logger.info(f"Scraping: {paginated_url}")
        
        for attempt in range(self.retry_limit):
            try:
                self.driver.get(paginated_url)
                self.random_delay()
                
                soup = BeautifulSoup(self.driver.page_source, 'html.parser')
                results = soup.find_all('div', class_='result')
                
                if not results:
                    self.logger.info(f"No results found for {court_url} in {year} page {page}")
                    return [], False  # No more pages
                
                for div in results:
                    try:
                        title_tag = div.find("a")
                        if not title_tag:
                            continue
                            
                        title = title_tag.text.strip()
                        url = self.base_url + title_tag['href']
                        
                        # Extract docid from URL
                        docid = self.extract_doc_id(title_tag['href'])
                        
                        # Extract snippet
                        snippet = div.find("p", class_="snippet")
                        snippet_text = snippet.text.strip() if snippet else ""
                        
                        # Extract date
                        date_span = div.find("span", class_="result_date")
                        date_text = date_span.text.strip() if date_span else ""
                        
                        # Get court name from the result
                        court_span = div.find("span", class_="docsource")
                        court_name = court_span.text.strip() if court_span else ""
                        
                        self.logger.info(f"Found case: {title} ({docid})")
                        
                        case_data = {
                            "DocID": docid,
                            "Title": title,
                            "URL": url,
                            "Snippet": snippet_text,
                            "Date": date_text,
                            "Year": year,
                            "Court": court_name
                        }
                        
                        # Get full text if enabled
                        if self.include_full_text:
                            self.logger.debug(f"Fetching full text for case {docid}")
                            case_data["Full_Text"] = self.get_case_text(url)
                        
                        cases.append(case_data)
                        self.stats["cases_scraped"] += 1
                        
                    except Exception as e:
                        self.logger.error(f"Error processing case: {e}")
                        self.stats["errors"] += 1
                
                return cases, True  # More pages may exist
                
            except Exception as e:
                self.logger.error(f"Error scraping page (attempt {attempt+1}): {e}")
                self.random_delay()
        
        # If all attempts failed
        self.stats["errors"] += 1
        return [], False
    
    def process_court_year(self, court_name, court_url, year):
        """Process all pages for a specific court and year"""
        self.logger.info(f"Processing {court_name} for year {year}")
        
        # Set checkpoint data
        self.checkpoint["current_court"] = court_name
        self.checkpoint["current_year"] = year
        
        # Start from the saved page or 0
        start_page = self.checkpoint.get("current_year_page", 0)
        
        page = start_page
        all_cases = []
        has_more_pages = True
        
        while has_more_pages:
            self.checkpoint["current_year_page"] = page
            self.save_checkpoint()
            
            cases, has_more_pages = self.get_cases_by_year_and_page(court_url, year, page)
            all_cases.extend(cases)
            
            # Save periodically
            if page % self.checkpoint_interval == 0 and all_cases:
                safe_court_name = court_name.replace(" ", "_").replace("/", "-")
                self.save_to_csv(all_cases, f"{safe_court_name}_{year}_partial_{page}")
                
                # Log progress
                elapsed = time.time() - self.stats["start_time"]
                self.logger.info(f"Progress: {len(all_cases)} cases scraped for {court_name} {year} (elapsed: {elapsed:.1f}s)")
            
            # Move to next page if more exist
            if has_more_pages:
                page += 1
            else:
                break
        
        # Reset page counter for next year
        self.checkpoint["current_year_page"] = 0
        
        # Mark year as completed
        if year not in self.checkpoint["current_court_years_completed"]:
            self.checkpoint["current_court_years_completed"].append(year)
            self.stats["years_completed"] += 1
        
        self.save_checkpoint()
        
        # Save final results for this year
        if all_cases:
            safe_court_name = court_name.replace(" ", "_").replace("/", "-")
            self.save_to_csv(all_cases, f"{safe_court_name}_{year}")
            self.logger.info(f"Completed {court_name} for {year}: {len(all_cases)} cases")
        else:
            self.logger.info(f"No cases found for {court_name} in {year}")
        
        return all_cases
    
    def process_court(self, court_name, court_url):
        """Process all years for a specific court"""
        self.logger.info(f"Processing court: {court_name}")
        
        # Skip if court already completed
        if court_name in self.checkpoint["courts_completed"]:
            self.logger.info(f"Skipping completed court: {court_name}")
            return []
        
        # Set current court in checkpoint
        self.checkpoint["current_court"] = court_name
        if not self.checkpoint.get("current_court_years_completed"):
            self.checkpoint["current_court_years_completed"] = []
        
        all_court_cases = []
        
        # Process each year
        years_to_process = range(self.start_year, self.end_year + 1)
        for year in years_to_process:
            # Skip if year already completed for this court
            if year in self.checkpoint["current_court_years_completed"]:
                self.logger.info(f"Skipping completed year {year} for {court_name}")
                continue
                
            yearly_cases = self.process_court_year(court_name, court_url, year)
            all_court_cases.extend(yearly_cases)
        
        # Mark court as completed
        self.checkpoint["courts_completed"].append(court_name)
        self.checkpoint["current_court_years_completed"] = []
        self.save_checkpoint()
        
        self.stats["courts_completed"] += 1
        
        # Save all court results
        if all_court_cases:
            safe_court_name = court_name.replace(" ", "_").replace("/", "-")
            self.save_to_csv(all_court_cases, f"{safe_court_name}_all")
            self.logger.info(f"Completed court {court_name}: {len(all_court_cases)} total cases")
        
        return all_court_cases
    
    def save_to_csv(self, cases, filename_prefix):
        """Save case data to CSV file"""
        if not cases:
            self.logger.warning(f"No cases to save for {filename_prefix}")
            return
            
        # Create a clean filename
        safe_name = filename_prefix.replace(" ", "_").replace("/", "-")
        file_path = os.path.join(self.download_dir, f"{safe_name}.csv")
        
        self.logger.info(f"Saving {len(cases)} cases to {file_path}")
        
        try:
            with open(file_path, "w", newline='', encoding='utf-8') as f:
                # Get all possible keys from all dictionaries
                fieldnames = set()
                for case in cases:
                    fieldnames.update(case.keys())
                    
                writer = csv.DictWriter(f, fieldnames=sorted(list(fieldnames)))
                writer.writeheader()
                writer.writerows(cases)
                
            self.logger.info(f"Successfully saved to {file_path}")
            
        except Exception as e:
            self.logger.error(f"Error saving to CSV: {e}")
            
            # Try saving as JSON as backup
            backup_path = os.path.join(self.download_dir, f"{safe_name}_backup.json")
            try:
                with open(backup_path, 'w', encoding='utf-8') as f:
                    json.dump(cases, f, ensure_ascii=False, indent=2)
                self.logger.info(f"Backup saved to {backup_path}")
            except Exception as e2:
                self.logger.critical(f"Failed to save backup: {e2}")
    
    def print_stats(self):
        """Print statistics about the scraping session"""
        elapsed = time.time() - self.stats["start_time"]
        hours, remainder = divmod(elapsed, 3600)
        minutes, seconds = divmod(remainder, 60)
        
        self.logger.info("-" * 40)
        self.logger.info("SCRAPING STATISTICS")
        self.logger.info("-" * 40)
        self.logger.info(f"Total cases scraped: {self.stats['cases_scraped']}")
        self.logger.info(f"Courts completed: {self.stats['courts_completed']}")
        self.logger.info(f"Years completed: {self.stats['years_completed']}")
        self.logger.info(f"Errors encountered: {self.stats['errors']}")
        self.logger.info(f"Total time: {int(hours)}h {int(minutes)}m {int(seconds)}s")
        self.logger.info(f"Cases per minute: {(self.stats['cases_scraped'] / (elapsed/60)):.2f}")
        self.logger.info("-" * 40)
    
    def scrape_all(self):
        """Main function to scrape all courts and years"""
        self.logger.info("Starting comprehensive scraping of Indian Kanoon")
        self.login()
        
        court_links = self.get_court_category_links()
        if not court_links:
            self.logger.critical("Failed to get court links. Exiting.")
            return
        
        self.logger.info(f"Will scrape {len(court_links)} courts from {self.start_year} to {self.end_year}")
        
        # Process each court
        for court_name, court_url in court_links.items():
            self.process_court(court_name, court_url)
        
        self.logger.info("Scraping completed!")
        self.print_stats()
        self.driver.quit()

def parse_args():
    """Parse command line arguments"""
    parser = argparse.ArgumentParser(description="Comprehensive Indian Kanoon Scraper")
    parser.add_argument("--username", "-u", help="Indian Kanoon username")
    parser.add_argument("--password", "-p", help="Indian Kanoon password")
    parser.add_argument("--output", "-o", default="indiankanoon-case-dump", help="Output directory")
    parser.add_argument("--start-year", "-s", type=int, default=1946, help="Start year")
    parser.add_argument("--end-year", "-e", type=int, default=datetime.now().year, help="End year")
    parser.add_argument("--no-full-text", action="store_true", help="Skip downloading full text of cases")
    parser.add_argument("--delay-min", type=float, default=1.0, help="Minimum delay between requests")
    parser.add_argument("--delay-max", type=float, default=3.0, help="Maximum delay between requests")
    parser.add_argument("--checkpoint-interval", type=int, default=10, help="Save checkpoint every N pages")
    parser.add_argument("--retry-limit", type=int, default=3, help="Number of retry attempts for failed requests")
    return parser.parse_args()

if __name__ == "__main__":
    args = parse_args()
    
    # Set up configuration
    config = {
        'base_url': "https://indiankanoon.org",
        'download_dir': args.output,
        'username': args.username,
        'password': args.password,
        'start_year': args.start_year,
        'end_year': args.end_year,
        'include_full_text': not args.no_full_text,
        'delay_min': args.delay_min,
        'delay_max': args.delay_max,
        'checkpoint_interval': args.checkpoint_interval,
        'retry_limit': args.retry_limit
    }
    
    # Initialize and run scraper
    scraper = IndianKanoonScraper(config)
    scraper.scrape_all()

usage: ipykernel_launcher.py [-h] [--username USERNAME] [--password PASSWORD]
                             [--output OUTPUT] [--start-year START_YEAR]
                             [--end-year END_YEAR] [--no-full-text]
                             [--delay-min DELAY_MIN] [--delay-max DELAY_MAX]
                             [--checkpoint-interval CHECKPOINT_INTERVAL]
                             [--retry-limit RETRY_LIMIT]
ipykernel_launcher.py: error: unrecognized arguments: --f=c:\Users\alene\AppData\Roaming\jupyter\runtime\kernel-v3d0c322dd4788d27bf559ead74fc8b2a189f7a15d.json


SystemExit: 2