In [None]:
"""
Google Places API Enrichment for LocalEats
Enriches existing Yelp restaurant data with Google Places attributes
"""

import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
import pandas as pd
import requests
import time
import json
from datetime import datetime
from typing import Dict, Optional, List

# =====================================================
# CONFIGURATION
# =====================================================

GOOGLE_PLACES_API_KEY = ''  # Replace with your actual key

SNOWFLAKE_CONFIG = {
    'user': 'FERRET',
    'password': '',
    'account': '',
    'warehouse': '',
    'database': '',
    'schema': '',
    'role': ''
}

# Fields to request from Google Places API
PLACE_DETAILS_FIELDS = [
    'id',
    'displayName',
    'formattedAddress',
    'location',
    'rating',
    'userRatingCount',
    'priceLevel',
    'businessStatus',
    'types',
    
    # Hours & Availability
    'regularOpeningHours',
    'currentOpeningHours',
    
    # Dietary Options
    'servesCoffee',
    'servesDessert',
    'servesBreakfast',
    'servesLunch',
    'servesDinner',
    'servesBeer',
    'servesWine',
    'servesVegetarianFood',
    
    # Service Options
    'dineIn',
    'takeout',
    'delivery',
    'reservable',
    'outdoorSeating',
    
    # Accessibility
    'accessibilityOptions',
    
    # Group/Family
    'goodForChildren',
    'goodForGroups',
    'allowsDogs',
    
    # Atmosphere
    'liveMusic',
    'menuForChildren',
    'restroom'
]

# =====================================================
# GOOGLE PLACES API FUNCTIONS
# =====================================================

def text_search_place(restaurant_name: str, address: str, city: str, api_key: str) -> Optional[str]:
    """
    Search Google Places using Text Search to find place_id
    Returns place_id if found, None otherwise
    """
    
    # Build search query
    query = f"{restaurant_name}, {address}, {city}, MA"
    
    url = "https://places.googleapis.com/v1/places:searchText"
    
    headers = {
        'Content-Type': 'application/json',
        'X-Goog-Api-Key': api_key,
        'X-Goog-FieldMask': 'places.id,places.displayName,places.formattedAddress'
    }
    
    payload = {
        "textQuery": query,
        "maxResultCount": 1  # We only want the top match
    }
    
    try:
        response = requests.post(url, headers=headers, json=payload)
        
        if response.status_code == 200:
            data = response.json()
            places = data.get('places', [])
            
            if places:
                place_id = places[0].get('id')
                return place_id
            else:
                return None
                
        else:
            print(f"  ‚ö†Ô∏è  Text Search Error {response.status_code}: {response.text[:200]}")
            return None
            
    except Exception as e:
        print(f"  ‚ùå Error in text_search: {e}")
        return None


def get_place_details(place_id: str, api_key: str) -> Optional[Dict]:
    """
    Get detailed information about a place using Place Details API
    """
    
    # The place_id from search already includes "places/" prefix
    # So we need to construct the full URL correctly
    if not place_id.startswith('places/'):
        place_id = f'places/{place_id}'
    
    url = f"https://places.googleapis.com/v1/{place_id}"
    
    # Construct field mask with proper prefixes
    field_mask_parts = [
        'id',
        'displayName',
        'formattedAddress',
        'location',
        'rating',
        'userRatingCount',
        'priceLevel',
        'businessStatus',
        'types',
        'regularOpeningHours',
        'currentOpeningHours',
        'servesCoffee',
        'servesDessert',
        'servesBreakfast',
        'servesLunch',
        'servesDinner',
        'servesBeer',
        'servesWine',
        'servesVegetarianFood',
        'dineIn',
        'takeout',
        'delivery',
        'reservable',
        'outdoorSeating',
        'accessibilityOptions',
        'goodForChildren',
        'goodForGroups',
        'allowsDogs',
        'liveMusic',
        'menuForChildren',
        'restroom'
    ]
    
    headers = {
        'Content-Type': 'application/json',
        'X-Goog-Api-Key': api_key,
        'X-Goog-FieldMask': ','.join(field_mask_parts)
    }
    
    try:
        response = requests.get(url, headers=headers)
        
        if response.status_code == 200:
            return response.json()
        else:
            print(f"  ‚ö†Ô∏è  Place Details Error {response.status_code}: {response.text[:200]}")
            return None
            
    except Exception as e:
        print(f"  ‚ùå Error getting details: {e}")
        return None


