<a href="https://colab.research.google.com/github/LuizFelipe-FF/coords-tab-geoquimica/blob/main/padroniza%C3%A7%C3%A3o_coors.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Funcionou pra maioria

In [10]:
import pandas as pd
import numpy as np
import re
import geopandas as gpd
import pyproj
from shapely.geometry import Point

# 1. Carregamento do CSV
df = pd.read_csv("pmp_recalculated.csv", sep=';')

# 2. Nomes únicos
def make_unique_names(series):
    counts = {}
    unique = []
    for name in series:
        if pd.isna(name):
            unique.append(name)
            continue
        count = counts.get(name, 0)
        suffix = '' if count == 0 else f'_{chr(96 + count + 1)}'
        unique.append(f"{name}{suffix}")
        counts[name] = count + 1
    return unique

df['sample_name_unique'] = make_unique_names(df['sample_name'])

# 3. Expansão de coordenadas compostas
def expand_multiple_coordinates(df):
    rows = []
    for _, row in df.iterrows():
        coord_str = str(row['longitude'])
        if 'and' in coord_str and '/' in coord_str:
            parts = [pair.strip() for pair in coord_str.split('and')]
            for i, pair in enumerate(parts):
                try:
                    lon_raw, lat_raw = pair.split('/')
                    new_row = row.copy()
                    new_row['lon_raw'] = lon_raw.strip()
                    new_row['lat_raw'] = lat_raw.strip()
                    new_row['sample_name_unique'] += f"_{chr(97 + i)}"
                    rows.append(new_row)
                except:
                    continue
        else:
            new_row = row.copy()
            new_row['lat_raw'] = str(row['latitude'])
            new_row['lon_raw'] = str(row['longitude'])
            rows.append(new_row)
    return pd.DataFrame(rows)

df_expanded = expand_multiple_coordinates(df)

# 4. Limpeza
def normalizar(coord):
    if pd.isna(coord): return coord
    coord = str(coord).strip()
    coord = coord.replace(',', '.')
    coord = coord.replace('º', '°').replace("’", "'").replace("‘", "'")
    coord = coord.replace("″", '"').replace("”", '"').replace("“", '"')
    coord = coord.replace("´", "'").replace("?", "'").replace("Â", "")
    coord = coord.replace('N', '').replace('S', '').replace('E', '').replace('W', '')
    coord = coord.replace('\u2013', '-').replace('\u2014', '-').replace('\u2212', '-')
    coord = coord.replace(' ', '')
    if coord.count('.') > 1 and not re.search(r'\d\.\d+$', coord):
        coord = coord.replace('.', '')
    if re.fullmatch(r'-?\d{5,}', coord):
        coord = re.sub(r'^(-?\d{2})(\d+)$', r'\1.\2', coord)
    return coord

df_expanded['lat_raw'] = df_expanded['lat_raw'].apply(normalizar)
df_expanded['lon_raw'] = df_expanded['lon_raw'].apply(normalizar)

# 5. Funções auxiliares
def dms_to_decimal(d, m, s, hemi=''):
    val = abs(float(d)) + float(m)/60 + float(s)/3600
    return -val if hemi in ['S', 'W'] else val

def parse_coord(coord):
    try: return float(coord)
    except: pass
    dms = re.match(r'(-?\d+)[°]?\s*(\d+)?\'?\s*(\d+(?:\.\d+)?)?\"?\s*([NSEW])?', coord, re.IGNORECASE)
    if dms:
        g, m, s, h = dms.groups()
        return dms_to_decimal(g, m or 0, s or 0, h.upper() if h else '')
    dmm = re.match(r'(-?\d+)[°]?\s*(\d+(?:\.\d+)?)[\'\s]*([NSEW])?', coord, re.IGNORECASE)
    if dmm:
        g, m, h = dmm.groups()
        return dms_to_decimal(g, m, 0, h.upper() if h else '')
    decimal = re.match(r'(-?\d+(?:\.\d+)?)[°]?\s*([NSEW])', coord, re.IGNORECASE)
    if decimal:
        val, h = decimal.groups()
        val = float(val)
        return -abs(val) if h.upper() in ['S', 'W'] else abs(val)
    return np.nan

