In [5]:
from google_maps_geocoder.geocoder import GoogleGeocoder
import pandas as pd

# Initialize the GoogleGeocoder
api_key = "AIzaSyD1yJXIQeY8oucCAAo0ekWXS-Jp0QjwEUQ"
geocoder = GoogleGeocoder(api_key)

# Load the dataset
input_data = pd.read_csv('./example_address.csv')

# Clean and prepare the data
destinations, needs_geocoding = geocoder.cleanup_pd(input_data)

# Perform geocoding
if needs_geocoding:
    final_dest_df = geocoder.geocode_addresses(destinations, needs_geocoding)
    final_dest_df.to_csv('geocoded_results.csv', index=False)
else:
    print("Data already contains coordinates.")


Error processing address '123 Maple St,Boston,MA,2110': 'results'
Error processing address '456 Oak Ave,New York,NY,10001': 'results'
Error processing address '789 Pine Rd,Chicago,IL,60601': 'results'
Error processing address '101 Birch Blvd,San Francisco,CA,94105': 'results'
Error processing address '202 Cedar Ln,Los Angeles,CA,90001': 'results'
Error processing address '303 Elm St,Seattle,WA,98101': 'results'
Error processing address '404 Walnut Dr,Denver,CO,80202': 'results'
Error processing address '505 Redwood Way,Austin,TX,73301': 'results'
Error processing address '606 Ash St,Miami,FL,33101': 'results'
Error processing address '707 Palm Ave,San Diego,CA,92101': 'results'


In [None]:
# google_geocoder.py
import pandas as pd
import time
import re
import requests
import logging

# Configure logging to capture WARNING level messages
logging.basicConfig(
    level=logging.WARNING,  # Set the minimum logging level to WARNING
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',  # Log format
)

