# Offline Fleet Processing Notebook

Notebook version of `fleet.py` focused on offline processing starting from existing parsed raw CSVs. Follow the sections sequentially.

In [None]:
# 1. Setup & Imports
import pandas as pd, numpy as np, re, json, logging, math
from pathlib import Path
from datetime import date, datetime
from difflib import SequenceMatcher
import unicodedata

pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', 100)
pd.set_option('display.width', 140)

logging.basicConfig(level=logging.INFO, format='%(asctime)s | %(levelname)7s | %(message)s')
log = logging.getLogger('offline')

## 2. Path Configuration & Input File Selection
Define base directories and auto-detect latest input CSV files. Adjust manually if needed.

In [None]:
# Auto-detect helper
def latest(pattern: str, base: Path) -> Path | None:
    files = sorted(base.glob(pattern), key=lambda p: p.stat().st_mtime, reverse=True)
    return files[0] if files else None

BASE = Path('.')
DATA_DIR = BASE / 'data'
RAW_DIR = BASE / 'raw'
RAW_LOCALIZA = RAW_DIR / 'localiza'
RAW_MOVIDA = RAW_DIR / 'movida'
FIPE_DIR = DATA_DIR / 'fipe'
TUPLES_DIR = DATA_DIR / 'tuples'
TABLES_DIR = DATA_DIR / 'tables'
for d in [DATA_DIR, RAW_LOCALIZA, RAW_MOVIDA, FIPE_DIR, TUPLES_DIR, TABLES_DIR]:
    d.mkdir(parents=True, exist_ok=True)

localiza_csv = latest('localiza_seminovos_*.csv', RAW_LOCALIZA)
movida_csv = latest('movida_seminovos_*.csv', RAW_MOVIDA)
fipe_models_csv = (DATA_DIR / 'fipe_models.csv') if (DATA_DIR / 'fipe_models.csv').exists() else Path('fipe_models.csv')
if not fipe_models_csv.exists(): fipe_models_csv = None
fipe_dump_csv = latest('fipe_dump_*.csv', FIPE_DIR)

log.info('Localiza CSV: %s', localiza_csv)
log.info('Movida   CSV: %s', movida_csv)
log.info('FIPE models CSV: %s', fipe_models_csv)
log.info('FIPE dump CSV: %s', fipe_dump_csv)

## 3. Utility Functions (Dates, Logging, Helpers)

In [None]:
def ymd_compact(d: date | None = None) -> str:
    return (d or date.today()).strftime('%Y%m%d')

def today_iso() -> str:
    return date.today().isoformat()

def ensure_dir(p: Path):
    p.mkdir(parents=True, exist_ok=True)

def clean_price_to_int(x):
    if x is None or (isinstance(x, float) and math.isnan(x)):
        return pd.NA
    if isinstance(x, (int, np.integer)):
        return int(x)
    if isinstance(x, (float, np.floating)):
        return int(round(float(x)))
    s = re.sub(r'[^\d\.,]', '', str(x))
    if ',' in s and '.' in s:
        s = s.replace('.', '').replace(',', '.')
    elif ',' in s:
        s = s.replace(',', '.')
    try:
        return int(round(float(s)))
    except Exception:
        return pd.NA

## 4. Normalization Helpers (Accents, Text Cleaning, Tokens)

In [None]:
# Canonical normalization & helper functions (from fleet.py)
import re, unicodedata, pandas as pd
from difflib import SequenceMatcher
import numpy as np

def strip_accents(s: str) -> str:
    return "".join(
        c for c in unicodedata.normalize("NFD", str(s))
        if unicodedata.category(c) != "Mn"
    )

def remove_duplicate_words(text: str) -> str:
    seen = set(); result=[]
    for word in text.split():
        key = word.casefold()
        if key not in seen:
            seen.add(key); result.append(word)
    return " ".join(result)

