<a href="https://colab.research.google.com/github/StefaneBG/PySpark-Metodos/blob/main/MetodosPySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Imports e Instalações

In [None]:
# Importação da biblioteca pandas
import pandas as pd

In [None]:
# Instalação dos requisitos para o PySpark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
# Configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
# Torna o pyspark "importável"
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Inicializar a SparkSession com suporte ao Hive
spark = SparkSession.builder \
    .appName("Spark with Hive on Colab") \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("spark.sql.warehouse.dir", "/content/spark-warehouse") \
    .config("hive.metastore.warehouse.dir", "/content/spark-warehouse") \
    .enableHiveSupport() \
    .getOrCreate()

# Criar diretório para o warehouse
!mkdir -p /content/spark-warehouse

In [None]:
# Verifica o SparkContext
print(spark)

# Exibe a Spark version
print(spark.version)

<pyspark.sql.session.SparkSession object at 0x7b8fef92eb60>
3.1.1


#01 - Carregar as quatro tabelas do banco locadora no PySpark como DataFrames


In [None]:
cliente = pd.read_csv('https://docs.google.com/spreadsheets/d/e/2PACX-1vQW3fP3j4qoiMGBXDAGzg_9IW2b3zgjdkVKLsURNVe9QezpHXimWfKle_55CQQtkeWL69OAASBDNdk8/pub?gid=2073489257&single=true&output=csv')
aluguel = pd.read_csv('https://docs.google.com/spreadsheets/d/e/2PACX-1vRncyLGO1iOo2H53JaryzVF1GPjUhWl9DsN7TZROCDxaP85iCwl5aW5ffBEzqtpAMRNYkd7eO5ehmgn/pub?gid=1581881382&single=true&output=csv')
carro = pd.read_csv('https://docs.google.com/spreadsheets/d/e/2PACX-1vSznlX7UXeH_LeKcNteiDnWvdwZyydAQl0_x8NU9cx6G00Zh7SMrjoUuNpytVq7U-iQVzQNJ7jC7GpY/pub?gid=306989914&single=true&output=csv')
marca = pd.read_csv('https://docs.google.com/spreadsheets/d/e/2PACX-1vSI-4_QEFwZ6eDGwUDlip0_PGBn7d_F7j59UwXmRvWbQFyy01ENPatjkbO1E8k5ZW5lqSY9ox112j7X/pub?gid=1160143272&single=true&output=csv')

In [None]:
cliente.to_csv('cliente.csv',index=False)
aluguel.to_csv('aluguel.csv',index=False)
carro.to_csv('carro.csv',index=False)
marca.to_csv('marca.csv',index=False)

In [None]:
# Cria a tabela cliente
spark.sql('''
CREATE TABLE IF NOT EXISTS cliente (
  codcliente INT,
  nome STRING,
  cidade STRING,
  sexo STRING,
  estado STRING,
  estadocivil STRING
)

USING CSV
OPTIONS (path '/content/cliente.csv', header 'true', inferSchema 'true')
''')
cliente = spark.sql('''
SELECT *
FROM cliente
''')

In [None]:
# Cria a tabela aluguel
spark.sql('''
CREATE TABLE IF NOT EXISTS aluguel (
  codaluguel INT,
  codcliente INT,
  codcarro INT,
  data_aluguel DATE
)

USING CSV
OPTIONS (path '/content/aluguel.csv', header 'true', inferSchema 'true')
''')
aluguel = spark.sql('''
SELECT *
FROM aluguel
''')

In [None]:
# Cria a tabela carro
spark.sql('''
CREATE TABLE IF NOT EXISTS carro (
  codcarro INT,
  codmarca INT,
  modelo STRING,
  valor DOUBLE
)

USING CSV
OPTIONS (path '/content/carro.csv', header 'true', inferSchema 'true')
''')
carro = spark.sql('''
SELECT *
FROM carro
''')