def extract_enrichment_data(place_details: Dict, place_id: str) -> Dict:
    """
    Extract relevant fields from Google Places response
    """
    
    # Helper to safely get nested values
    def safe_get(data, *keys, default=None):
        for key in keys:
            if isinstance(data, dict):
                data = data.get(key)
            else:
                return default
        return data if data is not None else default
    
    # Extract opening hours info
    opening_hours = safe_get(place_details, 'regularOpeningHours', default={})
    current_hours = safe_get(place_details, 'currentOpeningHours', default={})
    
    # Build weekday text (e.g., "Monday: 11:00 AM - 10:00 PM")
    weekday_text = opening_hours.get('weekdayDescriptions', [])
    hours_text = ' | '.join(weekday_text) if weekday_text else None
    
    # Extract accessibility
    accessibility = safe_get(place_details, 'accessibilityOptions', default={})
    
    enrichment = {
        'google_place_id': place_id,
        'google_name': safe_get(place_details, 'displayName', 'text'),
        'google_address': safe_get(place_details, 'formattedAddress'),
        'google_rating': safe_get(place_details, 'rating'),
        'google_review_count': safe_get(place_details, 'userRatingCount'),
        'google_price_level': safe_get(place_details, 'priceLevel'),
        'business_status': safe_get(place_details, 'businessStatus'),
        'place_types': '|'.join(safe_get(place_details, 'types', default=[])),
        
        # Hours
        'opening_hours_text': hours_text,
        'open_now': safe_get(current_hours, 'openNow'),
        
        # Dietary
        'serves_coffee': safe_get(place_details, 'servesCoffee'),
        'serves_dessert': safe_get(place_details, 'servesDessert'),
        'serves_breakfast': safe_get(place_details, 'servesBreakfast'),
        'serves_lunch': safe_get(place_details, 'servesLunch'),
        'serves_dinner': safe_get(place_details, 'servesDinner'),
        'serves_beer': safe_get(place_details, 'servesBeer'),
        'serves_wine': safe_get(place_details, 'servesWine'),
        'serves_vegetarian': safe_get(place_details, 'servesVegetarianFood'),
        
        # Service Options
        'dine_in': safe_get(place_details, 'dineIn'),
        'takeout': safe_get(place_details, 'takeout'),
        'delivery': safe_get(place_details, 'delivery'),
        'reservable': safe_get(place_details, 'reservable'),
        'outdoor_seating': safe_get(place_details, 'outdoorSeating'),
        
        # Accessibility
        'wheelchair_accessible_entrance': safe_get(accessibility, 'wheelchairAccessibleEntrance'),
        'wheelchair_accessible_parking': safe_get(accessibility, 'wheelchairAccessibleParking'),
        'wheelchair_accessible_restroom': safe_get(accessibility, 'wheelchairAccessibleRestroom'),
        'wheelchair_accessible_seating': safe_get(accessibility, 'wheelchairAccessibleSeating'),
        
        # Atmosphere
        'good_for_children': safe_get(place_details, 'goodForChildren'),
        'good_for_groups': safe_get(place_details, 'goodForGroups'),
        'allows_dogs': safe_get(place_details, 'allowsDogs'),
        'live_music': safe_get(place_details, 'liveMusic'),
        'has_restroom': safe_get(place_details, 'restroom'),
        
        'enriched_at': datetime.now().isoformat()
    }
    
    return enrichment

# =====================================================
# MAIN ENRICHMENT PIPELINE
# =====================================================

