O banco de dados escolhido simula dados de publicações de vagas relacionadas ao mercado de Inteligência Artificial, indicando o nome (fictício) da empresa, habilidades e ferramentas exigidas para cada vaga, localização (fictícia) de cada empresa, bem como o ramo de negócio.
As linhas de código a seguir são fornecidas no próprio site da kaggle, para possiibilitar a leitura dos dados diretamente da plataforma, via API.

In [0]:
#https://www.kaggle.com/datasets/minahilfatima12328/ai-workforce-data-overview

%pip install kagglehub
import kagglehub

# Download latest version
path = kagglehub.dataset_download("minahilfatima12328/ai-workforce-data-overview")

print("Path to dataset files:", path)

Collecting kagglehub
  Downloading kagglehub-0.3.13-py3-none-any.whl.metadata (38 kB)
Collecting tqdm (from kagglehub)
  Downloading tqdm-4.67.1-py3-none-any.whl.metadata (57 kB)
Downloading kagglehub-0.3.13-py3-none-any.whl (68 kB)
Downloading tqdm-4.67.1-py3-none-any.whl (78 kB)
Installing collected packages: tqdm, kagglehub
Successfully installed kagglehub-0.3.13 tqdm-4.67.1
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
Downloading from https://www.kaggle.com/api/v1/datasets/download/minahilfatima12328/ai-workforce-data-overview?dataset_version_number=1...


  0%|          | 0.00/91.8k [00:00<?, ?B/s]100%|██████████| 91.8k/91.8k [00:00<00:00, 3.03MB/s]

Extracting files...
Path to dataset files: /home/spark-ec14d290-b2b8-421c-bd61-a8/.cache/kagglehub/datasets/minahilfatima12328/ai-workforce-data-overview/versions/1





As linhas de código a seguir leem os dados extraídos e armazenados na memória e salvam os mesmos em um dataframe, utilizando as bibliotecas Spark. 

In [0]:
import pandas as pd
import io
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType
import os
# Criar sessão Spark
spark = SparkSession.builder.getOrCreate()

# Ler o arquivo como string 
files = os.listdir(path)
file_path = os.path.join(path, files[0])
with open(file_path, "r", encoding="utf-8") as f:
    data = f.read()

# Usar pandas para parsing, considerando aspas para células com vírgulas internas
pdf = pd.read_csv(io.StringIO(data), sep=",", quotechar='"', engine="python", dtype=str, keep_default_na=False)

# Converter para Spark DataFrame
df = spark.createDataFrame(pdf)

Em seguida, são criadas os schemas bronze, silver e gold, dentro do Unit Catalog do Databricks.

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS bronze;

CREATE DATABASE IF NOT EXISTS silver;

CREATE DATABASE IF NOT EXISTS gold

Para criação das tabelas a serem armazendas no schema bronze, foi pensada uma configuração estrela, onde existe uma tabela FATO, que contém as informações das vagas ("job_id", "job_title", "experience_level","employment_type", "posted_date"), uma tabela dimensão skills (que contém a chave estrangeira job_ib e a coluna skills_required), uma tabela dimensão tools (que contém a chave estrangeira job_ib e a coluna tools_preferred), uma tabela dimensão location (na qual a informação original era "cidade,estado" e que foi separada em duas colunas, uma para cidade e outra para estado), uma tabela dimensão salary (na qual a informação original era "min_salary - max_salary" e que foi separada em duas colunas, uma min_salary e outra para max_salary) e uma tabela dimensão company, que contém mais informações sobre as empresas que publicaram as vagas ("job_id", "company_name", "industry", "company_size").

In [0]:
# Criar dimensão para skills_required
skills_dim = df.select("job_id", F.explode(F.split(F.col("skills_required"), ", ")).alias("skill"))

# Criar dimensão para tools_preferred
tools_dim = df.select("job_id", F.explode(F.split(F.col("tools_preferred"), ", ")).alias("tool"))

# Criar dimensão para location (cidade e estado)
location_dim = df.select("job_id", F.split(F.col("location"), ", ").getItem(0).alias("city"),
                         F.split(F.col("location"), ", ").getItem(1).alias("state"))

# Criar dimensão para salary_range_usd
salary_dim = df.select("job_id",
                       F.split(F.col("salary_range_usd"), "-").getItem(0).alias("min_salary"),
                       F.split(F.col("salary_range_usd"), "-").getItem(1).alias("max_salary"))

# Criar tabela para company
company_dim = df.select("job_id", "company_name", "industry", "company_size")

# Criar tabela fato
fact_jobs = df.select("job_id", "job_title", "experience_level",
                      "employment_type", "posted_date")