def norm_text(s: str) -> str:
    import pandas as pd, re
    if s is None or (isinstance(s, float) and pd.isna(s)):
        return ""
    s0 = strip_accents(str(s).lower())
    s0 = s0.replace(",", ".")
    s0 = s0.replace("c/ar", "").replace("c/ ar","")
    s0 = re.sub(r'/', ' ', s0)
    s0 = re.sub(r'(?i)(?<!\w)(?:T\.)(?!\w)', 'turbo', s0)
    s0 = re.sub(r"[^a-z0-9\.\s]", " ", s0)
    s0 = re.sub(r"\bautomatic[oa]\b|\bat\b|\baut(?:\.|o)?\b", "aut", s0)
    s0 = re.sub(r"\bman(?:ual)?\b|\bmecanico\b", "mec", s0)
    s0 = re.sub(r"\bt\s?si\b", "tsi", s0)
    s0 = re.sub(r"\b(\d{2,4}(?:i|d))\s*a\b", r"\1 aut", s0)
    s0 = re.sub(r'(?<=[A-Za-z])\.(?=[A-Za-z])', '. ', s0)
    s0 = re.sub(r'(?<=[A-Za-z])\.', '', s0)
    s0 = re.sub(r'(?i)(?<!\w)(?:perf|perfor|performa|performance|p)(?!\w)', 'performance', s0)
    s0 = re.sub(r'(?i)(?<!\w)(?:long)(?!\w)', 'longitude', s0)
    s0 = re.sub(r'(?i)(?<!\w)(?:sportb|SPB|SB)(?!\w)', 'sportback', s0)
    s0 = re.sub(r'(?i)(?<!\w)(?:prest)(?!\w)', 'prestige', s0)
    s0 = re.sub(r'(?i)(?<!\w)(?:ultim)(?!\w)', 'ultimate', s0)
    s0 = re.sub(r'(?i)(?<!\w)(?:insc)(?!\w)', 'inscription', s0)
    s0 = re.sub(r'(?i)(?<!\w)(?:xdrive30e)(?!\w)', 'xdrive 30e', s0)
    s0 = re.sub(r'(?i)(?<!\w)(?:cp)(?!\w)', 'cs plus', s0)
    s0 = re.sub(r'(?i)(?<!\w)(?:7l)(?!\w)', '', s0)
    s0 = re.sub(r'(?i)(?<!\w)(?:hurric|hurr)(?!\w)', 'hurricane', s0)
    s0 = re.sub(r'(?i)(?<!\w)(?:overl)(?!\w)', 'overland', s0)
    s0 = re.sub(r'(?i)(?<!\w)(?:dies|die)(?!\w)', 'diesel', s0)
    s0 = re.sub(r'(?i)(?<!\w)(?:tb)(?!\w)', 'turbo', s0)
    s0 = re.sub(r'(?i)(?<!\w)(?:sed)(?!\w)', 'sedan', s0)
    s0 = re.sub(r'(?i)(?<!\w)(?:step)(?!\w)', 'stepway', s0)
    s0 = re.sub(r'(?i)(?<!\w)(?:hig)(?!\w)', 'highline', s0)
    s0 = re.sub(r'(?i)(?<!\w)(?:limit)(?!\w)', 'limited', s0)
    s0 = re.sub(r'(?i)(?<!\w)(?:plat)(?!\w)', 'platinum', s0)
    s0 = re.sub(r'(?i)\b\d+[pv]\b', '', s0)
    s0 = re.sub(r'(?i)(?<!\w)(?:exclu)(?!\w)', 'exclusive', s0)
    s0 = re.sub(r'(?i)(?<!\w)(?:t270)(?!\w)', 'turbo 270', s0)
    s0 = re.sub(r'(?i)(?<!\w)(?:comfort|comfor)(?!\w)', 'comfortline', s0)
    s0 = re.sub(r'(?i)\bONIX\s+HATCH\s+PREM\.\b', 'onix hatch premier', s0)
    s0 = re.sub(r'(?i)\bONIX\s+SEDAN\s+PREM\.\b', 'onix sedan premier', s0)
    s0 = re.sub(r'(?i)\bONIX\s+SEDAN\s+Plus+\s+PREM\.\b', 'onix sedan plus premier', s0)
    s0 = re.sub(r'(?i)\bONIX\s+SD\.\s+P\.\s+PR\.\b', 'onix sedan plus premier', s0)
    s0 = re.sub(r'(?i)\bFastback\s+Limited+\s+Ed\.\b', 'fastback limited edition', s0)
    s0 = re.sub(r'(?i)\bAIRCROSS\s+F\.\b', 'aircross feel', s0)
    s0 = re.sub(r'(?<=xc)(\d+)', r' \1', s0)
    s0 = re.sub(r'\bnew\b(?![\s-]*(?:range|beetle)\b)', '', s0, flags=re.IGNORECASE)
    s0 = remove_duplicate_words(s0)
    s0 = re.sub(r"\s+", " ", s0).strip()
    return s0

