In [23]:
import os
import sys
import csv
import re
import ssl
import asyncio
import traceback
import aiohttp
import backoff
import certifi
import pkg_resources
from tqdm.notebook import tqdm
from opencage.geocoder import OpenCageGeocode, OpenCageGeocodeError
import pandas as pd

# Use certificates from the certifi package instead of those of the operating system
sslcontext = ssl.create_default_context(cafile=certifi.where())

API_KEY = 'f015dd07c27a46afb460d86008c997c2'  # replace with your actual OpenCage Data API key
FORWARD_OR_REVERSE = 'reverse'  # 'forward' (address -> coordinates) or 'reverse' (coordinates -> address)
API_DOMAIN = 'api.opencagedata.com'
MAX_ITEMS = 100  # How many lines to read from the input file. Set to 0 for unlimited
NUM_WORKERS = 3  # For 10 requests per second try 2-5
REQUEST_TIMEOUT_SECONDS = 5  # For individual HTTP requests. Default is 1
RETRY_MAX_TRIES = 10  # How often to retry if a HTTP request times out
RETRY_MAX_TIME = 60  # Limit in seconds for retries
SHOW_PROGRESS = True  # Show progress bar

# Add the 'scripts' directory to the Python path
sys.path.append(os.path.abspath(os.path.join('..', 'scripts')))

import extract_to_mongodb as etm
import db_utils as dbu

collection_name = os.getenv('COLLECTION_NAME_CLEANED')
naturalearth_lowres = os.getenv('NATURALEARTH_SHAPEFILE_PATH')

In [24]:
print(f'Collection name: {collection_name}')

Collection name: wildfire_cleaned_data


In [25]:


# Load the cleaned data from MongoDB
geo_wfp = dbu.load_all_data_from_mongodb(collection_name)

In [None]:


# Check OpenCage geocoder is the latest version
minimum_required_version = '2.3.1'
package_version = pkg_resources.get_distribution('opencage').version
if pkg_resources.parse_version(package_version) < pkg_resources.parse_version(minimum_required_version):
    sys.stderr.write(f"At least version {minimum_required_version} of opencage geocoder package required. ")
    sys.stderr.write(f"Try upgrading by running 'pip install --upgrade opencage'.\n")
    sys.exit(1)

# Check API key present
if len(API_KEY) < 32:
    sys.stderr.write(f"API_KEY '{API_KEY}' does not look valid.\n")
    sys.exit(1)

PROGRESS_BAR = SHOW_PROGRESS and tqdm(total=0, position=0, desc="Addresses geocoded", dynamic_ncols=True)


ImportError: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html

In [None]:

def guess_text_is_coordinate_pair(text):
    coordinate_pattern = r'^(-?\d+(\.\d+)?),(-?\d+(\.\d+)?)$'
    return bool(re.search(coordinate_pattern, text))

async def write_one_geocoding_result(geocoding_result, address, address_id, results):
    if geocoding_result is not None:
        result = {
            'id': address_id,
            'lat': geocoding_result['geometry']['lat'],
            'lng': geocoding_result['geometry']['lng'],
            '_type': geocoding_result['components'].get('_type', ''),
            'country': geocoding_result['components'].get('country', ''),
            'county': geocoding_result['components'].get('county', ''),
            'city': geocoding_result['components'].get('city', ''),
            'postcode': geocoding_result['components'].get('postcode', ''),
            'road': geocoding_result['components'].get('road', ''),
            'house_number': geocoding_result['components'].get('house_number', ''),
            'confidence': geocoding_result['confidence'],
            'formatted': geocoding_result['formatted']
        }
    else:
        sys.stderr.write(f"not found, writing empty result: {address}\n")
        result = {
            'id': address_id,
            'lat': 0,
            'lng': 0,
            '_type': '',
            'country': '',
            'county': '',
            'city': '',
            'postcode': '',
            'road': '',
            'house_number': '',
            'confidence': -1,
            'formatted': ''
        }
    results.append(result)

