<a href="https://colab.research.google.com/github/felipeeng23/pyspark_funcionarios/blob/main/Exercicio_Felipe_SparkSQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Importação da biblioteca pandas
import pandas as pd

In [2]:
# Instalação dos requisitos para o PySpark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null #Instala o OpenJDK 8 (necessário para o Spark rodar no ambiente). A opção -qq reduz a saída do comando, e > /dev/null silencia os logs.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz #Baixa a versão 3.1.1 do Apache Spark com suporte ao Hadoop 3.2 do repositório oficial.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz #Descompacta o arquivo .tgz para acesso aos binários do Spark.
!pip install -q findspark #findspark é uma biblioteca que facilita a inicialização do Spark em ambientes como o Google Colab.

In [3]:
# Importa o módulo 'os' para interagir com variáveis de ambiente do sistema operacional
import os

# Define a variável de ambiente JAVA_HOME, indicando o caminho para o Java 8
# O Spark precisa do Java para ser executado, e aqui especificamos onde ele está instalado
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# Define a variável de ambiente SPARK_HOME, indicando o caminho de instalação do Spark
# Isso é necessário para que o Python saiba onde encontrar os binários e bibliotecas do Spark
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

# Importa a biblioteca 'findspark', que ajuda a localizar e configurar o Spark no ambiente Python
import findspark

# Inicializa o findspark, permitindo que possamos importar e utilizar o PySpark no código
findspark.init()


In [4]:
# Importa a classe SparkSession do módulo pyspark.sql
# SparkSession é o ponto de entrada principal para trabalhar com DataFrames no PySpark
from pyspark.sql import SparkSession

# Inicializar a SparkSession com suporte ao Hive
spark = SparkSession.builder \
    .appName("Spark with Hive on Colab") \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("spark.sql.warehouse.dir", "/content/spark-warehouse") \
    .config("hive.metastore.warehouse.dir", "/content/spark-warehouse") \
    .enableHiveSupport() \
    .getOrCreate()
# Define o nome da aplicação Spark (aparece nos logs)
# Configura o Spark para usar o catálogo do Hive
# Define o diretório do warehouse para o Hive
# Criar diretório para o warehouse
# Habilita o suporte ao Hive, permitindo consultas SQL compatíveis com Hive
# Cria a SparkSession ou reutiliza uma existente
# Cria o diretório para armazenar os metadados e tabelas gerenciadas pelo Hive
!mkdir -p /content/spark-warehouse

'''
 Por que usar o Hive com Spark?

Permite realizar consultas SQL complexas sobre grandes volumes de dados.
Facilita o gerenciamento de tabelas persistentes no Spark SQL.
Pode armazenar os resultados de consultas em um formato estruturado (como Parquet ou ORC).

'''

'\n Por que usar o Hive com Spark?\n\nPermite realizar consultas SQL complexas sobre grandes volumes de dados.\nFacilita o gerenciamento de tabelas persistentes no Spark SQL.\nPode armazenar os resultados de consultas em um formato estruturado (como Parquet ou ORC).\n\n'

In [5]:
# Verifica o SparkContext
print(spark)

# Exibe a Spark version
print(spark.version)

<pyspark.sql.session.SparkSession object at 0x77fc2a7728d0>
3.1.1


In [6]:
'''
O diretório /content é temporário — tudo será apagado quando a sessão for encerrada.
Se precisar manter os dados entre sessões, você pode salvar em uma área de armazenamento persistente
O Spark salva as tabelas gerenciadas em arquivos no sistema de arquivos local, ou em um sistema distribuído (como HDFS, se configurado).
No seu código, o local definido para armazenar a tabela é:
.config("spark.sql.warehouse.dir", "/content/spark-warehouse")
Portanto, as tabelas e os dados são criados e salvos no diretório:
/content/spark-warehouse
'''

'\nO diretório /content é temporário — tudo será apagado quando a sessão for encerrada.\nSe precisar manter os dados entre sessões, você pode salvar em uma área de armazenamento persistente\nO Spark salva as tabelas gerenciadas em arquivos no sistema de arquivos local, ou em um sistema distribuído (como HDFS, se configurado).\nNo seu código, o local definido para armazenar a tabela é:\n.config("spark.sql.warehouse.dir", "/content/spark-warehouse")\nPortanto, as tabelas e os dados são criados e salvos no diretório:\n/content/spark-warehouse\n'

