In [None]:
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point, MultiLineString
from shapely.wkt import loads
from shapely.ops import nearest_points
from shapely.geometry import box

# Function to find a point based on percentage
def get_point_from_prc(geom, prc):
    if geom.geom_type == 'MultiLineString':
        geom = max(geom.geoms, key=lambda g: g.length)  # toma el tramo más largo

    if geom.length == 0:
        return geom.centroid

    distance = geom.length * prc
    return geom.interpolate(distance)

# Files
csv = pd.read_csv('/home/bingus/Guadalahacks_here/POIs/POI_4815075.csv')  # columnas: 'ID', 'lado' ('L' o 'R'), 'name', 'prc', 'obj_id'
geo1 = gpd.read_file('/home/bingus/Guadalahacks_here/STREETS_NAV/SREETS_NAV_4815075.geojson')  # columnas: 'id', 'activo', 'di', 'geometry'
geo2 = gpd.read_file('/home/bingus/Guadalahacks_here/STREETS_NAMING_ADDRESSING/SREETS_NAMING_ADDRESSING_4815075.geojson')  # columnas: 'id', 'name'

# MULTIDIGIT filter
geo1 = geo1[geo1['MULTIDIGIT'] == 'Y'].copy()

# MERGE
geo1 = geo1.merge(csv.rename(columns={'LINK_ID': 'link_id'}), on='link_id')
geo1 = geo1.merge(geo2[['link_id', 'ST_NAME']], on='link_id', suffixes=('', '_json2'))

# Convvert Strings to geometry
def parse_multilinestring(geom_str):
    if not isinstance(geom_str, str):
        return geom_str
    parts = geom_str.split('), ')
    parts = [p if p.endswith(')') else p + ')' for p in parts]
    lines = [loads(p) for p in parts]
    return lines[0] if len(lines) == 1 else MultiLineString(lines)

geo1['geometry'] = geo1['geometry'].apply(parse_multilinestring)

# Spatial Index
sindex = geo1.sindex
buffer_dist = 0.0005  

resultados = []

for idx, row in geo1.iterrows():
    bounds = row.geometry.bounds
    bbox = box(bounds[0] - buffer_dist, bounds[1] - buffer_dist,
               bounds[2] + buffer_dist, bounds[3] + buffer_dist)

    posibles_idx = list(sindex.intersection(bbox.bounds))
    posibles_idx = [i for i in posibles_idx if i != idx]

    for idx2 in posibles_idx:
        if idx >= idx2:
            continue

        row2 = geo1.loc[idx2]

        #Diferent directions
        if row['DIR_TRAVEL'] == row2['DIR_TRAVEL']:
            continue 
        #Same name
        if row['ST_NAME'] != row2['ST_NAME']:
            continue  

        #Left or right
        pt1, pt2 = nearest_points(row.geometry, row2.geometry)
        lat1, lon1 = pt1.y, pt1.x
        lat2, lon2 = pt2.y, pt2.x

        if (lat1 < lat2) or (lat1 == lat2 and lon1 < lon2):
            izq, der = row, row2
            pt_izq, pt_der = pt1, pt2
        else:
            izq, der = row2, row
            pt_izq, pt_der = pt2, pt1

        if izq['POI_ST_SD'] != 'L' or der['POI_ST_SD'] != 'R':
            continue

        min_lat, max_lat = sorted([pt_izq.y, pt_der.y])
        min_lon, max_lon = sorted([pt_izq.x, pt_der.x])

        # MEdium POIs
        for k, fila_medio in geo1.iterrows():
            if k in (idx, idx2):
                continue

            pt_medio = get_point_from_prc(fila_medio.geometry, fila_medio['PERCFRREF'])
            lat_c, lon_c = pt_medio.y, pt_medio.x

            if min_lat < lat_c < max_lat and min_lon < lon_c < max_lon:
                resultados.append({
                    'obj_id': fila_medio['POI_ID'],
                    'id_calle': fila_medio['link_id'],
                    'entre_izq': izq['link_id'],
                    'entre_der': der['link_id'],
                    'name': fila_medio['POI_NAME'],  # Use the POIs own name
                    'street': fila_medio['ST_NAME'], # Use the POIs own street name
})

df_resultados = pd.DataFrame(resultados).drop_duplicates()
df_resultados


In [None]:

#Error IDs to list
ids_errores = df_resultados['obj_id'].tolist()
print(ids_errores)

In [None]:
#Geocoding and Search API
api_key = '8A7mW_2A904HCMa8hC-tGA9tonHLhDSXFxcHmMd3rCE'

In [None]:
import requests
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point

API_KEY = api_key
discover_endpoint = "https://discover.search.hereapi.com/v1/discover"

#API query fucntion
def find_poi_with_discover(lat, lng, poi_name, limit=1):
    params = {
        'at': f'{lat},{lng}',
        'q': poi_name,
        'limit': limit,
        'apiKey': API_KEY
    }

    try:
        response = requests.get(discover_endpoint, params=params)
        response.raise_for_status()
        data = response.json()
        if data.get('items'):
            return [
                {
                    'RESULT_NAME': item['title'],
                    'address': item['address']['label'],
                    'latitude': item['position']['lat'],
                    'longitude': item['position']['lng']
                } for item in data['items']
            ]
        else:
            return []
    except Exception as e:
        return []

