In [28]:
import pandas as pd
import requests
import time
import re
from datetime import datetime, timedelta
import os

In [29]:
"""
Red Alert ETL Pipeline

This script implements an ETL (Extract, Transform, Load) pipeline for processing
Israeli Red Alert (air raid siren) data. It fetches data from the Oref API,
adds geographic region information, and filters by specific regions and dates."""


'\nRed Alert ETL Pipeline\n\nThis script implements an ETL (Extract, Transform, Load) pipeline for processing\nIsraeli Red Alert (air raid siren) data. It fetches data from the Oref API,\nadds geographic region information, and filters by specific regions and dates.'

In [30]:
def fetch_high_volume_days(start_date, end_date, hours_per_chunk=6):
    """
    Special function to fetch data for high-volume days like October 7-8, 2023
    using very small time chunks to avoid API limitations.
    
    Args:
        start_date (datetime): Start date
        end_date (datetime): End date  
        hours_per_chunk (int): Hours per chunk (default: 6 hours)
        
    Returns:
        DataFrame: Combined DataFrame with all alerts from the high-volume period
    """
    all_alerts = []
    all_unique_locations = set()
    
    # Create hourly chunks for these critical days
    current_time = start_date
    chunk_times = []
    
    while current_time < end_date:
        chunk_end = current_time + timedelta(hours=hours_per_chunk)
        if chunk_end > end_date:
            chunk_end = end_date
        chunk_times.append((current_time, chunk_end))
        current_time = chunk_end
    
    print(f"\n=== Special Processing for High Volume Period: {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')} ===")
    print(f"Breaking into {len(chunk_times)} smaller chunks of {hours_per_chunk} hours each")
    
    for chunk_idx, (chunk_start, chunk_end) in enumerate(chunk_times):
        # Add small delay between requests
        if chunk_idx > 0:
            delay = 3  # Slightly longer delay for high-volume requests
            print(f"Waiting {delay} seconds before next high-volume request...")
            time.sleep(delay)
        
        # Format with both date and time for logging
        start_str = chunk_start.strftime('%Y-%m-%d %H:%M')
        end_str = chunk_end.strftime('%Y-%m-%d %H:%M')
        print(f"Fetching chunk {chunk_idx+1}/{len(chunk_times)}: {start_str} to {end_str}...")
        
        # Use the existing fetch function but with date only (API limitation)
        # We'll filter by time later if needed
        chunk_df = fetch_alerts(chunk_start.replace(hour=0, minute=0), 
                               chunk_end.replace(hour=23, minute=59))
        
        if not chunk_df.empty:
            # Filter to the exact time window since API only accepts date granularity
            if 'date' in chunk_df.columns and 'time' in chunk_df.columns:
                print(f"  Before filtering: {len(chunk_df)} records")
                
                # Convert string times to datetime for proper filtering
                try:
                    # Create a datetime column for filtering
                    chunk_df['datetime'] = pd.to_datetime(
                        chunk_df['date'] + ' ' + chunk_df['time'], 
                        format='%d.%m.%Y %H:%M:%S',
                        errors='coerce'  # This keeps invalid dates as NaT instead of raising errors
                    )
                    
                    # Display sample datetimes for debugging
                    print(f"  Sample datetimes: {chunk_df['datetime'].head(3).tolist()}")
                    print(f"  Filter range: {chunk_start} to {chunk_end}")
                    
                    # Filter by time range
                    filtered_df = chunk_df[(chunk_df['datetime'] >= chunk_start) & 
                                          (chunk_df['datetime'] < chunk_end)]
                    
                    print(f"  After filtering: {len(filtered_df)} records")
                    
                    # If filtering removed all records, check for date format issues
                    if filtered_df.empty and not chunk_df.empty:
                        print("  Warning: Time filtering removed all records, checking date formats...")
                        # Try alternative approach - use hour extraction
                        chunk_hour = pd.to_datetime(chunk_df['time'], format='%H:%M:%S', errors='coerce').dt.hour
                        start_hour = chunk_start.hour
                        end_hour = chunk_end.hour
                        
                        # Filter by hour range
                        filtered_df = chunk_df[(chunk_hour >= start_hour) & (chunk_hour < end_hour)]
                        print(f"  After hour-based filtering: {len(filtered_df)} records")
                        
                        # If still empty, keep all records for this chunk as a fallback
                        if filtered_df.empty:
                            print("  Warning: Hour filtering also failed. Keeping all records for this day.")
                            filtered_df = chunk_df.copy()
                    
                    # Drop the temporary datetime column
                    if 'datetime' in filtered_df.columns:
                        filtered_df = filtered_df.drop('datetime', axis=1)
                    
                except Exception as e:
                    print(f"  Error during time filtering: {str(e)}")
                    print("  Keeping all records for this day instead.")
                    filtered_df = chunk_df.copy()
                
                # Only continue if we have data after filtering
                if not filtered_df.empty:
                    # Debug: Log unique locations in this chunk
                    chunk_locations = set()
                    for loc_str in filtered_df['data'].dropna():
                        locations = [loc.strip() for loc in loc_str.split(',')]
                        chunk_locations.update(locations)
                    all_unique_locations.update(chunk_locations)
                    
                    print(f"  Found {len(filtered_df)} alerts with {len(chunk_locations)} unique locations in this time chunk.")
                    all_alerts.append(filtered_df)
                else:
                    print(f"  No alerts found in this specific time window after filtering.")
            else:
                # If we can't filter by time, just use the whole chunk
                chunk_locations = set()
                for loc_str in chunk_df['data'].dropna():
                    locations = [loc.strip() for loc in loc_str.split(',')]
                    chunk_locations.update(locations)
                all_unique_locations.update(chunk_locations)
                
                print(f"  Found {len(chunk_df)} alerts with {len(chunk_locations)} unique locations in this chunk.")
                all_alerts.append(chunk_df)
        else:
            print(f"  No data found in this time range.")
    
    if all_alerts:
        combined_df = pd.concat(all_alerts, ignore_index=True)
        print(f"\nHigh volume period summary:")
        print(f"Total alerts: {len(combined_df)}")
        print(f"Total unique locations: {len(all_unique_locations)}")
        
        # Save unique locations from this period to a separate file
        period_name = f"{start_date.strftime('%Y%m%d')}to{end_date.strftime('%Y%m%d')}"
        locations_file = f"locations_{period_name}.txt"
        with open(locations_file, 'w', encoding='utf-8') as f:
            for loc in sorted(all_unique_locations):
                f.write(f"{loc}\n")
        print(f"Unique locations for this period saved to {locations_file}")
        
        return combined_df
    else:
        print("No alerts data found for the high-volume period.")
        return pd.DataFrame(columns=['data', 'date', 'time', 'category', 'title'])

In [31]:
# =============================================================================
# Configuration
# =============================================================================

# API settings
OREF_API_URL = "https://alerts-history.oref.org.il/Shared/Ajax/GetAlarmsHistory.aspx"
NOMINATIM_API_URL = "https://nominatim.openstreetmap.org/search"

# Date range settings
START_DATE = datetime(2023, 10, 1)
END_DATE = datetime(2023, 12, 31)

# API request settings
MAX_RETRIES = 3
RETRY_DELAY = 5  # seconds

# Encoding settings
CSV_ENCODING = 'utf-8-sig'  # UTF-8 with BOM for Hebrew
EXCEL_ENGINE = 'openpyxl'   # Better support for Hebrew

# Specific dates to filter (66 days)
FILTER_DATES = [
    '2023-10-01', '2023-10-02', '2023-10-03', '2023-10-04', '2023-10-05',
    '2023-10-08', '2023-10-09', '2023-10-10', '2023-10-11', '2023-10-12',
    '2023-10-15', '2023-10-16', '2023-10-17', '2023-10-18', '2023-10-19',
    '2023-10-22', '2023-10-23', '2023-10-24', '2023-10-25', '2023-10-26',
    '2023-10-29', '2023-10-30', '2023-10-31', '2023-11-01', '2023-11-02',
    '2023-11-05', '2023-11-06', '2023-11-07', '2023-11-08', '2023-11-09',
    '2023-11-12', '2023-11-13', '2023-11-14', '2023-11-15', '2023-11-16',
    '2023-11-19', '2023-11-20', '2023-11-21', '2023-11-22', '2023-11-23',
    '2023-11-26', '2023-11-27', '2023-11-28', '2023-11-29', '2023-11-30',
    '2023-12-03', '2023-12-04', '2023-12-05', '2023-12-06', '2023-12-07',
    '2023-12-10', '2023-12-11', '2023-12-12', '2023-12-13', '2023-12-14',
    '2023-12-17', '2023-12-18', '2023-12-19', '2023-12-20', '2023-12-21',
    '2023-12-24', '2023-12-25', '2023-12-26', '2023-12-27', '2023-12-28',
    '2023-12-31'
]

# Region definitions
CENTRAL_REGIONS = [
    'CENTRAL_DISTRICT',
    'TEL_AVIV_DISTRICT',
    'DAN_DISTRICT',
    'SHARON_DISTRICT',
    'SHFELA_DISTRICT'
]

