### Importa módulos necessários

In [49]:
from pyspark.sql import SparkSession, Row, Window
import pyspark.sql.functions as F

### Inicia sessão Spark

In [50]:
spark = (
    SparkSession.builder.master("local[*]")
    .appName("features-creation")
    .config("spark.ui.port", "4040")
    .config("spark.ui.showConsoleProgress", "True")
    .getOrCreate()
)

### Carrega tabela da camada bronze

In [51]:
bronze_path = "/home/jovyan/data/bronze/salaries.parquet"
df = spark.read.parquet(bronze_path)

In [52]:
df.show(truncate=False)

+----------------+---------------+-------------------+------+---------------+-------------+------------------+------------+----------------+------------+---------+
|experience_level|employment_type|job_title          |salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|work_year|
+----------------+---------------+-------------------+------+---------------+-------------+------------------+------------+----------------+------------+---------+
|MI              |FT             |Data Manager       |117400|USD            |117400       |US                |0           |US              |M           |2024     |
|MI              |FT             |Data Manager       |62620 |USD            |62620        |US                |0           |US              |M           |2024     |
|SE              |FT             |Data Manager       |131200|USD            |131200       |US                |100         |US              |M           |2024     |
|SE             

### Prepara a base de treino e teste com features para modelos de Machine Learning

In [53]:
df_id = df.withColumn("id", F.monotonically_increasing_id()).select("id", *df.columns)

In [54]:
df_job_focus = df_id.withColumn(
    "job_focus",
    F.when(F.contains(F.col("job_title"), F.lit("Engineer")), "Engineer")
    .when(F.contains(F.col("job_title"), F.lit("Scien")), "Scientist")
    .when(F.contains(F.col("job_title"), F.lit("Research")), "Scientist")
    .when(F.contains(F.col("job_title"), F.lit("Analy")), "Analyst")
    .when(F.contains(F.col("job_title"), F.lit("Architect")), "Architect")
    .when(F.contains(F.col("job_title"), F.lit("Model")), "Architect")
    .when(
        F.contains(F.col("job_title"), F.lit("Business Intelligence")),
        "Business Intelligence",
    )
    .when(F.contains(F.col("job_title"), F.lit("BI")), "Business Intelligence")
    .when(F.contains(F.col("job_title"), F.lit("Visual")), "Business Intelligence")
    .when(F.contains(F.col("job_title"), F.lit("Manager")), "Manager")
    .when(F.contains(F.col("job_title"), F.lit("Devel")), "Developer")
    .otherwise("Other"),
).withColumn(
    "data_roles",
    F.when(F.contains(F.col("job_title"), F.lit("Data")), F.lit(1))
    .when(F.contains(F.col("job_title"), F.lit("Analytics")), F.lit(1))
    .when(F.contains(F.col("job_title"), F.lit("AI")), F.lit(1))
    .when(F.contains(F.col("job_title"), F.lit("Machine Learning")), F.lit(1))
    .when(F.contains(F.col("job_title"), F.lit("ML")), F.lit(1))
    .when(
        F.contains(F.col("job_title"), F.lit("Business Intelligence")),
        F.lit(1),
    )
    .when(F.contains(F.col("job_title"), F.lit("BI")), F.lit(1))
    .when(F.contains(F.col("job_title"), F.lit("Deep Learning")), F.lit(1))
    .when(F.contains(F.col("job_title"), F.lit("ETL")), F.lit(1))
    .when(F.contains(F.col("job_title"), F.lit("NLP")), F.lit(1))
    .when(F.contains(F.col("job_title"), F.lit("Artificial Intelligence")), F.lit(1))
    .when(F.contains(F.col("job_title"), F.lit("Chatbot")), F.lit(1))
    .when(F.contains(F.col("job_title"), F.lit("Decision Scientist")), F.lit(1))
    .otherwise(F.lit(0)),
)