def generic_norm_text(s: str) -> str:
    import re
    if s is None or (isinstance(s, float) and pd.isna(s)):
        return ""
    s0 = strip_accents(str(s).lower())
    s0 = re.sub(r"[^a-z0-9\s]", " ", s0)
    s0 = re.sub(r'(?<=xc)(\d+)', r' \1', s0)
    s0 = re.sub(r"\s+", " ", s0).strip()
    return s0

def norm_brand(s: str) -> str:
    s0 = generic_norm_text(s)
    aliases = {
        "vw - volkswagen":"volkswagen","vw volkswagen":"volkswagen","volks":"volkswagen","volkswagem":"volkswagen",
        "gm":"chevrolet","gm - chevrolet":"chevrolet","gm chevrolet":"chevrolet","chevy":"chevrolet",
        "mercedes-benz":"mercedes benz","mb":"mercedes benz","caoa chery":"chery","caoa chery/chery":"chery","great wall":"gwm"
    }
    return aliases.get(s0, s0)

def tokset(s: str) -> set:
    return set(generic_norm_text(s).split())

def extract_engine(s: str):
    m = re.search(r"\b(\d\.\d)\b", str(s))
    return m.group(1) if m else None

# Type normalization

def normalize_type(raw_type, fuel_sigla):
    if fuel_sigla:
        fs = str(fuel_sigla).upper().strip()
        if fs in {'E','H'}:
            return 'ev'
    if raw_type is None or (isinstance(raw_type, float) and pd.isna(raw_type)):
        return ""
    s0 = strip_accents(str(raw_type)).upper().strip()
    mapping = {
        'SEDAN':'SEDAN','SEDÃ':'SEDAN','SEDA':'SEDAN',
        'HATCH':'HATCH','HATCHBACK':'HATCH',
        'SUV':'SUV',
        'PICAPE':'PICKUP/VANS','PICAPE CABINE DUPLA':'PICKUP/VANS','PICK-UP':'PICKUP/VANS','PICKUP':'PICKUP/VANS','CAMINHONETE':'PICKUP/VANS','CABINE SIMPLES':'PICKUP/VANS',
        'UTILITÁRIO':'PICKUP/VANS','FURGAO':'PICKUP/VANS','MINIVAN':'PICKUP/VANS','VAN':'PICKUP/VANS','CARGA':'PICKUP/VANS',
        'ELETRICO':'EV','HIBRIDO':'EV',
        'COUPE':'PREMIUM','SPORTBACK':'PREMIUM','GRAN COUPE':'PREMIUM','FASTBACK':'PREMIUM',
        'PARTICULAR':'OTHER','OUTROS':'OTHER','OUTRO':'OTHER'
    }
    return mapping.get(s0, s0).lower()

# Matching scorer (canonical subset)