#Main function
def get_point_from_obj_id(obj_id):

    row = csv[csv['POI_ID'] == obj_id]
    if row.empty:
        raise ValueError(f"Object ID {obj_id} not found.")
    
    row = row.iloc[0]
    obj_id_val = row['POI_ID']
    prc = float(row['PERCFRREF'])
    path_id = row['LINK_ID']
    
    # Matching geometry
    geo_row = geo[geo['link_id'] == path_id]
    if geo_row.empty:
        raise ValueError(f"Path ID {path_id} not found in GeoJSON.")
    
    geom = geo_row.iloc[0]['geometry']
    
    #Get all coordinates
    coords = []
    if isinstance(geom, LineString):
        coords = list(geom.coords)
    elif isinstance(geom, MultiLineString):
        for line in geom.geoms:
            coords.extend(list(line.coords))
    else:
        raise TypeError("Geometry must be LineString or MultiLineString.")

    if len(coords) < 2:
        raise ValueError("Not enough coordinates in geometry to interpolate.")
    
    #Calculate total distance along the street
    total_length = sum(Point(coords[i]).distance(Point(coords[i+1])) for i in range(len(coords) - 1))
    target_distance = prc * total_length

    cumulative = 0
    for i in range(len(coords) - 1):
        p1 = Point(coords[i])
        p2 = Point(coords[i+1])
        segment_length = p1.distance(p2)
        
        if cumulative + segment_length >= target_distance:
            # Interpolate
            ratio = (target_distance - cumulative) / segment_length
            x = p1.x + ratio * (p2.x - p1.x)
            y = p1.y + ratio * (p2.y - p1.y)
            return Point(x, y)
        
        cumulative += segment_length

    return Point(coords[-1])

all_results = []

for i in ids_errores:  
    try:
        row = csv[csv['POI_ID'] == i]
        poi_name = row.iloc[0]['POI_NAME']
        point = get_point_from_obj_id(i)  
        latitude = point.y
        longitude = point.x

        # Call API
        results = find_poi_with_discover(latitude, longitude, poi_name, limit=1)

        # Attach original ID and input info 
        for res in results:
            if isinstance(res, dict):
                res['input_name'] = poi_name
                res['input_lat'] = latitude
                res['input_lon'] = longitude
                res['POI_ID'] = i
                all_results.append(res)

    except Exception as e:
        print(f"Error with ID {i}: {e}")
        continue

#Data Frame
df = pd.DataFrame(all_results)


df = df.dropna(subset=['latitude', 'longitude'])

# Create geometry
geometry = [Point(xy) for xy in zip(df['longitude'], df['latitude'])]
gdf_coordinates_poi = gpd.GeoDataFrame(df, geometry=geometry, crs="EPSG:4326")


gdf_coordinates_poi


In [None]:
import pandas as pd
import numpy as np
import geopandas as gpd
import random


df = df_resultados.copy()

# Merge 
df = df_resultados.merge(gdf_coordinates_poi[['POI_ID', 'geometry']], left_on='obj_id', right_on='POI_ID', how='left')


def get_start_end_points(geom):
    if geom.geom_type == 'LineString':
        coords = list(geom.coords)
        return coords[0], coords[-1]  
    elif geom.geom_type == 'MultiLineString':
        # Take first line start and last line end
        first_line = geom.geoms[0]
        last_line = geom.geoms[-1]
        return list(first_line.coords)[0], list(last_line.coords)[-1]
    else:
        return (np.nan, np.nan), (np.nan, np.nan)

def process_row(row):
    if pd.isna(row['geometry']):
        print(f"Object {row['obj_id']} has no geometry. Skipping.")
        return row

    # Get left street geometry
    left_street = geo1[geo1['link_id'] == row['entre_izq']]
    if left_street.empty:
        print(f"Left street id {row['entre_izq']} not found. Skipping.")
        return row

    # Get right street geometry
    right_street = geo1[geo1['link_id'] == row['entre_der']]
    if right_street.empty:
        print(f"Right street id {row['entre_der']} not found. Skipping.")
        return row

    left_geom = left_street.iloc[0].geometry
    right_geom = right_street.iloc[0].geometry

    left_start, left_end = get_start_end_points(left_geom)
    right_start, right_end = get_start_end_points(right_geom)

    obj_lon, obj_lat = row['geometry'].x, row['geometry'].y

    # Determine longitude and latitude min and max
    min_lon = min(left_start[0], left_end[0], right_start[0], right_end[0])
    max_lon = max(left_start[0], left_end[0], right_start[0], right_end[0])
    min_lat = min(left_start[1], left_end[1], right_start[1], right_end[1])
    max_lat = max(left_start[1], left_end[1], right_start[1], right_end[1])

    in_lon_range = (min_lon <= obj_lon <= max_lon)
    in_lat_range = (min_lat <= obj_lat <= max_lat)

    if in_lon_range and in_lat_range:
        print(f"[OK] Object {row['obj_id']} is correctly positioned between streets {row['entre_izq']} and {row['entre_der']}.")
    else:
        # Assign new random obj_id and determine position 
        new_id = random.randint(1000, 9999)
        while new_id in df['obj_id'].values:
            new_id = random.randint(1000, 9999)

        # Determine new position based on lon
        if obj_lon < min_lon:
            new_pos = 'L'
        elif obj_lon > max_lon:
            new_pos = 'R'
        else:
       
            new_pos = 'R' if obj_lat > max_lat else 'L'

        print(f"[CHANGE] Object {row['obj_id']} is out of range.")
        print(f"Assigning new obj_id: {new_id} and position: {new_pos}")

        df.at[row.name, 'obj_id'] = new_id
        df.at[row.name, 'posicion'] = new_pos

    return df.loc[row.name]


df = df.apply(process_row, axis=1)
# Save the modified DataFrame to a new CSV file

print("Processing complete.")
df