#Criando DF PySpark SQL

In [7]:
spark.sql('''
CREATE TABLE IF NOT EXISTS funcionarios (
  matricula INT,
  nome STRING,
  cidade STRING,
  estado STRING,
  pais STRING,
  idade INT,
  departamento STRING,
  cargo STRING,
  salario DOUBLE,
  escolaridade STRING,
  nota INT
)
''')
spark.sql('''
INSERT INTO funcionarios VALUES
(1, 'Lucas', 'Atibaia', 'SP', 'Brasil', 35, 'Compras', 'Gerente', 25000, 'Superior', 8),
(2, 'Ana', 'São Paulo', 'SP', 'Brasil', 29, 'Vendas', 'Coordenador', 12000, 'Superior', 6),
(3, 'Luiza', 'Santos', 'SP', 'Brasil', 38, 'Finanças', 'Gerente', 28000, 'MBA', 9),
(4, 'Fernando', 'Atibaia', 'SP', 'Brasil', 36, 'Marketing', 'Diretor', 40000, 'Mestrado', 7),
(5, 'Sandra', 'Atibaia', 'SP', 'Brasil', 28, 'Produção', 'Analista', 23000, 'Superior', 5),
(6, 'Douglas', 'Bragança', 'SP', 'Brasil', 29, 'Finanças', 'Analista', 11000, 'Superior', 9),
(7, 'Eduardo', 'Extrema', 'MG', 'Brasil', 30, 'Marketing', 'Gerente', 12000, 'MBA', 4),
(8, 'Ester', 'Itapeva', 'MG', 'Brasil', 29, 'Compras', 'Analista', 10000, 'Superior', 2),
(9, 'Pedro', 'Extrema', 'MG', 'Brasil', 30, 'Marketing', 'Analista', 13000, 'Superior', 1),
(10, 'Maria', 'Extrema', 'MG', 'Brasil', 40, 'Produção', 'Analista', 12000, 'MBA', 7)
''')

DataFrame[]

In [8]:
funcionarios = spark.sql('''
SELECT * FROM funcionarios;
''')

In [9]:
funcionarios.show()

+---------+--------+---------+------+------+-----+------------+-----------+-------+------------+----+
|matricula|    nome|   cidade|estado|  pais|idade|departamento|      cargo|salario|escolaridade|nota|
+---------+--------+---------+------+------+-----+------------+-----------+-------+------------+----+
|        6| Douglas| Bragança|    SP|Brasil|   29|    Finanças|   Analista|11000.0|    Superior|   9|
|        7| Eduardo|  Extrema|    MG|Brasil|   30|   Marketing|    Gerente|12000.0|         MBA|   4|
|        8|   Ester|  Itapeva|    MG|Brasil|   29|     Compras|   Analista|10000.0|    Superior|   2|
|        9|   Pedro|  Extrema|    MG|Brasil|   30|   Marketing|   Analista|13000.0|    Superior|   1|
|       10|   Maria|  Extrema|    MG|Brasil|   40|    Produção|   Analista|12000.0|         MBA|   7|
|        1|   Lucas|  Atibaia|    SP|Brasil|   35|     Compras|    Gerente|25000.0|    Superior|   8|
|        2|     Ana|São Paulo|    SP|Brasil|   29|      Vendas|Coordenador|12000.0

In [10]:
df = funcionarios.toPandas()
display(df)

Unnamed: 0,matricula,nome,cidade,estado,pais,idade,departamento,cargo,salario,escolaridade,nota
0,6,Douglas,Bragança,SP,Brasil,29,Finanças,Analista,11000.0,Superior,9
1,7,Eduardo,Extrema,MG,Brasil,30,Marketing,Gerente,12000.0,MBA,4
2,8,Ester,Itapeva,MG,Brasil,29,Compras,Analista,10000.0,Superior,2
3,9,Pedro,Extrema,MG,Brasil,30,Marketing,Analista,13000.0,Superior,1
4,10,Maria,Extrema,MG,Brasil,40,Produção,Analista,12000.0,MBA,7
5,1,Lucas,Atibaia,SP,Brasil,35,Compras,Gerente,25000.0,Superior,8
6,2,Ana,São Paulo,SP,Brasil,29,Vendas,Coordenador,12000.0,Superior,6
7,3,Luiza,Santos,SP,Brasil,38,Finanças,Gerente,28000.0,MBA,9
8,4,Fernando,Atibaia,SP,Brasil,36,Marketing,Diretor,40000.0,Mestrado,7
9,5,Sandra,Atibaia,SP,Brasil,28,Produção,Analista,23000.0,Superior,5