In [None]:
# Cria a tabela marca
spark.sql('''
CREATE TABLE IF NOT EXISTS marca (
  codmarca INT,
  marca STRING
)

USING CSV
OPTIONS (path '/content/marca.csv', header 'true', inferSchema 'true')
''')
marca = spark.sql('''
SELECT *
FROM marca
''')

#02 - Exibir as cinco primeiras linhas de cada DataFrame



In [None]:
cliente.show(5)

+----------+----------------+---------------+----+------+-----------+
|codcliente|            nome|         cidade|sexo|estado|estadocivil|
+----------+----------------+---------------+----+------+-----------+
|         1|       Ana Silva|Duque de Caxias|   F|    RJ|          C|
|         2|   Bruna Pereira|        Niterói|   F|    RJ|          C|
|         3|Túlio Nascimento|Duque de Caxias|   M|    RJ|          S|
|         4|  Fernando Souza|       Campinas|   M|    SP|          S|
|         5|   Lúcia Andrade|      São Paulo|   F|    SP|          C|
+----------+----------------+---------------+----+------+-----------+



In [None]:
aluguel.show(5)

+----------+----------+--------+------------+
|codaluguel|codcliente|codcarro|data_aluguel|
+----------+----------+--------+------------+
|         1|         3|       2|  2023-04-01|
|         2|         2|       1|  2023-04-02|
|         3|         2|       1|  2023-04-03|
|         4|         2|       3|  2023-04-04|
|         5|         1|       4|  2023-04-05|
+----------+----------+--------+------------+
only showing top 5 rows



In [None]:
carro.show(5)

+--------+--------+------+-----+
|codcarro|codmarca|modelo|valor|
+--------+--------+------+-----+
|       1|       1|    Ka|100.0|
|       2|       2|  Argo|150.0|
|       3|       3|  Onix|170.0|
|       4|       4|  Polo|150.0|
|       5|       5|  Kwid|120.0|
+--------+--------+------+-----+



In [None]:
marca.show(5)

+--------+----------+
|codmarca|     marca|
+--------+----------+
|       1|      Ford|
|       2|      Fiat|
|       3| Chevrolet|
|       4|Volkswagen|
|       5|   Renault|
+--------+----------+



#03 - Contar o número de linhas e colunas de cada tabela


In [None]:
def count_rows_and_columns(df):
    num_linha = df.count()
    num_colunas = len(df.columns)
    print(f"Número de linhas: {num_linha}")
    print(f"Número de colunas: {num_colunas}")



In [None]:
count_rows_and_columns(cliente)


Número de linhas: 5
Número de colunas: 6


In [None]:
count_rows_and_columns(aluguel)


Número de linhas: 10
Número de colunas: 4


In [None]:
count_rows_and_columns(carro)


Número de linhas: 5
Número de colunas: 4


In [None]:
count_rows_and_columns(marca)

Número de linhas: 5
Número de colunas: 2


# 04 - Exibir o esquema (schema) de cada DataFrame


In [None]:
cliente.printSchema()

root
 |-- codcliente: integer (nullable = true)
 |-- nome: string (nullable = true)
 |-- cidade: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- estadocivil: string (nullable = true)



In [None]:
marca.printSchema()

root
 |-- codmarca: integer (nullable = true)
 |-- marca: string (nullable = true)



In [None]:
carro.printSchema()

root
 |-- codcarro: integer (nullable = true)
 |-- codmarca: integer (nullable = true)
 |-- modelo: string (nullable = true)
 |-- valor: double (nullable = true)



In [None]:
aluguel.printSchema()

root
 |-- codaluguel: integer (nullable = true)
 |-- codcliente: integer (nullable = true)
 |-- codcarro: integer (nullable = true)
 |-- data_aluguel: date (nullable = true)



# 05 - Renomear as colunas dos DataFrames para ter nomes mais amigáveis


In [None]:
cliente=cliente.withColumnRenamed('codcliente','ID_Cliente')\
                .withColumnRenamed('nome','Nome')\
                .withColumnRenamed('cidade','Cidade')\
                .withColumnRenamed('sexo','Sexo')\
                .withColumnRenamed('estado','Estado')\
                .withColumnRenamed('estadocivil','Estado_Civil')
