<a href="https://colab.research.google.com/github/DuarteVn/PySpark-no-Google-Colab/blob/main/Notebook_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Etapa 1: Configurar PySpark e integração com MySQL

In [1]:
from google.colab import drive
drive.mount('/content/drive')
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, upper, when

Mounted at /content/drive


In [2]:
# Baixar o .jar correto do driver JDBC (MySQL Connector 8.0.33)
!wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar

# Copiar para o diretório do Spark
!cp mysql-connector-j-8.0.33.jar /usr/local/lib/python3.*/dist-packages/pyspark/jars/


--2025-05-20 13:41:22--  https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
Resolving repo1.maven.org (repo1.maven.org)... 199.232.192.209, 199.232.196.209, 2a04:4e42:4c::209, ...
Connecting to repo1.maven.org (repo1.maven.org)|199.232.192.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2481560 (2.4M) [application/java-archive]
Saving to: ‘mysql-connector-j-8.0.33.jar’


2025-05-20 13:41:22 (29.7 MB/s) - ‘mysql-connector-j-8.0.33.jar’ saved [2481560/2481560]



In [4]:
spark = SparkSession.builder.appName("Notebook2").getOrCreate()

#Estabelecer conexão com Banco de Dados MySQL no Google Cloud Plataform
url = "jdbc:mysql://35.198.51.193:3306/senai?useSSL=false&serverTimezone=UTC&zeroDateTimeBehavior=CONVERT_TO_NULL"
properties = {
    "user": "senai",
    "password": "senai123",
    "driver": "com.mysql.cj.jdbc.Driver"
}



## 1.1 Ler base de dados no Google Cloud Plataform

In [19]:
df_teste = spark.read.jdbc(
    url=url,
    table="(SELECT * FROM arquivo_nota LIMIT 10) AS nota",
    properties=properties
)

df_teste.show(5)

+--------+-----------+---------------+------------+---------------+----------+-----------+-----------+-----------+---------+-------------+----------------+----------------+------------+-------------------+----------+---------+---------+--------+
|id_venda|cod_cliente|    nom_cliente|cod_vendedor|   nom_vendedor|cod_cidade| nom_cidade|cod_produto|nom_produto|cod_marca|    nom_marca|cod_departamento|nom_departamento|cod_gerencia|       nom_gerencia| dtc_venda|qtd_venda|val_venda|num_nota|
+--------+-----------+---------------+------------+---------------+----------+-----------+-----------+-----------+---------+-------------+----------------+----------------+------------+-------------------+----------+---------+---------+--------+
|       1|          6|Reginaldo Rossi|           5|    Maria Braga|         1|   Salvador|          4|    Monitor|        4|          IBM|               1|     Informática|           4|   Casa e Decoração|2021-05-10|        1|  1300.00|    1033|
|       2|      

# Etapa 2: Importar base de dados

## 2.1 Ler base de dados original (notafiscal.csv)

In [5]:
df = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/Excel/notafiscal.csv", header=True, inferSchema=True, sep=',')
df.printSchema()
df.show(5)

root
 |-- id_venda: integer (nullable = true)
 |-- cod_cliente: integer (nullable = true)
 |-- nom_cliente: string (nullable = true)
 |-- cod_vendedor: integer (nullable = true)
 |-- nom_vendedor: string (nullable = true)
 |-- cod_cidade: integer (nullable = true)
 |-- nom_cidade: string (nullable = true)
 |-- cod_produto: integer (nullable = true)
 |-- Nom_produto: string (nullable = true)
 |-- cod_marca: integer (nullable = true)
 |-- Nom_marca: string (nullable = true)
 |-- cod_departamento: integer (nullable = true)
 |-- nom_departamento: string (nullable = true)
 |-- cod_gerencia: integer (nullable = true)
 |-- nom_gerencia: string (nullable = true)
 |-- dtc_venda: date (nullable = true)
 |-- qtd_venda: integer (nullable = true)
 |-- val_venda: double (nullable = true)
 |-- num_nota: integer (nullable = true)

+--------+-----------+---------------+------------+---------------+----------+-----------+-----------+-----------+---------+-------------+----------------+----------------+-

## 2.2 Ler CSV de DEPARTAMENTO do Notebook 1

In [6]:
df_departamento = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/Excel/departamento.csv", header=True, inferSchema=True)
df_departamento.printSchema()
df_departamento.show(5)

root
 |-- ID_DEPARTAMENTO: integer (nullable = true)
 |-- NOME_DEPARTAMENTO: string (nullable = true)

