## Jupyter notebook 01: Retrieving data from OpenStreetMap using OHSOME API and homogeneous grid cells

***Paper: Integração de Bases Toponímicas: Uma Abordagem Híbrida para Enriquecimento de Dados Oficiais com Topônimos Colaborativos do OpenStreetMap e Imagens ao Nível de Rua***

**Aims**

- To conduct a quantitative assessment of elements within OpenStreetMap (OSM) that have the 'name' attribute filled for potential categories of the Brazilian Authoritative Topographic Map; and

- To investigate the most significant intrinsic quality parameters that contribute to the reliability of toponyms in OSM.

**Brief Overview of the Proposed Methodology**

- Preliminary survey of potential OpenStreetMap (OSM) tags to provide relevant toponym information to categories of interest related to Brazilian Topographic Mapping;

- Execution of a quantitative analysis on collaboratively entered toponyms, utilizing homogeneous grid-based approaches; and

- Assessment of intrinsic quality parameters as indicators of the reliability of toponyms in a scientific context.

---

### Import the libraries

In [1]:
# Import library and some pre-installed modules
import os
import sys
import time
import json
import requests
import warnings
import numpy as np
import pandas as pd
import geopandas as gpd
import folium
import glob
import threading
import csv
import ipywidgets as widgets
from IPython.display import display, Markdown
from shapely.geometry import box, mapping
from scipy.optimize import curve_fit
from matplotlib import pyplot as plt
from tqdm import tqdm
from datetime import datetime, timedelta
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from copy import deepcopy

### Project setup

In [2]:
# Sets the root directory of the project as the working directory
os.chdir('..')
# Import the custom module

In [3]:
# Get current working directory
os.getcwd()


'/Users/darlanmnunes/Dev/DSc_git/PhD_Thesis_Step3_OSM_Toponyms'

### Import the modules

In [5]:
# Import the custom module
from src import utils

In [6]:
# Reload the utils module to ensure any changes are reflected
import importlib
importlib.reload(utils)

<module 'src.utils' from '/Users/darlanmnunes/Dev/DSc_git/PhD_Thesis_Step3_OSM_Toponyms/src/utils.py'>

### Homogeneous Grid Cells
 - Statistical Grid (cell size of 200 x 200m) produced by Instituto Brasileiro de Geografia e Estatística (Brazilian Institute of Geography and Statistics)

  - https://geoftp.ibge.gov.br/recortes_para_fins_estatisticos/

#### Import Homogeneous Grid Cells

##### Import the grid with the aggregated data extracted from OSM via the OHSOME API

In [None]:
# Import the statistics grid in GeoJSON format
grid = None

input_path = 'data/input_code1'

# Function for selecting and loading the GeoJSON file
def select_file(change):
    global grid
    selected_file = change['new']

    if selected_file != "Select the GeoJSON file with grid cells:":
        file_path = os.path.join(input_path, selected_file)
        try:
            with open(file_path, 'r') as file:
                grid = json.load(file)
            display("File selected with success:", selected_file)
            display("File path:", file_path)
        except FileNotFoundError:
            display("File not found:", selected_file)

# Listing available GeoJSON files
file_list = [f for f in os.listdir(input_path) if f.endswith('.geojson')]
options = ["Select the GeoJSON file with grid cells:"] + file_list

# Dropdown to select the GeoJSON file
dropdown = widgets.Dropdown(options=options)
dropdown.observe(select_file, names='value')

# Display the dropdown
display(dropdown)

