# ETL – Raw → Silver Layer

Projeto: Fake Real Job

Objetivo: Limpar, padronizar e estruturar os dados para análises confiáveis

## 1. Definição de Colunas

Algumas colunas da camada Raw não são levadas para a Silver Layer por não agregarem valor analítico ou por serem redundantes.

### Colunas removidas na Silver

| Coluna          | Motivo da Remoção                                    |
|-----------------|------------------------------------------------------|
| job_description | textos longos, não utilizados nas análises atuais    |
| requirements    | textos longos, não utilizados nas análises atuais    |
| benefits        | textos longos, não utilizados nas análises atuais    |
| industry        | não relevantes para o objetivo de detecção de fraude |
| company_profile | não relevantes para o objetivo de detecção de fraude |
| company_website | não relevantes para o objetivo de detecção de fraude |
| contact_email   | não relevantes para o objetivo de detecção de fraude |
| salary_range    | substituída por salary_min, salary_max e salary_avg  |
| location        | substituída por country, state e remote              |

## 2. Extract – Carregamento dos Dados Brutos
Nesta etapa os dados são carregados diretamente da camada Raw (CSV), preservando o conteúdo original.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
     trim, to_timestamp,lower, regexp_extract, split, size,count, min, max, avg, desc,sum, count
)

spark = SparkSession.builder \
    .appName("ETL_Raw_to_Silver") \
    .config(
        "spark.jars.packages",
        "org.postgresql:postgresql:42.7.3"
    ) \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

raw_path = "../Data Layer/raw/data_raw.csv"

df_raw = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .csv(raw_path)

df_raw.printSchema()

:: loading settings :: url = jar:file:/Users/analuizarocha/PycharmProjects/sb2-fake-real-job/.venv/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/analuizarocha/.ivy2/cache
The jars for the packages stored in: /Users/analuizarocha/.ivy2/jars
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-18438379-9fb1-4b72-a762-826447635282;1.0
	confs: [default]
	found org.postgresql#postgresql;42.7.3 in central
	found org.checkerframework#checker-qual;3.42.0 in central
:: resolution report :: resolve 47ms :: artifacts dl 2ms
	:: modules in use:
	org.checkerframework#checker-qual;3.42.0 from central in [default]
	org.postgresql#postgresql;42.7.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	----------------------------

root
 |-- job_id: integer (nullable = true)
 |-- job_title: string (nullable = true)
 |-- job_description: string (nullable = true)
 |-- requirements: string (nullable = true)
 |-- benefits: string (nullable = true)
 |-- company_name: string (nullable = true)
 |-- company_profile: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- location: string (nullable = true)
 |-- salary_range: string (nullable = true)
 |-- required_experience_years: integer (nullable = true)
 |-- education_level: string (nullable = true)
 |-- department: string (nullable = true)
 |-- posting_date: date (nullable = true)
 |-- application_deadline: date (nullable = true)
 |-- contact_email: string (nullable = true)
 |-- company_website: string (nullable = true)
 |-- has_logo: integer (nullable = true)
 |-- num_open_positions: integer (nullable = true)
 |-- job_function: string (nullable = true)
 |-- telecommuting: integer (nullable = true)
 |-- frau

## 4. Transform - Limpeza e Transformação

### 4.1 Padronização de valores vazios

Strings vazias ("") são convertidas para NULL, garantindo consistência nos filtros e validações.

In [2]:
from pyspark.sql.functions import col, when

def empty_string_to_null(column):
    return when(column == "", None).otherwise(column)

In [3]:
cleaned_columns = []

for column_name in df_raw.columns:
    cleaned_columns.append(
        empty_string_to_null(col(column_name)).alias(column_name)
    )

df_raw = df_raw.select(cleaned_columns)

print("As seguintes colunas tiveram strings vazias convertidas para NULL:")
print(df_raw.columns)

As seguintes colunas tiveram strings vazias convertidas para NULL:
['job_id', 'job_title', 'job_description', 'requirements', 'benefits', 'company_name', 'company_profile', 'industry', 'employment_type', 'location', 'salary_range', 'required_experience_years', 'education_level', 'department', 'posting_date', 'application_deadline', 'contact_email', 'company_website', 'has_logo', 'num_open_positions', 'job_function', 'telecommuting', 'fraud_reason', 'text_length', 'is_fake']


### 4.2 Seleção e tipagem das colunas