+---------------+-----------------+
|ID_DEPARTAMENTO|NOME_DEPARTAMENTO|
+---------------+-----------------+
|              3|        PAPELARIA|
|              6|  ELETRODOMESTICO|
|              5|       ELETRÓNICO|
|              1|      INFORMÁTICA|
|              7|           MOVÉIS|
+---------------+-----------------+
only showing top 5 rows



# Etapa 3: Criar DataFrame de PRODUTO

In [7]:
df_produto = df.select(
    "cod_produto",
    "Nom_produto",
    "cod_departamento"
).dropDuplicates()

df_produto.show(5)

+-----------+--------------------+----------------+
|cod_produto|         Nom_produto|cod_departamento|
+-----------+--------------------+----------------+
|         13|              Estojo|               3|
|          7|         Celular 4Gb|               2|
|         17|Aparalho de Barbe...|               6|
|          9|        Celular  8GB|               2|
|          6|           Notebook |               1|
+-----------+--------------------+----------------+
only showing top 5 rows



# Etapa 4: JOIN com Departamento

## 4.1 Cria tabela departamento no Google Colab Plataform

In [9]:
df_departamento.write.jdbc(
    url=url,
    table="departamento",
    mode="overwrite",
    properties=properties
)


## 4.2 Cria tabela produto no GCP Google Colab Plataform



In [14]:
df_produto.write.jdbc(
    url=url,
    table="produto",
    mode="overwrite",
    properties=properties
)

## 4.3 Realiza o JOIN diretamente no banco com as duas tabelas criadas

In [15]:
df_join = spark.read.jdbc(
    url=url,
    table="""
        (
        SELECT
            DISTINCT p.cod_produto, p.Nom_produto, p.cod_departamento, d.NOME_DEPARTAMENTO
        FROM
            produto p
        INNER JOIN
            departamento d
        ON
            p.cod_departamento = d.ID_DEPARTAMENTO
        ) AS join_produto
    """,
    properties=properties
)

df_join.show()

+-----------+--------------------+----------------+-----------------+
|cod_produto|         Nom_produto|cod_departamento|NOME_DEPARTAMENTO|
+-----------+--------------------+----------------+-----------------+
|         13|              Estojo|               3|        PAPELARIA|
|          7|         Celular 4Gb|               2|        TELEFONIA|
|         17|Aparalho de Barbe...|               6|  ELETRODOMESTICO|
|          9|        Celular  8GB|               2|        TELEFONIA|
|          6|           Notebook |               1|      INFORMÁTICA|
|          8|    Telefone Sem Fio|               2|        TELEFONIA|
|          5|          Computador|               1|      INFORMÁTICA|
|         16|             Perfume|               4|        COSMÉTICO|
|          4|             Monitor|               1|      INFORMÁTICA|
|         19|               Fogão|               6|  ELETRODOMESTICO|
|         14|       Classificador|               3|        PAPELARIA|
|         15|       

# Etapa 5: Corrigir nome do produto

## 5.1 Corrige no Spark o nome

In [16]:
df_produto = df_produto.withColumn(
    "Nom_produto",
    when(df_produto["Nom_produto"] == "Aparalho de Barbear Elétrico", "Barbeador Elétrico").otherwise(df_produto["Nom_produto"])
)

df_produto.show(5)

+-----------+------------------+----------------+
|cod_produto|       Nom_produto|cod_departamento|
+-----------+------------------+----------------+
|         13|            Estojo|               3|
|          7|       Celular 4Gb|               2|
|         17|Barbeador Elétrico|               6|
|          9|      Celular  8GB|               2|
|          6|         Notebook |               1|
+-----------+------------------+----------------+
only showing top 5 rows



## 5.2 Regrava a tabela de produto com o nome do produto novo

In [17]:
df_produto.write.jdbc(
    url=url,
    table="produto",
    mode="overwrite",
    properties=properties
)

## 5.3 Verifica o nome do produto atualizado no bando

In [21]:
df_filtrado  = spark.read.jdbc(
    url=url,
    table="""
        (SELECT * FROM produto
         WHERE Nom_produto = 'Barbeador Elétrico') AS prod
    """,
    properties=properties
)

df_filtrado .show(5)

+-----------+------------------+----------------+
|cod_produto|       Nom_produto|cod_departamento|
+-----------+------------------+----------------+
|         17|Barbeador Elétrico|               6|
+-----------+------------------+----------------+