Dropdown(options=('Select the GeoJSON file with grid cells:', 'grade_id36_bh_4cells_tests.geojson', 'step5_lot…

'File selected with success:'

'step5_lote163.geojson'

'File path:'

'data/input_code1/step5_lote163.geojson'

In [5]:
# Preview grid cells
grid

{'type': 'FeatureCollection',
 'name': 'step5_lote163',
 'crs': {'type': 'name', 'properties': {'name': 'urn:ogc:def:crs:EPSG::4674'}},
 'features': [{'type': 'Feature',
   'properties': {'fid': 3241,
    'id': '200ME60392N90904',
    'POP10': 432,
    'edif_ensino_total_count': 1.0,
    'edif_ensino_name_count': 1.0,
    'edif_ensino_name_ratio': 100.0,
    'edif_saude_total_count': 0.0,
    'edif_saude_name_count': 0.0,
    'edif_saude_name_ratio': 0.0,
    'edif_desenv_social_total_count': 0.0,
    'edif_desenv_social_name_count': 0.0,
    'edif_desenv_social_name_ratio': 0.0,
    'edif_constr_lazer_total_count': 0.0,
    'edif_constr_lazer_name_count': 0.0,
    'edif_constr_lazer_name_ratio': 0.0,
    'edif_pub_civil_total_count': 0.0,
    'edif_pub_civil_name_count': 0.0,
    'edif_pub_civil_name_ratio': 0.0,
    'edif_turistica_total_count': 0.0,
    'edif_turistica_name_count': 0.0,
    'edif_turistica_name_ratio': 0.0,
    'edif_metro_ferroviaria_total_count': 0.0,
    'edif_me

In [6]:
# Count the total number of grid cells in GeoJSON
total_cells = len(grid['features'])
print(f"Total grid cells in GeoJSON: {total_cells}")

Total grid cells in GeoJSON: 20


##### Partition the original GeoJSON grid into subsets of up to 4 cells each to optimise the process

In [7]:
# Partition the original GeoJSON grid into subsets of up to 4 cells each to optimise the process

# Number of cells per batch
subset_size = 20

# Split the original grid cells into subsets
subsets = [grid['features'][i:i + subset_size] for i in range(0, len(grid['features']), subset_size)]

# Create a new FeatureCollection structure for each subset and add a batch ID ("lote_id")
grid_subsets = []
for index, subset in enumerate(subsets):
    grid_subset = {
        'type': 'FeatureCollection',
        'features': subset,
        'lote_id': f"lote{index + 1}",
        'crs': grid['crs']
    }
    grid_subsets.append(grid_subset)

In [8]:
# Calculate the total number of subsets created
total_subsets = len(grid_subsets)
print(f"Total de subsets criados: {total_subsets}")

Total de subsets criados: 1


In [9]:
# Check the grids subsets
grid_subsets

[{'type': 'FeatureCollection',
  'features': [{'type': 'Feature',
    'properties': {'fid': 3241,
     'id': '200ME60392N90904',
     'POP10': 432,
     'edif_ensino_total_count': 1.0,
     'edif_ensino_name_count': 1.0,
     'edif_ensino_name_ratio': 100.0,
     'edif_saude_total_count': 0.0,
     'edif_saude_name_count': 0.0,
     'edif_saude_name_ratio': 0.0,
     'edif_desenv_social_total_count': 0.0,
     'edif_desenv_social_name_count': 0.0,
     'edif_desenv_social_name_ratio': 0.0,
     'edif_constr_lazer_total_count': 0.0,
     'edif_constr_lazer_name_count': 0.0,
     'edif_constr_lazer_name_ratio': 0.0,
     'edif_pub_civil_total_count': 0.0,
     'edif_pub_civil_name_count': 0.0,
     'edif_pub_civil_name_ratio': 0.0,
     'edif_turistica_total_count': 0.0,
     'edif_turistica_name_count': 0.0,
     'edif_turistica_name_ratio': 0.0,
     'edif_metro_ferroviaria_total_count': 0.0,
     'edif_metro_ferroviaria_name_count': 0.0,
     'edif_metro_ferroviaria_name_ratio': 0.0},

#### Visualize the spatial distribution of the homogeneous grid cell

In [10]:
import folium
import ipywidgets as widgets
from IPython.display import display

# Function to calculate the centroid of a polygon (original grid)
def calculate_centroid(coordinates):
    x = [p[0] for p in coordinates]
    y = [p[1] for p in coordinates]
    centroid_x = sum(x) / len(coordinates)
    centroid_y = sum(y) / len(coordinates)
    return [centroid_y, centroid_x]

# Calculate the coordinates of the centroid of the original grid
first_polygon = grid['features'][0]['geometry']['coordinates'][0][0]
centroid_coords = calculate_centroid(first_polygon)

# Function to plot a subset
def plot_subset(subset_index):
    subset_to_plot = grid_subsets[subset_index]

    # GeoJson style
    style = {'fillColor': '#8C8989', 'color': '#e31a1c', 'weight': 2}

    # Initialize the Folium map at the centroid of the original grid
    m = folium.Map(location=centroid_coords, tiles='OpenStreetMap', zoom_start=14)

    # Add GeoJson to the map
    folium.GeoJson(
        subset_to_plot,
        name=f'Grade Estatística 200m - Lote {subset_index+1}',
        tooltip=folium.GeoJsonTooltip(fields=['id', 'POP10']),
        style_function=lambda x: style
    ).add_to(m)

    # Display the map
    display(m)

# Create the drop-down list with the subset indexes
dropdown = widgets.Dropdown(
    options=[(f'Lote {i+1}', i) for i in range(len(grid_subsets))],
    description='Select a Batch:',
    disabled=False,
)

# Update the map based on the selection
widgets.interactive(plot_subset, subset_index=dropdown)

interactive(children=(Dropdown(description='Select a Batch:', options=(('Lote 1', 0),), value=0), Output()), _…

### **OHSOME API**

 - Access to features, attributes and OSM history edits using the OHSOME API (*OpenStreetMap History Data Analytics Platform*)

> - https://docs.ohsome.org/ohsome-api/v1/

In [11]:
# Fetch metadata from the ohsome API
# This code fetches metadata from the ohsome API and handles potential JSON decoding errors.
import requests

URL = 'https://api.ohsome.org/v1/metadata'
response = requests.get(URL)

if response.status_code == 200:
    try:
        data = response.json()
        print("Dados recebidos:")
        display(data)
    except ValueError:
        print("Erro ao decodificar JSON. Conteúdo bruto:")
        display(response.text)
else:
    display(f"Erro HTTP {response.status_code}")
    print("Resposta:")
    display(response.text)

Dados recebidos:


{'attribution': {'url': 'https://ohsome.org/copyrights',
  'text': '© OpenStreetMap contributors'},
 'apiVersion': '1.10.4',
 'timeout': 600.0,
 'extractRegion': {'spatialExtent': {'type': 'Polygon',
   'coordinates': [[[-180.0, -90.0],
     [180.0, -90.0],
     [180.0, 90.0],
     [-180.0, 90.0],
     [-180.0, -90.0]]]},
  'temporalExtent': {'fromTimestamp': '2007-10-08T00:00:00Z',
   'toTimestamp': '2025-04-06T13:00Z'},
  'replicationSequenceNumber': 110142}}

### Retrieving data from OpenStreetMap using OHSOME API and homogeneous grid cells

#### Define ET-EDGV class dictionary with respective OSM tags

In [12]:
# Novo dicionário de classes ET-EDGV com respectivas tags OSM
classe_et_edgv_to_tags = {
    'edif_ensino': [
        ('amenity', 'school'), ('amenity', 'university'),
        ('building', 'school'), ('amenity', 'kindergarten')
    ],
    'edif_saude': [
        ('amenity', 'hospital'), ('amenity', 'clinic'),
        ('building', 'hospital'), ('amenity', 'doctors'),
        ('amenity', 'dentist'), ('healthcare', '*')
    ],
    'edif_desenv_social': [
        ('amenity', 'social_facility'), ('building', 'public'),
        ('social_facility', '*')
    ],
    'edif_constr_lazer': [
        ('leisure', 'park'), ('leisure', 'sports_centre'),
        ('leisure', 'stadium'), ('amenity', 'theatre'),
        ('amenity', 'library'), ('amenity', 'community_centre'),
        ('amenity', 'arts_centre'), ('amenity', 'planetarium'),
        ('building', 'grandstand'), ('building', 'stadium'),
        ('tourism', 'museum')
    ],
    'edif_pub_civil': [
        ('building', 'public'), ('amenity', 'townhall'),
        ('office', 'government')
    ],
    'edif_turistica': [
        ('tourism', 'attraction'), ('tourism', 'artwork'),
        ('tourism', 'viewpoint'), ('amenity', 'fountain'),
        ('building', 'hotel')
    ],
    'edif_metro_ferroviaria': [
        ('railway', 'station'), ('railway', 'halt'),
        ('building', 'train_station'), ('public_transport', 'station')
    ]
}

#### Step 1 (*API Endpoint: Elements Aggregation*): count the number of OSM features (elements) and calculate the proportion of features with the attribute "name" fill in by contributors, for each grid cells:


 - Determine the total number of OSM features for interest tags, grouped by grid cell;

 - Quantify the total number of features with attribute "name" filled in; and

 - Calculate the proportion of features with attribute "name" filled in for each grid cell.

 - Period of data retrieved: 2007-10-08 to 2025-04-06;

In [None]:
# Step 1: name_ratio — Versão final com paralelização, log CSV, controle de lotes

# === CONFIGURAÇÕES GERAIS ===
url_tag = "https://api.ohsome.org/v1/elements/count/groupBy/boundary/groupBy/tag"

params_base = {'time': '2007-10-08/2025-04-06'}

output_dir = Path("results/1_output_grid/partial_results/step1_name_ratio")
output_dir.mkdir(parents=True, exist_ok=True)

log_path = output_dir / "log_step1.csv"
ultimo_lote_path = output_dir / "ultimo_lote_step1.txt"

# === INICIALIZAÇÃO DO LOG ===
if not log_path.exists():
    with open(log_path, 'w', newline='') as log_file:
        csv.writer(log_file).writerow(["lote", "mensagem", "timestamp"])

def log_mensagem(lote, mensagem):
    timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
    with open(log_path, 'a', newline='') as log_file:
        csv.writer(log_file).writerow([lote, mensagem, timestamp])

# === KEEP ALIVE ===
def keep_alive():
    while True:
        time.sleep(300)
        print("Ainda trabalhando...")
        log_mensagem("keep_alive", "Ainda trabalhando...")

threading.Thread(target=keep_alive, daemon=True).start()

# === FUNÇÃO DE PROCESSAMENTO DE UMA CÉLULA ===
def process_cell(feature):
    cell_geojson = json.dumps({"type": "FeatureCollection", "features": [feature]})
    cell_id = feature['properties']['id']
    erro_detectado = False

    for classe, tag_list in classe_et_edgv_to_tags.items():
        total_count_classe, name_count_classe = 0, 0

        for tag, value in tag_list:
            try:
                params_total = params_base.copy()
                params_total.update({
                    'bpolys': cell_geojson,
                    'filter': f'{tag}={value}',
                    'groupByKey': tag,
                    'groupByValues': value
                })
                response = requests.post(url_tag, data=params_total)
                response.raise_for_status()
                data = response.json()
                count = sum(res.get('value', 0) for res in data.get('groupByResult', [])[0].get('result', []))
                total_count_classe += count

                params_name = params_total.copy()
                params_name['filter'] = f'{tag}={value} and name=*'
                response = requests.post(url_tag, data=params_name)
                response.raise_for_status()
                data = response.json()
                name_count = sum(res.get('value', 0) for res in data.get('groupByResult', [])[0].get('result', []))
                name_count_classe += name_count

            except Exception as e:
                erro_detectado = True
                print(f"[ERRO] Célula {cell_id} | Tag: {tag}={value} | Erro: {e}")
                log_mensagem(cell_id, f"ERRO {tag}={value}: {e}")
                time.sleep(1)
                continue

        feature['properties'][f'{classe}_total_count'] = total_count_classe if not erro_detectado else 0
        feature['properties'][f'{classe}_name_count'] = name_count_classe if not erro_detectado else 0
        feature['properties'][f'{classe}_name_ratio'] = (
            (name_count_classe / total_count_classe) * 100 if total_count_classe > 0 else 0
        )

    return cell_id, feature

# === EXECUÇÃO DOS LOTES ===
ultimo_lote = 0
if ultimo_lote_path.exists():
    with open(ultimo_lote_path, 'r') as f:
        ultimo_lote = int(f.read().strip())

for lote_index in range(ultimo_lote, len(grid_subsets)):
    start_time = time.time()
    subset = grid_subsets[lote_index]
    feature_list = subset['features']

    updated_features = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(process_cell, f) for f in feature_list]
        for future in tqdm(as_completed(futures), total=len(futures), desc=f"Lote {lote_index + 1}"):
            try:
                _, processed_feature = future.result()
                updated_features.append(processed_feature)
            except Exception as e:
                log_mensagem(lote_index + 1, f"FALHA: {e}")

    fc = {
        "type": "FeatureCollection",
        "features": updated_features
    }
    if 'crs' in grid_subsets[0]:
      fc['crs'] = grid_subsets[0]['crs']

    out_path = output_dir / f"step1_lote{lote_index + 1}.geojson"
    with open(out_path, 'w', encoding='utf-8') as f:
        json.dump(fc, f)
    print(f"[SALVO] {out_path}")
    log_mensagem(lote_index + 1, f"SALVO {out_path.name}")

    # === CONSOLIDAÇÃO PARCIAL ===
    arquivos = sorted(glob.glob(str(output_dir / "step1_lote*.geojson")))
    todas_features = []
    for arquivo in arquivos:
        with open(arquivo, 'r', encoding='utf-8') as f:
            fc_parcial = json.load(f)
            todas_features.extend(fc_parcial['features'])

    final_fc = {
        "type": "FeatureCollection",
        "features": todas_features
    }
    if 'crs' in grid_subsets[0]:
      final_fc['crs'] = grid_subsets[0]['crs']

    with open(output_dir / "step1_consolidado.geojson", 'w', encoding='utf-8') as f:
        json.dump(final_fc, f)
    print("[CONSOLIDADO] step1_consolidado.geojson atualizado")
    log_mensagem(lote_index + 1, "CONSOLIDADO atualizado")

    with open(ultimo_lote_path, 'w') as f:
        f.write(str(lote_index + 1))

    tempo_exec = timedelta(seconds=int(time.time() - start_time))
    print(f"Tempo do lote {lote_index + 1}: {tempo_exec}")
    log_mensagem(lote_index + 1, f"Tempo do lote {lote_index + 1}: {tempo_exec}")

print("Step 1 (name_ratio) finalizado com sucesso.")
log_mensagem("step1", "Processamento finalizado")

#### Step 2 (*API Endpoint: Contributions Aggregation*): count the total number of contributions for features with and without the attribute "name" filled in:

- Count the **total number of contributions** to the *interest tags* for the total features in the grid cells, with and without the attribute "name" filled in.

- Period of data retrieved: 2007-10-08 to 2025-04-06.

In [None]:
# Step 2: contributions — Versão final com paralelização, log CSV, controle de lotes e CRS

# === CONFIGURAÇÕES GERAIS ===
url_contributions = "https://api.ohsome.org/v1/contributions/count/groupBy/boundary"
params_contributions_base = {'time': '2007-10-08/2025-04-06'}

output_dir = Path("results/1_output_grid/partial_results/step2_contributions")
output_dir.mkdir(parents=True, exist_ok=True)

log_path = output_dir / "log_step2.csv"
ultimo_lote_path = output_dir / "ultimo_lote_step2.txt"

# === INICIALIZAÇÃO DO LOG ===
if not log_path.exists():
    with open(log_path, 'w', newline='') as f:
        csv.writer(f).writerow(["lote", "mensagem", "timestamp"])

def log_mensagem(lote, mensagem):
    timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
    with open(log_path, 'a', newline='') as f:
        csv.writer(f).writerow([lote, mensagem, timestamp])

# === KEEP ALIVE ===
def keep_alive():
    while True:
        time.sleep(300)
        print("Ainda trabalhando...")
        log_mensagem("keep_alive", "Ainda trabalhando...")

threading.Thread(target=keep_alive, daemon=True).start()

# === FUNÇÃO DE CONTAGEM DAS CONTRIBUIÇÕES ===
def contar_contribuicoes(cell_geojson, tag, value):
    try:
        params_all = params_contributions_base.copy()
        params_all.update({'bpolys': cell_geojson, 'filter': f'{tag}={value}'})
        resp_all = requests.post(url_contributions, data=params_all)
        resp_all.raise_for_status()
        data_all = resp_all.json()
        total = sum(r.get('value', 0) for r in data_all.get('groupByResult', [])[0].get('result', []))

        params_named = params_all.copy()
        params_named['filter'] = f'{tag}={value} and name=*'
        resp_named = requests.post(url_contributions, data=params_named)
        resp_named.raise_for_status()
        data_named = resp_named.json()
        named = sum(r.get('value', 0) for r in data_named.get('groupByResult', [])[0].get('result', []))

        return total, named
    except Exception as e:
        return None, str(e)

# === PROCESSAMENTO DE UMA CÉLULA ===
def process_contributions_cell(feature):
    new_feature = json.loads(json.dumps(feature))  # Deep copy seguro
    cell_geojson = json.dumps({"type": "FeatureCollection", "features": [new_feature]})
    cell_id = new_feature['properties']['id']

    for classe, tag_list in classe_et_edgv_to_tags.items():
        total_contribs, name_contribs = 0, 0

        for tag, value in tag_list:
            retries = 3
            for attempt in range(retries):
                resultado = contar_contribuicoes(cell_geojson, tag, value)
                if resultado[0] is not None:
                    break
                time.sleep(2 ** attempt)
            if resultado[0] is None:
                print(f"[ERRO CONTRIB] Célula {cell_id} | Tag: {tag}={value} | Erro: {resultado[1]}")
                log_mensagem(cell_id, f"ERRO {tag}={value}: {resultado[1]}")
                continue
            total_contribs += resultado[0]
            name_contribs += resultado[1]

        new_feature['properties'][f'{classe}_total_contribs'] = total_contribs
        new_feature['properties'][f'{classe}_name_contribs'] = name_contribs

    return cell_id, new_feature

# === EXECUÇÃO DOS LOTES ===
ultimo_lote = 0
if ultimo_lote_path.exists():
    with open(ultimo_lote_path, 'r') as f:
        ultimo_lote = int(f.read().strip())

for lote_index in range(ultimo_lote, len(grid_subsets)):
    start_time = time.time()
    subset = grid_subsets[lote_index]
    feature_list = subset['features']

    updated_features = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(process_contributions_cell, f) for f in feature_list]
        for future in tqdm(as_completed(futures), total=len(futures), desc=f"Lote {lote_index + 1} (Step 2)"):
            try:
                _, processed_feature = future.result()
                updated_features.append(processed_feature)
            except Exception as e:
                log_mensagem(lote_index + 1, f"FALHA: {e}")

    fc = {
        "type": "FeatureCollection",
        "features": updated_features
    }
    if 'crs' in grid_subsets[0]:
        fc['crs'] = grid_subsets[0]['crs']

    out_path = output_dir / f"step2_lote{lote_index + 1}.geojson"
    with open(out_path, 'w', encoding='utf-8') as f:
        json.dump(fc, f)
    print(f"[SALVO STEP2] {out_path}")
    log_mensagem(lote_index + 1, f"SALVO {out_path.name}")

    # === CONSOLIDAÇÃO PARCIAL ===
    arquivos = sorted(glob.glob(str(output_dir / "step2_lote*.geojson")))
    todas_features = []
    for arquivo in arquivos:
        with open(arquivo, 'r', encoding='utf-8') as f:
            fc_parcial = json.load(f)
            todas_features.extend(fc_parcial['features'])

    final_fc = {
        "type": "FeatureCollection",
        "features": todas_features
    }
    if 'crs' in grid_subsets[0]:
        final_fc['crs'] = grid_subsets[0]['crs']

    with open(output_dir / "step2_consolidado.geojson", 'w', encoding='utf-8') as f:
        json.dump(final_fc, f)
    print("[CONSOLIDADO] step2_consolidado.geojson atualizado")
    log_mensagem(lote_index + 1, "CONSOLIDADO atualizado")

    with open(ultimo_lote_path, 'w') as f:
        f.write(str(lote_index + 1))

    tempo_msg = f"Tempo lote {lote_index + 1}: {str(timedelta(seconds=int(time.time() - start_time)))}"
    print(tempo_msg)
    log_mensagem(lote_index + 1, tempo_msg)

print("Step 2 (Contributions) finalizado com sucesso.")
log_mensagem("step2", "Processamento finalizado")

#### Step 3 (*API Endpoint: Contributions Aggregation*): Count the number of contributions in the past five years for features with the attribute "name" filled in:

 - Count the number of contributions in the past five years for tags of interest, aggregated by grid cells, with the attribute "name" filled in;

 - Period of data retrieved: 2019-03-09 to 2024-03-10
 
 - `This step wasn't applied to this paper.`

In [None]:
# Step 3: Latest 5 anos com name=* — Versão final com paralelização, log CSV, controle de lotes e CRS

url_latest = "https://api.ohsome.org/v1/contributions/latest/count"
params_latest_base = {'time': '2019-03-09/2024-03-10'}

output_dir = Path("results/1_output_grid/partial_results/step3_latest5y_name")
output_dir.mkdir(parents=True, exist_ok=True)

log_path = output_dir / "log_step3.csv"
ultimo_lote_path = output_dir / "ultimo_lote_step3.txt"

# === INICIALIZAÇÃO DO LOG ===
if not log_path.exists():
    with open(log_path, 'w', newline='') as f:
        csv.writer(f).writerow(["lote", "mensagem", "timestamp"])

def log_mensagem(lote, mensagem):
    timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
    with open(log_path, 'a', newline='') as f:
        csv.writer(f).writerow([lote, mensagem, timestamp])

# === KEEP ALIVE ===
def keep_alive():
    while True:
        time.sleep(300)
        print("Ainda trabalhando...")
        log_mensagem("keep_alive", "Ainda trabalhando...")

threading.Thread(target=keep_alive, daemon=True).start()

# === FUNÇÃO DE CONTAGEM DAS CONTRIBUIÇÕES RECENTES COM NAME ===
def contar_latest_name(cell_geojson, tag, value):
    try:
        params = params_latest_base.copy()
        params.update({'bpolys': cell_geojson, 'filter': f'{tag}={value} and name=*'})
        resp = requests.post(url_latest, data=params)
        resp.raise_for_status()
        data = resp.json()
        resultado = data.get('result', [])
        return resultado[-1].get('value', 0) if resultado else 0
    except Exception as e:
        return None, str(e)

# === PROCESSAMENTO DE UMA CÉLULA ===
def process_latest_cell(feature):
    new_feature = json.loads(json.dumps(feature))  # deep copy
    cell_geojson = json.dumps({"type": "FeatureCollection", "features": [new_feature]})
    cell_id = new_feature['properties']['id']

    for classe, tag_list in classe_et_edgv_to_tags.items():
        contribs_5y = 0

        for tag, value in tag_list:
            for attempt in range(3):
                resultado = contar_latest_name(cell_geojson, tag, value)
                if resultado is not None:
                    break
                time.sleep(2 ** attempt)

            if resultado is None:
                print(f"[ERRO LATEST5Y] Célula {cell_id} | Tag: {tag}={value}")
                log_mensagem(cell_id, f"ERRO {tag}={value}")
                continue

            contribs_5y += resultado

        new_feature['properties'][f'{classe}_latest5_name_contribs'] = contribs_5y

    return cell_id, new_feature

# === EXECUÇÃO DOS LOTES ===
ultimo_lote = 0
if ultimo_lote_path.exists():
    with open(ultimo_lote_path, 'r') as f:
        ultimo_lote = int(f.read().strip())

for lote_index in range(ultimo_lote, len(grid_subsets)):
    start_time = time.time()
    subset = grid_subsets[lote_index]
    feature_list = subset['features']

    updated_features = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(process_latest_cell, f) for f in feature_list]
        for future in tqdm(as_completed(futures), total=len(futures), desc=f"Lote {lote_index + 1} (Step 3)"):
            try:
                _, processed_feature = future.result()
                updated_features.append(processed_feature)
            except Exception as e:
                log_mensagem(lote_index + 1, f"FALHA: {e}")

    fc = {"type": "FeatureCollection", "features": updated_features}
    if 'crs' in grid_subsets[0]:
        fc['crs'] = grid_subsets[0]['crs']

    out_path = output_dir / f"step3_lote{lote_index + 1}.geojson"
    with open(out_path, 'w', encoding='utf-8') as f:
        json.dump(fc, f)
    log_mensagem(lote_index + 1, f"SALVO {out_path.name}")

    arquivos = sorted(glob.glob(str(output_dir / "step3_lote*.geojson")))
    todas_features = []
    for arquivo in arquivos:
        with open(arquivo, 'r', encoding='utf-8') as f:
            fc_parcial = json.load(f)
            todas_features.extend(fc_parcial['features'])

    final_fc = {"type": "FeatureCollection", "features": todas_features}
    if 'crs' in grid_subsets[0]:
        final_fc['crs'] = grid_subsets[0]['crs']

    with open(output_dir / "step3_consolidado.geojson", 'w', encoding='utf-8') as f:
        json.dump(final_fc, f)
    print("[CONSOLIDADO] step3_consolidado.geojson atualizado")
    log_mensagem(lote_index + 1, "CONSOLIDADO atualizado")

    with open(ultimo_lote_path, 'w') as f:
        f.write(str(lote_index + 1))

    tempo_msg = f"Tempo lote {lote_index + 1}: {str(timedelta(seconds=int(time.time() - start_time)))}"
    print(tempo_msg)
    log_mensagem(lote_index + 1, tempo_msg)

print("Step 3 (latest5 contributions with name) finalizado com sucesso.")
log_mensagem("step3", "Processamento finalizado")

#### Step 4 (*API Endpoint: Contributions Aggregation*): Count the total number of contributions to features with a filled-in name where a tagChange occurred:

- Count the total number of contributions to the tags of interest, aggregated by grid cell, with the attribute name filled in, considering the type of contribution (contributionType) tag change ('tagChange').

  - *contributionType available: ‘creation’, ‘deletion’, ‘tagChange’, ‘geometryChange’ ou uma combinação destes*

- Period of data retrieved: 2007-10-08 to 2025-04-06.

In [None]:
# Step 4: tagChange com name=* — Versão final com paralelização, log CSV, CRS

# === CONFIGURAÇÕES GERAIS ===
url_tagchange = "https://api.ohsome.org/v1/contributions/count/groupBy/boundary"
params_tagchange_base = {
    'time': '2007-10-08/2025-04-06',
    'contributionType': 'tagChange'
}

output_dir = Path("results/1_output_grid/partial_results/step4_tagchange_name")
output_dir.mkdir(parents=True, exist_ok=True)

log_path = output_dir / "log_step4.csv"
ultimo_lote_path = output_dir / "ultimo_lote_step4.txt"

# === INICIALIZAÇÃO DO LOG ===
if not log_path.exists():
    with open(log_path, 'w', newline='') as f:
        csv.writer(f).writerow(["lote", "mensagem", "timestamp"])

def log_mensagem(lote, mensagem):
    timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
    with open(log_path, 'a', newline='') as f:
        csv.writer(f).writerow([lote, mensagem, timestamp])

# === KEEP ALIVE ===
def keep_alive():
    while True:
        time.sleep(300)
        print("Ainda trabalhando...")
        log_mensagem("keep_alive", "Ainda trabalhando...")

threading.Thread(target=keep_alive, daemon=True).start()

# === FUNÇÃO DE CONTAGEM ===
def contar_tagchange_name(cell_geojson, tag, value):
    try:
        params = params_tagchange_base.copy()
        params.update({'bpolys': cell_geojson, 'filter': f'{tag}={value} and name=*'})
        resp = requests.post(url_tagchange, data=params)
        resp.raise_for_status()
        data = resp.json()
        return sum(r.get('value', 0) for r in data.get('groupByResult', [])[0].get('result', []))
    except Exception as e:
        return None, str(e)

# === PROCESSAMENTO DE UMA CÉLULA ===
def process_tagchange_cell(feature):
    new_feature = json.loads(json.dumps(feature))  # Deep copy
    cell_geojson = json.dumps({"type": "FeatureCollection", "features": [new_feature]})
    cell_id = new_feature['properties']['id']

    for classe, tag_list in classe_et_edgv_to_tags.items():
        total_tagchange = 0

        for tag, value in tag_list:
            for attempt in range(3):
                resultado = contar_tagchange_name(cell_geojson, tag, value)
                if resultado is not None:
                    break
                time.sleep(2 ** attempt)

            if resultado is None:
                print(f"[ERRO TAGCHANGE] Célula {cell_id} | Tag: {tag}={value}")
                log_mensagem(cell_id, f"ERRO {tag}={value}")
                continue

            total_tagchange += resultado

        new_feature['properties'][f'{classe}_name_tagchange'] = total_tagchange

    return cell_id, new_feature

# === EXECUÇÃO DOS LOTES ===
ultimo_lote = 0
if ultimo_lote_path.exists():
    with open(ultimo_lote_path, 'r') as f:
        ultimo_lote = int(f.read().strip())

for lote_index in range(ultimo_lote, len(grid_subsets)):
    start_time = time.time()
    subset = grid_subsets[lote_index]
    feature_list = subset['features']

    updated_features = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(process_tagchange_cell, f) for f in feature_list]
        for future in tqdm(as_completed(futures), total=len(futures), desc=f"Lote {lote_index + 1} (Step 4)"):
            try:
                _, processed_feature = future.result()
                updated_features.append(processed_feature)
            except Exception as e:
                log_mensagem(lote_index + 1, f"FALHA: {e}")

    fc = {"type": "FeatureCollection", "features": updated_features}
    if 'crs' in grid_subsets[0]:
        fc['crs'] = grid_subsets[0]['crs']

    out_path = output_dir / f"step4_lote{lote_index + 1}.geojson"
    with open(out_path, 'w', encoding='utf-8') as f:
        json.dump(fc, f)
    log_mensagem(lote_index + 1, f"SALVO {out_path.name}")

    # === CONSOLIDAÇÃO PARCIAL ===
    arquivos = sorted(glob.glob(str(output_dir / "step4_lote*.geojson")))
    todas_features = []
    for arquivo in arquivos:
        with open(arquivo, 'r', encoding='utf-8') as f:
            fc_parcial = json.load(f)
            todas_features.extend(fc_parcial['features'])

    final_fc = {"type": "FeatureCollection", "features": todas_features}
    if 'crs' in grid_subsets[0]:
        final_fc['crs'] = grid_subsets[0]['crs']

    with open(output_dir / "step4_consolidado.geojson", 'w', encoding='utf-8') as f:
        json.dump(final_fc, f)
    print("[CONSOLIDADO] step4_consolidado.geojson atualizado")
    log_mensagem(lote_index + 1, "CONSOLIDADO atualizado")

    with open(ultimo_lote_path, 'w') as f:
        f.write(str(lote_index + 1))

    tempo_msg = f"Tempo lote {lote_index + 1}: {str(timedelta(seconds=int(time.time() - start_time)))}"
    print(tempo_msg)
    log_mensagem(lote_index + 1, tempo_msg)

print("Step 4 (tagChange com name) finalizado com sucesso.")
log_mensagem("step4", "Processamento finalizado")

#### Step 5 (API Endpoint: Users Aggregation): Count the number of users (contributors) who edited features with attribute name filled in:

- Count the number of users who edited features of the OSM tags of Interest with attribute "name" attribute filled in, aggregated by grid cells.

- Period of data retrieved: 2007-10-08 to 2025-04-06.

In [None]:
# Step 5: Users with name=* — Versão final com paralelização, logs, retry, crs

# === CONFIGURAÇÕES GERAIS ===
url_users = "https://api.ohsome.org/v1/users/count/groupBy/boundary"
params_users_base = {'time': '2007-10-08/2025-04-06'}

output_dir = Path("results/1_output_grid/partial_results/step5_users_name")
output_dir.mkdir(parents=True, exist_ok=True)

log_path = output_dir / "log_step5.csv"
ultimo_lote_path = output_dir / "ultimo_lote_step5.txt"

# === INICIALIZAÇÃO DO LOG ===
if not log_path.exists():
    with open(log_path, 'w', newline='') as f:
        csv.writer(f).writerow(["lote", "mensagem", "timestamp"])

def log_mensagem(lote, mensagem):
    timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
    with open(log_path, 'a', newline='') as f:
        csv.writer(f).writerow([lote, mensagem, timestamp])

# === KEEP ALIVE ===
def keep_alive():
    while True:
        time.sleep(300)
        print("Ainda trabalhando...")
        log_mensagem("keep_alive", "Ainda trabalhando...")

threading.Thread(target=keep_alive, daemon=True).start()

# === FUNÇÃO DE CONTAGEM DE USUÁRIOS COM NAME ===
def contar_usuarios_name(cell_geojson, tag, value):
    try:
        params = params_users_base.copy()
        params.update({'bpolys': cell_geojson, 'filter': f'{tag}={value} and name=*'})
        resp = requests.post(url_users, data=params)
        resp.raise_for_status()
        data = resp.json()
        return sum(r.get('value', 0) for r in data.get('groupByResult', [])[0].get('result', []))
    except Exception as e:
        return None, str(e)

# === PROCESSAMENTO DE UMA CÉLULA ===
def process_users_cell(feature):
    new_feature = json.loads(json.dumps(feature))  # deep copy
    cell_geojson = json.dumps({"type": "FeatureCollection", "features": [new_feature]})
    cell_id = new_feature['properties']['id']

    for classe, tag_list in classe_et_edgv_to_tags.items():
        total_users_name = 0

        for tag, value in tag_list:
            for attempt in range(3):
                resultado = contar_usuarios_name(cell_geojson, tag, value)
                if resultado is not None:
                    break
                time.sleep(2 ** attempt)

            if resultado is None:
                print(f"[ERRO USERS] Célula {cell_id} | Tag: {tag}={value}")
                log_mensagem(cell_id, f"ERRO {tag}={value}")
                continue

            total_users_name += resultado

        new_feature['properties'][f'{classe}_users_name'] = total_users_name

    return cell_id, new_feature

# === EXECUÇÃO DOS LOTES ===
ultimo_lote = 0
if ultimo_lote_path.exists():
    with open(ultimo_lote_path, 'r') as f:
        ultimo_lote = int(f.read().strip())

for lote_index in range(ultimo_lote, len(grid_subsets)):
    start_time = time.time()
    subset = grid_subsets[lote_index]
    feature_list = subset['features']

    updated_features = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(process_users_cell, f) for f in feature_list]
        for future in tqdm(as_completed(futures), total=len(futures), desc=f"Lote {lote_index + 1} (Step 5)"):
            try:
                _, processed_feature = future.result()
                updated_features.append(processed_feature)
            except Exception as e:
                log_mensagem(lote_index + 1, f"FALHA: {e}")

    fc = {"type": "FeatureCollection", "features": updated_features}
    if 'crs' in grid_subsets[0]:
        fc['crs'] = grid_subsets[0]['crs']

    out_path = output_dir / f"step5_lote{lote_index + 1}.geojson"
    with open(out_path, 'w', encoding='utf-8') as f:
        json.dump(fc, f)
    log_mensagem(lote_index + 1, f"SALVO {out_path.name}")

    # === CONSOLIDAÇÃO PARCIAL ===
    arquivos = sorted(glob.glob(str(output_dir / "step5_lote*.geojson")))
    todas_features = []
    for arquivo in arquivos:
        with open(arquivo, 'r', encoding='utf-8') as f:
            fc_parcial = json.load(f)
            todas_features.extend(fc_parcial['features'])

    final_fc = {"type": "FeatureCollection", "features": todas_features}
    if 'crs' in grid_subsets[0]:
        final_fc['crs'] = grid_subsets[0]['crs']

    with open(output_dir / "step5_consolidado.geojson", 'w', encoding='utf-8') as f:
        json.dump(final_fc, f)
    print("[CONSOLIDADO] step5_consolidado.geojson atualizado")
    log_mensagem(lote_index + 1, "CONSOLIDADO atualizado")

    with open(ultimo_lote_path, 'w') as f:
        f.write(str(lote_index + 1))

    tempo_msg = f"Tempo lote {lote_index + 1}: {str(timedelta(seconds=int(time.time() - start_time)))}"
    print(tempo_msg)
    log_mensagem(lote_index + 1, tempo_msg)

print("Step 5 (Users with name) finalizado com sucesso.")
log_mensagem("step5", "Processamento finalizado")

Lote 1 (Step 5):  25%|██▌       | 5/20 [04:57<06:49, 27.28s/it]   

Ainda trabalhando...


Lote 1 (Step 5):  50%|█████     | 10/20 [09:48<04:49, 28.96s/it]

Ainda trabalhando...


Lote 1 (Step 5):  75%|███████▌  | 15/20 [14:31<02:31, 30.31s/it]

Ainda trabalhando...


Lote 1 (Step 5): 100%|██████████| 20/20 [18:56<00:00, 56.83s/it]

[CONSOLIDADO] step5_consolidado.geojson atualizado
Tempo lote 1: 0:18:56
Step 5 (Users with name) finalizado com sucesso.





Ainda trabalhando...
Ainda trabalhando...
Ainda trabalhando...
Ainda trabalhando...
Ainda trabalhando...
Ainda trabalhando...
Ainda trabalhando...
Ainda trabalhando...


#### Step 6 (*API Endpoint: Elements Aggregation*): Logistic regression (sigmoid)

  - Aggregation type: `count`
  - Logistic regression (sigmoid)
  - Marking sigmoid_fit_overflow (distribution of ‘non-adjustables’ by region)
  - Calculation of the days since the inflection point (not the last contribution)
  - Recording of the number of accumulated contributions (final_contributions) per class
  - Parallelisation with ThreadPoolExecutor
  - Intelligent retry per cell
  - Incremental file consolidation
  - .csv log + last batch resumption
  -  Saving results in properties for each cell

In [None]:
# Step 6: Logistic regression (sigmoid) — Versão final com paralelização, log CSV, controle de lotes

# === SUPRIMIR WARNINGS DE OVERFLOW ===
warnings.filterwarnings("ignore", category=RuntimeWarning)

# === CONFIGURAÇÕES GERAIS ===
output_dir = Path("results/1_output_grid/partial_results/step6_sigmoid_dias")
output_dir.mkdir(parents=True, exist_ok=True)

log_path = output_dir / "log_step6.csv"
ultimo_lote_path = output_dir / "ultimo_lote_step6.txt"

# === URL E PARÂMETROS OHSOME ===
url_ohsome = "https://api.ohsome.org/v1/elements/count"
params_base = {'time': '2007-10-08/2025-04-06/P1M'}

# === INICIALIZAÇÃO DO LOG ===
if not log_path.exists():
    with open(log_path, 'w', newline='') as f:
        csv.writer(f).writerow(["lote", "mensagem", "timestamp"])

def log_mensagem(lote, mensagem):
    timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
    with open(log_path, 'a', newline='') as f:
        csv.writer(f).writerow([lote, mensagem, timestamp])

# === KEEP ALIVE ===
def keep_alive():
    while True:
        time.sleep(300)
        print("Ainda trabalhando...")
        log_mensagem("keep_alive", "Ainda trabalhando...")

threading.Thread(target=keep_alive, daemon=True).start()

# === FUNÇÃO SIGMOIDE ===
def sigmoid(x, a, b, c, d):
    return a + ((b - a) / (1 + np.exp((c - x) / d)))

# === OBTÉM SÉRIE TEMPORAL VIA OHSOME ===
def fetch_ohsome_data(cell_geojson, tag, value):
    params = params_base.copy()
    params.update({
        'bpolys': cell_geojson,
        'filter': f"{tag}={value}"
    })
    response = requests.post(url_ohsome, data=params)
    response.raise_for_status()
    data = response.json()
    return pd.DataFrame(data['result'])

# === PROCESSA UMA FEATURE DA GRADE ===
def process_cell_sigmoid(feature):
    new_feature = json.loads(json.dumps(feature))  # Deep copy
    cell_id = new_feature['properties']['id']
    cell_geojson = json.dumps({"type": "FeatureCollection", "features": [new_feature]})

    for classe, tag_list in classe_et_edgv_to_tags.items():
        X, Y = None, None
        overflow_ocorrido = False
        ponto_inflexao_data = None
        total_contribuicoes = 0

        for tag, value in tag_list:
            try:
                # Consulta API
                df = fetch_ohsome_data(cell_geojson, tag, value)
                df['timestamp'] = pd.to_datetime(df['timestamp'])
                df['value'] = df['value'].astype(int)

                if df['value'].sum() == 0:
                    continue

                df['time_index'] = np.arange(len(df))
                X = df['time_index'].values
                Y = df['value'].values

                # Ajuste sigmoide
                popt, _ = curve_fit(sigmoid, X, Y, method="lm", maxfev=70000)
                a, b, c, d = popt
                y_fit = sigmoid(X, *popt)

                # Métricas
                rmse = np.sqrt(np.mean((y_fit - Y) ** 2))
                erro_pct = rmse / Y[-1] if Y[-1] != 0 else None
                ponto_inflexao_idx = int(round(c))
                ponto_inflexao_data = df['timestamp'].iloc[ponto_inflexao_idx] if 0 <= ponto_inflexao_idx < len(df) else None
                total_contribuicoes = int(Y[-1])

                # Registro nas propriedades
                new_feature['properties'][f'{classe}_contribuicoes_finais'] = total_contribuicoes
                new_feature['properties'][f'{classe}_sigmoid_rmse'] = float(rmse)
                new_feature['properties'][f'{classe}_sigmoid_pct_erro'] = round(erro_pct, 4) if erro_pct else None
                new_feature['properties'][f'{classe}_sigmoid_a'] = float(a)
                new_feature['properties'][f'{classe}_sigmoid_b'] = float(b)
                new_feature['properties'][f'{classe}_sigmoid_c'] = float(c)
                new_feature['properties'][f'{classe}_sigmoid_d'] = float(d)
                new_feature['properties'][f'{classe}_inflexao_idx'] = ponto_inflexao_idx
                new_feature['properties'][f'{classe}_inflexao_data'] = ponto_inflexao_data.strftime('%Y-%m-%d') if ponto_inflexao_data else None
                new_feature['properties'][f'{classe}_sigmoid_fit_overflow'] = False

                break  # sucesso, não precisa testar outra tag

            except Exception as e:
                log_mensagem(cell_id, f"[SIGMOID OVERFLOW] {classe} ({tag}={value}): {str(e)}")
                overflow_ocorrido = True

        # Se não conseguiu ajustar com nenhuma tag
        if overflow_ocorrido:
            new_feature['properties'][f'{classe}_sigmoid_fit_overflow'] = True

        # === DIAS DESDE O PONTO DE INFLEXÃO ===
        try:
            if ponto_inflexao_data:
              hoje = pd.Timestamp(datetime.today().date()).tz_localize(None)
              ponto_inflexao_data = ponto_inflexao_data.tz_localize(None).normalize()
              dias_desde = (hoje - ponto_inflexao_data).days
              new_feature['properties'][f"{classe}_dias_desde_inflexao"] = dias_desde
        except Exception as e:
            log_mensagem(cell_id, f"[ERRO CÁLCULO DIAS INFLEXAO] {classe}: {str(e)}")

    return cell_id, new_feature

# === EXECUÇÃO EM LOTE ===
ultimo_lote = 0
if ultimo_lote_path.exists():
    with open(ultimo_lote_path, 'r') as f:
        ultimo_lote = int(f.read().strip())

for lote_index in range(ultimo_lote, len(grid_subsets)):
    start_time = time.time()
    subset = grid_subsets[lote_index]
    feature_list = subset['features']

    updated_features = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(process_cell_sigmoid, f) for f in feature_list]
        for future in tqdm(as_completed(futures), total=len(futures), desc=f"Lote {lote_index + 1} (Step 6)"):
            try:
                _, processed_feature = future.result()
                updated_features.append(processed_feature)
            except Exception as e:
                log_mensagem(lote_index + 1, f"[FALHA GERAL] {str(e)}")

    # Salvar arquivo GeoJSON do lote
    fc = {"type": "FeatureCollection", "features": updated_features}
    if 'crs' in grid_subsets[0]:
        fc['crs'] = grid_subsets[0]['crs']

    out_path = output_dir / f"step6_lote{lote_index + 1}.geojson"
    with open(out_path, 'w', encoding='utf-8') as f:
        json.dump(fc, f)
    log_mensagem(lote_index + 1, f"SALVO {out_path.name}")

    # Consolidação incremental
    arquivos = sorted(glob.glob(str(output_dir / "step6_lote*.geojson")))
    todas_features = []
    for arquivo in arquivos:
        with open(arquivo, 'r', encoding='utf-8') as f:
            fc_parcial = json.load(f)
            todas_features.extend(fc_parcial['features'])

    final_fc = {"type": "FeatureCollection", "features": todas_features}
    if 'crs' in grid_subsets[0]:
        final_fc['crs'] = grid_subsets[0]['crs']

    with open(output_dir / "step6_consolidado.geojson", 'w', encoding='utf-8') as f:
        json.dump(final_fc, f)
    log_mensagem(lote_index + 1, "CONSOLIDADO atualizado")

    with open(ultimo_lote_path, 'w') as f:
        f.write(str(lote_index + 1))

    tempo_msg = f"Tempo lote {lote_index + 1}: {str(timedelta(seconds=int(time.time() - start_time)))}"
    print(tempo_msg)
    log_mensagem(lote_index + 1, tempo_msg)

print("Step 6 (Sigmoid + Dias desde inflexão + Contribuições finais) finalizado com sucesso.")
log_mensagem("step6", "Processamento finalizado")