# Region translation mapping
REGION_TRANSLATION = {
    'מחוז המרכז': 'CENTRAL_DISTRICT',
    'מחוז הצפון': 'NORTHERN_DISTRICT',
    'מחוז הדרום': 'SOUTHERN_DISTRICT',
    'מחוז ירושלים': 'JERUSALEM_DISTRICT',
    'מחוז תל אביב': 'TEL_AVIV_DISTRICT',
    'מחוז חיפה': 'HAIFA_DISTRICT',
    'יהודה ושומרון': 'JUDEA_AND_SAMARIA',
    'עוטף עזה': 'GAZA_ENVELOPE',
    'DAN': 'DAN_DISTRICT',
    'SHARON': 'SHARON_DISTRICT',
    'CENTRAL': 'CENTRAL_DISTRICT',
    'NORTH': 'NORTHERN_DISTRICT',
    'SOUTH': 'SOUTHERN_DISTRICT',
    'JERUSALEM': 'JERUSALEM_DISTRICT',
    'WEST_BANK': 'JUDEA_AND_SAMARIA',
    'SHFELA': 'SHFELA_DISTRICT',
    'JORDAN_VALLEY': 'JORDAN_VALLEY_DISTRICT',
    'GAZA_ENVELOPE': 'GAZA_ENVELOPE',
    "CENTRAL_DISTRICT": 'CENTRAL_DISTRICT',
    "JUDEA_AND_SAMARIA": 'JUDEA_AND_SAMARIA',
    "NORTHERN_DISTRICT": 'NORTHERN_DISTRICT',
    "JERUSALEM_DISTRICT": 'JERUSALEM_DISTRICT',
    "SOUTHERN_DISTRICT": 'SOUTHERN_DISTRICT',
    "HAIFA_DISTRICT": 'HAIFA_DISTRICT',
}

# File paths
MERGED_ALARMS_PATH = 'merged_alarms_data.csv'
CITIES_REGIONS_PATH = 'israel_cities_regions.xlsx'
CITIES_REGIONS_UPDATED_PATH = 'israel_cities_regions_updated.xlsx'
ALARMS_WITH_REGIONS_PATH = 'alarms_with_regions.xlsx'
ALARMS_WITH_ENGLISH_REGIONS_PATH = 'alarms_with_english_regions.xlsx'
CENTRAL_REGIONS_ALARMS_PATH = 'central_regions_alarms.xlsx'
FINAL_FILTERED_ALARMS_PATH = 'central_regions_66days_sorted.xlsx'

# Encoding settings
CSV_ENCODING = 'utf-8'
EXCEL_ENGINE = 'openpyxl'

# Manual region mappings for locations that couldn't be geocoded
# Manual region mappings for locations that couldn't be geocoded
MANUAL_REGION_MAPPINGS = {
    # Industrial zones in the south
    'אזור תעשייה הדרומי אשקלון': 'SOUTH',
    'אזור תעשייה ברוש': 'SOUTH',
    'אזור תעשייה תימורים': 'SOUTH',
    'אזור תעשייה עידן הנגב': 'SOUTH',
    'אזור תעשייה עד הלום': 'SOUTH',
    'אזור תעשייה קריית גת': 'SOUTH',
    "אזור תעשייה נ.ע.מ": 'SOUTH',
    
    # Industrial zones in the center
    'אזור תעשייה נשר - רמלה': 'CENTRAL',
    'אזור תעשייה חבל מודיעין': 'CENTRAL',
    'אזור תעשייה אפק ולב הארץ': 'CENTRAL',
    'אזור תעשייה כנות': 'CENTRAL',
    'פארק תעשייה ראם': 'CENTRAL',
    
    # Industrial zones in the north
    'אזור תעשייה מבואות הגלבוע': 'NORTH',
    'אזור תעשייה אלון התבור': 'NORTH',
    'אזור תעשייה רמת דלתון': 'NORTH',
    'אזור תעשייה קדמת גליל': 'NORTH',
    'אזור תעשייה אכזיב מילואות': 'NORTH',
    'אזור תעשייה צמח': 'NORTH',
    'אזור תעשייה צ.ח.ר': 'NORTH',
    'אזור תעשייה תרדיון': 'NORTH',
    'עכו - אזור תעשייה': 'NORTH',
    "מרכז חבר": 'HAIFA_DISTRICT',
    
    # City parts
    'הרצליה - מרכז וגליל ים': 'SHARON',
    'הרצליה - מערב': 'SHARON',
    'באר שבע - מזרח': 'SOUTH',
    'באר שבע - מערב': 'SOUTH',
    'תל אביב - דרום העיר ויפו': 'DAN',
    'רמת גן - מזרח': 'DAN',
    'ראשון לציון - מזרח': 'CENTRAL',
    'אשדוד - איזור תעשייה צפוני': 'SOUTH',
    
    # NEW MAPPINGS FOR PREVIOUSLY UNKNOWN LOCATIONS
    
    # Judea and Samaria
    'אלון שבות': 'JUDEA_AND_SAMARIA',
    'אלפי מנשה': 'JUDEA_AND_SAMARIA',
    'ברוכין': 'JUDEA_AND_SAMARIA',
    'גבע בנימין': 'JUDEA_AND_SAMARIA',
    'גבעון החדשה': 'JUDEA_AND_SAMARIA',
    'מודיעין עילית': 'JUDEA_AND_SAMARIA',
    'מעלה אפרים': 'JUDEA_AND_SAMARIA',
    'מעלה מכמש': 'JUDEA_AND_SAMARIA',
    'נבי סמואל': 'JUDEA_AND_SAMARIA',
    'נווה דניאל': 'JUDEA_AND_SAMARIA',
    'נופי פרת': 'JUDEA_AND_SAMARIA',
    'עלי זהב': 'JUDEA_AND_SAMARIA',
    'ראש צורים': 'JUDEA_AND_SAMARIA',
    'אזור תעשייה אריאל': 'JUDEA_AND_SAMARIA',
    'אזור תעשייה ברקן': 'JUDEA_AND_SAMARIA',
    'פארק תעשיות מגדל עוז': 'JUDEA_AND_SAMARIA',
    "ענב": 'JUDEA_AND_SAMARIA',
    "סלעית": 'JUDEA_AND_SAMARIA',

    # Northern District
    'בית סוהר צלמון': 'NORTHERN_DISTRICT',
    'בני יהודה וגבעת יואב': 'NORTHERN_DISTRICT',
    'גורנות הגליל': 'NORTHERN_DISTRICT',
    'ח\'וואלד': 'NORTHERN_DISTRICT',
    'אל-ח\'וואלד מערב': 'NORTHERN_DISTRICT',
    'כאוכב אבו אלהיג\'א': 'NORTHERN_DISTRICT',
    'כורזים ורד הגליל': 'NORTHERN_DISTRICT',
    'מנחת מחניים': 'NORTHERN_DISTRICT',
    'מצוק עורבים': 'NORTHERN_DISTRICT',
    'מרכז אזורי מבואות חרמון': 'NORTHERN_DISTRICT',
    'מרכז אזורי מרום גליל': 'NORTHERN_DISTRICT',
    'סואעד חמירה': 'NORTHERN_DISTRICT',
    'קצרין - אזור תעשייה': 'NORTHERN_DISTRICT',
    'צפת - נוף כנרת': 'NORTHERN_DISTRICT',
    'צפת - עיר': 'NORTHERN_DISTRICT',
    'צפת - עכברה': 'NORTHERN_DISTRICT',
    'קבוצת גבע': 'NORTHERN_DISTRICT',
    'ישובי אומן': 'NORTHERN_DISTRICT',
    'ישובי יעל': 'NORTHERN_DISTRICT',
    'אתר ההנצחה גולני': 'NORTHERN_DISTRICT',

    # Central District
    'אתר חירייה': 'CENTRAL_DISTRICT',
    'כפר נוער בן שמן': 'CENTRAL_DISTRICT',
    'מודיעין - ישפרו סנטר': 'CENTRAL_DISTRICT',
    'מודיעין - ליגד סנטר': 'CENTRAL_DISTRICT',
    'מרכז אזורי דרום השרון': 'CENTRAL_DISTRICT',
    'מתחם פי גלילות': 'CENTRAL_DISTRICT',
    'אזור תעשיה רגם': 'CENTRAL_DISTRICT',
    "כפר הרי''ף וצומת ראם": 'CENTRAL_DISTRICT',

    # Jerusalem District
    'פנימיית עין כרם': 'JERUSALEM_DISTRICT',
    'אזור תעשייה הר טוב - צרעה': 'JERUSALEM_DISTRICT',

    # Southern District
    'ואדי אל נעם דרום': 'SOUTHERN_DISTRICT',
    'חוות שיקמים': 'SOUTHERN_DISTRICT',
    'כפר הרי"ף וצומת ראם': 'SOUTHERN_DISTRICT',
    'כפר מימון ותושיה': 'SOUTHERN_DISTRICT',
    'מטווח ניר עם': 'SOUTHERN_DISTRICT',
    'מתחם בני דרום': 'SOUTHERN_DISTRICT',
    'סעייה-מולדה': 'SOUTHERN_DISTRICT',
    'קריית חינוך מרחבים': 'SOUTHERN_DISTRICT',
    'תחנת רכבת קריית מלאכי - יואב': 'SOUTHERN_DISTRICT',
    'תעשיון חצב': 'SOUTHERN_DISTRICT',
    'תקומה וחוות יזרעם': 'SOUTHERN_DISTRICT',
    'אזור תעשייה שחק': 'SOUTHERN_DISTRICT',
}

