In [1]:
import pandas as pd
import numpy as np
import json
import hashlib
import re
from urllib.parse import unquote
from sqlalchemy import *
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy.dialects.mysql import insert
from pydantic_settings import BaseSettings
from pydantic import Field, SecretStr
from dotenv import load_dotenv

load_dotenv()

class Settings(BaseSettings):
    DB_HOST: str = Field(default="localhost")
    DB_PORT: int = Field(default=3306)
    DB_USER: str = Field(default="root")
    DB_PASSWORD: SecretStr = Field(...)
    DB_NAME: str = Field(default="indumine_db")

    @property
    def DATABASE_URL(self) -> str:
        return (f"mysql+pymysql://{self.DB_USER}:{self.DB_PASSWORD.get_secret_value()}"
                f"@{self.DB_HOST}:{self.DB_PORT}/{self.DB_NAME}?charset=utf8mb4")

settings = Settings()
engine = create_engine(settings.DATABASE_URL)
Base = declarative_base()


In [2]:
class Category(Base):
    __tablename__ = 'categories'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(100), nullable=False, unique=True)
    slug = Column(String(100), nullable=False, unique=True)

class Products(Base):
    __tablename__ = 'products'
    id = Column(String(50), primary_key=True)
    url = Column(Text, nullable=False)
    name = Column(String(255), nullable=False)
    category_id = Column(Integer, ForeignKey('categories.id'), nullable=True)
    description = Column(Text, nullable=True)
    specs = Column(JSON, nullable=True)
    images = Column(Text, nullable=True)
    scraped_at = Column(String(50), nullable=False)

Base.metadata.create_all(engine)


In [None]:
def get_robust_category(row):
    for col in ['Product group', 'Product family', 'Product type']:
        val = row.get(col)
        if pd.notnull(val) and str(val).strip() not in ['', 'Not applicable']:
            return str(val).strip().title()

    url = str(row.get('Product URL', ''))
    if "/en/" not in url: return "Componentes"
    
    try:
        path_content = url.split("/en/")[1].split("/p/")[0]
        segments = [unquote(s) for s in path_content.split('/') if s]
        
        if not segments: return "Componentes"
        
        category = segments[0].replace('-', ' ').title()
        
        blacklist_keywords = ['Unit', 'Cfw', 'Cwb', 'Ssw', 'Mt8', 'Spw', 'Lsp']
        
        if any(key.lower() in category.lower() for key in blacklist_keywords) or len(category) > 30:
            if len(segments) > 1:
                return segments[1].replace('-', ' ').title()
            return "Automação Industrial"

        return category
    except:
        return "Componentes"

In [None]:
raw_data = pd.read_csv('../data/weg_products_final copy.csv', sep=',', encoding='utf-8', low_memory=False)

raw_data['extracted_category'] = raw_data.apply(get_robust_category, axis=1)

specs_series = (
    raw_data.drop_duplicates(subset=['Product URL', 'Feature', 'Value'])
    .groupby('Product URL')
    .apply(lambda x: {
        'specs': dict(zip(x['Feature'], x['Value'])),
        'category': x['extracted_category'].iloc[0]
    }, include_groups=False)
)


products_df = pd.DataFrame(list(specs_series.values))
products_df['url'] = specs_series.index

def generate_id(row):
    return row['specs'].get('Product Code', hashlib.md5(row['url'].encode()).hexdigest()[:20])

products_df['id'] = products_df.apply(generate_id, axis=1)
products_df['name'] = products_df['specs'].apply(lambda s: s.get('Product Name', 'Produto sem Nome'))
products_df['description'] = products_df['specs'].apply(lambda s: s.get('Description', ''))
products_df['images'] = "[]"
products_df['scraped_at'] = pd.Timestamp.now().isoformat()


In [None]:
def sync_categories(engine, categories_series):
    unique_cats = categories_series.unique()
    Session = sessionmaker(bind=engine)
    session = Session()
    
    try:
        for cat_name in unique_cats:
            slug = str(cat_name).lower().replace(' ', '-')
            stmt = insert(Category).values(name=cat_name, slug=slug)
            stmt = stmt.on_duplicate_key_update(name=cat_name)
            session.execute(stmt)
        session.commit()
        
        query = text("SELECT id, name FROM categories")
        return pd.read_sql(query, engine.connect()).set_index('name')['id'].to_dict()
    finally:
        session.close()

cat_lookup = sync_categories(engine, products_df['category'])
products_df['category_id'] = products_df['category'].map(cat_lookup)

products_df = products_df.astype(object).where(pd.notnull(products_df), None)


In [None]:
def mysql_upsert(table_class, engine, df, chunk_size=500):
    total = len(df)
    print(f"Iniciando Upsert de {total} produtos...")

    for i in range(0, total, chunk_size):
        chunk = df.iloc[i : i + chunk_size]
        records = chunk.to_dict(orient='records')
        
        for r in records:
            if isinstance(r['specs'], dict):
                r['specs'] = json.dumps(r['specs'])
            if 'category' in r: del r['category']

        Session = sessionmaker(bind=engine)
        session = Session()
        try:
            stmt = insert(table_class).values(records)
            update_dict = {c.name: stmt.inserted[c.name] for c in table_class.__table__.columns if not c.primary_key}
            session.execute(stmt.on_duplicate_key_update(update_dict))
            session.commit()
            print(f"Lote {i//chunk_size + 1} enviado...")
        except Exception as e:
            session.rollback()
            print(f"Erro no lote {i}: {e}")
        finally:
            session.close()

mysql_upsert(Products, engine, products_df[['id', 'url', 'name', 'category_id', 'description', 'specs', 'images', 'scraped_at', 'category']])

Iniciando Upsert de 4051 produtos...
Lote 1 enviado...
Lote 2 enviado...
Lote 3 enviado...
Lote 4 enviado...
Lote 5 enviado...
Lote 6 enviado...
Lote 7 enviado...
Lote 8 enviado...
Lote 9 enviado...