cliente.columns

['ID_Cliente', 'Nome', 'Cidade', 'Sexo', 'Estado', 'Estado_Civil']

In [None]:
aluguel=aluguel.withColumnRenamed('codaluguel','ID_Aluguel')\
                .withColumnRenamed('codcliente','ID_Cliente')\
                .withColumnRenamed('codcarro','ID_Carro')\
                .withColumnRenamed('data_aluguel','Data_Aluguel')
aluguel.columns

['ID_Aluguel', 'ID_Cliente', 'ID_Carro', 'Data_Aluguel']

In [None]:
marca=marca.withColumnRenamed('codmarca','ID_Marca')\
            .withColumnRenamed('marca','Marca')
marca.columns


['ID_Marca', 'Marca']

In [None]:
carro=carro.withColumnRenamed('codcarro','ID_Carro')\
          .withColumnRenamed('codmarca','ID_Marca')\
          .withColumnRenamed('modelo','Modelo')\
          .withColumnRenamed('valor','Valor')
carro.columns

['ID_Carro', 'ID_Marca', 'Modelo', 'Valor']

#06 - Selecionar apenas os aluguéis realizados após uma data específica


In [None]:
data_especifica = "2023-01-01"
aluguel.withColumn("Data_Aluguel",aluguel.Data_Aluguel.cast('date')).filter(aluguel.Data_Aluguel > data_especifica).show()

+----------+----------+--------+------------+
|ID_Aluguel|ID_Cliente|ID_Carro|Data_Aluguel|
+----------+----------+--------+------------+
|         1|         3|       2|  2023-04-01|
|         2|         2|       1|  2023-04-02|
|         3|         2|       1|  2023-04-03|
|         4|         2|       3|  2023-04-04|
|         5|         1|       4|  2023-04-05|
|         6|         1|       4|  2023-04-13|
|         7|         1|       1|  2023-04-15|
|         8|         5|       2|  2023-04-19|
|         9|         5|       2|  2023-04-21|
|        10|         3|       1|  2023-04-25|
+----------+----------+--------+------------+



# 07 - Encontrar clientes que residem no estado de "RJ"


In [None]:
cliente.filter(cliente.Estado == 'RJ').show()

+----------+----------------+---------------+----+------+------------+
|ID_Cliente|            Nome|         Cidade|Sexo|Estado|Estado_Civil|
+----------+----------------+---------------+----+------+------------+
|         1|       Ana Silva|Duque de Caxias|   F|    RJ|           C|
|         2|   Bruna Pereira|        Niterói|   F|    RJ|           C|
|         3|Túlio Nascimento|Duque de Caxias|   M|    RJ|           S|
+----------+----------------+---------------+----+------+------------+



# 08 - Filtrar carros com valor de aluguel maior que 150


In [None]:
carro.filter(carro.Valor > 150).show()

+--------+--------+------+-----+
|ID_Carro|ID_Marca|Modelo|Valor|
+--------+--------+------+-----+
|       3|       3|  Onix|170.0|
+--------+--------+------+-----+



# 09 - Selecionar aluguéis onde o cliente é do sexo feminino


In [None]:
cliente.where(cliente.Sexo == 'F').show()

+----------+-------------+---------------+----+------+------------+
|ID_Cliente|         Nome|         Cidade|Sexo|Estado|Estado_Civil|
+----------+-------------+---------------+----+------+------------+
|         1|    Ana Silva|Duque de Caxias|   F|    RJ|           C|
|         2|Bruna Pereira|        Niterói|   F|    RJ|           C|
|         5|Lúcia Andrade|      São Paulo|   F|    SP|           C|
+----------+-------------+---------------+----+------+------------+



# 10 - Identificar clientes solteiros

In [None]:
cliente.where(cliente.Estado_Civil == 'S').show()

