In [0]:
from pyspark.sql.functions import current_timestamp
from pyspark.sql.types import StringType
import re
from pyspark.sql import functions as F

# Defina database e paths
database_name = "visa_jobs_silver"  # pode ser criado antes no UC
silver_table_name = "company_skilled_migrants"
bronze_path = "/mnt/netherlands-tracker/bronze/company_skilled_migrants"

# Leitura bronze
df_bronze = spark.read.parquet(bronze_path)

# Função de limpeza do nome da empresa
def clean_company_name(name):
    if not name:
        return None
    name = name.strip()
    name = re.sub(r"[^a-zA-Z0-9&\s\.]", "", name)
    name = name.replace("B.V.", "BV").replace("B.V", "BV").replace("N.V.", "NV").replace("N.V", "NV")
    name = re.sub(r"\s+", " ", name)
    name = re.sub(r"\s+(BV|NV)$", "", name, flags=re.IGNORECASE)
    name = name.lower()
    return name

clean_company_name_udf = F.udf(clean_company_name, StringType())

# Aplica limpeza e adiciona timestamp de ingestão
df_clean = (df_bronze
    .withColumn("company_clean", clean_company_name_udf(F.col("Organisation"))).withColumnRenamed("KvK number", "KvK_number")

    .withColumn("silver_ingestion_ts", current_timestamp())
)

# Remove a coluna KvK number
df_clean = df_clean.drop("KvK_number")

# Remove duplicados
df_clean = df_clean.dropDuplicates(["company_clean"])

# Mostra resultado para conferência
df_clean.select("Organisation", "company_clean", "load_date").show(10, truncate=False)

# Salva como tabela Delta 
df_clean.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{database_name}.{silver_table_name}")

In [0]:
# Confirmação
row_count = spark.table(f"{database_name}.{silver_table_name}").count()
print(f"Tabela {database_name}.{silver_table_name} criada com sucesso - QTDE de linhas: {row_count}")

In [0]:
%sql
SELECT company_clean FROM visa_jobs_silver.company_skilled_migrants LIMIT 100