In [None]:
## this is my code for getting the nearest adress from coordinates. good luck

In [None]:
import pandas as pd
from bs4 import BeautifulSoup
import re
import lxml
# --- Configuration ---
# Set the path to your KML file
KML_FILE_PATH = r"C:\Users\basde\Desktop\Address from coordinates\All Pylons - Circuits.kml"

def parse_description(description_html):
    """
    Parses the HTML content of a <description> tag to extract key-value data.
    """
    if not description_html:
        return {}

    # Use BeautifulSoup to handle the HTML structure within the description
    soup = BeautifulSoup(description_html, 'html.parser')
    
    # Get text separated by newlines to preserve structure
    full_text = soup.get_text(separator='\n', strip=True)
    
    data = {}
    
    # Define a mapping from the text label to the desired column name
    key_mapping = {
        'Pyloon nr.': 'pylon_nr',
        'Lijn': 'line',
        'Gemeente': 'municipality',
        'sc': 'sc_type',
        'x': 'lambert_x',
        'y': 'lambert_y',
        'PerceelNr.': 'parcel_nr',
        'Hoogte': 'height',
        'Technical Type': 'technical_type'
    }
    
    lines = full_text.split('\n')
    for line in lines:
        if ':' in line:
            # Split only on the first colon to handle potential extra colons in the value
            key, value = line.split(':', 1)
            key = key.strip()
            value = value.strip()
            
            if key in key_mapping:
                column_name = key_mapping[key]
                data[column_name] = value
                
    return data

def parse_kml_to_dataframe(file_path):
    """
    Parses a KML file and converts its placemark data into a pandas DataFrame.
    """
    print(f"Reading KML file from: {file_path}")
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            kml_content = f.read()

        # Remove the default namespace definition to simplify parsing
        kml_content_cleaned = re.sub(r'xmlns=".*?"', '', kml_content, count=1, flags=re.IGNORECASE)

        # Use the 'lxml-xml' parser for proper XML parsing on the cleaned content
        soup = BeautifulSoup(kml_content_cleaned, 'lxml-xml')

    except FileNotFoundError:
        print(f"Error: The file '{file_path}' was not found.")
        return None

    all_pylons_data = []

    # Find all Placemark elements in the document
    placemarks = soup.find_all('Placemark')
    print(f"Found {len(placemarks)} placemarks.")
    
    if not placemarks:
        print("Warning: No <Placemark> tags were found. The resulting DataFrame will be empty.")
        return pd.DataFrame()

    for placemark in placemarks:
        # Initialize a dictionary for the current pylon
        pylon_data = {}

        # 1. Extract name (pylon_id)
        name_tag = placemark.find('name')
        pylon_data['pylon_id'] = name_tag.text.strip() if name_tag else None

        # 2. Extract coordinates
        coords_tag = placemark.find('coordinates')
        if coords_tag:
            # Split coordinates string and clean up
            coords = [c.strip() for c in coords_tag.text.split(',')]
            pylon_data['longitude'] = coords[0] if len(coords) > 0 else None
            pylon_data['latitude'] = coords[1] if len(coords) > 1 else None
            pylon_data['altitude'] = coords[2] if len(coords) > 2 else None
        else:
            pylon_data['longitude'] = None
            pylon_data['latitude'] = None
            pylon_data['altitude'] = None

        # 3. Parse the complex description field
        description_tag = placemark.find('description')
        description_html = description_tag.text if description_tag else ""
        
        # Use the helper function to parse the description
        description_data = parse_description(description_html)
        pylon_data.update(description_data)
        
        all_pylons_data.append(pylon_data)

    # Create the DataFrame from the list of dictionaries
    df = pd.DataFrame(all_pylons_data)
    
    # --- Data Cleaning and Type Conversion ---
    print("Cleaning and converting data types...")
    
    numeric_cols = [
        'longitude', 'latitude', 'altitude', 
        'height', 'lambert_x', 'lambert_y'
    ]
    
    for col in numeric_cols:
        if col in df.columns:
            # 'coerce' will turn any parsing errors into NaN
            df[col] = pd.to_numeric(df[col], errors='coerce')

    # Reorder columns for better readability
    ordered_columns = [
        'pylon_id', 'pylon_nr', 'line', 'municipality', 'technical_type',
        'longitude', 'latitude', 'altitude', 'height', 
        'lambert_x', 'lambert_y', 'parcel_nr', 'sc_type'
    ]
    
    # Filter to only include columns that were actually found in the file
    final_columns = [col for col in ordered_columns if col in df.columns]
    df = df[final_columns]

    return df

