1- Importar bibliotecas

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import split
from pyspark.sql.functions import substring
from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import * #StructType, StructField, StringType
from pyspark.sql.types import IntegerType 
from pyspark.sql import functions as F
from datetime import datetime
import pandas as pd
from pandas import DataFrame
import os

2- Ler arquivo .csv

In [0]:
%python
file_path = "/FileStore/tables/dados_clientes.csv"

df_customer = spark.read.csv(file_path)
display(df_customer)

_c0
Número da Conta;Nome do Cliente;Endereço;Telefone;Email
1;Fernanda Lima;Rua da Consolação
2;Juliana Costa;Rua Augusta
3;Pedro Santos;Rua da Consolação
4;Fernanda Lima;Rua Augusta
5;Lucas Fernandes;Rua da Consolação
6;Carlos Pereira;Rua XV de Novembro
7;Juliana Costa;Avenida Paulista
8;Carlos Pereira;Avenida Paulista
9;Ana Souza;Avenida Atlântica


3- Estruturar o arquivo .csv definindo as colunas do df (nome, tipo, se pode ou não nulo)

In [0]:
file_schema = StructType([
    StructField(('Número da Conta'), IntegerType(), False)
    ,StructField(('Nome do Cliente'), StringType(), False)
    ,StructField(('Endereço'), StringType(), False)
    ,StructField(('Telefone'), StringType(), False)
    ,StructField(('Email'), StringType(), False)
])

df_customer = spark.read.format("csv")\
    .option("inferSchema", "true") \
    .option("schema", file_schema) \
    .option("header", "true") \
    .option("sep", ";") \
    .option("encoding", "utf-8") \
    .load(file_path)

df_customer.show(3)

+---------------+---------------+--------------------+---------------+--------------------+
|Número da Conta|Nome do Cliente|            Endereço|       Telefone|               Email|
+---------------+---------------+--------------------+---------------+--------------------+
|              1|  Fernanda Lima|Rua da Consolação...|(11) 98623-5795|cliente1@exemplo.com|
|              2|  Juliana Costa|   Rua Augusta, 1011|(11) 98959-1767|cliente2@exemplo.com|
|              3|   Pedro Santos|Rua da Consolação...|(11) 93947-2587|cliente3@exemplo.com|
+---------------+---------------+--------------------+---------------+--------------------+
only showing top 3 rows



In [0]:
# Visualizar como está a estrutura de cada coluna
df_customer.printSchema()

root
 |-- Número da Conta: integer (nullable = true)
 |-- Nome do Cliente: string (nullable = true)
 |-- Endereço: string (nullable = true)
 |-- Telefone: string (nullable = true)
 |-- Email: string (nullable = true)



In [0]:
# Quantidade de linhas do dataframe
number_rows = df_customer.count()

# Quantidade de colunas do dataframe
# dtypes = info
number_columns = len(df_customer.dtypes)

print(f'Number of rows: {number_rows}')
print(f'Row: {df_customer.head(1)}\n')
print(f'Number of columns: {number_columns}')
print(f'Columns: {df_customer.columns}')

Number of rows: 1001
Row: [Row(Número da Conta=1, Nome do Cliente='Fernanda Lima', Endereço='Rua da Consolação, 2223', Telefone='(11) 98623-5795', Email='cliente1@exemplo.com')]

Number of columns: 5
Columns: ['Número da Conta', 'Nome do Cliente', 'Endereço', 'Telefone', 'Email']


4- Normalizações

- Normalizar a coluna Endereço (col1= Logradouro e col2= Número do Logradouro)

In [0]:
# É necessário essa biblioteca: from pyspark.sql.functions import split

# Dividir a coluna 'Endereço' em 'Logradouro' (antes da virgula) e 'Número Logradouro' (depois da virgula)
df_customer = df_customer.withColumn("Logradouro", split(df_customer["Endereço"], ",")[0])
df_customer = df_customer.withColumn("Número Logradouro", split(df_customer["Endereço"], ",")[1])

# Dropar a coluna 'Endereço'
df_customer = df_customer.drop("Endereço")

df_customer.show(3)