def score_best_fipe_for_key(brand_norm: str, model_norm: str, version_norm: str, model_year: int, fipe_df: pd.DataFrame, threshold: float):
    if not version_norm or not model_norm or pd.isna(model_year):
        return (None, None, None, 0.0, "unmatched")
    m_tokens = [re.escape(t) for t in model_norm.split() if t]
    if not m_tokens:
        return (None, None, None, 0.0, "unmatched")
    token_pattern = "(?=.*" + ")(?=.*".join(m_tokens) + ")"
    cand_mask = (
        (fipe_df["_brand_norm"] == brand_norm) &
        (fipe_df["AnoModelo"] == int(model_year)) &
        fipe_df["_model_norm"].str.contains(token_pattern, regex=True, na=False)
    )
    cand = fipe_df[cand_mask]
    if cand.empty:
        return (None, None, None, 0.0, "unmatched")
    v_toks = tokset(version_norm)
    v_engine = extract_engine(version_norm)
    s_best=-1.0; m_best=c_best=None; y_best=None
    for _, fr in cand.iterrows():
        fipe_model_norm = fr["_model_norm"]
        c_toks = fr["_toks"]
        q_toks = v_toks
        inter = len(q_toks & c_toks)
        coverage = inter / len(q_toks) if q_toks else 0.0
        precision = inter / len(c_toks) if c_toks else 0.0
        if coverage < 0.05: continue
        f1 = (2*precision*coverage/(precision+coverage)) if (precision+coverage) else 0.0
        jacc = (len(q_toks & c_toks)/len(q_toks | c_toks)) if (q_toks or c_toks) else 0.0
        base = SequenceMatcher(None, version_norm, fipe_model_norm).ratio()
        score = 0.55*f1 + 0.20*jacc + 0.25*base
        c_engine = fr["_engine"]
        if v_engine:
            score += 0.05 if c_engine == v_engine else (-0.10 if c_engine is not None else 0.0)
        if "gp" in v_toks and "gp" in c_toks: score += 0.03
        score -= min(0.12, 0.02 * len(c_toks - q_toks))
        if score > s_best:
            s_best = score; m_best = fr["Modelo"]; c_best = fr["CodigoFipe"]; y_best = int(fr["AnoModelo"]) if pd.notna(fr["AnoModelo"]) else None
    if s_best >= threshold and m_best and c_best and y_best is not None:
        return (m_best, c_best, y_best, float(round(s_best,4)), "matched")
    return (None, None, None, float(max(s_best,0.0)), "unmatched")

print('Canonical normalization & matching helpers loaded.')

## 5. Loading Raw Localiza CSV

In [None]:
if localiza_csv is None:
    raise SystemExit('Localiza CSV not found. Place parsed CSV in raw/localiza/.')
loc_df = pd.read_csv(localiza_csv, sep=';')
log.info('Localiza rows: %d columns: %d', len(loc_df), loc_df.shape[1])
required_loc_cols = {'brand','model','version','model_year','price'}
missing = required_loc_cols - set(c.lower() for c in loc_df.columns)
if missing:
    log.warning('Localiza missing columns (may be fine if naming differs): %s', missing)
loc_df.head()

## 6. Loading Raw Movida CSV

In [None]:
if movida_csv is None:
    log.warning('Movida CSV not found; continuing with Localiza only.')
    mov_df = pd.DataFrame(columns=['brand','model','version','model_year','price'])
else:
    mov_df = pd.read_csv(movida_csv, sep=';')
    log.info('Movida rows: %d columns: %d', len(mov_df), mov_df.shape[1])
mov_df.head()

## 7. Data Harmonization (Column Mapping & Typing)

In [None]:
def _standardize(df: pd.DataFrame, vendor: str) -> pd.DataFrame:
    rename_map = {c.lower(): c for c in df.columns}
    def col(name):
        return next((c for k,c in rename_map.items() if k == name), None)
    for required in ['brand','model','version']:
        if col(required) is None:
            df[required] = ''
    if col('price') is None:
        df['price'] = pd.NA
    if col('model_year') is None:
        df['model_year'] = pd.NA
    if 'snapshot_date' not in df.columns:
        df['snapshot_date'] = today_iso()
    # Lowercase important fields
    for c in ['brand','model','version']:
        if c in df.columns:
            df[c] = df[c].astype(str).str.strip().str.lower()
    df['price'] = df['price'].apply(clean_price_to_int).astype('Int64') if 'price' in df.columns else pd.NA
    df['model_year'] = pd.to_numeric(df['model_year'], errors='coerce').astype('Int64')
    return df

loc_df = _standardize(loc_df, 'localiza')
mov_df = _standardize(mov_df, 'movida')
loc_df.head()

## 8. Augment Localiza & Movida (Derived Version Fields)

In [None]:
for df in [loc_df, mov_df]:
    if df.empty: continue
    df['_brand_norm'] = df['brand'].map(norm_brand)
    df['_model_norm'] = df['model'].map(generic_norm_text)
    df['_version_norm'] = df['version'].map(norm_text)
    df['_engine'] = df['_version_norm'].map(extract_engine)