# --- Main execution ---
if __name__ == "__main__":
    pylons_df = parse_kml_to_dataframe(KML_FILE_PATH)

    if pylons_df is not None and not pylons_df.empty:
        print("\nSuccessfully created DataFrame.")
        print("DataFrame Info:")
        pylons_df.info()
        
        print("\nDataFrame Head:")
        print(pylons_df.head())
    elif pylons_df is not None:
        print("\nScript finished, but the DataFrame is empty.")

In [None]:


import requests
import pandas as pd
import io
import os
import math
import osmium
from rtree import index
from tqdm.auto import tqdm

# --- Configuration ---
# Update this path to where you downloaded the 'belgium-latest.osm.pbf' file.
BELGIUM_PBF_PATH = r"C:\Users\basde\Desktop\Address from coordinates\belgium-latest.osm.pbf"
OSRM_API_URL = "http://router.project-osrm.org"

# --- Initialize TQDM for Pandas ---
# This line adds a .progress_apply() method to pandas DataFrames,
# which we will use to show a progress bar.
tqdm.pandas(desc="Processing Coordinates")

print("Setup Complete. Libraries imported and configuration is set.")

In [None]:
# === CELL 2: CORE LOGIC AND HELPER FUNCTIONS ===

class AddressIndexHandler(osmium.SimpleHandler):
    # This class remains the same as before.
    def __init__(self):
        super(AddressIndexHandler, self).__init__()
        self.addresses = []

    def _process_tags(self, tags, location, element_type, element_id):
        if 'addr:street' in tags and 'addr:housenumber' in tags:
            self.addresses.append({
                'type': element_type, 'id': element_id, 'lat': location.lat, 'lon': location.lon,
                'street': tags.get('addr:street'), 'housenumber': tags.get('addr:housenumber'),
                'city': tags.get('addr:city'), 'zip_code': tags.get('addr:postcode')
            })

    def node(self, n):
        self._process_tags(n.tags, n.location, 'node', n.id)

    def way(self, w):
        try:
            if not w.nodes or len(w.nodes) < 1: return
            min_lon, min_lat, max_lon, max_lat = 181, 91, -181, -91
            for node in w.nodes:
                min_lon, max_lon = min(min_lon, node.lon), max(max_lon, node.lon)
                min_lat, max_lat = min(min_lat, node.lat), max(max_lat, node.lat)
            if min_lon > 180: return
            center_loc = osmium.osm.Location(min_lon + (max_lon - min_lon) / 2, min_lat + (max_lat - min_lat) / 2)
            self._process_tags(w.tags, center_loc, 'way', w.id)
        except osmium.InvalidLocationError:
            pass