In [32]:
# =============================================================================
# Utility Functions
# =============================================================================

def clean_text(text):
    """
    Clean and normalize text.
    
    Args:
        text (str): Input text
        
    Returns:
        str: Cleaned text
    """
    if not isinstance(text, str):
        return text
    
    # Remove quotes and extra spaces
    text = text.replace('"', '').strip()
    
    # Normalize spaces
    text = re.sub(r'\s+', ' ', text)
    
    return text


def get_date_range_chunks(start_date, end_date, chunk_days=14):
    """
    Split date range into chunks of specified days.
    
    Args:
        start_date (datetime): Start date
        end_date (datetime): End date
        chunk_days (int): Number of days in each chunk
        
    Returns:
        list: List of (chunk_start, chunk_end) tuples
    """
    chunks = []
    current_start = start_date
    
    while current_start < end_date:
        current_end = current_start + timedelta(days=chunk_days)
        if current_end > end_date:
            current_end = end_date
        
        chunks.append((current_start, current_end))
        current_start = current_end + timedelta(days=1)
    
    return chunks


def optimize_excel_columns(worksheet, dataframe):
    """
    Optimize column widths in Excel worksheet based on content.
    
    Args:
        worksheet: Excel worksheet object
        dataframe (DataFrame): DataFrame with the data
    """
    for idx, col in enumerate(dataframe.columns):
        max_length = max(
            dataframe[col].astype(str).str.len().max(),
            len(str(col))
        ) + 2
        worksheet.column_dimensions[chr(65 + idx)].width = max_length


In [33]:
# =============================================================================
# Extract Functions
# =============================================================================

def fetch_alerts(from_date, to_date, lang="he", retries=0):
    """
    Fetch Red Alert data from the Oref API for a given date range with retry capability.
    
    Args:
        from_date (datetime): Start date
        to_date (datetime): End date
        lang (str): Language code ('he' for Hebrew, 'en' for English)
        retries (int): Current retry attempt
        
    Returns:
        DataFrame: DataFrame containing the alert data
    """
    params = {
        "lang": lang,
        "fromDate": from_date.strftime("%d.%m.%Y"),
        "toDate": to_date.strftime("%d.%m.%Y"),
        "mode": "0"
    }
    
    # Add custom user agent to avoid potential blocks
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
        "Accept": "application/json, text/javascript, /; q=0.01",
        "Accept-Language": "he-IL,he;q=0.9,en-US;q=0.8,en;q=0.7",
        "X-Requested-With": "XMLHttpRequest",
        "Referer": "https://alerts-history.oref.org.il/",
        "Content-Type": "application/json; charset=utf-8"
    }
    
    try:
        # Skip SSL verification with a warning
        import warnings
        warnings.filterwarnings('ignore', message='Unverified HTTPS request')
        
        # Use session to maintain cookies
        session = requests.Session()
        
        # First visit the main site to get cookies
        session.get("https://alerts-history.oref.org.il/", verify=False, headers=headers)
        
        # Now make the actual API request
        response = session.get(
            OREF_API_URL, 
            params=params, 
            headers=headers,
            verify=False,
            timeout=15  # Set timeout to avoid hanging
        )
        
        response.raise_for_status()
        
        # Explicitly set encoding to utf-8 before getting JSON content
        response.encoding = 'utf-8'
        data = response.json()
        
        if isinstance(data, list):
            df = pd.DataFrame(data)
            if not df.empty:
                print(f"  Successfully fetched {len(df)} alerts.")
                
                # Ensure proper encoding for text columns
                for col in df.columns:
                    if df[col].dtype == 'object':  # Only process string columns
                        df[col] = df[col].apply(lambda x: x if not isinstance(x, str) else x.encode('latin1').decode('utf-8') if '\\u' in repr(x) else x)
                
                return df
            else:
                print("  API returned empty list.")
        else:
            print(f"  Unexpected API response format: {type(data)}")
        
        # If we reached here, we didn't get valid data
        if retries < MAX_RETRIES:
            print(f"  Retrying ({retries+1}/{MAX_RETRIES}) after {RETRY_DELAY} seconds...")
            time.sleep(RETRY_DELAY)
            return fetch_alerts(from_date, to_date, lang, retries+1)
        
    except requests.exceptions.RequestException as e:
        print(f"  Error fetching data: {e}")
        if retries < MAX_RETRIES:
            print(f"  Retrying ({retries+1}/{MAX_RETRIES}) after {RETRY_DELAY} seconds...")
            time.sleep(RETRY_DELAY)
            return fetch_alerts(from_date, to_date, lang, retries+1)
    
    # Return empty DataFrame with expected columns if all retries failed
    return pd.DataFrame(columns=['data', 'date', 'time', 'category', 'title'])


def fetch_alerts_in_chunks(start_date, end_date, initial_chunk_days=7, retry_days=3, delay_seconds=2):
    """
    Fetch alerts in small chunks to avoid timeouts and API limitations.
    Includes retry mechanism with smaller date ranges if initial chunks fail.
    
    Args:
        start_date (datetime): Start date
        end_date (datetime): End date
        initial_chunk_days (int): Initial number of days in each chunk (default: 7 days)
        retry_days (int): Number of days in each chunk for retry attempts (default: 3 days)
        delay_seconds (int): Delay between API requests in seconds
        
    Returns:
        DataFrame: Combined DataFrame with all alerts
    """
    all_alerts = []
    chunks = get_date_range_chunks(start_date, end_date, initial_chunk_days)
    all_unique_locations = set()
    
    for chunk_idx, (chunk_start, chunk_end) in enumerate(chunks):
        # Add delay between requests to avoid overwhelming the API
        if chunk_idx > 0:
            print(f"Waiting {delay_seconds} seconds before next request...")
            time.sleep(delay_seconds)
        
        print(f"Fetching data from {chunk_start.strftime('%Y-%m-%d')} to {chunk_end.strftime('%Y-%m-%d')}...")
        chunk_df = fetch_alerts(chunk_start, chunk_end)
        
        # If initial chunk failed, try with smaller chunks
        if chunk_df.empty and (chunk_end - chunk_start).days > retry_days:
            print(f"  No data found. Retrying with smaller chunks of {retry_days} days...")
            smaller_chunks = get_date_range_chunks(chunk_start, chunk_end, retry_days)
            
            for small_idx, (small_start, small_end) in enumerate(smaller_chunks):
                # Add delay between smaller chunk requests
                if small_idx > 0:
                    time.sleep(delay_seconds)
                
                print(f"  Fetching smaller chunk: {small_start.strftime('%Y-%m-%d')} to {small_end.strftime('%Y-%m-%d')}...")
                small_df = fetch_alerts(small_start, small_end)
                
                if not small_df.empty:
                    # Debug: Log unique locations in this chunk
                    chunk_locations = set()
                    for loc_str in small_df['data'].dropna():
                        locations = [loc.strip() for loc in loc_str.split(',')]
                        chunk_locations.update(locations)
                    all_unique_locations.update(chunk_locations)
                    
                    print(f"    Found {len(small_df)} alerts with {len(chunk_locations)} unique locations in this chunk.")
                    all_alerts.append(small_df)
                else:
                    print(f"    No data found in smaller chunk.")
        
        elif not chunk_df.empty:
            # Debug: Log unique locations in this chunk
            chunk_locations = set()
            for loc_str in chunk_df['data'].dropna():
                locations = [loc.strip() for loc in loc_str.split(',')]
                chunk_locations.update(locations)
            all_unique_locations.update(chunk_locations)
            
            print(f"  Found {len(chunk_df)} alerts with {len(chunk_locations)} unique locations in this chunk.")
            all_alerts.append(chunk_df)
        else:
            print(f"  No data found in this date range.")
    
    if all_alerts:
        # Ensure we're concatenating and getting the unique locations correctly
        combined_df = pd.concat(all_alerts, ignore_index=True)
        
        # Show alert counts by date for verification
        if not combined_df.empty and 'date' in combined_df.columns:
            date_counts = combined_df.groupby('date').size()
            print("\nAlerts by date:")
            for date, count in date_counts.items():  # Changed from iteritems() to items()
                print(f"  {date}: {count} alerts")
                
        # Count the unique locations in the final dataset
        print(f"\nTotal combined unique locations across all chunks: {len(all_unique_locations)}")
        print(f"Total alerts fetched: {len(combined_df)}")
        
        # Save unique locations to file for inspection
        with open('all_unique_locations.txt', 'w', encoding='utf-8') as f:
            for loc in sorted(all_unique_locations):
                f.write(f"{loc}\n")
        print("All unique locations saved to all_unique_locations.txt")
        
        return combined_df
    else:
        print("No alerts data found for the entire date range.")
        return pd.DataFrame(columns=['data', 'date', 'time', 'category', 'title'])