loc_df[['brand','model','version','_version_norm']].head()

## 9. Load FIPE Models (fipe_models.csv)

In [None]:
if fipe_models_csv is None:
    raise SystemExit('fipe_models.csv not found in data/. Download or copy it before proceeding.')
# Flexible separator autodetect
try:
    fipe_models = pd.read_csv(fipe_models_csv, sep=None, engine='python', encoding='utf-8-sig')
except Exception:
    fipe_models = pd.read_csv(fipe_models_csv, sep=';', encoding='utf-8-sig')
log.info('FIPE models rows: %d', len(fipe_models))
fipe_models.head()

## 10. Prepare FIPE Models (Normalization & Tokenization)

In [None]:
fipe_models.columns = [c.replace('\ufeff','').strip() for c in fipe_models.columns]
colmap = {c.lower(): c for c in fipe_models.columns}
# Required columns
req_map = {k: colmap.get(k) for k in ['marca','modelo','codigofipe','anomodelo']}
missing = [k for k,v in req_map.items() if v is None]
if missing:
    raise SystemExit(f'Missing required FIPE model columns: {missing}')
fm = fipe_models.rename(columns={req_map['marca']:'Marca', req_map['modelo']:'Modelo', req_map['codigofipe']:'CodigoFipe', req_map['anomodelo']:'AnoModelo'})
fm['Marca'] = fm['Marca'].astype(str).str.lower().str.strip()
fm['Modelo'] = fm['Modelo'].astype(str).str.lower().str.strip()
fm['CodigoFipe'] = fm['CodigoFipe'].astype(str).str.strip()
fm['AnoModelo'] = pd.to_numeric(fm['AnoModelo'], errors='coerce').astype('Int64')
fm = fm[fm['CodigoFipe'].ne('') & fm['Modelo'].ne('') & fm['AnoModelo'].notna()].copy()
fm['_brand_norm'] = fm['Marca'].map(norm_brand)
fm['_model_norm'] = fm['Modelo'].map(norm_text)
fm['_toks'] = fm['_model_norm'].str.split().apply(set)
fm['_engine'] = fm['_model_norm'].map(extract_engine)
fm = fm.drop_duplicates(subset=['CodigoFipe','Modelo','AnoModelo']).reset_index(drop=True)
fm.head()

## 11. Load / Initialize Version Match Cache

In [None]:
VERSION_MATCH_TABLE = DATA_DIR / 'localiza_version_match.csv'
match_cols = ['brand_norm','model_norm','version_norm','model_year','fipe_brand','fipe_model','fipe_code','score','match_source','first_seen','last_seen']
if VERSION_MATCH_TABLE.exists():
    cache_df = pd.read_csv(VERSION_MATCH_TABLE, sep=';')
    for c in match_cols:
        if c not in cache_df.columns: cache_df[c] = pd.NA
    cache_df = cache_df[match_cols]
else:
    cache_df = pd.DataFrame(columns=match_cols)
cache_df.head()

## 12. Match Localiza Versions to FIPE Codes

In [None]:
THRESHOLD = 0.62  # adjust as needed

# Identify new keys
loc_df['model_year'] = pd.to_numeric(loc_df['model_year'], errors='coerce').astype('Int64')
key_cols = ['_brand_norm','_model_norm','_version_norm','model_year']
existing_keys = set(tuple(r) for r in cache_df[['brand_norm','model_norm','version_norm','model_year']].itertuples(index=False, name=None))
new_keys_df = (loc_df[key_cols].drop_duplicates()
               .rename(columns={'_brand_norm':'brand_norm','_model_norm':'model_norm','_version_norm':'version_norm'}))
new_keys_df['model_year'] = pd.to_numeric(new_keys_df['model_year'], errors='coerce').astype('Int64')
new_keys_df = new_keys_df[~new_keys_df.apply(tuple, axis=1).isin(existing_keys)]
log.info('New version keys to match: %d', len(new_keys_df))