class LocalAddressFinder:
    # This class remains the same as before.
    def __init__(self, pbf_file, data_path, index_path_prefix):
        self.pbf_file = pbf_file
        self.data_path = data_path
        self.index_path = index_path_prefix
        self.address_data = []
        
        if os.path.exists(self.data_path) and os.path.exists(self.index_path + ".dat"):
            print("Loading pre-built index from files...")
            self._load_from_disk()
            print(f"Successfully loaded {len(self.address_data)} addresses and index.")
        else:
            print("No pre-built index found.")
            self._build_from_pbf_and_save()

    def _load_from_disk(self):
        with open(self.data_path, 'rb') as f:
            self.address_data = pickle.load(f)
        self.idx = index.Index(self.index_path)

    def _build_from_pbf_and_save(self):
        print(f"Building address index from '{self.pbf_file}'...")
        print("Step 1/2: Parsing OSM data file (this may take a minute)...")
        handler = AddressIndexHandler()
        handler.apply_file(self.pbf_file, locations=True)
        self.address_data = handler.addresses
        
        print(f"Saving {len(self.address_data)} addresses to '{self.data_path}'...")
        with open(self.data_path, 'wb') as f:
            pickle.dump(self.address_data, f)
        
        print(f"Step 2/2: Indexing addresses and saving to '{self.index_path}.dat/idx'...")
        p = index.Property()
        p.overwrite = True
        self.idx = index.Index(self.index_path, properties=p)
        
        for i, addr in enumerate(self.address_data):
            self.idx.insert(i, (addr['lon'], addr['lat'], addr['lon'], addr['lat']))
        
        self.idx.close()
        print("Index build and save complete.")

    def find_candidates(self, lat, lon, radius_m=2000):
        lat_offset = radius_m / 111111.0
        lon_offset = radius_m / (111111.0 * math.cos(math.radians(lat)))
        search_bbox = (lon - lon_offset, lat - lat_offset, lon + lon_offset, lat + lat_offset)
        candidate_indices = self.idx.intersection(search_bbox)
        return [self.address_data[i] for i in candidate_indices]


def haversine_distance(lat1, lon1, lat2, lon2):
    """Calculates the straight-line distance between two points on Earth."""
    R = 6371000  # Radius of Earth in meters
    phi1 = math.radians(lat1)
    phi2 = math.radians(lat2)
    delta_phi = math.radians(lat2 - lat1)
    delta_lambda = math.radians(lon2 - lon1)

    a = math.sin(delta_phi / 2)**2 + math.cos(phi1) * math.cos(phi2) * math.sin(delta_lambda / 2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))

    return R * c

def main_orchestrator(lat, lon, address_finder):
    """
    Finds the geometrically closest address to a point by calculating
    the straight-line distance.
    """
    result_template = {'street': None, 'housenumber': None, 'city': None, 
                       'zip_code': None, 'straight_line_distance_m': None}
    
    # Step 1: Find all potential candidates in a radius
    candidates = address_finder.find_candidates(lat, lon)
    if not candidates:
        return result_template

    # Step 2: Loop through candidates to find the one with the minimum straight-line distance
    best_candidate = None
    min_distance = float('inf')

    for candidate in candidates:
        distance = haversine_distance(lat, lon, candidate['lat'], candidate['lon'])
        if distance < min_distance:
            min_distance = distance
            best_candidate = candidate
    
    # Step 3: If a best candidate was found, populate the results
    if best_candidate:
        result_template.update({
            'street': best_candidate.get('street'),
            'housenumber': best_candidate.get('housenumber'),
            'city': best_candidate.get('city'),
            'zip_code': best_candidate.get('zip_code'),
            'straight_line_distance_m': min_distance
        })
        
    return result_template

print("Core logic functions and classes defined.")

In [None]:
import pickle
from rtree import index
import os
# === CELL 3: INITIALIZE ADDRESS FINDER (Builds or Loads Index) ===
SAVED_DATA_PATH = "address_data.pkl"
SAVED_INDEX_PATH_PREFIX = "address_index"
# This will now check for "address_data.pkl" and "address_index.dat/idx".
# If they exist, it will load them. If not, it will build them from the PBF
# file and save them for the next time.
try:
    address_finder = LocalAddressFinder(BELGIUM_PBF_PATH, 
                                        data_path=SAVED_DATA_PATH, 
                                        index_path_prefix=SAVED_INDEX_PATH_PREFIX)
except FileNotFoundError as e:
    print(e)

In [None]:
# # === ONE-TIME CELL TO SAVE THE EXISTING IN-MEMORY INDEX ===