#display(spark.table("bronze.fact_jobs").limit(10))
#display(spark.table("bronze.skills_dim").limit(10))
#display(spark.table("bronze.tools_dim").limit(10))
#display(spark.table("bronze.location_dim").limit(10))
#display(spark.table("bronze.salary_dim").limit(10))
#display(spark.table("bronze.company_dim").limit(10))


As linhas de código a seguir completam o fluxo ETL para o schema gold, salvando as tabelas, após a separação entre fato e dimensões e as transformações realizadas nas colunas da tabela original.

In [0]:
# Salvar as tabelas no schema bronze
skills_dim.write.format("delta").mode("overwrite").saveAsTable("bronze.skills_dim")
tools_dim.write.format("delta").mode("overwrite").saveAsTable("bronze.tools_dim")
location_dim.write.format("delta").mode("overwrite").saveAsTable("bronze.location_dim")
salary_dim.write.format("delta").mode("overwrite").saveAsTable("bronze.salary_dim")
company_dim.write.format("delta").mode("overwrite").saveAsTable("bronze.company_dim")
fact_jobs.write.format("delta").mode("overwrite").saveAsTable("bronze.fact_jobs")

As linhas de código a seguir fazem as transformações necessárias para transformar os dados do schema gold para o schema silver, como remoção de linhas e espaços em branco, padronização de escrita (minúsculas/maiúsculas) e configuração do tipo de dado correto para cada coluna, já que todas as colunas vieram como formato "string" na tabela original.

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DecimalType, DateType, TimestampType

#leitura das tabelas
fact_bronze = spark.table("bronze.fact_jobs")
skills_bronze = spark.table("bronze.skills_dim")
tools_bronze = spark.table("bronze.tools_dim")
location_bronze = spark.table("bronze.location_dim")
company_bronze = spark.table("bronze.company_dim")
salary_bronze = spark.table("bronze.salary_dim")

#Remoção de espaços em branco nas células, transformação de colunas para os tipos corretos
fact_clean = (
    fact_bronze
    .withColumn("job_title", F.trim(F.col("job_title")))
    .withColumn("posted_date_raw", F.col("posted_date"))
    # Parse de data (ajuste formato se necessário)
    .withColumn("posted_date", F.to_date(F.col("posted_date"), "yyyy-MM-dd"))
    .withColumn("ingest_ts", F.current_timestamp())
)
#Remoção de espaços em branco nas células
company_clean = (
    company_bronze
        .withColumn("company_name", F.trim(F.col("company_name")))
    .withColumn("industry", F.trim(F.col("industry")))
)
#Transforma salários em inteiro
salary_clean = (
    salary_bronze
    .withColumn("min_salary_raw", F.col("min_salary"))
    .withColumn("max_salary_raw", F.col("max_salary"))
    .withColumn("min_salary", F.regexp_replace(F.col("min_salary_raw"), r"[^\d]", "").cast(IntegerType()))
    .withColumn("max_salary", F.regexp_replace(F.col("max_salary_raw"), r"[^\d]", "").cast(IntegerType()))
    # marca registros com problemas
    .withColumn("salary_valid", (F.col("min_salary").isNotNull()) & (F.col("max_salary").isNotNull()) & (F.col("min_salary") <= F.col("max_salary")))
)

#Normaliza dados de localização
location_clean = (
    location_bronze
    .withColumn("city", F.initcap(F.trim(F.col("city"))))
    .withColumn("state", F.upper(F.trim(F.col("state"))))
)

#Remoção de duplicatas (job_id/skill e job_id/tool) e linhas com dados em branco.
skills_clean = (
    skills_bronze
    .withColumn("skill", F.trim(F.col("skill")))
    .filter(F.col("skill") != "")
    .dropDuplicates(["job_id", "skill"])
    .withColumn("skill_id", F.sha2(F.concat_ws("||", F.col("job_id"), F.col("skill")), 256))
)
tools_clean = (
    tools_bronze
    .withColumn("tool", F.trim(F.col("tool")))
    .filter(F.col("tool") != "")
    .dropDuplicates(["job_id", "tool"])
    .withColumn("tool_id", F.sha2(F.concat_ws("||", F.col("job_id"), F.col("tool")), 256))
)

Após as transformações, os dados são salvos no schema silver.

In [0]:
#Salva dados tratados na tabela silver:
fact_clean.write.format("delta").mode("overwrite").saveAsTable("silver.fact_jobs")
company_clean.write.format("delta").mode("overwrite").saveAsTable("silver.companies")
skills_clean.write.format("delta").mode("overwrite").saveAsTable("silver.skills")
tools_clean.write.format("delta").mode("overwrite").saveAsTable("silver.tools")
location_clean.write.format("delta").mode("overwrite").saveAsTable("silver.locations")
salary_clean.write.format("delta").mode("overwrite").saveAsTable("silver.salaries")