In [4]:
df_silver = df_raw.select(
    col("job_id").cast("int").alias("job_id"),

    col("job_title").alias("job_title"),
    col("company_name").alias("company_name"),
    col("location").alias("location"),
    col("employment_type").alias("employment_type"),

    col("required_experience_years").cast("int").alias("required_experience_years"),

    col("telecommuting").cast("boolean").alias("telecommuting"),
    col("has_logo").cast("boolean").alias("has_logo"),
    col("is_fake").cast("boolean").alias("is_fake"),

    col("fraud_reason").alias("fraud_reason"),

    to_timestamp(col("posting_date")).alias("posting_timestamp"),
    to_timestamp(col("application_deadline")).alias("application_deadline_timestamp"),

    col("salary_range").alias("salary_range")
)

tipos_encontrados = {}

for col_name, dtype in df_silver.dtypes:
    if dtype not in tipos_encontrados:
        tipos_encontrados[dtype] = []
    tipos_encontrados[dtype].append(col_name)

print("\n" + "="*40)
print(f"TABELA SILVER CRIADA COM SUCESSO")
print("="*40)
print("Resumo da estrutura detectada:")

for tipo, colunas in tipos_encontrados.items():
    print(f"- {tipo.capitalize()}: {', '.join(colunas)}")

print("-" * 40)
print(f"Total de colunas: {len(df_silver.columns)}")
print("="*40 + "\n")



TABELA SILVER CRIADA COM SUCESSO
Resumo da estrutura detectada:
- Int: job_id, required_experience_years
- String: job_title, company_name, location, employment_type, fraud_reason, salary_range
- Boolean: telecommuting, has_logo, is_fake
- Timestamp: posting_timestamp, application_deadline_timestamp
----------------------------------------
Total de colunas: 13



### 4.3 Tratamento de salários

In [5]:
df_silver = df_silver.withColumn(
    "salary_clean",
    lower(col("salary_range"))
)

df_silver = df_silver.withColumn(
    "salary_min",
    regexp_extract(col("salary_clean"), r'(\d+)', 1).cast("int")
)

df_silver = df_silver.withColumn(
    "salary_max",
    regexp_extract(col("salary_clean"), r'\d+\D+(\d+)', 1).cast("int")
)

df_silver = df_silver \
    .withColumn(
        "salary_min",
        when(
            col("salary_clean").contains("k") & (col("salary_min") < 1000),
            col("salary_min") * 1000
        ).otherwise(col("salary_min"))
    ) \
    .withColumn(
        "salary_max",
        when(
            col("salary_clean").contains("k") & (col("salary_max") < 1000),
            col("salary_max") * 1000
        ).otherwise(col("salary_max"))
    )

df_silver = df_silver.withColumn(
    "salary_max",
    when(col("salary_max").isNull(), col("salary_min"))
    .otherwise(col("salary_max"))
)

df_silver = df_silver.withColumn(
    "salary_avg",
    ((col("salary_min") + col("salary_max")) / 2).cast("int")
)

df_silver = df_silver.drop("salary_range", "salary_clean")

metrics = df_silver.select(
    count("*").alias("total"),
    count("salary_avg").alias("preenchidos"),
    min("salary_avg").alias("minimo"),
    max("salary_avg").alias("maximo"),
    avg("salary_avg").cast("int").alias("media")
).collect()[0]

nulos = metrics['total'] - metrics['preenchidos']
pct_sucesso = (metrics['preenchidos'] / metrics['total'] * 100) if metrics['total'] > 0 else 0

print(f"""
RELATÓRIO DE EXTRAÇÃO DE SALÁRIOS
=================================
Total de vagas analisadas: {metrics['total']}

Status da Conversão:
[✓] Com Salário: {metrics['preenchidos']} ({pct_sucesso:.1f}%)
[x] Sem Salário: {nulos} (Vazios ou padrão não reconhecido)

Insights dos Valores Extraídos:
- Menor salário anual: {metrics['minimo']}
- Maior salário anual: {metrics['maximo']}
- Média geral:         {metrics['media']}
=================================
""")


RELATÓRIO DE EXTRAÇÃO DE SALÁRIOS
Total de vagas analisadas: 3000

Status da Conversão:
[✓] Com Salário: 2197 (73.2%)
[x] Sem Salário: 803 (Vazios ou padrão não reconhecido)

Insights dos Valores Extraídos:
- Menor salário anual: 50000
- Maior salário anual: 90000
- Média geral:         69644



### 4.4 Localização: país, estado e remoto

In [6]:
df_silver = df_silver.withColumn(
    "remote",
    when(
        lower(col("location")).contains("remote") |
        lower(col("location")).contains("anywhere") |
        lower(col("location")).contains("worldwide"),
        True
    ).otherwise(False)
)