def extract_unique_locations(alerts_df):
    """
    Extract unique locations from the alerts dataframe.
    
    Args:
        alerts_df (DataFrame): DataFrame containing alert data
        
    Returns:
        list: List of unique location names
    """
    # Check if 'data' column exists
    if 'data' not in alerts_df.columns:
        print("Warning: 'data' column not found in alerts DataFrame")
        return []
        
    unique_locations = set()
    processed_count = 0
    
    # Debug: Count total rows and non-null data entries
    total_rows = len(alerts_df)
    non_null_data = alerts_df['data'].count()
    print(f"Processing {non_null_data} non-null entries out of {total_rows} total rows")
    
    # Process in smaller batches for better debugging
    batch_size = 1000
    for i in range(0, len(alerts_df), batch_size):
        batch = alerts_df.iloc[i:i+batch_size]
        batch_locations = set()
        
        for location_str in batch['data'].dropna():
            if not isinstance(location_str, str):
                # Skip non-string values
                continue
                
            # Split by comma and clean whitespace
            try:
                locations = [loc.strip() for loc in location_str.split(',')]
                batch_locations.update(locations)
                processed_count += 1
            except Exception as e:
                print(f"Error processing location string '{location_str}': {e}")
        
        unique_locations.update(batch_locations)
        print(f"Processed batch {i//batch_size + 1}, found {len(batch_locations)} unique locations in this batch")
    
    print(f"Processed {processed_count} location entries, found {len(unique_locations)} total unique locations")
    return sorted(unique_locations)


def get_location_info(city, country="Israel"):
    """
    Get location information from OpenStreetMap for a given city.
    
    Args:
        city (str): City name
        country (str): Country name
        
    Returns:
        dict: Location information including coordinates and region
    """
    # Remove quotes if they exist and add country to the search
    city_clean = clean_text(city)
    
    params = {
        'q': f"{city_clean}, {country}",
        'format': 'json',
        'addressdetails': 1,
        'limit': 1
    }
    
    headers = {
        'User-Agent': 'RedAlertETL/1.0'
    }
    
    try:
        # Disable SSL verification with a warning
        import warnings
        warnings.filterwarnings('ignore', message='Unverified HTTPS request')
        
        response = requests.get(NOMINATIM_API_URL, params=params, headers=headers, verify=False, timeout=10)
        response.raise_for_status()
        data = response.json()
        
        if data:
            result = data[0]
            address = result.get('address', {})
            
            # Try to get the most specific region information
            region = address.get('state',
                     address.get('region',
                     address.get('county',
                     address.get('district', 'Unknown'))))
            
            return {
                'city': city_clean,
                'lat': float(result['lat']),
                'lon': float(result['lon']),
                'region': region,
                'full_address': result.get('display_name', '')
            }
    except Exception as e:
        print(f"Error processing {city}: {str(e)}")
    
    # Use region mapping if available for this location
    if city_clean in MANUAL_REGION_MAPPINGS:
        return {
            'city': city_clean,
            'lat': None,
            'lon': None,
            'region': MANUAL_REGION_MAPPINGS[city_clean],
            'full_address': f"{city_clean}, {country}"
        }
    
    # Return default values if geocoding fails
    return {
        'city': city_clean,
        'lat': None,
        'lon': None,
        'region': 'UNKNOWN',
        'full_address': None
    }


def batch_geocode_locations(locations, delay=1, max_errors=20):
    """
    Geocode a batch of locations with rate limiting.
    
    Args:
        locations (list): List of location names
        delay (int): Delay between requests in seconds
        max_errors (int): Maximum consecutive errors before falling back to mappings
        
    Returns:
        DataFrame: DataFrame with geocoding results
    """
    results = []
    total_locations = len(locations)
    consecutive_errors = 0
    use_manual_mapping = False
    
    # Add progress tracking
    print(f"Starting geocoding for {total_locations} locations...")
    
    for idx, location in enumerate(locations, 1):
        # Check if we've hit too many errors and should switch to manual mapping
        if consecutive_errors >= max_errors and not use_manual_mapping:
            print(f"\nWARNING: Encountered {max_errors} consecutive geocoding errors.")
            print("Switching to manual mapping for remaining locations.")
            use_manual_mapping = True
        
        # Print progress every 10 items
        if idx % 10 == 0 or idx == 1 or idx == total_locations:
            print(f"Processing {idx}/{total_locations}: {location}")
        
        # If we're in manual mapping mode, try to use that first
        if use_manual_mapping and location in MANUAL_REGION_MAPPINGS:
            result = {
                'city': location,
                'lat': None,
                'lon': None,
                'region': MANUAL_REGION_MAPPINGS[location],
                'full_address': f"{location}, Israel"
            }
            results.append(result)
            continue
            
        # Otherwise try geocoding
        try:
            result = get_location_info(location)
            
            # Check if geocoding failed (no lat/lon)
            if result['lat'] is None or result['lon'] is None:
                # Try manual mapping if available
                if location in MANUAL_REGION_MAPPINGS:
                    result['region'] = MANUAL_REGION_MAPPINGS[location]
                    consecutive_errors = 0  # Reset error counter on successful manual mapping
                else:
                    consecutive_errors += 1
            else:
                consecutive_errors = 0  # Reset error counter on successful geocoding
                
            results.append(result)
            time.sleep(delay)  # Respect rate limiting
            
        except Exception as e:
            print(f"Unexpected error processing {location}: {str(e)}")
            consecutive_errors += 1
            
            # Add a fallback result
            if location in MANUAL_REGION_MAPPINGS:
                region = MANUAL_REGION_MAPPINGS[location]
            else:
                region = 'UNKNOWN'
                
            results.append({
                'city': location,
                'lat': None,
                'lon': None,
                'region': region,
                'full_address': None
            })
            
            # Add extra delay after an error
            time.sleep(delay * 2)
    
    # Print summary statistics
    df = pd.DataFrame(results)
    geocoded_count = df['lat'].notnull().sum()
    manual_count = df['lat'].isnull().sum()
    unknown_count = (df['region'] == 'UNKNOWN').sum()
    
    print(f"\nGeocoding summary:")
    print(f"  Total locations: {total_locations}")
    print(f"  Successfully geocoded: {geocoded_count} ({geocoded_count/total_locations*100:.1f}%)")
    print(f"  Used manual mapping: {manual_count} ({manual_count/total_locations*100:.1f}%)")
    print(f"  Unknown regions: {unknown_count} ({unknown_count/total_locations*100:.1f}%)")
    
    return df



In [34]:
# =============================================================================
# Transform Functions
# =============================================================================

def apply_manual_region_mapping(regions_df):
    """
    Apply manual region mapping for locations that couldn't be geocoded.
    
    Args:
        regions_df (DataFrame): DataFrame with location and region data
        
    Returns:
        DataFrame: Updated DataFrame with manual mappings
    """
    # Create a copy to avoid modifying the original
    result_df = regions_df.copy()
    
    # Update the region values in the DataFrame
    for location, region in MANUAL_REGION_MAPPINGS.items():
        result_df.loc[result_df['city'] == location, 'region'] = region
    
    return result_df


def map_location_to_region(location, regions_mapping):
    """
    Map a location to its region using the mapping dictionary.
    
    Args:
        location (str): Location name
        regions_mapping (DataFrame): DataFrame with city-region mappings
        
    Returns:
        str: Region name
    """
    # Split in case of multiple locations separated by comma
    locations = [loc.strip() for loc in location.split(',')]
    
    for loc in locations:
        # Try exact match
        exact_match = regions_mapping[regions_mapping['city'] == loc]
        if not exact_match.empty:
            return exact_match.iloc[0]['region']
        
        # Try partial match
        for city, region in regions_mapping[['city', 'region']].values:
            if city in loc or loc in city:
                return region
    
    return 'UNKNOWN'


def add_region_to_alerts(alerts_df, regions_mapping):
    """
    Add region information to the alerts dataframe.
    
    Args:
        alerts_df (DataFrame): DataFrame with alert data
        regions_mapping (DataFrame): DataFrame with city-region mappings
        
    Returns:
        DataFrame: Alert data with region information
    """
    # Create a copy to avoid modifying the original
    result_df = alerts_df.copy()
    
    # Apply the mapping function to each row
    result_df['region'] = result_df['data'].apply(
        lambda x: map_location_to_region(x, regions_mapping)
    )
    
    return result_df


def translate_regions_to_english(df):
    """
    Translate region names to English.
    
    Args:
        df (DataFrame): DataFrame with region column
        
    Returns:
        DataFrame: DataFrame with added English region names
    """
    # Create a copy to avoid modifying the original
    result_df = df.copy()
    
    # Add English region names
    result_df['region_en'] = result_df['region'].map(REGION_TRANSLATION)
    
    return result_df