class GoogleGeocoder:
    """
    A class for interacting with the Google Geocoding API and performing geocoding on datasets.
    """

    def __init__(self, api_key, return_full_results=False):
        """
        Initialize the GoogleGeocoder class.
        
        :param api_key: Google API key for accessing the Geocoding API.
        :param return_full_results: Boolean indicating if the full API response should be returned.
        """
        self.api_key = api_key
        self.return_full_results = return_full_results
        self.logger = logging.getLogger('google_maps_geocoder.geocoder')

        # Test the connection upon initialization
        self.test_connection()


    def test_connection(self):
        """
        Test the connection to the Google Geocoding API by handling various statuses,
        including the case where the request limit is exceeded.
        """
        test_address = "1600 Pennsylvania Ave, Washington, DC"
        try:
            response = self.get_google_results(test_address)
            status = response.get('status')

            if status == 'OVER_QUERY_LIMIT':
                # Log and raise a more specific error for query limit exceeded
                self.logger.warning(f"Google Geocode API returned an unexpected status: {status}")
                raise ConnectionError(f"Unexpected status from Google Geocode API: {status}")

            if status != 'OK':
                # Log other unexpected statuses
                self.logger.warning(f"Unexpected status from Google Geocode API: {status}")
                raise ConnectionError(f"Unexpected status from Google Geocode API: {status}")

            self.logger.info('Google Geocoder API connection successful!')

        except KeyError as e:
            # Handle the case where 'results' is missing
            self.logger.warning(f"Missing 'results' in the response: {e}")
            raise ConnectionError("There was an error with the response from Google Geocode.")
        except requests.exceptions.RequestException as e:
            # Handle general connection errors
            self.logger.warning(f"RequestException occurred: {e}")
            raise ConnectionError("There was an error with the request to the Google Geocoding API.")
        except Exception as e:
            # Catch any other unexpected errors
            self.logger.warning(f"Unexpected error occurred: {e}")
            raise ConnectionError("An unexpected error occurred while testing the connection to Google Geocoding API.")

        
    def cleanup_pd(self, destinations):
        """
        Cleans and preprocesses the input DataFrame for geocoding.
        
        :param destinations: DataFrame containing location data.
        :return: Tuple (cleaned DataFrame, boolean indicating if geocoding is needed).
        """
        try:
            destinations = destinations.dropna(how='all')
            filter_df_dest = destinations.filter(regex=re.compile(r"^lat.*|^Y$|^geo.*lat|^lon.*|^X$|^geo.*lon", re.IGNORECASE))
            dest_col_names = list(filter_df_dest.columns)
            if len(dest_col_names) > 0:
                destinations = destinations.rename(columns={dest_col_names[1]: 'Longitude', dest_col_names[0]: 'Latitude'})
                destinations['Coords'] = list(zip(destinations['Latitude'], destinations['Longitude']))
                if any(x[0] is None for x in destinations['Coords']):
                    destinations.drop(columns=['Latitude', 'Longitude', 'Coords'], inplace=True)
                    filter_df_dest = destinations.filter(regex=re.compile(r"address.*|city$|town$|state$|zip code.*|zipcode.*|zip*|Postal*|prov*", re.IGNORECASE))
                    destinations['ADDRESS_FULL'] = filter_df_dest.apply(lambda y: ','.join(y.dropna().astype(str)), axis=1)
        except Exception as e:
            print(f'Error cleaning destinations dataset: {e}')

        if 'Coords' not in destinations.columns:
            filter_df_dest = destinations.filter(regex=re.compile(r"address.*|city$|town$|state$|zip code.*|zipcode.*|zip*|Postal*|prov*", re.IGNORECASE))
            destinations['ADDRESS_FULL'] = filter_df_dest.apply(lambda y: ','.join(y.dropna().astype(str)), axis=1)
        return destinations, 'Coords' not in destinations.columns

    def get_google_results(self, address):
        """
        Fetch geocode results from the Google Maps Geocoding API.
        
        :param address: Address string to geocode.
        :return: Dictionary containing geocode information.
        """
        geocode_url = f"https://maps.googleapis.com/maps/api/geocode/json?address={address}&key={self.api_key}"
        
        try:
            response = requests.get(geocode_url)
            response.raise_for_status()  # Raise an error for bad responses (e.g., 4xx, 5xx)
            data = response.json()
            
            # Log the full response for debugging purposes
            self.logger.debug(f"Google API response: {data}")

            # Check if the response contains 'results'
            if 'results' not in data:
                self.logger.warning("Google Geocode API response does not contain 'results'.")
                raise KeyError("No 'results' in response from Google Geocode API")

            # If no results found, return a default response
            if not data['results']:
                return {
                    "formatted_address": None, "latitude": None, "longitude": None,
                    "accuracy": None, "google_place_id": None, "type": None, "postcode": None,
                    "input_string": address, "number_of_results": 0, "status": data.get('status')
                }

            # Extract the first result
            answer = data['results'][0]
            return {
                "formatted_address": answer.get('formatted_address'),
                "latitude": answer.get('geometry', {}).get('location', {}).get('lat'),
                "longitude": answer.get('geometry', {}).get('location', {}).get('lng'),
                "accuracy": answer.get('geometry', {}).get('location_type'),
                "google_place_id": answer.get("place_id"),
                "type": ",".join(answer.get('types', [])),
                "postcode": ",".join([x['long_name'] for x in answer.get('address_components', []) if 'postal_code' in x.get('types', [])]),
                "input_string": address,
                "number_of_results": len(data['results']),
                "status": data.get('status'),
                "response": data if self.return_full_results else None
            }
        
        except requests.exceptions.RequestException as e:
            # Handle network or API request errors
            self.logger.warning(f"Error occurred during the request: {e}")
            raise ConnectionError("Problem with request to Google Geocode API")

        except KeyError as e:
            # Handle missing 'results' or other unexpected response structures
            self.logger.warning(f"KeyError: {str(e)}")
            raise

        except Exception as e:
            # Catch any other unexpected errors
            self.logger.warning(f"Unexpected error: {str(e)}")
            raise

    def geocode_addresses(self, destinations, destinations_value):
        """
        Geocodes a list of addresses and appends results to the DataFrame.
        
        :param destinations: DataFrame containing location data.
        :param destinations_value: Boolean indicating if geocoding is needed.
        :return: Updated DataFrame with geocoded coordinates.
        """
        if not destinations_value:
            print("Destinations are pre-geocoded and the Coords column is present.")
            return destinations

        # Initialize columns
        destinations['latitude'] = None
        destinations['longitude'] = None
        destinations['Coords'] = None

        addresses = destinations['ADDRESS_FULL'].tolist()
        results = []

        for address in addresses:
            retries = 0
            while retries < 5:
                try:
                    result = self.get_google_results(address)
                    if result is None:
                        time.sleep(60)  # Simulate waiting for query limits
                        retries += 1
                        continue

                    location = result['results'][0]['geometry']['location']
                    results.append({
                        'latitude': location['lat'],
                        'longitude': location['lng']
                    })
                    break
                except (KeyError, IndexError):
                    print(f"Error processing address '{address}': 'results'")
                    results.append({'latitude': None, 'longitude': None})
                    break

            if retries >= 5:
                results.append({'latitude': None, 'longitude': None})

        # Ensure the length matches the DataFrame
        while len(results) < len(addresses):
            results.append({'latitude': None, 'longitude': None})

        # Update the DataFrame
        for i, result in enumerate(results):
            destinations.at[i, 'latitude'] = result['latitude']
            destinations.at[i, 'longitude'] = result['longitude']

        destinations['Coords'] = list(zip(destinations['latitude'], destinations['longitude']))
        return destinations