def utm_to_latlon(e, n, zone=22, hemisphere='S'):
    try:
        crs_utm = pyproj.CRS(proj='utm', zone=zone, south=(hemisphere.upper() == 'S'))
        transformer = pyproj.Transformer.from_crs(crs_utm, 'EPSG:4326', always_xy=True)
        lon, lat = transformer.transform(e, n)
        return lat, lon
    except:
        return np.nan, np.nan

def limpar_utm(coord):
    if pd.isna(coord): return None
    coord = re.sub(r'[^\d]', '', str(coord))
    try: return int(coord)
    except: return None

def corrigir_decimal_mal_posicionado(coord):
    try:
        if isinstance(coord, str) and re.fullmatch(r'-\d{3}\.\d+', coord):
            numeros = re.sub(r'[^\d]', '', coord)
            if len(numeros) >= 6:
                return float(f"-{numeros[:2]}.{numeros[2:]}")
    except:
        pass
    return np.nan

# 6. Conversão principal
def process_coordinates(row):
    lat = row['lat_raw']
    lon = row['lon_raw']

    if isinstance(lon, str) and '/' in lon:
        try:
            e, n = map(lambda x: int(re.sub(r'\D', '', x)), lon.split('/'))
            return pd.Series(utm_to_latlon(e, n))
        except:
            return pd.Series([np.nan, np.nan])

    try:
        lat_dd = parse_coord(lat)
        lon_dd = parse_coord(lon)

        if pd.isna(lat_dd) or pd.isna(lon_dd):
            e_raw = limpar_utm(lon)
            n_raw = limpar_utm(lat)
            if e_raw and n_raw and 100000 < e_raw < 10000000:
                return pd.Series(utm_to_latlon(e_raw, n_raw))
        return pd.Series([lat_dd, lon_dd])
    except:
        return pd.Series([np.nan, np.nan])

df_expanded[['lat_dd', 'lon_dd']] = df_expanded.apply(process_coordinates, axis=1)

# 7. Recuperar ponto mal posicionado
def aplicar_recuperacao(row):
    lat, lon = row['lat_dd'], row['lon_dd']
    if pd.isna(lat):
        lat = corrigir_decimal_mal_posicionado(row['lat_raw'])
    if pd.isna(lon):
        lon = corrigir_decimal_mal_posicionado(row['lon_raw'])
    return pd.Series([lat, lon])

df_expanded[['lat_dd', 'lon_dd']] = df_expanded.apply(aplicar_recuperacao, axis=1)

# 8. Corrigir sinal
def corrigir_sinal(lat, lon):
    if pd.isna(lat) or pd.isna(lon): return np.nan, np.nan
    try:
        lat = float(lat)
        lon = float(lon)
        if lat > 0: lat *= -1
        if lon > 0: lon *= -1
        if not (-90 <= lat <= 0 and -180 <= lon <= 0):
            return np.nan, np.nan
        return lat, lon
    except:
        return np.nan, np.nan

df_expanded[['lat_dd', 'lon_dd']] = df_expanded.apply(
    lambda row: pd.Series(corrigir_sinal(row['lat_dd'], row['lon_dd'])),
    axis=1
)

# 9. Diagnóstico
def classificar_falha(row):
    if pd.isna(row['lat_raw']) or pd.isna(row['lon_raw']):
        return 'coord ausente'
    try:
        float(row['lat_raw'])
        float(row['lon_raw'])
    except:
        return 'coord mal formatada'
    if abs(float(row['lat_raw'])) > 90 or abs(float(row['lon_raw'])) > 180:
        return 'grau inválido'
    return 'parse falhou'

df_expanded['conversao'] = np.where(
    df_expanded[['lat_dd', 'lon_dd']].notna().all(axis=1),
    'Convertido', 'Falha'
)

df_expanded['motivo_falha'] = df_expanded.apply(
    lambda r: classificar_falha(r) if r['conversao'] == 'Falha' else '',
    axis=1
)

# 10. Exportar falhas
falhas = df_expanded[df_expanded['conversao'] == 'Falha']
falhas.to_csv("coordenadas_falhas.csv", index=False)