In [55]:
inflation = spark.createDataFrame(
    [
        Row(year=2020, rate=0.0123),
        Row(year=2021, rate=0.0470),
        Row(year=2022, rate=0.0800),
        Row(year=2023, rate=0.0412),
        Row(year=2024, rate=0.0),
    ]
)

In [56]:
cumulative_inflation = (
    inflation.select("year")
    .alias("a")
    .join(inflation.alias("b"), F.col("b.year") >= F.col("a.year"), how="left")
    .select("a.year", "rate")
    .groupBy("year")
    .agg(F.sum(F.col("rate")).alias("cumulative_rate"))
)

In [57]:
df_update = (
    df_job_focus.join(
        cumulative_inflation, df.work_year == cumulative_inflation.year, how="left"
    )
    .withColumn(
        "updated_salary",
        F.round(
            F.col("salary_in_usd") + F.col("salary_in_usd") * F.col("cumulative_rate"),
            2,
        ),
    )
    .sort("work_year")
)

In [58]:
z_score = (
    df_update.withColumn(
        "mean", F.mean("updated_salary").over(Window.partitionBy(F.lit(1)))
    )
    .withColumn(
        "stddev", F.stddev_pop("updated_salary").over(Window.partitionBy(F.lit(1)))
    )
    .withColumn(
        "z_score", F.abs((F.col("updated_salary") - F.col("mean")) / F.col("stddev"))
    )
    .withColumn("limiar", F.when(F.col("z_score") > 3, 1).otherwise(0))
    .filter(F.col("limiar") == 0)
)

In [59]:
df_job_type = z_score.withColumn(
    "job_type",
    F.when(F.col("remote_ratio") == 0, "Presencial")
    .when(F.col("remote_ratio") == 50, "Híbrido")
    .when(F.col("remote_ratio") == 100, "Remoto"),
)

In [60]:
df_usa = df_job_type.withColumn(
    "usa_residence", F.when(F.col("employee_residence") == "US", 1).otherwise(0)
).withColumn("usa_company", F.when(F.col("company_location") == "US", 1).otherwise(0))

In [61]:
df_silver = df_usa.select(
    "id",
    "experience_level",
    "employment_type",
    "company_size",
    "job_focus",
    "data_roles",
    "job_type",
    "usa_residence",
    "usa_company",
    "updated_salary",
    "work_year",
)

In [62]:
df_silver.show(truncate=False)

+-----------+----------------+---------------+------------+---------+----------+----------+-------------+-----------+--------------+---------+
|id         |experience_level|employment_type|company_size|job_focus|data_roles|job_type  |usa_residence|usa_company|updated_salary|work_year|
+-----------+----------------+---------------+------------+---------+----------+----------+-------------+-----------+--------------+---------+
|17179869184|EN              |FT             |S           |Engineer |1         |Presencial|0            |0          |118050.0      |2020     |
|17179869185|EN              |CT             |L           |Analyst  |1         |Híbrido   |0            |0          |52830.92      |2020     |
|17179869186|SE              |FT             |M           |Scientist|1         |Híbrido   |1            |1          |193602.0      |2020     |
|17179869187|EN              |FT             |L           |Analyst  |1         |Presencial|0            |0          |56544.77      |2020     |

In [63]:
df_silver.count()

64323

In [64]:
df_silver.printSchema()

root
 |-- id: long (nullable = false)
 |-- experience_level: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- company_size: string (nullable = true)
 |-- job_focus: string (nullable = false)
 |-- data_roles: integer (nullable = false)
 |-- job_type: string (nullable = true)
 |-- usa_residence: integer (nullable = false)
 |-- usa_company: integer (nullable = false)
 |-- updated_salary: double (nullable = true)
 |-- work_year: integer (nullable = true)



### Persistindo na camada silver

In [65]:
silver_path = "/home/jovyan/data/silver/features.parquet"
df_silver.write.partitionBy("work_year").mode("overwrite").parquet(silver_path)

In [66]:
spark.stop()