def enrich_restaurants():
    """
    Main function to enrich all restaurants
    """
    
    print("üöÄ Starting Google Places Enrichment Pipeline\n")
    start_time = datetime.now()
    
    # Connect to Snowflake
    print("üìä Connecting to Snowflake...")
    conn = snowflake.connector.connect(**SNOWFLAKE_CONFIG)
    cursor = conn.cursor()
    
    # Fetch restaurants from Gold table
    print("üì• Fetching restaurants from Gold table...")
    query = """
    SELECT 
        RESTAURANT_ID,
        RESTAURANT_NAME,
        STREET_ADDRESS,
        CITY,
        STATE,
        POSTAL_CODE,
        LATITUDE,
        LONGITUDE
    FROM LOCEATS_DB.DBT_SKASIREDDY_MARTS.GOLD_RESTAURANTS_MASTER
    """
    
    cursor.execute(query)
    restaurants_df = cursor.fetch_pandas_all()
    
    print(f"‚úÖ Found {len(restaurants_df)} restaurants to enrich\n")
    
    # Check for existing enrichments
    try:
        cursor.execute("SELECT RESTAURANT_ID FROM BRONZE_GOOGLE_PLACES_ENRICHMENT")
        existing = set(row[0] for row in cursor.fetchall())
        print(f"üìã Found {len(existing)} already enriched restaurants")
        restaurants_df = restaurants_df[~restaurants_df['RESTAURANT_ID'].isin(existing)]
        print(f"üéØ Will enrich {len(restaurants_df)} new restaurants\n")
    except:
        print("üìã No existing enrichments found (table may not exist yet)\n")
        existing = set()
    
    if len(restaurants_df) == 0:
        print("‚úÖ All restaurants already enriched!")
        conn.close()
        return
    
    # Enrichment loop
    enriched_data = []
    match_stats = {'found': 0, 'not_found': 0, 'errors': 0}
    
    print("=" * 60)
    print("üîç Starting Enrichment Process")
    print("=" * 60)
    
    for idx, row in restaurants_df.iterrows():
        restaurant_id = row['RESTAURANT_ID']
        name = row['RESTAURANT_NAME']
        address = row['STREET_ADDRESS']
        city = row['CITY']
        
        progress = idx + 1
        
        # Step 1: Find place_id using Text Search
        place_id = text_search_place(name, address, city, GOOGLE_PLACES_API_KEY)
        
        if not place_id:
            match_stats['not_found'] += 1
            time.sleep(0.1)
            continue
        
        # Step 2: Get detailed information
        details = get_place_details(place_id, GOOGLE_PLACES_API_KEY)
        
        if not details:
            match_stats['errors'] += 1
            time.sleep(0.1)
            continue
        
        # Step 3: Extract enrichment data
        enrichment = extract_enrichment_data(details, place_id)
        enrichment['restaurant_id'] = restaurant_id
        
        enriched_data.append(enrichment)
        match_stats['found'] += 1
        
        # Rate limiting - be conservative
        time.sleep(0.2)
        
        # Progress update every 50 restaurants
        if progress % 50 == 0:
            success_rate = (match_stats['found'] / progress * 100)
            print(f"üìä Progress: {progress}/{len(restaurants_df)} | Enriched: {match_stats['found']} | Success Rate: {success_rate:.1f}%")
        
        # Save checkpoint every 100 restaurants
        if len(enriched_data) % 100 == 0 and len(enriched_data) > 0:
            print(f"üíæ Checkpoint: Saving {len(enriched_data)} enrichments to Snowflake...")
            save_to_snowflake(enriched_data, conn)
            enriched_data = []
    
    # Save remaining data
    if len(enriched_data) > 0:
        print(f"\nüíæ Saving final {len(enriched_data)} enrichments...")
        save_to_snowflake(enriched_data, conn)
    
    # Print summary
    end_time = datetime.now()
    duration = (end_time - start_time).total_seconds() / 60
    
    print("\n" + "=" * 60)
    print("‚úÖ ENRICHMENT COMPLETE!")
    print("=" * 60)
    print(f"Time taken: {duration:.1f} minutes")
    print(f"Successfully enriched: {match_stats['found']}")
    print(f"Not found in Google: {match_stats['not_found']}")
    print(f"Errors: {match_stats['errors']}")
    print(f"Success rate: {match_stats['found'] / len(restaurants_df) * 100:.1f}%")
    print(f"\nData loaded into: LOCEATS_DB.BRONZE.BRONZE_GOOGLE_PLACES_ENRICHMENT")
    
    conn.close()


def save_to_snowflake(data: List[Dict], conn):
    """Save enriched data to Snowflake"""
    
    df = pd.DataFrame(data)
    df.columns = df.columns.str.upper()
    
    try:
        success, nchunks, nrows, _ = write_pandas(
            conn,
            df,
            'BRONZE_GOOGLE_PLACES_ENRICHMENT',
            database='LOCEATS_DB',
            schema='BRONZE',
            auto_create_table=True,
            quote_identifiers=False
        )
        print(f"  ‚úÖ Saved {nrows} records to Snowflake")
    except Exception as e:
        print(f"  ‚ùå Error saving to Snowflake: {e}")


# =====================================================
# RUN
# =====================================================

if __name__ == "__main__":
    enrich_restaurants()

üöÄ Starting Google Places Enrichment Pipeline

üìä Connecting to Snowflake...
üì• Fetching restaurants from Gold table...
‚úÖ Found 3227 restaurants to enrich

üìã No existing enrichments found (table may not exist yet)

üîç Starting Enrichment Process
üìä Progress: 50/3227 | Enriched: 50 | Success Rate: 100.0%
üìä Progress: 100/3227 | Enriched: 100 | Success Rate: 100.0%
üíæ Checkpoint: Saving 100 enrichments to Snowflake...
  ‚úÖ Saved 100 records to Snowflake
üìä Progress: 150/3227 | Enriched: 150 | Success Rate: 100.0%
üìä Progress: 200/3227 | Enriched: 200 | Success Rate: 100.0%
üíæ Checkpoint: Saving 100 enrichments to Snowflake...
  ‚úÖ Saved 100 records to Snowflake
üìä Progress: 250/3227 | Enriched: 249 | Success Rate: 99.6%
üìä Progress: 300/3227 | Enriched: 299 | Success Rate: 99.7%
üíæ Checkpoint: Saving 100 enrichments to Snowflake...
  ‚úÖ Saved 100 records to Snowflake
üìä Progress: 350/3227 | Enriched: 349 | Success Rate: 99.7%
üìä Progress: 400/3227 | 