Para a transformação de dados do schema silver para gold, a única transformação realizada foi a quebra dos meses e anos, a fim de avaliar se existem períodos ou meses específicos que há aumento ou redução do número de vagas publicadas.

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DecimalType, DateType, TimestampType

fact_silver = spark.table("silver.fact_jobs")

fact_time = (
    fact_silver
    .withColumn("posted_date_parsed",
        F.coalesce(
            F.to_date(F.col("posted_date"), "yyyy-MM-dd"),
            F.to_date(F.col("posted_date"), "dd/MM/yyyy"),
            F.to_date(F.col("posted_date"), "MM/dd/yyyy"),
            F.to_date(F.col("posted_date"), "yyyy/MM/dd"),
            F.to_date(F.col("posted_date"), "dd-MM-yyyy"),
            F.to_date(F.col("posted_date"), "MMM dd, yyyy")  # ex: Jan 02, 2024
        )
    )
    .withColumn("posted_year", F.year(F.col("posted_date_parsed")))
    .withColumn("posted_month", F.month(F.col("posted_date_parsed")))
    .withColumn("posted_month_name", F.date_format(F.col("posted_date_parsed"), "MMMM"))
    .withColumn("posted_yyyy_mm", F.date_format(F.col("posted_date_parsed"), "yyyy-MM"))
)

Armazenamento dos dados na tabela Gold:

In [0]:
fact_time.write.format("delta").mode("overwrite").saveAsTable("gold.fact_jobs_time")

#transferência das tabelas skills, tools, salaries companies e location sem transformações:
skills_silver = spark.table("silver.skills")
skills_silver.write.format("delta").mode("overwrite").saveAsTable("gold.skills")
tools_silver = spark.table("silver.tools")
tools_silver.write.format("delta").mode("overwrite").saveAsTable("gold.tools")
salary_silver = spark.table("silver.salaries")
salary_silver.write.format("delta").mode("overwrite").saveAsTable("gold.salaries")
location_silver = spark.table("silver.locations")
location_silver.write.format("delta").mode("overwrite").saveAsTable("gold.locations")
company_silver = spark.table("silver.companies")
company_silver.write.format("delta").mode("overwrite").saveAsTable("gold.companies")

A query a seguir avalia quais vagas foram as mais requisitadas, em todo o dataset. Isso foi feito através da contagem de vagas (job_title), agrupando pelo nome da vaga e ordenando em ordem decrescente. A vaga com maior ocorrência é a de analista de dados.

In [0]:
%sql
SELECT `job_title`, COUNT(*) AS posting_count
FROM `workspace`.`gold`.`fact_jobs_time`
GROUP BY `job_title`
ORDER BY posting_count DESC

job_title,posting_count
Data Analyst,271
NLP Engineer,265
AI Product Manager,258
Quant Researcher,251
ML Engineer,250
Data Scientist,238
AI Researcher,237
Computer Vision Engineer,230


A query a seguir busca as top 10 indústrias que postaram vagas de emprego, em todo o dataset. A indústria automotiva foi a com maior número de vagas postadas (300) e representou 15% de todas as vagas postadas.

In [0]:
%sql
SELECT
  c.industry,
  COUNT(f.job_id) AS jobs_posted,
  ROUND(100.0 * COUNT(f.job_id) / SUM(COUNT(f.job_id)) OVER (), 2) AS pct_of_total
FROM
  gold.fact_jobs_time f
JOIN
  workspace.gold.companies c
  ON f.job_id = c.job_id
GROUP BY
  c.industry
ORDER BY
  jobs_posted DESC
LIMIT 10;

industry,jobs_posted,pct_of_total
Automotive,300,15.0
Education,294,14.7
Retail,293,14.65
E-commerce,291,14.55
Finance,279,13.95
Tech,274,13.7
Healthcare,269,13.45


A query a seguir contabiliza o número de vagas postadas por mês, ano a ano. Foi gerada uma visualização com um gráfico de barras (Vagasxtempo), mas o mesmo não traz nenhuma informação que chame atenção, como meses ou anos específicos com maior ou menor demanda. Como o conjunto de dados parece ser fictício, não foi possível tirar nenhum insight, porém, em um conjunto de dados real, provavelmente iríamos perceber um aumento no número de vagas publicadas com o passar do tempo, devido à grande explosão de IA generativa, especialmente nos últimos 2 anos.

In [0]:
%sql
SELECT 
    `posted_yyyy_mm`, 
    COUNT(`job_id`) AS `job_postings_count`
FROM 
    `workspace`.`gold`.`fact_jobs_time`
GROUP BY 
    `posted_yyyy_mm`