In [11]:
df.to_csv('dados_funcionarios.csv',index=False)

In [12]:
# Convertendo um ojeto pandas em um df pyspark
#dfpy = spark.createDataFrame(df.to_dict(orient='records'))

In [13]:
spark.sql('''
CREATE TABLE IF NOT EXISTS funcionarios2 (
  matricula INT,
  nome STRING,
  cidade STRING,
  estado STRING,
  pais STRING,
  idade INT,
  departamento STRING,
  cargo STRING,
  salario DOUBLE,
  escolaridade STRING,
  nota INT
)
USING CSV
OPTIONS (path '/content/dados_funcionarios.csv', header 'true', inferSchema 'true' )
''')
spark.sql('''
SELECT *
FROM funcionarios2
''').show()

+---------+--------+---------+------+------+-----+------------+-----------+-------+------------+----+
|matricula|    nome|   cidade|estado|  pais|idade|departamento|      cargo|salario|escolaridade|nota|
+---------+--------+---------+------+------+-----+------------+-----------+-------+------------+----+
|        6| Douglas| Bragança|    SP|Brasil|   29|    Finanças|   Analista|11000.0|    Superior|   9|
|        7| Eduardo|  Extrema|    MG|Brasil|   30|   Marketing|    Gerente|12000.0|         MBA|   4|
|        8|   Ester|  Itapeva|    MG|Brasil|   29|     Compras|   Analista|10000.0|    Superior|   2|
|        9|   Pedro|  Extrema|    MG|Brasil|   30|   Marketing|   Analista|13000.0|    Superior|   1|
|       10|   Maria|  Extrema|    MG|Brasil|   40|    Produção|   Analista|12000.0|         MBA|   7|
|        1|   Lucas|  Atibaia|    SP|Brasil|   35|     Compras|    Gerente|25000.0|    Superior|   8|
|        2|     Ana|São Paulo|    SP|Brasil|   29|      Vendas|Coordenador|12000.0

In [17]:
df[['cargo','salario']].groupby('cargo').mean().reset_index()

Unnamed: 0,cargo,salario
0,Analista,13800.0
1,Coordenador,12000.0
2,Diretor,40000.0
3,Gerente,21666.666667


In [14]:
'''
criando uma visao do salario medio agrupado por cargo
'''

spark.sql('''
SELECT cargo, ROUND(AVG(salario),1) AS salario_medio
FROM funcionarios
GROUP BY cargo;
''').show()

+-----------+-------------+
|      cargo|salario_medio|
+-----------+-------------+
|    Gerente|      21666.7|
|Coordenador|      12000.0|
|    Diretor|      40000.0|
|   Analista|      13800.0|
+-----------+-------------+



In [18]:
df[['nome', 'salario']][df['salario'] > 20000]

Unnamed: 0,nome,salario
5,Lucas,25000.0
7,Luiza,28000.0
8,Fernando,40000.0
9,Sandra,23000.0


In [19]:
df[['nome', 'salario']].query('salario > 20000')

Unnamed: 0,nome,salario
5,Lucas,25000.0
7,Luiza,28000.0
8,Fernando,40000.0
9,Sandra,23000.0


In [20]:
'''
Vamor fazer uma consulta que retorne apenas os nomes e salários,
cujo salário seja maior que 20.000
'''
spark.sql('''
SELECT nome,salario
FROM funcionarios
WHERE salario>20000
''').show()

+--------+-------+
|    nome|salario|
+--------+-------+
|   Lucas|25000.0|
|   Luiza|28000.0|
|Fernando|40000.0|
|  Sandra|23000.0|
+--------+-------+



In [21]:
df[['nome', 'salario']].groupby('nome').mean().reset_index().query('salario > 20000')

Unnamed: 0,nome,salario
4,Fernando,40000.0
5,Lucas,25000.0
6,Luiza,28000.0
9,Sandra,23000.0


In [25]:
'''
Vamor fazer uma consulta que retorne apenas os nomes e salários,
cuja media de salário seja maior que 20.000'''