rows = []
today_iso = today_iso()
for _, r in new_keys_df.iterrows():
    brand_n = r['brand_norm']; model_n = r['model_norm']; version_n = r['version_norm']; year = r['model_year']
    if pd.isna(year) or not model_n:
        rows.append({**r, 'fipe_brand': None, 'fipe_model': None, 'fipe_code': None, 'score': 0.0, 'match_source':'contains','first_seen':today_iso,'last_seen':today_iso})
        continue
    cand = fm[(fm['_brand_norm']==brand_n) & (fm['AnoModelo']==int(year))]
    if cand.empty:
        rows.append({**r, 'fipe_brand': None, 'fipe_model': None, 'fipe_code': None, 'score': 0.0, 'match_source':'contains','first_seen':today_iso,'last_seen':today_iso})
        continue
    model_tok_set = set(model_n.split())
    cand = cand[cand['_model_norm'].apply(lambda m: model_n in m or model_tok_set.issubset(set(m.split())))]
    if cand.empty:
        rows.append({**r, 'fipe_brand': None, 'fipe_model': None, 'fipe_code': None, 'score': 0.0, 'match_source':'contains','first_seen':today_iso,'last_seen':today_iso})
        continue
    v_toks = set(version_n.split()) if version_n else set()
    best_score=-1.0; best=None
    for _, fr in cand.iterrows():
        f_toks = fr['_toks']
        inter = len(v_toks & f_toks)
        coverage = inter/len(v_toks) if v_toks else 0.0
        precision = inter/len(f_toks) if f_toks else 0.0
        f1 = (2*precision*coverage/(precision+coverage)) if (precision+coverage) else 0.0
        jacc = (len(v_toks & f_toks)/len(v_toks | f_toks)) if (v_toks or f_toks) else 0.0
        seq = SequenceMatcher(None, version_n, fr['_model_norm']).ratio() if version_n else 0.0
        score = 0.5*seq + 0.3*f1 + 0.2*jacc
        if score > best_score:
            best_score = score; best = fr
    if best is not None:
        rows.append({**r, 'fipe_brand': best['Marca'], 'fipe_model': best['Modelo'], 'fipe_code': best['CodigoFipe'], 'score': round(float(best_score),4), 'match_source':'contains','first_seen':today_iso,'last_seen':today_iso})

new_matches = pd.DataFrame(rows, columns=match_cols)
new_matches.head()

## 13. Persist Updated Match Cache & Matched Localiza Dataset

In [None]:
if not new_matches.empty:
    cache_df = pd.concat([cache_df, new_matches], ignore_index=True)
# Update last_seen for keys present today
present_keys = set(tuple(r) for r in loc_df[key_cols].drop_duplicates().rename(columns={'_brand_norm':'brand_norm','_model_norm':'model_norm','_version_norm':'version_norm'}).itertuples(index=False, name=None))
mask = cache_df[['brand_norm','model_norm','version_norm','model_year']].apply(tuple, axis=1).isin(present_keys)
cache_df.loc[mask,'last_seen'] = today_iso
cache_df = cache_df.sort_values(['brand_norm','model_norm','version_norm','model_year','last_seen']).drop_duplicates(subset=['brand_norm','model_norm','version_norm','model_year'], keep='last')
ensure_dir(VERSION_MATCH_TABLE.parent)
cache_df.to_csv(VERSION_MATCH_TABLE, index=False, sep=';')
log.info('Saved version match cache: %d rows', len(cache_df))

# Merge back into Localiza dataset
loc_matched = loc_df.merge(cache_df.rename(columns={'brand_norm':'_brand_norm','model_norm':'_model_norm','version_norm':'_version_norm'}), on=['_brand_norm','_model_norm','_version_norm','model_year'], how='left')
loc_matched.rename(columns={'score':'match_score'}, inplace=True)
loc_matched['match_score'] = pd.to_numeric(loc_matched['match_score'], errors='coerce').fillna(0.0)
loc_matched['match_accepted'] = (loc_matched['match_score'] >= THRESHOLD).astype(int)
match_out = DATA_DIR / f"localiza_with_fipe_match_{ymd_compact()}.csv"
loc_matched.to_csv(match_out, index=False, sep=';')
log.info('Saved matched Localiza CSV: %s', match_out)
loc_matched[['brand','model','version','fipe_code','match_score','match_accepted']].head()

## 14. Extract (fipe_code, model_year) Tuples from Localiza & Movida

