## iNatAtor Data Extractor

(also provided as .py)

This notebook will walk you through how to gather data for fine tuning.

In [None]:
import pandas as pd
import sqlalchemy
from dotenv import load_dotenv
import os
import h3
import shapely
import pyproj
import numpy as np
import json

import datetime

load_dotenv()

### Parameters

You need to supply a .env file that contains necessary db secrets, or hardcode them here.
`h3_max_res` determines how many points you will sample from a hexagon.
You can think of `sampling_amount = h3_max_res - hexagon_size`, therefore the amount of points you sample is inversely correlated to the hexagon's resolution.

`sampling_mode` refers to sampling strategy. `polygon` randomly checks points in a square that lies inside the hexagon. `circle` calculates a radius from the center of the hexagon to its boundary and samples a point inside the circle, this elminates repeadetly checking if a point lies in a polygon. `circle` is slightly faster than `polygon` but cannot sample from the very outreach points of the hexagons.

In [None]:
params = {
    'db': os.getenv('POSTGRES_DB'),
    'user': os.getenv('POSTGRES_USER'),
    'password': os.getenv('POSTGRES_PASSWORD'),
    'url': os.getenv('DATABASE_URL'),
    'sampling_mode': 'circle', # polygon | circle
    'h3_max_res': 7
}

params

You can try any query you want, however, to use with the fine-tuner you need `taxa_id`, `hex_id`, `type` from the query result. These three fields are what the model uses to fine tune.

Some example queries are:

`SELECT an."taxa_id", ah."hex_id", ah."type" FROM "annotation" AS an INNER JOIN "annotation_hexagon" AS ah ON an."annotation_id"=ah."annotation_id" WHERE an."taxa_id" = 12345`
- This query gets annotations for only one specific taxa.

In [None]:
QUERY = 'SELECT an."taxa_id", ah."hex_id", ah."type" FROM "annotation" AS an INNER JOIN "annotation_hexagon" AS ah ON an."annotation_id"=ah."annotation_id"'

This block makes communication with the database and reads by executing the query.

In [None]:
engine = sqlalchemy.engine.create_engine(url=params['url'])
df = pd.read_sql(QUERY, engine)
df.head()

In [None]:
len(df)

This block provides functions to sample points from a hexagon

In [None]:
def generate_random_points_in_polygon(boundary, N):
    polygon = shapely.Polygon(boundary)
    min_x, min_y, max_x, max_y = polygon.bounds

    random_points = []
    while len(random_points) < N:
        x = np.random.uniform(min_x, max_x)
        y = np.random.uniform(min_y, max_y)

        point = shapely.Point(x, y)
        if polygon.contains(point):
            random_points.append((x, y))

    return random_points

def generate_random_points_in_circle(lat, lng, R, N):
    random_points = []
    while len(random_points) < N:
        r = R * np.sqrt(np.random.uniform(0, 1)) # random distance from center
        theta = np.random.uniform(0, 2 * np.pi) # random degree
        
        x = lng + r * np.cos(theta) / (111320 * np.cos(lat * np.pi / 180))
        y = lat + r * np.sin(theta) / 111320
        
        random_points.append((y, x))
    
    return random_points

In [None]:
hex_resolution = [h3.h3_get_resolution(hex_id) for hex_id in df['hex_id']]
df['hex_resolution'] = hex_resolution

hex_boundary = [h3.h3_to_geo_boundary(hex_id, geo_json=False) for hex_id in df['hex_id']]
df['hex_boundary'] = hex_boundary

if params['sampling_mode'] == 'polygon':
    pass
else:
    center_point = [h3.h3_to_geo(hex_id) for hex_id in df['hex_id']]
    df['center_point'] = center_point

    radius = [min([calculate_geo_distance(r['center_point'], loc) for loc in r['hex_boundary']]) for _, r in df.iterrows()] # type: ignore
    df['R'] = radius

In [None]:
df.head()

In [None]:
start_time = datetime.datetime.now()

psuedo_points = []
for i, r in df.iterrows():
    random_n_points = None
    N = params['h3_max_res'] - r['hex_resolution']
    if params['sampling_mode'] == 'polygon':
       random_n_points = generate_random_points_in_polygon(r['hex_boundary'], N)
    else:
        lat, lng = r['center_point']
        random_n_points = generate_random_points_in_circle(lat, lng, r['R'], N)

    for random_lat, random_lng in random_n_points:
        psuedo_point = {
            'taxon_id': r['taxa_id'],
            'type': r['type'],
            'latitude': random_lat,
            'longitude': random_lng
        }

        psuedo_points.append(psuedo_point)

df_psuedo_points = pd.DataFrame(psuedo_points)

end_time = datetime.datetime.now()
print('Executed in: ', (end_time - start_time))

In [None]:
df_psuedo_points.head(), len(df_psuedo_points)

Your annotation will be saved in a .csv file with a time stamp.

You can use the new annotation data you extracted to use in fine tuning, head to `fine_tune_main.py` view instructions on how to set parameters and start fine-tuning a geomodel.

In [None]:
with open("paths.json", 'r') as f:
    paths = json.load(f)

date_now = datetime.datetime.now()
df_psuedo_points.to_csv(os.path.join(paths['annotation'], str(date_now)+'.csv'))