+---------------+---------------+---------------+--------------------+-----------------+-----------------+
|Número da Conta|Nome do Cliente|       Telefone|               Email|       Logradouro|Número Logradouro|
+---------------+---------------+---------------+--------------------+-----------------+-----------------+
|              1|  Fernanda Lima|(11) 98623-5795|cliente1@exemplo.com|Rua da Consolação|             2223|
|              2|  Juliana Costa|(11) 98959-1767|cliente2@exemplo.com|      Rua Augusta|             1011|
|              3|   Pedro Santos|(11) 93947-2587|cliente3@exemplo.com|Rua da Consolação|             2223|
+---------------+---------------+---------------+--------------------+-----------------+-----------------+
only showing top 3 rows



- Normalizar a coluna Telefone (col1= DDD e col2= Telefone)

In [0]:
# É necessário essa biblioteca: from pyspark.sql.functions import substring

# Dividir a coluna 'Telefone' em 'DDD' (antes do parenteses) e 'Telefone' (depois do parenteses)
df_customer = df_customer.withColumn("DDD", split(df_customer["Telefone"], " ")[0])
df_customer = df_customer.withColumn("Número Telefone", split(df_customer["Telefone"], " ")[1])

df_customer.show(3)

+---------------+---------------+---------------+--------------------+-----------------+-----------------+----+---------------+
|Número da Conta|Nome do Cliente|       Telefone|               Email|       Logradouro|Número Logradouro| DDD|Número Telefone|
+---------------+---------------+---------------+--------------------+-----------------+-----------------+----+---------------+
|              1|  Fernanda Lima|(11) 98623-5795|cliente1@exemplo.com|Rua da Consolação|             2223|(11)|     98623-5795|
|              2|  Juliana Costa|(11) 98959-1767|cliente2@exemplo.com|      Rua Augusta|             1011|(11)|     98959-1767|
|              3|   Pedro Santos|(11) 93947-2587|cliente3@exemplo.com|Rua da Consolação|             2223|(11)|     93947-2587|
+---------------+---------------+---------------+--------------------+-----------------+-----------------+----+---------------+
only showing top 3 rows



In [0]:
# É necessário essas bibliotecas: from pyspark.sql import SparkSession e from pyspark.sql.functions import regexp_replace

# Fazer a limpeza dos caracteres especiais >>> Substituir () - e espaços por nada
df_customer = df_customer.withColumn('DDD', regexp_replace('DDD', r'\(|\)|\ ', ''))
df_customer = df_customer.withColumn('Número Telefone', regexp_replace('Número Telefone', r'\-|\ ', ''))

# Dropar a coluna 'Telefone'
df_customer = df_customer.drop("Telefone")

df_customer.show(3)

+---------------+---------------+--------------------+-----------------+-----------------+---+---------------+
|Número da Conta|Nome do Cliente|               Email|       Logradouro|Número Logradouro|DDD|Número Telefone|
+---------------+---------------+--------------------+-----------------+-----------------+---+---------------+
|              1|  Fernanda Lima|cliente1@exemplo.com|Rua da Consolação|             2223| 11|      986235795|
|              2|  Juliana Costa|cliente2@exemplo.com|      Rua Augusta|             1011| 11|      989591767|
|              3|   Pedro Santos|cliente3@exemplo.com|Rua da Consolação|             2223| 11|      939472587|
+---------------+---------------+--------------------+-----------------+-----------------+---+---------------+
only showing top 3 rows



5 - Alterar os tipos das colunas do dataframe

In [0]:
# É necessário essas bibliotecas: from pyspark.sql.types import IntegerType e from pyspark.sql import functions as F

# Setando tipos de dados das colunas
df_customer = df_customer.withColumn("Logradouro", F.col("Logradouro").cast(StringType())) \
    .withColumn("Número Logradouro", F.col("Número Logradouro").cast(IntegerType())) \
    .withColumn("DDD", F.col("DDD").cast(IntegerType())) \
    .withColumn("Número Telefone", F.col("Número Telefone").cast(IntegerType()))



In [0]:
# Visualizar como está a estrutura de cada coluna
df_customer.printSchema()

root
 |-- Número da Conta: integer (nullable = true)
 |-- Nome do Cliente: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Logradouro: string (nullable = true)
 |-- Número Logradouro: integer (nullable = true)
 |-- DDD: integer (nullable = true)
 |-- Número Telefone: integer (nullable = true)



In [0]:
# Quantidade de linhas do dataframe
number_rows = df_customer.count()

# Quantidade de colunas do dataframe
# dtypes = info
number_columns = len(df_customer.dtypes)

print(f'Number of rows: {number_rows}')
print(f'Row: {df_customer.head(1)}\n')
print(f'Number of columns: {number_columns}')
print(f'Columns: {df_customer.columns}')