In [None]:
def collect_tuples(loc: pd.DataFrame, mov: pd.DataFrame):
    out = set()
    if not loc.empty and 'fipe_code' in loc.columns:
        for _, r in loc[['fipe_code','model_year']].dropna().iterrows():
            code = str(r['fipe_code']).strip();
            try: yr = int(r['model_year'])
            except: continue
            if code:
                out.add((code, yr))
    if not mov.empty and 'fipe_code' in mov.columns:
        for _, r in mov[['fipe_code','model_year']].dropna().iterrows():
            code = str(r['fipe_code']).strip();
            try: yr = int(r['model_year'])
            except: continue
            if code:
                out.add((code, yr))
    return out

tuples_set = collect_tuples(loc_matched, mov_df)
log.info('Unique tuples collected: %d', len(tuples_set))
list(sorted(list(tuples_set))[:10])

## 15. Generate Tuples Audit DataFrame & Export

In [None]:
ldf = loc_matched[['fipe_code','model_year']].dropna().copy(); ldf['fipe_code']=ldf['fipe_code'].astype(str).str.strip()
mdf = mov_df[['fipe_code','model_year']].dropna().copy() if ('fipe_code' in mov_df.columns) else pd.DataFrame(columns=['fipe_code','model_year'])
if not mdf.empty:
    mdf['fipe_code']=mdf['fipe_code'].astype(str).str.strip()

lcnt = (ldf.groupby(['fipe_code','model_year']).size().rename('localiza_count').reset_index()) if not ldf.empty else pd.DataFrame(columns=['fipe_code','model_year','localiza_count'])
cnt2 = (mdf.groupby(['fipe_code','model_year']).size().rename('movida_count').reset_index()) if not mdf.empty else pd.DataFrame(columns=['fipe_code','model_year','movida_count'])
audit = pd.merge(lcnt, cnt2, on=['fipe_code','model_year'], how='outer')
for c in ['localiza_count','movida_count']:
    if c not in audit.columns: audit[c]=0
audit[['localiza_count','movida_count']] = audit[['localiza_count','movida_count']].fillna(0).astype(int)
audit['total_count'] = audit['localiza_count'] + audit['movida_count']
audit = audit.sort_values(['total_count','fipe_code','model_year'], ascending=[False, True, True])
TUPLES_DIR.mkdir(parents=True, exist_ok=True)
audit_out = TUPLES_DIR / f'fipe_tuples_{ymd_compact()}.csv'
audit.to_csv(audit_out, index=False, sep=';')
log.info('Tuples audit saved: %s (rows=%d)', audit_out, len(audit))
audit.head()

## 16. Load Existing FIPE Dump (Offline)

In [None]:
if fipe_dump_csv is None:
    log.warning('No FIPE dump CSV found (data/fipe/fipe_dump_*.csv). Skip pricing merge steps if absent.')
    fipe_dump = pd.DataFrame()
else:
    fipe_dump = pd.read_csv(fipe_dump_csv)
    # Basic numeric parsing for ValorNum if present
    if 'ValorNum' in fipe_dump.columns:
        fipe_dump['ValorNum'] = pd.to_numeric(fipe_dump['ValorNum'], errors='coerce')
log.info('FIPE dump rows: %d', len(fipe_dump))
fipe_dump.head()

## 17. Build Localiza Vendor Table (Merge FIPE Pricing)

In [None]:
def _merge_vendor(df: pd.DataFrame, fipe: pd.DataFrame) -> pd.DataFrame:
    if fipe.empty: return df.assign(fipe_price=pd.NA, premium_vs_fipe_price=pd.NA)
    f = fipe.rename(columns={'CodigoFipe':'fipe_code','AnoModelo':'model_year','ValorNum':'fipe_price','Modelo':'fipe_version'}) if 'CodigoFipe' in fipe.columns else fipe.copy()
    keep_cols = [c for c in ['fipe_code','model_year','fipe_price','fipe_version'] if c in f.columns]
    f = f[keep_cols].dropna(subset=['fipe_code','model_year']) if {'fipe_code','model_year'}.issubset(f.columns) else f
    merged = df.merge(f, on=['fipe_code','model_year'], how='left') if {'fipe_code','model_year'}.issubset(df.columns) else df.copy()
    if 'price' in merged.columns and 'fipe_price' in merged.columns:
        merged['premium_vs_fipe_price'] = np.where(merged['fipe_price'].gt(0), (merged['price']-merged['fipe_price'])/merged['fipe_price'], pd.NA)
    return merged

