How Bootcamp Engenharia de Dados - Desafio 1
---

In [None]:
# Para obter os datasets via kaggle-cli é preciso ter o token da conta do Kaggle configurado. 
# Para isso, basta obter o token e salvar o arquivo no seu Google Drive.
# No meu caso, eu salvei em /content/drive/MyDrive/Colab-Notebooks/how
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Definir uma ENV_VAR que aponte para diretório onde se encontra o token do Kaggle
import os
os.environ["KAGGLE_CONFIG_DIR"] = "/content/drive/MyDrive/Colab-Notebooks/how"

# Instalar o kaggle-cli
!pip install --upgrade kaggle

In [3]:
# instalar outras dependências
!apt-get update -qq
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz
!tar xf spark-3.4.0-bin-hadoop3.tgz
!pip install -q findspark
!wget -q https://jdbc.postgresql.org/download/postgresql-42.6.0.jar
!mv postgresql-42.6.0.jar /content/spark-3.4.0-bin-hadoop3/jars

In [12]:
# Ambiente spark
import findspark
findspark.init()

In [13]:
# Iniciar a sessão spark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Bootcamp Engenharia de Dados - How") \
    .getOrCreate()

In [None]:
# Mostra a sessão spark criada
spark

In [64]:
# Outras variáveis necessárias
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.0-bin-hadoop3"

In [65]:
# O banco de dados PostgreSQL usado no exercício pode ser qualquer serviço SAS gratuito (RDS, por exemplo)
# Aqui eu usei o serviço elephantsql.com (que até usa a infra da AWS)
# Basta criar uma conta e uma instância e obter a URL, login e senha 
# Seria um procedimento parecido para o RDS com o adicional de ter de criar as regras de Security Group para permitir acesso externo
# Criar um arquivo json com a seguinte estrutura
# {
#  "DB_URL": "valor_url",
#  "DB_USER": "valor_user",
#  "DB_PWD": "valor_pwd"
# }
env = spark.read.json('/content/drive/MyDrive/Colab-Notebooks/how/.env', multiLine=True)
os.environ["DB_URL"] = env.select('DB_URL').first()[0]
os.environ["DB_USER"] = env.select('DB_USER').first()[0]
os.environ["DB_PWD"] = env.select('DB_PWD').first()[0]

# Processando dados

In [None]:
# Criar um diretório para os datasets
!mkdir datasets

# baixar datasets do desafio e mover para o diretório datasets
!kaggle datasets download -d chickooo/top-tech-startups-hiring-2023
!mv top-tech-startups-hiring-2023.zip datasets

!kaggle datasets download -d loganlauton/nba-players-and-team-data
!mv nba-players-and-team-data.zip datasets

In [67]:
# Funções utilitárias
from pyspark.sql.functions import current_timestamp, col, to_json

Top Tech Startups

In [68]:
# Descompactar arquivos
! unzip -q -d '/content/datasets' '/content/datasets/top-tech-startups-hiring-2023.zip'
! rm -rf '/content/datasets/images' 

In [None]:
# Criando o dataframe
path = '/content/datasets/json_data.json'
df = spark.read.json(path, multiLine=True)
df.printSchema()
df.count()

In [None]:
# Transformando a struct jobs em um json
df = df.withColumn("jobs",to_json(col("jobs")))

# Criando a coluna created_at
df = df.withColumn("created_at", current_timestamp())

# Salvando os dados no banco
df.write.jdbc(
    url=os.getenv("DB_URL"), 
    table="top_tech_startups", 
    properties={
        "user": os.getenv("DB_USER"), 
        "password": os.getenv("DB_PWD")})

## **NBA**

In [71]:
# Descompactar arquivos
! unzip -q -d '/content/datasets' '/content/datasets/nba-players-and-team-data.zip'

Payroll

In [None]:
# Criando o dataframe
path = '/content/datasets/NBA Payroll(1990-2023).csv'
df = spark.read.option("header",True).csv(path)
df.printSchema()
df.count()

In [73]:
# Criando uma nova coluna com o valor de _c0
df = df.withColumn("id",col("_c0"))

# Removendo _c0
df = df.drop('_c0')

# Criando a coluna created_at
df = df.withColumn("created_at", current_timestamp())

# Salvando os dados no banco
df.write.jdbc(
    url=os.getenv("DB_URL"), 
    table="nba_payroll", 
    properties={
        "user": os.getenv("DB_USER"), 
        "password": os.getenv("DB_PWD")})

Salaries

In [None]:
# Criando o dataframe
path = '/content/datasets/NBA Salaries(1990-2023).csv'
df = spark.read.option("header",True).csv(path)
df.printSchema()
df.count()

In [75]:
# Criando uma nova coluna com o valor de _c0
df = df.withColumn("id",col("_c0"))

# Removendo _c0
df = df.drop('_c0')

# Criando a coluna created_at
df = df.withColumn("created_at", current_timestamp())

# Salvando os dados no banco
df.write.jdbc(
    url=os.getenv("DB_URL"), 
    table="nba_salaries", 
    properties={
        "user": os.getenv("DB_USER"), 
        "password": os.getenv("DB_PWD")})

Player Stats

In [None]:
# Criando o dataframe
path = '/content/datasets/NBA Player Stats(1950 - 2022).csv'
df = spark.read.option("header",True).csv(path)
df.printSchema()
df.count()

In [78]:
# Criando uma nova coluna com o valor de _c0
df = df.withColumn("id",col("_c0"))

# Removendo as colubas _c0 e Unnamed
df = df.drop('_c0')
df = df.drop('Unnamed')

# Criando a coluna created_at
df = df.withColumn("created_at", current_timestamp())

# Salvando os dados no banco
df.write.jdbc(
    url=os.getenv("DB_URL"), 
    table="nba_player_stats", 
    properties={
        "user": os.getenv("DB_USER"), 
        "password": os.getenv("DB_PWD")})

Player Box Score Stats

In [None]:
# Criando o dataframe
path = '/content/datasets/NBA Player Box Score Stats(1950 - 2022).csv'
df = spark.read.option("header",True).csv(path)
df.printSchema()
df.count()

In [80]:
# Criando uma nova coluna com o valor de _c0
df50k = df.limit(50000)
df50k = df50k.withColumn("id",col("_c0"))

# Removendo as colubas _c0
df50k = df50k.drop('_c0')

# Criando a coluna created_at
df50k = df50k.withColumn("created_at", current_timestamp())

# Salvando os dados no banco
df50k.write.jdbc(
    url=os.getenv("DB_URL"), 
    table="nba_player_box_score_stats", 
    properties={
        "user": os.getenv("DB_USER"), 
        "password": os.getenv("DB_PWD")})