ORDER BY 
    `posted_yyyy_mm`;

posted_yyyy_mm,job_postings_count
2023-09,30
2023-10,86
2023-11,66
2023-12,89
2024-01,97
2024-02,76
2024-03,89
2024-04,79
2024-05,70
2024-06,64


Databricks visualization. Run in Databricks to view.

Também foi criada uma query para verificar se algum estado específico apresentava uma maior demanda por vagas de IA, mas não foi possível identificar um estado em especial com maior volume de vagas publicadas.

In [0]:
%sql
SELECT `state`, COUNT(`job_id`) as `job_postings_count`
FROM `workspace`.`gold`.`locations`
GROUP BY `state`
ORDER BY `job_postings_count` DESC;

state,job_postings_count
PG,19
BB,18
BT,18
FJ,18
HR,18
IQ,17
JO,17
GQ,16
JM,16
UZ,16


A mesma query foi feita para verificar se alguma cidade se destaca no número de vagas publicadas, mas também não foi observada nenhuma cidade com demanda significativamente maior do que as demais.

In [0]:
%sql
SELECT `city`, COUNT(`job_id`) as `job_postings_count`
FROM `workspace`.`gold`.`locations`
GROUP BY `city`
ORDER BY `job_postings_count` DESC;

city,job_postings_count
East Michael,5
Davidmouth,4
South Michael,4
Jenniferfurt,3
West Andrew,3
Port Robert,3
Matthewshire,3
Lewisville,3
South Christopher,3
Jessicamouth,3


Também foi feita uma pesquisa de quais vagas apresentavam o maior salário médio ((salário mínimo + salário máximo)/2). As vagas de quantum researcher (da empresa Bailey-Harris) e machine learning engineer (da empresa Martin PLC) foram os que mais se destacaram.

In [0]:
%sql
SELECT 
  s.job_id,
  fjt.job_title,
  c.company_name,
  (AVG(s.min_salary) + AVG(s.max_salary)) / 2 AS average_salary
FROM 
  workspace.gold.salaries s
LEFT JOIN
  workspace.gold.fact_jobs_time fjt
  ON s.job_id = fjt.job_id
LEFT JOIN
  workspace.gold.companies c
  ON s.job_id = c.job_id
WHERE 
  s.salary_valid = TRUE
GROUP BY 
  s.job_id,
  fjt.job_title,
  c.company_name
ORDER BY 
  average_salary DESC;

job_id,job_title,company_name,average_salary
204,Quant Researcher,Bailey-Harris,197776.5
1928,ML Engineer,Martin PLC,194743.5
1346,Quant Researcher,"Herring, Smith and Fletcher",194145.5
1035,Quant Researcher,Peterson Ltd,194075.0
1956,Quant Researcher,Jones-Lopez,193918.0
678,Data Scientist,Williams-Leonard,193761.0
1684,Computer Vision Engineer,"James, Wagner and Johnson",193166.0
291,AI Researcher,"Johnson, Humphrey and Walsh",192467.0
1363,Computer Vision Engineer,Powell-Hall,191996.5
473,ML Engineer,"Rodriguez, West and Erickson",191811.0


Também foram verificadas quais skills foram mais requisitadas, isto é, apareceram mais vezes nas vagas publicadas. A habilidade em trabalhar com tensor flow foi a que mais apareceu.

In [0]:
%sql
SELECT `skill`, COUNT(*) as `skill_count`
FROM `workspace`.`gold`.`skills`
GROUP BY `skill`
ORDER BY `skill_count` DESC
LIMIT 10

skill,skill_count
TensorFlow,452
Excel,432
Pandas,427
FastAPI,419
NumPy,416
Reinforcement Learning,414
Azure,413
Hugging Face,408
SQL,408
Keras,406


O mesmo foi feito para as ferramentas. A ferramenta que mais foi requisitada nas vagas publicadas foi MLFlow.

In [0]:
%sql
SELECT 
  `tool`, 
  COUNT(`tool`) AS `tool_count`
FROM 
  `workspace`.`gold`.`tools`
GROUP BY 
  `tool`
ORDER BY 
  `tool_count` DESC
LIMIT 
  10;

tool,tool_count
MLflow,513
LangChain,511
FastAPI,505
KDB+,499
BigQuery,494
TensorFlow,487
PyTorch,475
Scikit-learn,474


Autoavaliação: Acredito que, o dataset escolhido, bem como as transformações e análises realizadas cumpriram o papel de aprendizagem proposto, uma vez que foi possível passar pelas etapas de obtenção dos dados, definição do esquema, criação das camadas bronze, silver e gold, além da aplicação de transformações nos dados. Por fim, foram exercitadas as pesquisas SQL, a fim de analisar os dados da camada gold.