spark.sql('''
SELECT
    nome,
    AVG(salario) AS salario_medio
FROM
    funcionarios2
GROUP BY
    nome
HAVING
    AVG(salario) > 20000
''').show()


+--------+-------------+
|    nome|salario_medio|
+--------+-------------+
|   Lucas|      25000.0|
|   Luiza|      28000.0|
|  Sandra|      23000.0|
|Fernando|      40000.0|
+--------+-------------+



## Banco locadora

### Chamando locadora

In [26]:
aluguel = pd.read_csv('https://docs.google.com/spreadsheets/d/e/2PACX-1vRBm8orBNbKLqHPtSSrZCLJrduyM_lI-4ZfVkmRqkqK7PvqnzkvKV0mJbRCiHH6IYVpcMXeefCqQsW2/pub?gid=0&single=true&output=csv')
cliente = pd.read_csv('https://docs.google.com/spreadsheets/d/e/2PACX-1vRBm8orBNbKLqHPtSSrZCLJrduyM_lI-4ZfVkmRqkqK7PvqnzkvKV0mJbRCiHH6IYVpcMXeefCqQsW2/pub?gid=1573858125&single=true&output=csv')
carro = pd.read_csv('https://docs.google.com/spreadsheets/d/e/2PACX-1vRBm8orBNbKLqHPtSSrZCLJrduyM_lI-4ZfVkmRqkqK7PvqnzkvKV0mJbRCiHH6IYVpcMXeefCqQsW2/pub?gid=2131324504&single=true&output=csv')
marca = pd.read_csv('https://docs.google.com/spreadsheets/d/e/2PACX-1vRBm8orBNbKLqHPtSSrZCLJrduyM_lI-4ZfVkmRqkqK7PvqnzkvKV0mJbRCiHH6IYVpcMXeefCqQsW2/pub?gid=1957306968&single=true&output=csv')



### Transformando em csv

In [27]:
cliente.to_csv('cliente.csv',index=False)
aluguel.to_csv('aluguel.csv',index=False)
carro.to_csv('carro.csv',index=False)
marca.to_csv('marca.csv',index=False)

### Fazendo Schema SparkSQL