# 11. Exportar convertidos
df_ok = df_expanded[df_expanded['conversao'] == 'Convertido']
gdf = gpd.GeoDataFrame(
    df_ok,
    geometry=gpd.points_from_xy(df_ok['lon_dd'], df_ok['lat_dd']),
    crs="EPSG:4326"
)
gdf.to_file("amostras_convertidas.shp", driver="ESRI Shapefile", encoding="utf-8")
gdf.to_csv("amostras_convertidas.csv", sep=';', index=False)

# 12. Painel final de status
print("\n✅ Painel de Conversão Final")
print(f"Total original: {df.shape[0]}")
print(f"Após expansão de coordenadas: {df_expanded.shape[0]}")
print(f"Coordenadas convertidas com sucesso: {df_ok.shape[0]}")
print(f"Coordenadas com falha: {falhas.shape[0]}")
print("\n📊 Classificação das falhas:")
print(falhas['motivo_falha'].value_counts())


✅ Painel de Conversão Final
Total original: 3742
Após expansão de coordenadas: 3769
Coordenadas convertidas com sucesso: 2300
Coordenadas com falha: 1469

📊 Classificação das falhas:
motivo_falha
parse falhou           1401
grau inválido            35
coord mal formatada      33
Name: count, dtype: int64


diferentão mas nao rodou tao bem

In [9]:
import pandas as pd
import numpy as np
import re
import geopandas as gpd
import pyproj
from shapely.geometry import Point

# ===============================================
# 1. Funções de parsing refinadas
# ===============================================

def normalize_coord_string(coord_str):
    if pd.isna(coord_str): return None
    coord_str = str(coord_str).strip()
    coord_str = coord_str.replace(',', '.')
    coord_str = coord_str.replace("º", "°").replace("’", "'").replace("‘", "'")
    coord_str = coord_str.replace("″", '"').replace("”", '"').replace("“", '"')
    coord_str = coord_str.replace("´", "'").replace("?", "'").replace("Â", "")
    coord_str = coord_str.replace('N', '').replace('S', '').replace('E', '').replace('W', '')
    coord_str = coord_str.replace('\u2013', '-').replace('\u2014', '-').replace('\u2212', '-')
    coord_str = coord_str.replace(' ', '')

    if coord_str.count('.') > 1 and not re.search(r'\d+\.\d+$', coord_str):
        parts = coord_str.split('.')
        coord_str = ''.join(parts[:-1]) + '.' + parts[-1]

    if re.fullmatch(r'-?\d+\.\d+\.\d+', coord_str):
        parts = coord_str.split('.')
        coord_str = ''.join(parts[:-1]) + '.' + parts[-1]

    return coord_str

def dms_to_decimal(degrees, minutes, seconds):
    try:
        deg = float(degrees)
        min = float(minutes)
        sec = float(seconds)
        val = abs(deg) + (min / 60) + (sec / 3600)
        return -val if deg > 0 else deg
    except:
        return np.nan

def dmm_to_decimal(degrees, minutes_decimal):
    try:
        deg = float(degrees)
        min_dec = float(minutes_decimal)
        val = abs(deg) + (min_dec / 60)
        return -val if deg > 0 else deg
    except:
        return np.nan

def parse_and_convert_coord(coord_str, is_longitude=False):
    norm = normalize_coord_string(coord_str)
    if norm is None: return np.nan

    dms_match = re.match(r'(-?\d+)[°]?\s*(\d+)?\'?\s*(\d+(?:\.\d+)?)?"?', norm)
    if dms_match:
        g, m, s = dms_match.groups()
        return dms_to_decimal(g, m or 0, s or 0)

    dmm_match = re.match(r'(-?\d+)[°]?\s*(\d+\.\d+)', norm)
    if dmm_match:
        g, m_dec = dmm_match.groups()
        return dmm_to_decimal(g, m_dec)

    try:
        val = float(norm)
        if abs(val) > 10 and abs(val) < 1000 and re.match(r'^-?\d{3}\.\d+$', norm) and not is_longitude:
            val = val / 10
        return -abs(val)
    except:
        return np.nan

# ===============================================
# 2. Etapas do pipeline com pandas
# ===============================================

df = pd.read_csv("pmp_recalculated.csv", sep=';')

def make_unique_names(series):
    counts = {}
    result = []
    for name in series:
        if pd.isna(name):
            result.append(name)
            continue
        c = counts.get(name, 0)
        suffix = '' if c == 0 else f'_{chr(96 + c + 1)}'
        result.append(f"{name}{suffix}")
        counts[name] = c + 1
    return result