# import pickle
# from rtree import index
# import os

# print("Saving the data from the existing 'address_finder' object...")

# # 1. Define the filenames (must match the new code)
# SAVED_DATA_PATH = "address_data.pkl"
# SAVED_INDEX_PATH_PREFIX = "address_index"

# # 2. Save the address_data list using pickle
# print(f"Step 1/2: Saving {len(address_finder.address_data)} addresses to '{SAVED_DATA_PATH}'...")
# with open(SAVED_DATA_PATH, 'wb') as f:
#     pickle.dump(address_finder.address_data, f)
# print("...address data saved successfully.")

# # 3. Create a new, persistent R-tree index and populate it from the existing data
# print(f"Step 2/2: Saving the spatial index to '{SAVED_INDEX_PATH_PREFIX}.dat/idx'...")
# p = index.Property()
# p.overwrite = True # Overwrite index files if they already exist
# persistent_idx = index.Index(SAVED_INDEX_PATH_PREFIX, properties=p)

# for i, addr in enumerate(address_finder.address_data):
#     persistent_idx.insert(i, (addr['lon'], addr['lat'], addr['lon'], addr['lat']))

# persistent_idx.close() # Important: This flushes the index to disk
# print("...spatial index saved successfully.")

# print("\nSave complete! You can now use the new code to load this index instantly.")

In [None]:
# === CELL 5: RUN THE PROCESSING ===

# We use .progress_apply() on your pylons_df DataFrame.
# This will find the closest reachable address for each pylon.
results = pylons_df.progress_apply(
    lambda row: main_orchestrator(row['latitude'], row['longitude'], address_finder), 
    axis=1
)

In [None]:
# === CELL 6: FORMAT AND DISPLAY FINAL RESULTS ===

# Convert the Series of dictionaries into a new DataFrame
results_df = pd.DataFrame(results.tolist())

# Concatenate the new address columns with your original pylons_df
pylons_df_enriched = pd.concat([pylons_df.reset_index(drop=True), results_df], axis=1)

print("Processing complete. Enriched pylons_df:")
display(pylons_df_enriched)

In [None]:
import pandas as pd
import numpy as np


# --- Data Cleaning Steps ---
# It's still a good practice to clean the column first.
pylons_df_enriched['pylon_id_cleaned'] = pylons_df_enriched['pylon_id'].astype(str).str.strip()


# --- Splitting the Column ---
# Use .str.split() with expand=True to create a new DataFrame from the split parts.
split_df = pylons_df_enriched['pylon_id_cleaned'].str.split('_', expand=True)


# --- Assigning to New Columns ---
# Assign the first and second column from the split to 'UGE' and 'Nr.'
pylons_df_enriched['UGE'] = split_df[0]
pylons_df_enriched['Nr.'] = split_df[1]


# --- Identifying Incorrect Formats ---
# A format is considered incorrect if it doesn't split into exactly two parts.
# This happens if there is no underscore (split_df[1] will be None)
# or if there is more than one underscore (split_df[2] will not be None).
incorrect_mask = (split_df[1].isnull()) | (split_df[2].notnull())
incorrect_format_df = pylons_df_enriched[incorrect_mask]


# We can drop the intermediate cleaned column if it's no longer needed.
pylons_df_enriched = pylons_df_enriched.drop(columns=['pylon_id_cleaned'])


print("DataFrame with new 'UGE' and 'Nr.' columns using split:")
print(pylons_df_enriched)
print("\nRows with incorrect format (did not split into 2 parts):")
print(incorrect_format_df.drop(columns=['UGE', 'Nr.'], errors='ignore'))

In [None]:
# 1. Define the name for your output file
output_filename = 'lines_data2.csv'

# 2. Use the .to_csv() method to save the DataFrame
#    We use index=False because we typically don't need to save the
#    DataFrame's index numbers to the CSV file.
pylons_df_enriched.to_csv(output_filename, index=False)

print(f"DataFrame has been successfully saved to '{output_filename}'")