Number of rows: 1001
Row: [Row(Número da Conta=1, Nome do Cliente='Fernanda Lima', Email='cliente1@exemplo.com', Logradouro='Rua da Consolação', Número Logradouro=2223, DDD=11, Número Telefone=986235795)]

Number of columns: 7
Columns: ['Número da Conta', 'Nome do Cliente', 'Email', 'Logradouro', 'Número Logradouro', 'DDD', 'Número Telefone']


6- Filtros e agregações no dataframe

In [0]:
# Fitrar DDD, Número do telefone e a Cidade Região de cada um dos DDD 
filtrar_df_costumer = df_customer.select(col("DDD")
                    ,col("Número Telefone").alias("Telefone")
                    ,when(df_customer.DDD == "11","Região Metropolitana de São Paulo")
                    .when(df_customer.DDD == "12","Vale do Paraíba e Litoral Norte")
                    .when(df_customer.DDD == "13","Baixada Santista")
                    .when(df_customer.DDD == "14","Bauru e região")
                    .when(df_customer.DDD == "15","Sorocaba e região")
                    .when(df_customer.DDD == "16","Ribeirão Preto e região")
                    .when(df_customer.DDD == "17","São José do Rio Preto e região")
                    .when(df_customer.DDD == "18","Presidente Prudente e região")
                    .when(df_customer.DDD == "19","Campinas e região")
                    .when(df_customer.DDD.isNull() ,"Outro Estado")
                    .otherwise(df_customer.DDD).alias("Cidade Região DDD")).show()

+---+---------+--------------------+
|DDD| Telefone|   Cidade Região DDD|
+---+---------+--------------------+
| 11|986235795|Região Metropolit...|
| 11|989591767|Região Metropolit...|
| 11|939472587|Região Metropolit...|
| 11|993453815|Região Metropolit...|
| 11|964492610|Região Metropolit...|
| 11|949146650|Região Metropolit...|
| 11|935213943|Região Metropolit...|
| 11|919848352|Região Metropolit...|
| 11|952406030|Região Metropolit...|
| 11|915694953|Região Metropolit...|
| 11|998834579|Região Metropolit...|
| 11|916898490|Região Metropolit...|
| 11|929255254|Região Metropolit...|
| 11|937508674|Região Metropolit...|
| 11|964363441|Região Metropolit...|
| 11|970225724|Região Metropolit...|
| 11|999144856|Região Metropolit...|
| 11|967671371|Região Metropolit...|
| 11|990908621|Região Metropolit...|
| 11|940032265|Região Metropolit...|
+---+---------+--------------------+
only showing top 20 rows



In [0]:
# Filtrar os dados dos clientes que começam com a letra 'J'
filtrar_costumer_j = df_customer.filter(F.col('Nome do Cliente').like("J%"))
filtrar_costumer_j.show()

+---------------+---------------+--------------------+------------------+-----------------+---+---------------+
|Número da Conta|Nome do Cliente|               Email|        Logradouro|Número Logradouro|DDD|Número Telefone|
+---------------+---------------+--------------------+------------------+-----------------+---+---------------+
|              2|  Juliana Costa|cliente2@exemplo.com|       Rua Augusta|             1011| 11|      989591767|
|              7|  Juliana Costa|cliente7@exemplo.com|  Avenida Paulista|              456| 11|      935213943|
|             10|     João Silva|cliente10@exemplo...|    Avenida Brasil|             1213| 11|      915694953|
|             14|     João Silva|cliente14@exemplo...|  Avenida Ipiranga|             2021| 11|      937508674|
|             17|     João Silva|cliente17@exemplo...|    Rua das Flores|              123| 11|      999144856|
|             18|     João Silva|cliente18@exemplo...| Avenida Atlântica|             1617| 11|      967

In [0]:
# Agregação por DDD
# É necessário essa biblioteca: import pyspark.sql.functions importar avg 
agregacao_ddd_costumer = df_customer.groupBy(df_customer.DDD).agg(F.count(F.col('Nome do cliente')).alias('Contagem DDD'))
agregacao_ddd_costumer.show() 


+---+------------+
|DDD|Contagem DDD|
+---+------------+
| 11|        1001|
+---+------------+



7- Salvar em formato parquet

In [0]:
df_customer.write.parquet('/path/to/save/df_file_costumer.parquet')

