In [3]:
import pandas as pd
import numpy as np
import requests
import xml.etree.ElementTree as ET
import gzip
from io import BytesIO
import os
import concurrent.futures
import ipywidgets as widgets
from IPython.display import display, clear_output
from tqdm.notebook import tqdm
import warnings
import gc
import csv
warnings.filterwarnings('ignore')


# Worker function (Must be outside)
def parse_gz_sitemap_worker(gz_url):
    try:
        response = requests.get(gz_url, timeout=60, stream=True)
        if response.status_code != 200:
            return []
        with gzip.GzipFile(fileobj=BytesIO(response.content)) as gz:
            xml_content = gz.read()
        root = ET.fromstring(xml_content)
        ns = {'sm': 'http://www.sitemaps.org/schemas/sitemap/0.9'}
        urls_data = []
        for url_elem in root.findall('sm:url', ns):
            loc = url_elem.find('sm:loc', ns)
            lastmod = url_elem.find('sm:lastmod', ns)
            if loc is not None:
                urls_data.append({
                    'url': loc.text,
                    'last_modified': lastmod.text if lastmod is not None else None
                })
        return urls_data
    except:
        return []


def detect_stale_pages_bouncer(log_folder_path, sitemap_urls_text, output_dir='stale_pages_output', 
                               max_records_per_file=500000, max_gz_per_index=None,
                               log_batch_size=5, workers=10):
    
    os.makedirs(output_dir, exist_ok=True)
    sitemap_urls = [url.strip() for url in sitemap_urls_text.replace(',', '\n').split('\n') if url.strip()]
    
    if not sitemap_urls:
        print("‚ùå No sitemap URLs provided")
        return None

    print("="*80)
    print(f"üï∫ STALE PAGE DETECTION: THE PARTY BOUNCER (Parallel: {workers} workers)")
    print("="*80)
    print(f"üìÅ Log folder: {log_folder_path}")
    print(f"üó∫Ô∏è  Sitemap indexes: {len(sitemap_urls)}")
    
    # ============================================================================
    # STEP 1: BUILD THE GUEST LIST (LOGS)
    # ============================================================================
    print("\nüìñ Step 1: Building the Guest List (Processing Logs)...")
    
    if not os.path.exists(log_folder_path):
        print(f"‚ùå Folder not found: {log_folder_path}")
        return None

    log_files = [f for f in os.listdir(log_folder_path) if f.endswith('.csv')]
    
    if not log_files:
        print("‚ùå No CSV files found.")
        return None

    all_log_stats = []
    
    for i in tqdm(range(0, len(log_files), log_batch_size), desc="   Processing Logs"):
        batch_files = log_files[i:i+log_batch_size]
        dfs = []
        for file in batch_files:
            try:
                df = pd.read_csv(os.path.join(log_folder_path, file), 
                               usecols=['request_uri', 'http_user_agent', 'time_iso8601', 'status'], 
                               encoding='utf-8-sig', low_memory=False)
                dfs.append(df)
            except: continue
            
        if dfs:
            batch_df = pd.concat(dfs)
            batch_df['url'] = 'https://www.alamy.com' + batch_df['request_uri'].str.split('?').str[0].fillna('')
            batch_df['timestamp'] = pd.to_datetime(batch_df['time_iso8601'], errors='coerce')
            
            min_date = batch_df['timestamp'].min()
            max_date = batch_df['timestamp'].max()
            days = (max_date - min_date).days + 1 if pd.notnull(max_date) else 1
            
            batch_stats = batch_df.groupby('url').agg({
                'request_uri': 'count',
                'http_user_agent': lambda x: x.mode()[0] if len(x.mode()) > 0 else x.iloc[0],
                'status': lambda x: x.mode()[0] if len(x.mode()) > 0 else 200
            }).reset_index()
            batch_stats.columns = ['url', 'crawl_count', 'user_agent', 'status_code']
            batch_stats['days_active'] = days
            
            all_log_stats.append(batch_stats)
            del batch_df, dfs
            gc.collect()

    if not all_log_stats:
        print("‚ùå No log data processed.")
        return None

    print("   ‚îú‚îÄ Finalizing Guest List...")
    full_log_stats = pd.concat(all_log_stats).groupby('url').agg({
        'crawl_count': 'sum',
        'user_agent': 'first',
        'status_code': 'first',
        'days_active': 'max'
    }).reset_index()
    
    full_log_stats['crawl_frequency'] = full_log_stats['crawl_count'] / full_log_stats['days_active']
    full_log_stats['authority_score'] = (full_log_stats['crawl_count'] * 0.7 + 
                                         full_log_stats['crawl_frequency'] * full_log_stats['days_active'] * 0.3)
    
    threshold = full_log_stats['crawl_count'].quantile(0.10)
    print(f"   ‚îú‚îÄ Bottom 10% Threshold: <= {threshold} crawls")
    
    invited_guests = set(full_log_stats['url'])
    wallflowers_df = full_log_stats[full_log_stats['crawl_count'] <= threshold]
    wallflowers = set(wallflowers_df['url'])
    
    print(f"‚úÖ Guest List Ready: {len(invited_guests):,} total, {len(wallflowers):,} low activity")

    # ============================================================================
    # STEP 2: AT THE GATE (SITEMAP PARSING)
    # ============================================================================
    print("\nüì• Step 2: Checking Sitemaps at the Gate (Parallel)...")
    
    def parse_sitemap_index(index_url):
        try:
            response = requests.get(index_url, timeout=60)
            root = ET.fromstring(response.content)
            ns = {'sm': 'http://www.sitemaps.org/schemas/sitemap/0.9'}
            return [loc.text for loc in root.findall('sm:sitemap/sm:loc', ns)]
        except:
            return []

    stale_party_list = []
    total_processed_urls = 0
    
    def check_at_gate(batch_df):
        batch_df['is_invited'] = batch_df['url'].isin(invited_guests)
        batch_df['is_wallflower'] = batch_df['url'].isin(wallflowers)
        
        orphans = batch_df[~batch_df['is_invited']].copy()
        orphans['page_type'] = 'Orphan'
        orphans['crawl_count'] = 0
        orphans['authority_score'] = 0.0
        orphans['crawl_frequency'] = 0.0
        orphans['user_agent'] = 'Not Crawled'
        orphans['status_code'] = 0  # NEW: Orphans = never crawled
        
        low_act = batch_df[batch_df['is_wallflower']].copy()
        low_act['page_type'] = 'Low Activity'
        
        if not low_act.empty:
            low_act = low_act.merge(
                full_log_stats[['url', 'crawl_count', 'authority_score', 'crawl_frequency', 'user_agent', 'status_code']], 
                on='url', how='left'
            )
            low_act['status_code'] = low_act['status_code'].fillna(200)  # Default 200 for crawled pages
        
        return pd.concat([orphans, low_act])

    for idx_num, index_url in enumerate(sitemap_urls, 1):
        print(f"\n   ‚îú‚îÄ Index {idx_num}/{len(sitemap_urls)}: {index_url.split('/')[-1]}")
        gz_urls = parse_sitemap_index(index_url)
        if not gz_urls: continue
        if max_gz_per_index: gz_urls = gz_urls[:max_gz_per_index]
            
        print(f"   ‚îÇ  ‚îú‚îÄ Spawning {workers} workers for {len(gz_urls)} files...")
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
            futures = {executor.submit(parse_gz_sitemap_worker, url): url for url in gz_urls}
            batch_results = []
            
            for future in tqdm(concurrent.futures.as_completed(futures), total=len(gz_urls), desc="   ‚îÇ  ‚îî‚îÄ Parsing", leave=False):
                result = future.result()
                if result: batch_results.extend(result)
                
                if len(batch_results) > 200000:
                    batch_df = pd.DataFrame(batch_results)
                    total_processed_urls += len(batch_df)
                    processed_batch = check_at_gate(batch_df)
                    if not processed_batch.empty:
                        stale_party_list.append(processed_batch)
                    batch_results = []
                    del batch_df
                    gc.collect()
            
            if batch_results:
                batch_df = pd.DataFrame(batch_results)
                total_processed_urls += len(batch_df)
                processed_batch = check_at_gate(batch_df)
                if not processed_batch.empty:
                    stale_party_list.append(processed_batch)
                del batch_df
                gc.collect()

    # ============================================================================
    # STEP 3: SAVE OUTPUT (CSV with Proper Quoting)
    # ============================================================================
    print("\nüíæ Step 3: Consolidating and Saving...")
    
    if not stale_party_list:
        print("‚ùå No stale pages found.")
        return None
        
    final_df = pd.concat(stale_party_list, ignore_index=True)
    
    current_date = pd.Timestamp.now()
    final_df['last_modified'] = pd.to_datetime(final_df['last_modified'], errors='coerce')
    final_df['days_since_modified'] = (current_date - final_df['last_modified']).dt.days
    
    final_df['priority_score'] = 0
    final_df.loc[final_df['days_since_modified'] > 180, 'priority_score'] = 100
    final_df.loc[(final_df['days_since_modified'] > 90) & (final_df['days_since_modified'] <= 180), 'priority_score'] = 70
    final_df.loc[final_df['days_since_modified'] <= 90, 'priority_score'] = 40
    
    final_df = final_df.sort_values(['page_type', 'priority_score'], ascending=[False, False])
    
    # UPDATED: Added status_code, renamed status to page_type
    output_cols = ['url', 'crawl_count', 'authority_score', 'crawl_frequency', 'user_agent', 
                   'status_code', 'page_type', 'priority_score', 'days_since_modified', 'last_modified']
    
    final_cols = [c for c in output_cols if c in final_df.columns]
    final_df = final_df[final_cols]
    
    if len(final_df) <= max_records_per_file:
        output_path = os.path.join(output_dir, 'stale_pages.csv')
        final_df.to_csv(output_path, index=False, encoding='utf-8-sig', quoting=csv.QUOTE_ALL)
        print(f"   ‚îî‚îÄ Saved: {output_path}")
    else:
        num_parts = (len(final_df) // max_records_per_file) + 1
        for i in range(num_parts):
            start_idx = i * max_records_per_file
            end_idx = min((i + 1) * max_records_per_file, len(final_df))
            part_df = final_df.iloc[start_idx:end_idx]
            output_path = os.path.join(output_dir, f'stale_pages_part{i+1}.csv')
            part_df.to_csv(output_path, index=False, encoding='utf-8-sig', quoting=csv.QUOTE_ALL)
        print(f"   ‚îî‚îÄ Saved {num_parts} files")

    print("\n" + "="*80)
    print("üìä FINAL STATS")
    print("="*80)
    print(f"‚úÖ Total Analyzed: {total_processed_urls:,}")
    print(f"‚úÖ Stale Found: {len(final_df):,}")
    print(f"   ‚Ä¢ Orphans: {len(final_df[final_df['page_type']=='Orphan']):,}")
    print(f"   ‚Ä¢ Low Activity: {len(final_df[final_df['page_type']=='Low Activity']):,}")
    print(f"üìÅ Output Location: {os.path.abspath(output_dir)}")
    print("="*80)
    
    return final_df


# UI Widgets
log_folder_input = widgets.Text(value='', placeholder='D:\\path\\to\\log\\files', description='Log Folder:', layout=widgets.Layout(width='600px'))
sitemap_input = widgets.Textarea(value='https://www.alamy.com/sitemaps/image_daily_index_s_1_10000000.xml', placeholder='Enter sitemap URLs', description='Sitemap URLs:', layout=widgets.Layout(width='600px', height='150px'))
test_mode_checkbox = widgets.Checkbox(value=True, description='Test Mode')
log_batch_input = widgets.IntText(value=5, description='Log Batch:')
gz_batch_input = widgets.IntText(value=20, description='Workers:')
run_button = widgets.Button(description='üöÄ Run Party Bouncer', button_style='success', icon='check', layout=widgets.Layout(width='300px'))
output_area = widgets.Output()


def on_run_clicked(b):
    with output_area:
        clear_output()
        max_gz = 10 if test_mode_checkbox.value else None
        detect_stale_pages_bouncer(
            log_folder_input.value, 
            sitemap_input.value, 
            max_gz_per_index=max_gz, 
            log_batch_size=log_batch_input.value, 
            workers=gz_batch_input.value
        )


run_button.on_click(on_run_clicked)
display(widgets.VBox([
    widgets.HTML("<h2>üï∫ Stale Page Detection: Party Bouncer (Updated)</h2>"), 
    log_folder_input, sitemap_input, 
    widgets.HBox([log_batch_input, gz_batch_input]), 
    test_mode_checkbox, run_button, output_area
]))


VBox(children=(HTML(value='<h2>üï∫ Stale Page Detection: Party Bouncer (Updated)</h2>'), Text(value='', descript‚Ä¶