+----------+----------------+---------------+----+------+------------+
|ID_Cliente|            Nome|         Cidade|Sexo|Estado|Estado_Civil|
+----------+----------------+---------------+----+------+------------+
|         3|Túlio Nascimento|Duque de Caxias|   M|    RJ|           S|
|         4|  Fernando Souza|       Campinas|   M|    SP|           S|
+----------+----------------+---------------+----+------+------------+



# 11 - Realizar um join entre "Aluguel" e "Cliente" para adicionar informações do cliente ao DataFrame de aluguéis

In [None]:
aluguel.join(cliente, aluguel.ID_Cliente == cliente.ID_Cliente,'inner').show()

+----------+----------+--------+------------+----------+----------------+---------------+----+------+------------+
|ID_Aluguel|ID_Cliente|ID_Carro|Data_Aluguel|ID_Cliente|            Nome|         Cidade|Sexo|Estado|Estado_Civil|
+----------+----------+--------+------------+----------+----------------+---------------+----+------+------------+
|         7|         1|       1|  2023-04-15|         1|       Ana Silva|Duque de Caxias|   F|    RJ|           C|
|         6|         1|       4|  2023-04-13|         1|       Ana Silva|Duque de Caxias|   F|    RJ|           C|
|         5|         1|       4|  2023-04-05|         1|       Ana Silva|Duque de Caxias|   F|    RJ|           C|
|         4|         2|       3|  2023-04-04|         2|   Bruna Pereira|        Niterói|   F|    RJ|           C|
|         3|         2|       1|  2023-04-03|         2|   Bruna Pereira|        Niterói|   F|    RJ|           C|
|         2|         2|       1|  2023-04-02|         2|   Bruna Pereira|       

# 12 - Juntar "Carro" e "Marca" para incluir o nome da marca no DataFrame de carros

In [None]:
'''
['ID_Marca', 'Marca']
['ID_Carro', 'ID_Marca', 'Modelo', 'Valor']
'''
carro.join(marca, carro.ID_Marca == marca.ID_Marca,).show()

+--------+--------+------+-----+--------+----------+
|ID_Carro|ID_Marca|Modelo|Valor|ID_Marca|     Marca|
+--------+--------+------+-----+--------+----------+
|       1|       1|    Ka|100.0|       1|      Ford|
|       2|       2|  Argo|150.0|       2|      Fiat|
|       3|       3|  Onix|170.0|       3| Chevrolet|
|       4|       4|  Polo|150.0|       4|Volkswagen|
|       5|       5|  Kwid|120.0|       5|   Renault|
+--------+--------+------+-----+--------+----------+



# 13 - Criar um DataFrame combinando "Aluguel", "Carro" e "Cliente"


In [None]:
aluguel_carro_cliente = aluguel \
    .join(carro, aluguel.ID_Carro == carro.ID_Carro, "inner") \
    .join(cliente, aluguel.ID_Cliente == cliente.ID_Cliente, "inner")\
    .select('Nome','Cidade','Sexo','Estado','Estado_Civil','Data_Aluguel','Modelo','Valor')

aluguel_carro_cliente.show()

+----------------+---------------+----+------+------------+------------+------+-----+
|            Nome|         Cidade|Sexo|Estado|Estado_Civil|Data_Aluguel|Modelo|Valor|
+----------------+---------------+----+------+------------+------------+------+-----+
|Túlio Nascimento|Duque de Caxias|   M|    RJ|           S|  2023-04-01|  Argo|150.0|
|   Bruna Pereira|        Niterói|   F|    RJ|           C|  2023-04-02|    Ka|100.0|
|   Bruna Pereira|        Niterói|   F|    RJ|           C|  2023-04-03|    Ka|100.0|
|   Bruna Pereira|        Niterói|   F|    RJ|           C|  2023-04-04|  Onix|170.0|
|       Ana Silva|Duque de Caxias|   F|    RJ|           C|  2023-04-05|  Polo|150.0|
|       Ana Silva|Duque de Caxias|   F|    RJ|           C|  2023-04-13|  Polo|150.0|
|       Ana Silva|Duque de Caxias|   F|    RJ|           C|  2023-04-15|    Ka|100.0|
|   Lúcia Andrade|      São Paulo|   F|    SP|           C|  2023-04-19|  Argo|150.0|
|   Lúcia Andrade|      São Paulo|   F|    SP|        