In [33]:
cliente.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5 entries, 0 to 4
Data columns (total 6 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   codcliente   5 non-null      int64 
 1   nome         5 non-null      object
 2   cidade       5 non-null      object
 3   sexo         5 non-null      object
 4   estado       5 non-null      object
 5   estadocivil  5 non-null      object
dtypes: int64(1), object(5)
memory usage: 372.0+ bytes


In [28]:
spark.sql('''
CREATE TABLE IF NOT EXISTS cliente (
  codcliente INT,
  nome STRING,
  cidade STRING,
  sexo STRING,
  estado STRING,
  estadocivil STRING
)

USING CSV
OPTIONS (path '/content/cliente.csv', header 'true', inferSchema 'true')

''')


DataFrame[]

In [29]:
spark.sql('''
SELECT * FROM cliente
''').show()

+----------+----------------+---------------+----+------+-----------+
|codcliente|            nome|         cidade|sexo|estado|estadocivil|
+----------+----------------+---------------+----+------+-----------+
|         1|       Ana Silva|Duque de Caxias|   F|    RJ|          C|
|         2|   Bruna Pereira|        Niterói|   F|    RJ|          C|
|         3|Túlio Nascimento|Duque de Caxias|   M|    RJ|          S|
|         4|  Fernando Souza|       Campinas|   M|    SP|          S|
|         5|   Lúcia Andrade|      São Paulo|   F|    SP|          C|
+----------+----------------+---------------+----+------+-----------+



In [30]:
spark.sql('''
CREATE TABLE IF NOT EXISTS aluguel (
  codaluguel INT,
  codcliente INT,
  codcarro INT,
  data_aluguel DATE
)

USING CSV
OPTIONS (path '/content/aluguel.csv', header 'true', inferSchema 'true')

''')

DataFrame[]

In [31]:
spark.sql('''
SELECT * FROM aluguel
''').show()

+----------+----------+--------+------------+
|codaluguel|codcliente|codcarro|data_aluguel|
+----------+----------+--------+------------+
|         1|         3|       2|  2023-04-01|
|         2|         2|       1|  2023-04-02|
|         3|         2|       1|  2023-04-03|
|         4|         2|       3|  2023-04-04|
|         5|         1|       4|  2023-04-05|
|         6|         1|       4|  2023-04-13|
|         7|         1|       1|  2023-04-15|
|         8|         5|       2|  2023-04-19|
|         9|         5|       2|  2023-04-21|
|        10|         3|       1|  2023-04-25|
+----------+----------+--------+------------+



In [32]:
carro.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5 entries, 0 to 4
Data columns (total 4 columns):
 #   Column    Non-Null Count  Dtype  
---  ------    --------------  -----  
 0   codcarro  5 non-null      int64  
 1   codmarca  5 non-null      int64  
 2   modelo    5 non-null      object 
 3   valor     5 non-null      float64
dtypes: float64(1), int64(2), object(1)
memory usage: 292.0+ bytes


In [34]:
spark.sql('''
CREATE TABLE IF NOT EXISTS carro(
    codcarro INT,
    codmarca INT,
    modelo STRING,
    valor DOUBLE
)
USING CSV
OPTIONS (path '/content/carro.csv', header 'true', inferSchema 'true')
''')

DataFrame[]

In [35]:
spark.sql('''
SELECT * FROM carro
''').show()

+--------+--------+------+-----+
|codcarro|codmarca|modelo|valor|
+--------+--------+------+-----+
|       1|       1|    Ka|100.0|
|       2|       2|  Argo|150.0|
|       3|       3|  Onix|170.0|
|       4|       4|  Polo|150.0|
|       5|       5|  Kwid|120.0|
+--------+--------+------+-----+



In [36]:
marca.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5 entries, 0 to 4
Data columns (total 2 columns):
 #   Column    Non-Null Count  Dtype 
---  ------    --------------  ----- 
 0   codmarca  5 non-null      int64 
 1   marca     5 non-null      object
dtypes: int64(1), object(1)
memory usage: 212.0+ bytes


In [37]:
spark.sql('''CREATE TABLE IF NOT EXISTS marca (
  codmarca INT,
  marca STRING
)
USING CSV
OPTIONS (path '/content/marca.csv', header 'true', inferSchema 'true')
''')

DataFrame[]

In [38]:
spark.sql('''
SELECT * FROM marca
''').show()

+--------+----------+
|codmarca|     marca|
+--------+----------+
|       1|      Ford|
|       2|      Fiat|
|       3| Chevrolet|
|       4|Volkswagen|
|       5|   Renault|
+--------+----------+



In [39]:
'''
2 - Liste os estados diferentes presentes na tabela cliente.
'''
spark.sql('''
SELECT DISTINCT estado FROM cliente
''').show()

+------+
|estado|
+------+
|    SP|
|    RJ|
+------+



In [40]:
#com pandas

estado_distinct = cliente['estado'].unique()
print(estado_distinct)

['RJ' 'SP']


In [41]:
'''
3 - Encontre todos os clientes que são do sexo masculino e solteiros.
'''
spark.sql('''
SELECT nome,sexo,estadocivil
FROM cliente
WHERE sexo='M' AND estadocivil='S'
''').show()

+----------------+----+-----------+
|            nome|sexo|estadocivil|
+----------------+----+-----------+
|Túlio Nascimento|   M|          S|
|  Fernando Souza|   M|          S|
+----------------+----+-----------+



In [42]:
#com pandas
clientes_M_S=cliente[(cliente['sexo']=='M') & (cliente['estadocivil']=='S')]
display(clientes_M_S[['nome','sexo','estadocivil']])

Unnamed: 0,nome,sexo,estadocivil
2,Túlio Nascimento,M,S
3,Fernando Souza,M,S


In [43]:
'''
4-Liste todos os clientes cujo nome começa com a letra 'A'
'''
spark.sql('''
SELECT nome,estado FROM cliente
WHERE nome LIKE 'A%'
''').show()

+---------+------+
|     nome|estado|
+---------+------+
|Ana Silva|    RJ|
+---------+------+



In [44]:
'''
4-Liste todos os clientes cujo sobrenome começa com a letra 'A'
'''
spark.sql('''
SELECT nome,estado FROM cliente
WHERE nome LIKE '% A%'
''').show()

+-------------+------+
|         nome|estado|
+-------------+------+
|Lúcia Andrade|    SP|
+-------------+------+



In [45]:
cliente_com_a=cliente[cliente['nome'].str.startswith('A')]
print(cliente_com_a)

   codcliente       nome           cidade sexo estado estadocivil
0           1  Ana Silva  Duque de Caxias    F     RJ           C


In [46]:
'''
5 - Liste todos os carros que são da marca 'Ford' ou 'Fiat'.
'''
spark.sql('''
SELECT carro.modelo,marca.marca
FROM carro
JOIN marca ON carro.codmarca = marca.codmarca
WHERE marca.marca = 'Fiat' OR marca.marca='Ford'
''').show()

+------+-----+
|modelo|marca|
+------+-----+
|    Ka| Ford|
|  Argo| Fiat|
+------+-----+



Explicação: Objetivo: Combinar as tabelas carro e marca em uma única tabela chamada carros_marca.

Chave de junção: A junção está sendo feita usando a coluna 'codmarca' como chave comum entre as tabelas.

Tipo de junção:

how='inner' significa que será um INNER JOIN

Somente serão incluídos registros onde o codmarca existe em AMBAS as tabelas

Registros sem correspondência em qualquer uma das tabelas serão excluídos do resultado

Resultado:

A nova tabela carros_marca conterá todas as colunas de ambas as tabelas originais

As colunas terão os sufixos _x e _y caso existam colunas com nomes iguais (não mostrado no código)

Esta operação é comum quando você precisa combinar informações de diferentes tabelas relacionadas por uma chave comum, como neste caso de carros e suas respectivas marcas.

Tipos de Joins no Pandas No Pandas, a função merge() oferece vários tipos de joins, semelhantes aos joins em SQL. Aqui estão os principais tipos:

Inner Join (how='inner') Retorna apenas as linhas que têm correspondência em AMBAS as tabelas

Exemplo:

pd.merge(df1, df2, on='chave', how='inner')

Left Join (how='left')
Retorna TODAS as linhas da tabela à esquerda (primeira tabela)

E apenas as linhas correspondentes da tabela à direita

Valores não correspondentes serão preenchidos com NaN

Exemplo:

pd.merge(df1, df2, on='chave', how='left')

Right Join (how='right')
Retorna TODAS as linhas da tabela à direita (segunda tabela)

E apenas as linhas correspondentes da tabela à esquerda

Valores não correspondentes serão preenchidos com NaN

Exemplo:

pd.merge(df1, df2, on='chave', how='right')

Outer Join (Full Join) (how='outer')
Retorna TODAS as linhas de AMBAS as tabelas

Combina onde houver correspondência

Preenche com NaN onde não houver correspondência

Exemplo:

pd.merge(df1, df2, on='chave', how='outer')

Cross Join
Retorna o produto cartesiano das duas tabelas

Combina cada linha da primeira tabela com todas as linhas da segunda

Exemplo:

pd.merge(df1, df2, how='cross')

Parâmetros importantes: on: Nome da coluna chave para o join

left_on/right_on: Nomes diferentes de colunas chave em cada dataframe

suffixes: Sufixos para colunas com nomes duplicados (padrão: ('_x', '_y'))

In [47]:
#join
carros_marca = carro.merge(marca, on='codmarca', how= 'inner')
# Filtrar os carros das marcas 'Ford' ou 'Fiat'
carros_fiat_ford= carros_marca[carros_marca['marca'].isin(['Ford','Fiat'])]
display(carros_fiat_ford)

Unnamed: 0,codcarro,codmarca,modelo,valor,marca
0,1,1,Ka,100.0,Ford
1,2,2,Argo,150.0,Fiat


In [48]:
'''
Encontre os clientes que alugaram o carro com modelo 'Onix'
'''
spark.sql('''
SELECT cliente.nome,carro.modelo
FROM aluguel
JOIN cliente ON aluguel.codcliente=cliente.codcliente
JOIN carro ON aluguel.codcarro = carro.codcarro
WHERE modelo='Onix'
''').show()

+-------------+------+
|         nome|modelo|
+-------------+------+
|Bruna Pereira|  Onix|
+-------------+------+



In [49]:
aluguel_cliente_carro=aluguel.merge(cliente, on='codcliente').merge(carro, on='codcarro')
#filtrar o modelo Onix
cliente_onix= aluguel_cliente_carro[aluguel_cliente_carro['modelo']=='Onix']
display(cliente_onix[['nome','data_aluguel','modelo']])

Unnamed: 0,nome,data_aluguel,modelo
3,Bruna Pereira,2023-04-04,Onix


In [50]:
#Liste os modelos de carros e o número de vezes que cada um foi alugado
spark.sql('''
SELECT carro.modelo, COUNT(codaluguel) AS qtde_alugueis
FROM aluguel
JOIN carro ON aluguel.codcarro = carro.codcarro
GROUP BY modelo
ORDER BY COUNT(codaluguel) DESC
''').show()

+------+-------------+
|modelo|qtde_alugueis|
+------+-------------+
|    Ka|            4|
|  Argo|            3|
|  Polo|            2|
|  Onix|            1|
+------+-------------+



In [51]:
#merge aluguel com cliente
aluguel_cliente=aluguel.merge(cliente,on='codcliente')

#contar
qtde_alugueis= aluguel_cliente.groupby('nome')['codaluguel'].count().reset_index()
#qtde_alugueis = qtde_alugueis.rename(columns= {'codaluguel':'qtde_alugueis'}, inplace=True)

#ordem descrescente
qtde_alugueis.columns = ['nome','qtde_alugueis']
qtde_alugueis=qtde_alugueis.sort_values(by='qtde_alugueis',ascending=False)
print(qtde_alugueis)

               nome  qtde_alugueis
0         Ana Silva              3
1     Bruna Pereira              3
2     Lúcia Andrade              2
3  Túlio Nascimento              2


In [59]:
'''
Encontre os clientes que alugaram carros em um determinado mês e ano,
listando os modelos de carros e as datas de aluguel.
'''
spark.sql('''
SELECT cliente.nome,carro.modelo,aluguel.data_aluguel
FROM aluguel
JOIN cliente ON aluguel.codcliente=cliente.codcliente
JOIN carro ON aluguel.codcarro=carro.codcarro
WHERE
MONTH(aluguel.data_aluguel) = 4 AND YEAR(aluguel.data_aluguel)=2023
''').show()


+----------------+------+------------+
|            nome|modelo|data_aluguel|
+----------------+------+------------+
|Túlio Nascimento|  Argo|  2023-04-01|
|   Bruna Pereira|    Ka|  2023-04-02|
|   Bruna Pereira|    Ka|  2023-04-03|
|   Bruna Pereira|  Onix|  2023-04-04|
|       Ana Silva|  Polo|  2023-04-05|
|       Ana Silva|  Polo|  2023-04-13|
|       Ana Silva|    Ka|  2023-04-15|
|   Lúcia Andrade|  Argo|  2023-04-19|
|   Lúcia Andrade|  Argo|  2023-04-21|
|Túlio Nascimento|    Ka|  2023-04-25|
+----------------+------+------------+



In [62]:
#transformando coluna em datime

aluguel_cliente_carro['data_aluguel'] = pd.to_datetime(aluguel_cliente_carro['data_aluguel'])

#merge - já existia o merge com essa referência
#aluguel_cliente_carro

#filtrar o mês de abril de 2023
alugueis_4_2023=aluguel_cliente_carro[
    (aluguel_cliente_carro['data_aluguel'].dt.month == 4) &
    (aluguel_cliente_carro['data_aluguel'].dt.year == 2023)
    ]
display(alugueis_4_2023[['nome','modelo','data_aluguel']])

Unnamed: 0,nome,modelo,data_aluguel
0,Túlio Nascimento,Argo,2023-04-01
1,Bruna Pereira,Ka,2023-04-02
2,Bruna Pereira,Ka,2023-04-03
3,Bruna Pereira,Onix,2023-04-04
4,Ana Silva,Polo,2023-04-05
5,Ana Silva,Polo,2023-04-13
6,Ana Silva,Ka,2023-04-15
7,Lúcia Andrade,Argo,2023-04-19
8,Lúcia Andrade,Argo,2023-04-21
9,Túlio Nascimento,Ka,2023-04-25


In [63]:
print(aluguel_cliente_carro.dtypes)

codaluguel               int64
codcliente               int64
codcarro                 int64
data_aluguel    datetime64[ns]
nome                    object
cidade                  object
sexo                    object
estado                  object
estadocivil             object
codmarca                 int64
modelo                  object
valor                  float64
dtype: object