loc_vendor = _merge_vendor(loc_matched, fipe_dump)
loc_vendor.head()

## 18. Build Movida Vendor Table (Merge FIPE Pricing)

In [None]:
mov_vendor = _merge_vendor(mov_df, fipe_dump)
mov_vendor.head()

## 19. Build Consolidated FIPE Time Series Table (Filter to Vendor Presence)

In [None]:
def parse_mes_label(label: str):
    pt = {'janeiro':1,'fevereiro':2,'março':3,'marco':3,'abril':4,'maio':5,'junho':6,'julho':7,'agosto':8,'setembro':9,'outubro':10,'novembro':11,'dezembro':12}
    s = (label or '').lower().replace('\xa0',' ').strip()
    s = re.sub(r'\s+',' ', s)
    s_norm = re.sub(r'[/-]',' ', s)
    m = re.search(r'(janeiro|fevereiro|março|marco|abril|maio|junho|julho|agosto|setembro|outubro|novembro|dezembro)\s+(?:de\s+)?(\d{4})', s_norm)
    if m:
        return int(m.group(2)), pt[m.group(1)]
    m2 = re.search(r'(1[0-2]|0?[1-9])\s+(\d{4})', s_norm)
    if m2:
        return int(m2.group(2)), int(m2.group(1))
    return 1900,1

if fipe_dump.empty:
    fipe_series = pd.DataFrame()
else:
    fdf = fipe_dump.copy()
    if 'MesReferencia' not in fdf.columns:
        log.warning('Missing MesReferencia in FIPE dump; cannot build time series')
        fipe_series = pd.DataFrame()
    else:
        fdf['reference_year'], fdf['reference_month'] = zip(*fdf['MesReferencia'].map(parse_mes_label))
        # Shift back one month
        def _shift(y,m):
            return (y-1,12) if m==1 else (y, m-1)
        fdf['reference_year'], fdf['reference_month'] = zip(*fdf.apply(lambda r: _shift(int(r['reference_year']), int(r['reference_month'])), axis=1))
        fdf = fdf.rename(columns={'CodigoFipe':'fipe_code','AnoModelo':'model_year','ValorNum':'fipe_price','Marca':'brand','Modelo':'fipe_version'})
        vendor_presence = set((c,y) for c,y in tuples_set)
        fdf = fdf[fdf[['fipe_code','model_year']].apply(tuple, axis=1).isin(vendor_presence)].copy()
        fdf = fdf.sort_values(['fipe_code','model_year','reference_year','reference_month'])
        fdf = fdf.drop_duplicates(subset=['fipe_code','model_year','reference_year','reference_month'], keep='last')
        fdf['m_m_price_change'] = fdf.groupby(['fipe_code','model_year'])['fipe_price'].pct_change()
        fipe_series = fdf[['reference_year','reference_month','brand','fipe_version','fipe_code','model_year','fipe_price','m_m_price_change']]
fipe_series.head()

## 20. Compute Price Premium & Month-over-Month Changes
Already computed: premium_vs_fipe_price in vendor tables; m_m_price_change in FIPE series.

## 21. Finalize & Export All Output Tables

In [None]:
STAMP = ymd_compact()
ensure_dir(TABLES_DIR)
loc_vendor_out = TABLES_DIR / f'localiza_table_{STAMP}.csv'
mov_vendor_out = TABLES_DIR / f'movida_table_{STAMP}.csv'
fipe_series_out = TABLES_DIR / f'fipe_table_{STAMP}.csv'
loc_vendor.to_csv(loc_vendor_out, index=False, sep=';')
mov_vendor.to_csv(mov_vendor_out, index=False, sep=';')
if not fipe_series.empty:
    fipe_series.to_csv(fipe_series_out, index=False, sep=';')
log.info('Exported vendor tables and FIPE series.')

loc_vendor.head(), mov_vendor.head(), fipe_series.head()