# 14 - Realizar um join entre "Cliente" e "Carro" com uma condição específica


In [None]:

cliente_aluguel_join = cliente.join(aluguel, cliente.ID_Cliente == aluguel.ID_Cliente, 'left')
cliente_carro_join = cliente_aluguel_join.join(carro, cliente_aluguel_join.ID_Carro == carro.ID_Carro, 'left')
cliente_carro_filtered = cliente_carro_join.filter((cliente_carro_join.Sexo == 'M') & (cliente_carro_join.Valor > 100))\
                                            .select('Nome','Cidade','Estado','Estado_Civil','Modelo','Valor')\
                                            .show()



+----------------+---------------+------+------------+------+-----+
|            Nome|         Cidade|Estado|Estado_Civil|Modelo|Valor|
+----------------+---------------+------+------------+------+-----+
|Túlio Nascimento|Duque de Caxias|    RJ|           S|  Argo|150.0|
+----------------+---------------+------+------------+------+-----+



# 15 - Encontrar o valor médio dos carros alugados


In [None]:
carro.groupBy('Modelo').agg(F.avg('Valor').alias('valor_medio')).show()

+------+-----------+
|Modelo|valor_medio|
+------+-----------+
|    Ka|      100.0|
|  Polo|      150.0|
|  Onix|      170.0|
|  Kwid|      120.0|
|  Argo|      150.0|
+------+-----------+



#16 - Calcular o número total de clientes por estado


In [None]:
cliente.groupBy('Estado').agg(F.count('ID_Cliente').alias('total_clientes')).show()

+------+--------------+
|Estado|total_clientes|
+------+--------------+
|    SP|             2|
|    RJ|             3|
+------+--------------+



#17 - Identificar a marca mais popular com base nos aluguéis


In [None]:
marca_carro=marca.join(carro, marca.ID_Marca == carro.ID_Marca, 'inner').join(aluguel, carro.ID_Carro == aluguel.ID_Carro, 'inner')
marca_carro.groupBy('Marca').agg(F.count('ID_Aluguel').alias('total_alugueis')).orderBy(F.desc('total_alugueis')).show()

+----------+--------------+
|     Marca|total_alugueis|
+----------+--------------+
|      Ford|             4|
|      Fiat|             3|
|Volkswagen|             2|
| Chevrolet|             1|
+----------+--------------+



#18 - Determinar o maior e menor valor de aluguel entre os carros


In [None]:

aluguel_com_valor = aluguel.join(carro, aluguel.ID_Carro == carro.ID_Carro, "inner")
aluguel_com_valor.agg(F.max('Valor').alias('maior_valor'), F.min('Valor').alias('menor_valor')).show()

+-----------+-----------+
|maior_valor|menor_valor|
+-----------+-----------+
|      170.0|      100.0|
+-----------+-----------+



#19 - Classificar os carros pelo valor do aluguel em ordem decrescente


In [None]:
aluguel_carro_cliente \
    .select('Modelo', 'Valor') \
    .orderBy(F.desc('Valor')) \
    .show()



+------+-----+
|Modelo|Valor|
+------+-----+
|  Onix|170.0|
|  Argo|150.0|
|  Argo|150.0|
|  Polo|150.0|
|  Polo|150.0|
|  Argo|150.0|
|    Ka|100.0|
|    Ka|100.0|
|    Ka|100.0|
|    Ka|100.0|
+------+-----+



#20 - Calcular a diferença em dias entre o aluguel mais recente e o mais antigo


In [None]:
aluguel_carro_cliente.withColumn("Data_Aluguel",aluguel_carro_cliente.Data_Aluguel.cast('date')) \
                     .agg(F.max('Data_Aluguel').alias('data_mais_recente'),
                          F.min('Data_Aluguel').alias('data_mais_antiga')) \
                          .show()