8- Visualizar o dataframe em uma view (SQL)

In [0]:
df_customer.createOrReplaceTempView("dados_clientes")

In [0]:
%sql
-- Visualizar a view
SELECT * FROM dados_clientes

Número da Conta,Nome do Cliente,Email,Logradouro,Número Logradouro,DDD,Número Telefone
1,Fernanda Lima,cliente1@exemplo.com,Rua da Consolação,2223,11,986235795
2,Juliana Costa,cliente2@exemplo.com,Rua Augusta,1011,11,989591767
3,Pedro Santos,cliente3@exemplo.com,Rua da Consolação,2223,11,939472587
4,Fernanda Lima,cliente4@exemplo.com,Rua Augusta,1011,11,993453815
5,Lucas Fernandes,cliente5@exemplo.com,Rua da Consolação,2223,11,964492610
6,Carlos Pereira,cliente6@exemplo.com,Rua XV de Novembro,1415,11,949146650
7,Juliana Costa,cliente7@exemplo.com,Avenida Paulista,456,11,935213943
8,Carlos Pereira,cliente8@exemplo.com,Avenida Paulista,456,11,919848352
9,Ana Souza,cliente9@exemplo.com,Avenida Atlântica,1617,11,952406030
10,João Silva,cliente10@exemplo.com,Avenida Brasil,1213,11,915694953


9- Fazendo agregações simples utilizando a view: dados_clientes

In [0]:
%sql
-- DDD, Número do telefone e a Cidade Região de cada um dos DDD 
SELECT DDD
,`Número Telefone` as Telefone
,CASE WHEN DDD = 11 THEN 'Região Metropolitana de São Paulo'
      WHEN DDD = 12 THEN 'Vale do Paraíba e Litoral Norte'
      WHEN DDD = 13 THEN 'Baixada Santista'
      WHEN DDD = 14 THEN 'Bauru e região'
      WHEN DDD = 15 THEN 'Sorocaba e região'
      WHEN DDD = 16 THEN 'Ribeirão Preto e região'
      WHEN DDD = 17 THEN 'São José do Rio Preto e região'
      WHEN DDD = 18 THEN 'Presidente Prudente e região'
      WHEN DDD = 19 THEN 'Campinas e região'
    ELSE 'Outro Estado'
  END AS `Cidade Região DDD`
FROM dados_clientes


DDD,Telefone,Cidade Região DDD
11,986235795,Região Metropolitana de São Paulo
11,989591767,Região Metropolitana de São Paulo
11,939472587,Região Metropolitana de São Paulo
11,993453815,Região Metropolitana de São Paulo
11,964492610,Região Metropolitana de São Paulo
11,949146650,Região Metropolitana de São Paulo
11,935213943,Região Metropolitana de São Paulo
11,919848352,Região Metropolitana de São Paulo
11,952406030,Região Metropolitana de São Paulo
11,915694953,Região Metropolitana de São Paulo


In [0]:
%sql
-- Retornar os dados dos clientes que começam com a letra 'J'
SELECT *
FROM dados_clientes
WHERE `Nome do Cliente` LIKE 'J%' 

Número da Conta,Nome do Cliente,Email,Logradouro,Número Logradouro,DDD,Número Telefone
2,Juliana Costa,cliente2@exemplo.com,Rua Augusta,1011,11,989591767
7,Juliana Costa,cliente7@exemplo.com,Avenida Paulista,456,11,935213943
10,João Silva,cliente10@exemplo.com,Avenida Brasil,1213,11,915694953
14,João Silva,cliente14@exemplo.com,Avenida Ipiranga,2021,11,937508674
17,João Silva,cliente17@exemplo.com,Rua das Flores,123,11,999144856
18,João Silva,cliente18@exemplo.com,Avenida Atlântica,1617,11,967671371
19,Juliana Costa,cliente19@exemplo.com,Rua XV de Novembro,1415,11,990908621
25,Juliana Costa,cliente25@exemplo.com,Avenida Atlântica,1617,11,960019926
31,Juliana Costa,cliente31@exemplo.com,Avenida Paulista,456,11,932425096
36,João Silva,cliente36@exemplo.com,Rua das Flores,123,11,975703918


In [0]:
%sql
-- Agregação por DDD
SELECT DDD
,COUNT(`Nome do cliente`)
FROM dados_clientes
GROUP BY DDD 


DDD,count(Nome do cliente)
11,1001