df_silver = df_silver \
    .withColumn("location_parts", split(col("location"), ",")) \
    .withColumn(
        "country",
        when(col("remote") == False, trim(col("location_parts")[0]))
        .otherwise(None)
    ) \
    .withColumn(
        "state",
        when(
            (col("remote") == False) & (size(col("location_parts")) > 1),
            trim(col("location_parts")[1])
        ).otherwise(None)
    )

df_silver = df_silver.drop("location", "location_parts")

loc_stats = df_silver.select(
    count("*").alias("total"),
    sum(col("remote").cast("int")).alias("qtd_remote"),
    count("country").alias("qtd_paises"),
    count("state").alias("qtd_estados")
).collect()[0]

total = loc_stats['total']
remoto = loc_stats['qtd_remote']
remoto = remoto if remoto else 0
presencial = total - remoto
pct_remoto = (remoto / total * 100) if total > 0 else 0

print(f"""
ANÁLISE DE LOCALIZAÇÃO E GEOGRAFIA
==================================
Transformação:
- Detecção de Trabalho Remoto
- Extração de País e Estado (via quebra por vírgula)

Modalidade de Trabalho:
----------------------------------
Remoto:      {remoto} ({pct_remoto:.1f}%)
Presencial:  {presencial} ({100 - pct_remoto:.1f}%)

""")

print("Top 5 Países detectados (Vagas Presenciais):")
df_silver.filter("country is not null") \
    .groupBy("country") \
    .count() \
    .orderBy(desc("count")) \
    .show(5, truncate=False)


ANÁLISE DE LOCALIZAÇÃO E GEOGRAFIA
Transformação:
- Detecção de Trabalho Remoto
- Extração de País e Estado (via quebra por vírgula)

Modalidade de Trabalho:
----------------------------------
Remoto:      594 (19.8%)
Presencial:  2406 (80.2%)


Top 5 Países detectados (Vagas Presenciais):
+--------+-----+
|country |count|
+--------+-----+
|London  |616  |
|New York|610  |
|Toronto |604  |
|Berlin  |576  |
+--------+-----+



### 4.7 Tratamento de nulos e análise dos motivos de fraude

In [7]:
df_silver = df_silver.withColumn(
    "fraud_reason",
    when(
        (col("is_fake") == True) & (col("fraud_reason").isNull()),
        "Fraud flagged without explicit reason"
    ).otherwise(col("fraud_reason"))
)

df_fakes = df_silver.filter(col("is_fake") == True)

total_fakes = df_fakes.count()
patched_count = df_fakes.filter(col("fraud_reason") == "Fraud flagged without explicit reason").count()

print("Top Motivos de Fraude (Apenas Vagas Falsas):")
df_fakes.groupBy("fraud_reason") \
    .count() \
    .orderBy(desc("count")) \
    .show(3, truncate=False)

Top Motivos de Fraude (Apenas Vagas Falsas):
+------------------+-----+
|fraud_reason      |count|
+------------------+-----+
|No salary info    |512  |
|Unverified company|499  |
|Suspicious email  |461  |
+------------------+-----+



### 4.6 Remoção de registros inválidos e duplicados

In [8]:
df_silver = df_silver.filter(
    (col("is_fake") == True) | (
        col("job_id").isNotNull() &
        col("job_title").isNotNull() &
        col("company_name").isNotNull() &
        col("posting_timestamp").isNotNull()
    )
)
df_silver = df_silver.dropDuplicates(["job_id"])

df_silver.show(n=20, truncate=False)

+------+---------------------------+------------+---------------+-------------------------+-------------+--------+-------+------------------+-------------------+------------------------------+----------+----------+----------+------+--------+-------+
|job_id|job_title                  |company_name|employment_type|required_experience_years|telecommuting|has_logo|is_fake|fraud_reason      |posting_timestamp  |application_deadline_timestamp|salary_min|salary_max|salary_avg|remote|country |state  |
+------+---------------------------+------------+---------------+-------------------------+-------------+--------+-------+------------------+-------------------+------------------------------+----------+----------+----------+------+--------+-------+
|1     |Software Engineer          |Company_543 |Contract       |8                        |false        |false   |false  |null              |2023-11-24 00:00:00|2024-09-16 00:00:00           |60000     |80000     |70000     |false |Toronto |Canada |


### 5. Load – Salvamento dos Dados
#### 5.1 Escrita no banco PostgreSQL (Silver Layer)

In [9]:
jdbc_url = "jdbc:postgresql://localhost:5432/fake-real-job"

db_properties = {
    "user": "postgres",
    "password": "senha123",
    "driver": "org.postgresql.Driver"
}

df_silver.write \
    .mode("overwrite") \
    .jdbc(
        url=jdbc_url,
        table="silver_jobs",
        properties=db_properties
    )