def backoff_hdlr(details):
    sys.stderr.write("Backing off {wait:0.1f} seconds afters {tries} tries "
                     "calling function {target} with args {args} and kwargs "
                     "{kwargs}\n".format(**details))

@backoff.on_exception(backoff.expo, (asyncio.TimeoutError),
                      max_time=RETRY_MAX_TIME,
                      max_tries=RETRY_MAX_TRIES,
                      on_backoff=backoff_hdlr)
async def geocode_one_address(address, address_id, results):
    async with OpenCageGeocode(API_KEY, domain=API_DOMAIN, sslcontext=sslcontext) as geocoder:
        global FORWARD_OR_REVERSE
        geocoding_results = None
        try:
            if FORWARD_OR_REVERSE == 'reverse' or \
                (FORWARD_OR_REVERSE == 'guess' and guess_text_is_coordinate_pair(address)):
                lon_lat = address.split(',')
                geocoding_results = await geocoder.reverse_geocode_async(lon_lat[0], lon_lat[1], no_annotations=1)
            else:
                geocoding_results = await geocoder.geocode_async(address, no_annotations=1)
        except OpenCageGeocodeError as exc:
            sys.stderr.write(str(exc) + "\n")
        except Exception as exc:
            traceback.print_exception(exc, file=sys.stderr)

        try:
            if geocoding_results is not None and len(geocoding_results):
                geocoding_result = geocoding_results[0]
            else:
                geocoding_result = None
            await write_one_geocoding_result(geocoding_result, address, address_id, results)
        except Exception as exc:
            traceback.print_exception(exc, file=sys.stderr)

async def run_worker(worker_name, queue, results):
    global PROGRESS_BAR
    sys.stderr.write(f"Worker {worker_name} starts...\n")
    while True:
        work_item = await queue.get()
        address_id = work_item['id']
        address = work_item['address']
        await geocode_one_address(address, address_id, results)
        if SHOW_PROGRESS:
            PROGRESS_BAR.update(1)
        queue.task_done()

async def main():
    global PROGRESS_BAR
    assert sys.version_info >= (3, 7), "Script requires Python 3.7 or newer"

    queue = asyncio.Queue(maxsize=MAX_ITEMS)
    results = []

    # Load data from MongoDB into the queue
    for index, row in geo_wfp.iterrows():
        address_id = row['id']
        address = row['address']  # Adjust according to your data structure
        work_item = {'id': address_id, 'address': address}
        await queue.put(work_item)
        if queue.full():
            break

    sys.stderr.write(f"{queue.qsize()} work_items in queue\n")

    if SHOW_PROGRESS:
        PROGRESS_BAR.total = queue.qsize()
        PROGRESS_BAR.refresh()

    sys.stderr.write(f"Creating {NUM_WORKERS} task workers...\n")
    tasks = []
    for i in range(NUM_WORKERS):
        task = asyncio.create_task(run_worker(f'worker {i}', queue, results))
        tasks.append(task)

    sys.stderr.write("Now waiting for workers to finish processing queue...\n")
    await queue.join()

    for task in tasks:
        task.cancel()

    if SHOW_PROGRESS:
        PROGRESS_BAR.close()

    sys.stderr.write("All done.\n")

    # Save the results back to MongoDB
    results_df = pd.DataFrame(results)
    dbu.insert_df_only_to_mongodb(results_df, os.getenv('COLLECTION_NAME_CLEANED_WITH_CITY'))

asyncio.run(main())


Could not connect to MongoDB: c.wildfirecluster.mongocluster.cosmos.azure.com:10260: timed out, Timeout: 30s, Topology Description: <TopologyDescription id: 66994a47ba2ad26730a7eb66, topology_type: Unknown, servers: [<ServerDescription ('c.wildfirecluster.mongocluster.cosmos.azure.com', 10260) server_type: Unknown, rtt: None, error=NetworkTimeout('c.wildfirecluster.mongocluster.cosmos.azure.com:10260: timed out')>]>


ImportError: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html