+-----------------+----------------+
|data_mais_recente|data_mais_antiga|
+-----------------+----------------+
|       2023-04-25|      2023-04-01|
+-----------------+----------------+



#21 - Criar uma coluna no DataFrame "Carro" para categorizar os valores de

---




+------+-------------+
|Modelo|total_aluguel|
+------+-------------+
|  Argo|        450.0|
|    Ka|        400.0|
|  Polo|        300.0|
|  Onix|        170.0|
+------+-------------+



#22 - Criar uma coluna no DataFrame "Cliente" para indicar se a cidade é a capital do estado


In [None]:
capitalsp= 'São Paulo'
capitalrj= 'Rio de Janeiro'

cliente.withColumn('Capital', F.when((cliente.Cidade == capitalsp) | (cliente.Cidade == capitalrj), 'Sim').otherwise('Não')).show()

+----------+----------------+---------------+----+------+------------+-------+
|ID_Cliente|            Nome|         Cidade|Sexo|Estado|Estado_Civil|Capital|
+----------+----------------+---------------+----+------+------------+-------+
|         1|       Ana Silva|Duque de Caxias|   F|    RJ|           C|    Não|
|         2|   Bruna Pereira|        Niterói|   F|    RJ|           C|    Não|
|         3|Túlio Nascimento|Duque de Caxias|   M|    RJ|           S|    Não|
|         4|  Fernando Souza|       Campinas|   M|    SP|           S|    Não|
|         5|   Lúcia Andrade|      São Paulo|   F|    SP|           C|    Sim|
+----------+----------------+---------------+----+------+------------+-------+



# 23 - Adicionar uma coluna em "Aluguel" com o valor total do aluguel, considerando uma taxa fixa de 10%


In [None]:
# Adicionar a nova coluna com o valor ajustado
aluguel_carro_cliente = aluguel_carro_cliente.withColumn(
    'Valor_Total',
    F.round(aluguel_carro_cliente.Valor * 1.1, 2)
)
aluguel_carro_cliente.select('Nome','Valor_Total').show()

+----------------+-----------+
|            Nome|Valor_Total|
+----------------+-----------+
|Túlio Nascimento|      165.0|
|   Bruna Pereira|      110.0|
|   Bruna Pereira|      110.0|
|   Bruna Pereira|      187.0|
|       Ana Silva|      165.0|
|       Ana Silva|      165.0|
|       Ana Silva|      110.0|
|   Lúcia Andrade|      165.0|
|   Lúcia Andrade|      165.0|
|Túlio Nascimento|      110.0|
+----------------+-----------+



#24 - Agrupar os aluguéis por cliente e contar o número de carros alugados

In [None]:
cliente_aluguel_join.groupBy('Nome').agg(F.count('ID_Carro').alias('total_carros_alugados')).show()

+----------------+---------------------+
|            Nome|total_carros_alugados|
+----------------+---------------------+
|       Ana Silva|                    3|
|Túlio Nascimento|                    2|
|   Lúcia Andrade|                    2|
|   Bruna Pereira|                    3|
|  Fernando Souza|                    0|
+----------------+---------------------+



#25 - Criar um script PySpark para agendar a execução automática das transformações

In [None]:
# prompt: Criar um script PySpark para agendar a execução automática das transformações

# Define a função que contém as suas transformações
def executar_transformacoes():
    # ... (seu código de transformação existente aqui) ...
    pass  # Substitua 'pass' pelo seu código

# Usa o módulo apscheduler para agendar a execução
from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()

# Executa as transformações a cada dia às 00:00 (meio-noite)
scheduler.add_job(executar_transformacoes, 'cron', hour=0, minute=0)

# Inicia o scheduler
scheduler.start()

# Mantém o script em execução para que o agendador funcione
import time
while True:
    time.sleep(60)  # Verifica a cada minuto