df['sample_name_unique'] = make_unique_names(df['sample_name'])

def expand_multiple_coords(df):
    rows = []
    for _, row in df.iterrows():
        coord_str = str(row['longitude'])
        if 'and' in coord_str and '/' in coord_str:
            parts = [pair.strip() for pair in coord_str.split('and')]
            for i, pair in enumerate(parts):
                try:
                    lon_raw, lat_raw = pair.split('/')
                    new_row = row.copy()
                    new_row['lon_raw'] = lon_raw.strip()
                    new_row['lat_raw'] = lat_raw.strip()
                    new_row['sample_name_unique'] += f"_{chr(97 + i)}"
                    rows.append(new_row)
                except:
                    continue
        else:
            new_row = row.copy()
            new_row['lat_raw'] = str(row['latitude'])
            new_row['lon_raw'] = str(row['longitude'])
            rows.append(new_row)
    return pd.DataFrame(rows)

df_expanded = expand_multiple_coords(df)

# ===============================================
# 3. Conversão usando o parser refinado
# ===============================================

def parse_row(row):
    lat = parse_and_convert_coord(row['lat_raw'], is_longitude=False)
    lon = parse_and_convert_coord(row['lon_raw'], is_longitude=True)
    return pd.Series([lat, lon])

df_expanded[['lat_dd', 'lon_dd']] = df_expanded.apply(parse_row, axis=1)

# 4. Correção de sinal e faixa
def corrigir_sinal(lat, lon):
    if pd.isna(lat) or pd.isna(lon): return np.nan, np.nan
    if lat > 0: lat *= -1
    if lon > 0: lon *= -1
    if not (-90 <= lat <= 0 and -180 <= lon <= 0):
        return np.nan, np.nan
    return lat, lon

df_expanded[['lat_dd', 'lon_dd']] = df_expanded.apply(
    lambda r: pd.Series(corrigir_sinal(r['lat_dd'], r['lon_dd'])),
    axis=1
)

# 5. Diagnóstico
def diagnostico(row):
    if pd.isna(row['lat_raw']) or pd.isna(row['lon_raw']):
        return 'coord ausente'
    try:
        float(row['lat_raw'])
        float(row['lon_raw'])
    except:
        return 'coord mal formatada'
    if abs(float(row['lat_raw'])) > 90 or abs(float(row['lon_raw'])) > 180:
        return 'grau inválido'
    return 'parse falhou'

df_expanded['conversao'] = np.where(
    df_expanded[['lat_dd', 'lon_dd']].notna().all(axis=1), 'Convertido', 'Falha'
)
df_expanded['motivo_falha'] = df_expanded.apply(
    lambda r: diagnostico(r) if r['conversao'] == 'Falha' else '',
    axis=1
)

# 6. Exportações
df_expanded[df_expanded['conversao'] == 'Falha'].to_csv("coordenadas_falhas.csv", index=False)

df_ok = df_expanded[df_expanded['conversao'] == 'Convertido'].copy()

gdf = gpd.GeoDataFrame(
    df_ok,
    geometry=gpd.points_from_xy(df_ok['lon_dd'], df_ok['lat_dd']),
    crs="EPSG:4326"
)

gdf.to_file("amostras_convertidas.shp", driver="ESRI Shapefile", encoding="utf-8")
gdf.to_csv("amostras_convertidas.csv", sep=';', index=False)

# 7. Painel
print("\n✅ Conversão concluída")
print(f"Original: {df.shape[0]}")
print(f"Após expansão: {df_expanded.shape[0]}")
print(f"Convertidas: {(df_expanded['conversao'] == 'Convertido').sum()}")
print(f"Falhas: {(df_expanded['conversao'] == 'Falha').sum()}")
print("\n📊 Motivos das falhas:")
print(df_expanded['motivo_falha'].value_counts())


✅ Conversão concluída
Original: 3742
Após expansão: 3769
Convertidas: 1943
Falhas: 1826

📊 Motivos das falhas:
motivo_falha
                       1943
parse falhou           1401
grau inválido           361
coord mal formatada      64
Name: count, dtype: int64