def filter_by_regions(df, regions):
    """
    Filter dataframe by specified regions.
    
    Args:
        df (DataFrame): DataFrame to filter
        regions (list): List of region names
        
    Returns:
        DataFrame: Filtered DataFrame
    """
    return df[df['region_en'].isin(regions)]


def filter_by_dates(df, dates):
    """
    Filter dataframe by specified dates.
    
    Args:
        df (DataFrame): DataFrame to filter
        dates (list): List of date strings in 'YYYY-MM-DD' format
        
    Returns:
        DataFrame: Filtered DataFrame
    """
    # Ensure date is in datetime format
    if 'date' in df.columns and not pd.api.types.is_datetime64_any_dtype(df['date']):
        df['date'] = pd.to_datetime(df['date'], format='%d.%m.%Y')
    
    # Create a string version of the date for filtering
    df['date_str'] = df['date'].dt.strftime('%Y-%m-%d')
    
    # Filter and sort
    filtered_df = df[df['date_str'].isin(dates)].sort_values(['date', 'time'])
    
    return filtered_df



In [35]:
# =============================================================================
# Load Functions
# =============================================================================

def save_to_csv(df, filepath, encoding=None, index=False):
    """
    Save DataFrame to CSV file.
    
    Args:
        df (DataFrame): DataFrame to save
        filepath (str): Output file path
        encoding (str, optional): File encoding
        index (bool): Whether to include index in output
    """
    try:
        # Use UTF-8 with BOM for Hebrew text
        encoding = 'utf-8-sig'
        
        # Convert any problematic text in string columns
        for col in df.columns:
            if df[col].dtype == 'object':  # Only process string columns
                df[col] = df[col].apply(lambda x: x if not isinstance(x, str) else x)
        
        df.to_csv(filepath, index=index, encoding=encoding)
        print(f"Data saved to CSV: {filepath} with encoding {encoding}")
    except Exception as e:
        print(f"Error saving to CSV: {e}")
        
        # Try alternative encoding if primary fails
        try:
            df.to_csv(filepath, index=index, encoding='cp1255')  # Windows Hebrew encoding
            print(f"Data saved to CSV with alternative encoding (cp1255): {filepath}")
        except Exception as e2:
            print(f"Failed to save CSV with alternative encoding: {e2}")


def save_to_excel(df, filepath, sheet_name='Data', index=False, optimize_columns=True):
    """
    Save DataFrame to Excel file with optimized column widths.
    
    Args:
        df (DataFrame): DataFrame to save
        filepath (str): Output file path
        sheet_name (str): Name of the sheet
        index (bool): Whether to include index in output
        optimize_columns (bool): Whether to optimize column widths
    """
    try:
        # Fix Hebrew encoding issues in string columns
        for col in df.columns:
            if df[col].dtype == 'object':  # Only process string columns
                df[col] = df[col].apply(lambda x: x if not isinstance(x, str) else x)
        
        with pd.ExcelWriter(filepath, engine=EXCEL_ENGINE) as writer:
            df.to_excel(writer, index=index, sheet_name=sheet_name)
            
            if optimize_columns:
                try:
                    worksheet = writer.sheets[sheet_name]
                    optimize_excel_columns(worksheet, df)
                except Exception as e:
                    print(f"Warning: Could not optimize columns: {e}")
        
        print(f"Data saved to Excel: {filepath}")
    except Exception as e:
        print(f"Error saving to Excel: {e}")
        
        # Try alternative approach if primary fails
        try:
            # Simplified saving without optimization
            df.to_excel(filepath, index=index, sheet_name=sheet_name, engine=EXCEL_ENGINE)
            print(f"Data saved to Excel with simplified method: {filepath}")
        except Exception as e2:
            print(f"Failed to save Excel with alternative method: {e2}")

In [36]:
# =============================================================================
# Main ETL Pipeline
# =============================================================================

def extract_stage():
    """Extract data from sources"""
    print("\n=== Extract Stage ===")
    
    # Define a default locations list in case we can't get real data
    # This is a fallback to avoid errors when API calls fail
    default_locations = [
        'אשקלון', 'אשדוד', 'תל אביב', 'באר שבע', 'ירושלים',
        'חיפה', 'נתניה', 'רעננה', 'הרצליה', 'פתח תקווה'
    ]
    
    # Check if user wants to force fetch new data
    force_fetch = False
    user_input = input("Do you want to fetch fresh data from the API? (y/n, default: n): ").strip().lower()
    if user_input == 'y' or user_input == 'yes':
        force_fetch = True
        # Force delete old file to avoid any mixing with previous data
        if os.path.exists(MERGED_ALARMS_PATH):
            try:
                os.remove(MERGED_ALARMS_PATH)
                print(f"Deleted existing data file: {MERGED_ALARMS_PATH}")
            except Exception as e:
                print(f"Warning: Could not delete existing file: {e}")
    
    # Check if merged file already exists to avoid re-fetching
    if os.path.exists(MERGED_ALARMS_PATH) and not force_fetch:
        print(f"Loading existing data from {MERGED_ALARMS_PATH}")
        try:
            # Try multiple encodings for Hebrew text
            try:
                alerts_df = pd.read_csv(MERGED_ALARMS_PATH, encoding='utf-8-sig')
            except UnicodeDecodeError:
                try:
                    alerts_df = pd.read_csv(MERGED_ALARMS_PATH, encoding='utf-8')
                except UnicodeDecodeError:
                    alerts_df = pd.read_csv(MERGED_ALARMS_PATH, encoding='cp1255')
            
            # Fix any encoding issues in text columns
            for col in alerts_df.columns:
                if alerts_df[col].dtype == 'object':  # Only process string columns
                    alerts_df[col] = alerts_df[col].apply(lambda x: x if not isinstance(x, str) 
                                                        else x.encode('latin1').decode('utf-8') 
                                                        if '\\u' in repr(x) else x)
        
            # Verify the dataframe has the expected structure
            if 'data' not in alerts_df.columns:
                print("Warning: CSV file does not have expected structure")
                if not force_fetch:
                    user_input = input("CSV file has invalid structure. Fetch new data? (y/n, default: y): ").strip().lower()
                    if user_input != 'n' and user_input != 'no':
                        force_fetch = True
                
                if not force_fetch:
                    # Create a minimal dataframe with required columns
                    alerts_df = pd.DataFrame({
                        'data': default_locations,
                        'date': [datetime.now().strftime('%d.%m.%Y')] * len(default_locations),
                        'time': ['00:00:00'] * len(default_locations),
                        'category': [''] * len(default_locations),
                        'title': [''] * len(default_locations)
                    })
        except Exception as e:
            print(f"Error reading the CSV file: {e}")
            force_fetch = True
    else:
        force_fetch = True
    
    # Fetch new data if needed or requested
    if force_fetch:
        print("Fetching alerts data from API...")
        
        # Allow user to customize chunk size
        chunk_size = 7  # Default 7 days
        try:
            user_chunk = input("Enter chunk size in days (default: 7): ").strip()
            if user_chunk and user_chunk.isdigit():
                chunk_size = int(user_chunk)
                if chunk_size < 1 or chunk_size > 30:
                    print("Invalid chunk size. Using default (7 days).")
                    chunk_size = 7
        except:
            print("Invalid input. Using default chunk size (7 days).")
        
        # Custom date range if needed
        use_custom_dates = False
        custom_start = START_DATE
        custom_end = END_DATE
        
        user_input = input("Use custom date range instead of defaults? (y/n, default: n): ").strip().lower()
        if user_input == 'y' or user_input == 'yes':
            use_custom_dates = True
            try:
                start_date_str = input(f"Enter start date (YYYY-MM-DD, default: {START_DATE.strftime('%Y-%m-%d')}): ").strip()
                if start_date_str:
                    custom_start = datetime.strptime(start_date_str, '%Y-%m-%d')
                
                end_date_str = input(f"Enter end date (YYYY-MM-DD, default: {END_DATE.strftime('%Y-%m-%d')}): ").strip()
                if end_date_str:
                    custom_end = datetime.strptime(end_date_str, '%Y-%m-%d')
                
                print(f"Using custom date range: {custom_start.strftime('%Y-%m-%d')} to {custom_end.strftime('%Y-%m-%d')}")
            except ValueError as e:
                print(f"Invalid date format: {e}. Using default date range.")
                use_custom_dates = False
                
        # Special handling for Oct 7-8 high volume period
        special_processing = False
        user_input = input("Use special processing for high-volume days (Oct 7-8, 2023)? (y/n, default: n): ").strip().lower()
        if user_input == 'y' or user_input == 'yes':
            special_processing = True
            
        # Initialize with empty dataframe
        alerts_df = pd.DataFrame(columns=['data', 'date', 'time', 'category', 'title'])
        all_dfs = []
            
        if special_processing:
            # Oct 7-8 special processing with smaller time chunks
            oct7_start = datetime(2023, 10, 7, 0, 0)
            oct8_end = datetime(2023, 10, 9, 0, 0)  # End of Oct 8
            
            oct7_8_df = fetch_high_volume_days(oct7_start, oct8_end, hours_per_chunk=6)
            
            if not oct7_8_df.empty:
                all_dfs.append(oct7_8_df)
                
            # Modify the date range to exclude Oct 7-8 if we're fetching the full range
            if custom_start <= oct7_start and custom_end >= oct8_end:
                # Process before Oct 7
                if custom_start < oct7_start:
                    before_df = fetch_alerts_in_chunks(
                        custom_start, 
                        oct7_start - timedelta(days=1), 
                        initial_chunk_days=chunk_size, 
                        retry_days=max(1, chunk_size // 3)
                    )
                    if not before_df.empty:
                        all_dfs.append(before_df)
                
                # Process after Oct 8
                if custom_end > oct8_end:
                    after_df = fetch_alerts_in_chunks(
                        oct8_end + timedelta(days=1),
                        custom_end,
                        initial_chunk_days=chunk_size, 
                        retry_days=max(1, chunk_size // 3)
                    )
                    if not after_df.empty:
                        all_dfs.append(after_df)
            else:
                # If we're not covering Oct 7-8 with our custom range, process normally
                normal_df = fetch_alerts_in_chunks(
                    custom_start if use_custom_dates else START_DATE, 
                    custom_end if use_custom_dates else END_DATE, 
                    initial_chunk_days=chunk_size, 
                    retry_days=max(1, chunk_size // 3)
                )
                if not normal_df.empty:
                    all_dfs.append(normal_df)
        else:
            # Normal processing for the entire date range
            normal_df = fetch_alerts_in_chunks(
                custom_start if use_custom_dates else START_DATE, 
                custom_end if use_custom_dates else END_DATE, 
                initial_chunk_days=chunk_size, 
                retry_days=max(1, chunk_size // 3)
            )
            if not normal_df.empty:
                all_dfs.append(normal_df)
        
        # Combine all data chunks
        if all_dfs:
            alerts_df = pd.concat(all_dfs, ignore_index=True)
            
            # Remove any duplicates that might have occurred in overlapping fetches
            before_dedup = len(alerts_df)
            alerts_df = alerts_df.drop_duplicates()
            after_dedup = len(alerts_df)
            if before_dedup > after_dedup:
                print(f"Removed {before_dedup - after_dedup} duplicate alerts")
        
        # If we didn't get any data, create a minimal dataframe
        if alerts_df.empty or 'data' not in alerts_df.columns:
            print("Warning: No data retrieved from API, using sample data")
            alerts_df = pd.DataFrame({
                'data': default_locations,
                'date': [datetime.now().strftime('%d.%m.%Y')] * len(default_locations),
                'time': ['00:00:00'] * len(default_locations),
                'category': [''] * len(default_locations),
                'title': [''] * len(default_locations)
            })
        else:
            # Display debug information
            print("\nData statistics:")
            print(f"Total alerts fetched: {len(alerts_df)}")
            unique_locations_count = extract_unique_locations(alerts_df)
            print(f"Unique locations found: {len(unique_locations_count)}")
            print(f"Date range in data: {alerts_df['date'].min()} to {alerts_df['date'].max()}")
            
        # Save with a timestamp to ensure we don't mix data
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_path = f"merged_alarms_data_{timestamp}.csv"
        save_to_csv(alerts_df, backup_path)
        print(f"Backup saved to: {backup_path}")
        
        # Save to main file
        save_to_csv(alerts_df, MERGED_ALARMS_PATH)
    
    # Always extract and display unique locations count from current data
    unique_locs = extract_unique_locations(alerts_df)
    print(f"\nCurrent unique locations count: {len(unique_locs)}")
    
    # Check if city regions file already exists
    if os.path.exists(CITIES_REGIONS_PATH):
        print(f"Loading existing city region data from {CITIES_REGIONS_PATH}")
        city_regions_df = pd.read_excel(CITIES_REGIONS_PATH)
    else:
        print("Extracting unique locations...")
        unique_locations = extract_unique_locations(alerts_df)
        print(f"Found {len(unique_locations)} unique locations")
        
        print("Geocoding locations...")
        city_regions_df = batch_geocode_locations(unique_locations)
        save_to_excel(city_regions_df, CITIES_REGIONS_PATH)
    
    return alerts_df, city_regions_df


def transform_stage(alerts_df, city_regions_df):
    """Transform and enrich the data"""
    print("\n=== Transform Stage ===")
    
    # Apply manual region mapping for missing locations
    print("Applying manual region mappings...")
    updated_regions_df = apply_manual_region_mapping(city_regions_df)
    save_to_excel(updated_regions_df, CITIES_REGIONS_UPDATED_PATH)
    
    # Add region information to alerts
    print("Adding region information to alerts...")
    alerts_with_regions = add_region_to_alerts(alerts_df, updated_regions_df)
    
    # Translate regions to English
    print("Translating regions to English...")
    alerts_with_english = translate_regions_to_english(alerts_with_regions)
    
    # Filter by central regions
    print("Filtering by central regions...")
    central_alerts = filter_by_regions(alerts_with_english, CENTRAL_REGIONS)
    
    # Filter by specific dates
    print("Filtering by specific dates...")
    filtered_alerts = filter_by_dates(central_alerts, FILTER_DATES)
    
    # Remove temporary columns used for filtering
    if 'date_str' in filtered_alerts.columns:
        filtered_alerts = filtered_alerts.drop('date_str', axis=1)
    
    return alerts_with_regions, alerts_with_english, central_alerts, filtered_alerts


def load_stage(alerts_with_regions, alerts_with_english, central_alerts, filtered_alerts):
    """Load processed data to files"""
    print("\n=== Load Stage ===")
    
    print("Saving alerts with regions...")
    save_to_excel(alerts_with_regions, ALARMS_WITH_REGIONS_PATH)
    
    print("Saving alerts with English regions...")
    save_to_excel(alerts_with_english, ALARMS_WITH_ENGLISH_REGIONS_PATH)
    
    print("Saving central region alerts...")
    save_to_excel(central_alerts, CENTRAL_REGIONS_ALARMS_PATH)
    
    print("Saving final filtered alerts...")
    save_to_excel(
        filtered_alerts, 
        FINAL_FILTERED_ALARMS_PATH,
        sheet_name='Filtered Alarms'
    )


def analyze_results(filtered_alerts):
    """Perform basic analysis on the final dataset"""
    print("\n=== Analysis ===")
    
    # Count alerts by date
    date_counts = filtered_alerts.groupby(
        filtered_alerts['date'].dt.strftime('%Y-%m-%d')
    ).size().sort_index()
    
    print("Alert counts by date:")
    print(date_counts)
    
    # Count alerts by region
    region_counts = filtered_alerts['region_en'].value_counts()
    print("\nAlert counts by region:")
    print(region_counts)
    
    # Summary statistics
    total_alerts = len(filtered_alerts)
    total_days = len(date_counts)
    avg_alerts_per_day = total_alerts / total_days if total_days > 0 else 0
    
    print(f"\nTotal alerts: {total_alerts}")
    print(f"Days with alerts: {total_days} out of {len(FILTER_DATES)} filtered days")
    print(f"Average alerts per day: {avg_alerts_per_day:.2f}")
    
    # Find missing dates
    all_dates_set = set(FILTER_DATES)
    dates_with_alerts = set(date_counts.index)
    missing_dates = all_dates_set - dates_with_alerts
    
    if missing_dates:
        print("\nDates without alerts:")
        for date in sorted(missing_dates):
            print(date)


def main():
    """Main ETL pipeline execution"""
    print("=" * 60)
    print("          Red Alert ETL Pipeline")
    print("=" * 60)
    print("This pipeline extracts alerts data from פיקוד העורף (Oref)")
    print("processes it, and exports filtered data for analysis.")
    print("-" * 60)
    
    try:
        # Force clean start with diagnostics
        force_clean = False
        user_input = input("DIAGNOSTIC MODE: Force completely clean start? (y/n, default: n): ").strip().lower()
        if user_input == 'y' or user_input == 'yes':
            force_clean = True
            print("\n=== DIAGNOSTIC: Cleaning all data files ===")
            
            # List of files to potentially delete for a clean start
            data_files = [
                MERGED_ALARMS_PATH,
                CITIES_REGIONS_PATH,
                CITIES_REGIONS_UPDATED_PATH,
                ALARMS_WITH_REGIONS_PATH,
                ALARMS_WITH_ENGLISH_REGIONS_PATH,
                CENTRAL_REGIONS_ALARMS_PATH,
                FINAL_FILTERED_ALARMS_PATH
            ]
            
            for file_path in data_files:
                if os.path.exists(file_path):
                    try:
                        os.remove(file_path)
                        print(f"  Deleted: {file_path}")
                    except Exception as e:
                        print(f"  Error deleting {file_path}: {e}")
                else:
                    print(f"  File does not exist: {file_path}")
            
            print("=== Diagnostic cleaning completed ===\n")
        
        # Allow skipping stages for faster debugging/reprocessing
        start_at_transform = False
        skip_to_analysis = False
        
        if not force_clean:
            user_input = input("Start at transform stage? (y/n, default: n): ").strip().lower()
            if user_input == 'y' or user_input == 'yes':
                start_at_transform = True
            
            user_input = input("Skip to analysis stage? (y/n, default: n): ").strip().lower()
            if user_input == 'y' or user_input == 'yes':
                skip_to_analysis = True
                start_at_transform = True
        
        # Define variables that might be needed later
        alerts_df = None
        city_regions_df = None
        alerts_with_regions = None
        alerts_with_english = None
        central_alerts = None
        filtered_alerts = None
        
        # Extract stage
        if not start_at_transform:
            print("\n=== Extract Stage ===")
            
            # Check if user wants to force fetch new data
            force_fetch = False
            user_input = input("Do you want to fetch fresh data from the API? (y/n, default: n): ").strip().lower()
            if user_input == 'y' or user_input == 'yes':
                force_fetch = True
                # Force delete old file to avoid any mixing with previous data
                if os.path.exists(MERGED_ALARMS_PATH):
                    try:
                        os.remove(MERGED_ALARMS_PATH)
                        print(f"Deleted existing data file: {MERGED_ALARMS_PATH}")
                    except Exception as e:
                        print(f"Warning: Could not delete existing file: {e}")
            
            # Check if merged file already exists to avoid re-fetching
            if os.path.exists(MERGED_ALARMS_PATH) and not force_fetch:
                print(f"Loading existing data from {MERGED_ALARMS_PATH}")
                try:
                    # Try multiple encodings for Hebrew text
                    try:
                        alerts_df = pd.read_csv(MERGED_ALARMS_PATH, encoding='utf-8-sig')
                    except UnicodeDecodeError:
                        try:
                            alerts_df = pd.read_csv(MERGED_ALARMS_PATH, encoding='utf-8')
                        except UnicodeDecodeError:
                            alerts_df = pd.read_csv(MERGED_ALARMS_PATH, encoding='cp1255')
                    
                    # Verify the dataframe has the expected structure
                    if 'data' not in alerts_df.columns:
                        print("Warning: CSV file does not have expected structure")
                        force_fetch = True
                except Exception as e:
                    print(f"Error reading the CSV file: {e}")
                    force_fetch = True
            else:
                force_fetch = True
            
            # Fetch new data if needed or requested
            if force_fetch:
                print("Fetching alerts data from API...")
                
                # Allow user to customize chunk size
                chunk_size = 7  # Default 7 days
                try:
                    user_chunk = input("Enter chunk size in days (default: 7): ").strip()
                    if user_chunk and user_chunk.isdigit():
                        chunk_size = int(user_chunk)
                        if chunk_size < 1 or chunk_size > 30:
                            print("Invalid chunk size. Using default (7 days).")
                            chunk_size = 7
                except:
                    print("Invalid input. Using default chunk size (7 days).")
                
                # Custom date range if needed
                use_custom_dates = False
                custom_start = START_DATE
                custom_end = END_DATE
                
                user_input = input("Use custom date range instead of defaults? (y/n, default: n): ").strip().lower()
                if user_input == 'y' or user_input == 'yes':
                    use_custom_dates = True
                    try:
                        start_date_str = input(f"Enter start date (YYYY-MM-DD, default: {START_DATE.strftime('%Y-%m-%d')}): ").strip()
                        if start_date_str:
                            custom_start = datetime.strptime(start_date_str, '%Y-%m-%d')
                        
                        end_date_str = input(f"Enter end date (YYYY-MM-DD, default: {END_DATE.strftime('%Y-%m-%d')}): ").strip()
                        if end_date_str:
                            custom_end = datetime.strptime(end_date_str, '%Y-%m-%d')
                        
                        print(f"Using custom date range: {custom_start.strftime('%Y-%m-%d')} to {custom_end.strftime('%Y-%m-%d')}")
                    except ValueError as e:
                        print(f"Invalid date format: {e}. Using default date range.")
                        use_custom_dates = False
                
                # Initialize with empty dataframe
                all_dfs = []
                
                # Special handling for Oct 7-8 high volume period
                process_high_volume = False
                user_input = input("Process high-volume days (Oct 7-8, 2023)? (y/n, default: n): ").strip().lower()
                if user_input == 'y' or user_input == 'yes':
                    process_high_volume = True
                    
                # Define high-volume period dates
                oct7_start = datetime(2023, 10, 7, 0, 0)
                oct8_end = datetime(2023, 10, 9, 0, 0)  # End of Oct 8
                
                # Determine effective date range to process
                effective_start = custom_start if use_custom_dates else START_DATE
                effective_end = custom_end if use_custom_dates else END_DATE
                
                # Process data in a way that ensures proper integration
                if process_high_volume and effective_start <= oct8_end and effective_end >= oct7_start:
                    print("\n--- Processing High Volume Period First ---")
                    
                    # Determine the overlap between requested range and high-volume period
                    hv_start = max(effective_start, oct7_start)
                    hv_end = min(effective_end, oct8_end)
                    
                    print(f"Fetching high-volume period: {hv_start.strftime('%Y-%m-%d %H:%M')} - {hv_end.strftime('%Y-%m-%d %H:%M')}")
                    high_volume_df = fetch_high_volume_days(hv_start, hv_end, hours_per_chunk=6)
                    
                    if not high_volume_df.empty:
                        print(f"Successfully fetched {len(high_volume_df)} alerts for high-volume period")
                        all_dfs.append(high_volume_df)
                    else:
                        print("No data found for high-volume period")
                    
                    # Process data before high-volume period if needed
                    if effective_start < oct7_start:
                        print("\n--- Processing Period Before High Volume Days ---")
                        print(f"Fetching data from {effective_start.strftime('%Y-%m-%d')} to {(oct7_start - timedelta(days=1)).strftime('%Y-%m-%d')}...")
                        
                        before_df = fetch_alerts_in_chunks(
                            effective_start,
                            oct7_start - timedelta(days=1),
                            initial_chunk_days=chunk_size,
                            retry_days=max(1, chunk_size // 3)
                        )
                        
                        if not before_df.empty:
                            print(f"Successfully fetched {len(before_df)} alerts for period before high-volume days")
                            all_dfs.append(before_df)
                        else:
                            print("No data found for period before high-volume days")
                    
                    # Process data after high-volume period if needed
                    if effective_end > oct8_end:
                        print("\n--- Processing Period After High Volume Days ---")
                        # MODIFIED LINE: Start from oct8_end which is 2023-10-09 00:00
                        # This includes Oct 9 in the fetch
                        print(f"Fetching data from {oct8_end.strftime('%Y-%m-%d')} to {effective_end.strftime('%Y-%m-%d')}...")
                        
                        # MODIFIED LINE: Removed the +timedelta(days=1) to start from 
                        # exactly where the high-volume period ended (Oct 9 00:00)
                        after_df = fetch_alerts_in_chunks(
                            oct8_end,  # This is 2023-10-09 00:00
                            effective_end,
                            initial_chunk_days=chunk_size,
                            retry_days=max(1, chunk_size // 3)
                        )
                        
                        if not after_df.empty:
                            print(f"Successfully fetched {len(after_df)} alerts for period after high-volume days")
                            all_dfs.append(after_df)
                        else:
                            print("No data found for period after high-volume days")
                
                else:
                    # Normal processing (no high-volume days or they're not in our range)
                    print("\n--- Processing Entire Date Range Normally ---")
                    print(f"Fetching data from {effective_start.strftime('%Y-%m-%d')} to {effective_end.strftime('%Y-%m-%d')}...")
                    
                    normal_df = fetch_alerts_in_chunks(
                        effective_start,
                        effective_end,
                        initial_chunk_days=chunk_size,
                        retry_days=max(1, chunk_size // 3)
                    )
                    
                    if not normal_df.empty:
                        print(f"Successfully fetched {len(normal_df)} alerts for the entire period")
                        all_dfs.append(normal_df)
                    else:
                        print("No data found for the requested period")
                
                # Combine all data chunks
                if all_dfs:
                    print("\n--- Combining All Data ---")
                    alerts_df = pd.concat(all_dfs, ignore_index=True)
                    
                    # Remove any duplicates that might have occurred in overlapping fetches
                    before_dedup = len(alerts_df)
                    alerts_df = alerts_df.drop_duplicates()
                    after_dedup = len(alerts_df)
                    if before_dedup > after_dedup:
                        print(f"Removed {before_dedup - after_dedup} duplicate alerts")
                    
                    # Display data statistics
                    print("\nData statistics:")
                    print(f"Total alerts fetched: {len(alerts_df)}")
                    
                    if 'date' in alerts_df.columns:
                        # Convert to datetime for easier manipulation if not already
                        if not pd.api.types.is_datetime64_any_dtype(alerts_df['date']):
                            try:
                                alerts_df['date_dt'] = pd.to_datetime(alerts_df['date'], format='%d.%m.%Y')
                                date_min = alerts_df['date_dt'].min().strftime('%Y-%m-%d')
                                date_max = alerts_df['date_dt'].max().strftime('%Y-%m-%d')
                                alerts_df = alerts_df.drop('date_dt', axis=1)
                            except:
                                date_min = alerts_df['date'].min()
                                date_max = alerts_df['date'].max()
                        else:
                            date_min = alerts_df['date'].min().strftime('%Y-%m-%d')
                            date_max = alerts_df['date'].max().strftime('%Y-%m-%d')
                            
                        print(f"Date range in data: {date_min} to {date_max}")
                    
                    # Count alerts by date
                    if 'date' in alerts_df.columns:
                        date_counts = alerts_df.groupby('date').size()
                        print("\nAlerts by date (top 10):")
                        for date, count in date_counts.sort_values(ascending=False).head(10).items():
                            print(f"  {date}: {count} alerts")
                    
                    # Save the combined data
                    print("\n--- Saving Combined Data ---")
                    # Save with a timestamp to ensure we don't mix data
                    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
                    backup_path = f"merged_alarms_data_{timestamp}.csv"
                    save_to_csv(alerts_df, backup_path)
                    print(f"Backup saved to: {backup_path}")
                    
                    # Save to main file
                    save_to_csv(alerts_df, MERGED_ALARMS_PATH)
                    print(f"Combined data saved to: {MERGED_ALARMS_PATH}")
                else:
                    print("No data was fetched from any source. Cannot continue.")
                    return
            
            # Extract unique locations and geocode them
            if alerts_df is None or alerts_df.empty:
                print("Error: No alert data available. Cannot continue.")
                return
                
            # Check if city regions file already exists
            if os.path.exists(CITIES_REGIONS_PATH) and not force_fetch:
                print(f"Loading existing city region data from {CITIES_REGIONS_PATH}")
                city_regions_df = pd.read_excel(CITIES_REGIONS_PATH)
            else:
                print("\n--- Extracting and Geocoding Locations ---")
                print("Extracting unique locations...")
                unique_locations = extract_unique_locations(alerts_df)
                print(f"Found {len(unique_locations)} unique locations")
                
                print("Geocoding locations...")
                city_regions_df = batch_geocode_locations(unique_locations)
                save_to_excel(city_regions_df, CITIES_REGIONS_PATH)
                print(f"Location data saved to: {CITIES_REGIONS_PATH}")
        
        else:
            # If starting at transform stage, load existing files
            print("\n=== Extract Stage ===")
            print("Skipping extraction, loading existing files...")
            try:
                # Try multiple encodings for Hebrew text
                try:
                    alerts_df = pd.read_csv(MERGED_ALARMS_PATH, encoding='utf-8-sig')
                except UnicodeDecodeError:
                    try:
                        alerts_df = pd.read_csv(MERGED_ALARMS_PATH, encoding='utf-8')
                    except UnicodeDecodeError:
                        alerts_df = pd.read_csv(MERGED_ALARMS_PATH, encoding='cp1255')
                
                city_regions_df = pd.read_excel(CITIES_REGIONS_PATH)
                
                print(f"Loaded {len(alerts_df)} alerts and {len(city_regions_df)} locations")
            except Exception as e:
                print(f"Error loading existing files: {e}")
                print("Cannot continue without data files.")
                return
        
        # Transform stage
        if not skip_to_analysis:
            print("\n=== Transform Stage ===")
            
            # Apply manual region mapping for missing locations
            print("Applying manual region mappings...")
            updated_regions_df = apply_manual_region_mapping(city_regions_df)
            save_to_excel(updated_regions_df, CITIES_REGIONS_UPDATED_PATH)
            
            # Add region information to alerts
            print("Adding region information to alerts...")
            alerts_with_regions = add_region_to_alerts(alerts_df, updated_regions_df)
            
            # Translate regions to English
            print("Translating regions to English...")
            alerts_with_english = translate_regions_to_english(alerts_with_regions)
            
            # Filter by central regions
            print("Filtering by central regions...")
            central_alerts = filter_by_regions(alerts_with_english, CENTRAL_REGIONS)
            
            # Filter by specific dates
            print("Filtering by specific dates...")
            filtered_alerts = filter_by_dates(central_alerts, FILTER_DATES)
            
            # Remove temporary columns used for filtering
            if 'date_str' in filtered_alerts.columns:
                filtered_alerts = filtered_alerts.drop('date_str', axis=1)
        else:
            print("\n=== Transform Stage ===")
            print("Skipping transformation, loading existing files...")
            # Load necessary file for analysis
            if os.path.exists(FINAL_FILTERED_ALARMS_PATH):
                filtered_alerts = pd.read_excel(FINAL_FILTERED_ALARMS_PATH)
                print(f"Loaded {len(filtered_alerts)} filtered alerts for analysis")
            else:
                print(f"Error: {FINAL_FILTERED_ALARMS_PATH} not found. Cannot skip to analysis.")
                skip_to_analysis = False
                
                # Must perform transform stage
                # Apply manual region mapping for missing locations
                print("Applying manual region mappings...")
                updated_regions_df = apply_manual_region_mapping(city_regions_df)
                save_to_excel(updated_regions_df, CITIES_REGIONS_UPDATED_PATH)
                
                # Add region information to alerts
                print("Adding region information to alerts...")
                alerts_with_regions = add_region_to_alerts(alerts_df, updated_regions_df)
                
                # Translate regions to English
                print("Translating regions to English...")
                alerts_with_english = translate_regions_to_english(alerts_with_regions)
                
                # Filter by central regions
                print("Filtering by central regions...")
                central_alerts = filter_by_regions(alerts_with_english, CENTRAL_REGIONS)
                
                # Filter by specific dates
                print("Filtering by specific dates...")
                filtered_alerts = filter_by_dates(central_alerts, FILTER_DATES)
                
                # Remove temporary columns used for filtering
                if 'date_str' in filtered_alerts.columns:
                    filtered_alerts = filtered_alerts.drop('date_str', axis=1)
        
        # Load stage
        if not skip_to_analysis:
            print("\n=== Load Stage ===")
            
            print("Saving alerts with regions...")
            save_to_excel(alerts_with_regions, ALARMS_WITH_REGIONS_PATH)
            
            print("Saving alerts with English regions...")
            save_to_excel(alerts_with_english, ALARMS_WITH_ENGLISH_REGIONS_PATH)
            
            print("Saving central region alerts...")
            save_to_excel(central_alerts, CENTRAL_REGIONS_ALARMS_PATH)
            
            print("Saving final filtered alerts...")
            save_to_excel(
                filtered_alerts, 
                FINAL_FILTERED_ALARMS_PATH,
                sheet_name='Filtered Alarms'
            )
        
        # Analysis stage
        print("\n=== Analysis Stage ===")
        analyze_results(filtered_alerts)
        
        print("\nETL Pipeline completed successfully!")
    
    except Exception as e:
        print(f"\nError in ETL pipeline: {e}")
        import traceback
        traceback.print_exc()
        print("\nETL Pipeline completed with errors.")


# Run the pipeline if script is executed directly
if __name__ == "__main__":
    main()

          Red Alert ETL Pipeline
This pipeline extracts alerts data from פיקוד העורף (Oref)
processes it, and exports filtered data for analysis.
------------------------------------------------------------

=== Extract Stage ===
Skipping extraction, loading existing files...
Loaded 9791 alerts and 878 locations

=== Transform Stage ===
Skipping transformation, loading existing files...
Error: central_regions_66days_sorted.xlsx not found. Cannot skip to analysis.
Applying manual region mappings...
Data saved to Excel: israel_cities_regions_updated.xlsx
Adding region information to alerts...
Translating regions to English...
Filtering by central regions...
Filtering by specific dates...

=== Load Stage ===
Saving alerts with regions...


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['date'] = pd.to_datetime(df['date'], format='%d.%m.%Y')
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['date_str'] = df['date'].dt.strftime('%Y-%m-%d')


Data saved to Excel: alarms_with_regions.xlsx
Saving alerts with English regions...
Data saved to Excel: alarms_with_english_regions.xlsx
Saving central region alerts...


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df[col] = df[col].apply(lambda x: x if not isinstance(x, str) else x)


Data saved to Excel: central_regions_alarms.xlsx
Saving final filtered alerts...
Data saved to Excel: central_regions_66days_sorted.xlsx

=== Analysis Stage ===

=== Analysis ===
Alert counts by date:
date
2023-10-08     26
2023-10-09     65
2023-10-10     46
2023-10-11     40
2023-10-12     14
2023-10-15     36
2023-10-16     69
2023-10-17    134
2023-10-18     18
2023-10-19     24
2023-10-22      3
2023-10-24    128
2023-10-25     25
2023-10-26    106
2023-10-29     16
2023-10-30     40
2023-10-31     47
2023-11-01     29
2023-11-02     30
2023-11-05     75
2023-11-06      1
2023-11-07     51
2023-11-09      2
2023-11-13     24
2023-11-14      5
2023-11-15      1
2023-11-20    137
2023-11-21     14
2023-12-03      1
2023-12-04     11
2023-12-05     36
2023-12-11     16
2023-12-13      4
2023-12-19     59
2023-12-21     86
dtype: int64

Alert counts by region:
region_en
CENTRAL_DISTRICT     1075
TEL_AVIV_DISTRICT     280
DAN_DISTRICT           44
SHARON_DISTRICT        20
Name: count,