# __*Teste de Dados*__

### __*Configurações Iniciais do Projeto*__

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year,regexp_replace
from pyspark.sql.functions import desc
import psycopg2
import pandas as pd

path = 'dbfs:/FileStore'

db_properties = {
    "url": "jdbc:postgresql://psql-mock-database-cloud.postgres.database.azure.com:5432/ecom1692318010376vfwwambdpkyibnfm",
    "user": "kumgkhwuqrbpgarjylncaedn@psql-mock-database-cloud",
    "password": "zhbrfvkhhqhbeljntqpduvma",
    "driver": "org.postgresql.Driver"
}

connection_string = """dbname='ecom1692318010376vfwwambdpkyibnfm' 
                       user='kumgkhwuqrbpgarjylncaedn@psql-mock-database-cloud' 
                       password='zhbrfvkhhqhbeljntqpduvma' host='psql-mock-database-cloud.postgres.database.azure.com' 
                       port='5432'
                       """

connection = psycopg2.connect(connection_string)

spark = SparkSession.builder.appName("TestedeDados").getOrCreate()


### __*Funções Auxiliares*__

In [0]:

def GetTables(connection) -> list:
    """Função para listas todas as tabelas do DB"""

    query = """SELECT table_name
            FROM information_schema.tables
            WHERE table_schema = 'public' AND table_type = 'BASE TABLE';
            """
    cursor = connection.cursor()
    cursor.execute(query)
    tables = cursor.fetchall()
    return tables

### __*Salvando os dados como parquet*__

In [0]:
for table in GetTables(connection):
  table_name = table[0]
  df = spark.read.jdbc(url=db_properties["url"], table=table_name, properties=db_properties)
  df.write.mode("overwrite").parquet(f'{path}/{table_name}')
  print(f'{table_name} salva')

  

customers salva
employees salva
offices salva
orderdetails salva
orders salva
payments salva
product_lines salva
products salva


### __*Merge entre a tabela JDBC e os arquivos parquet*__

In [0]:


def Inser_jbdc_in_parquet(table_name:str,jdbc_properties:dict,path_parquet_table:str)->None:
    """Funçaõ para inserir """

    jdbc_table = spark.read.jdbc(url=jdbc_properties['url'], table=table_name, properties=jdbc_properties)
    parquet_data = spark.read.parquet(path_parquet_table)

    new_parquet_itens = jdbc_table.subtract(parquet_data)
    new_parquet_itens.write.mode("append").parquet(path_parquet_table)


def Update_jbdc_in_parquet(table_name:str,jdbc_properties:dict,path_parquet_table:str,col_id:str)->None:
    """Função para atualizar os dados"""

    jdbc_table = spark.read.jdbc(url=jdbc_properties['url'], table=table_name, properties=jdbc_properties)

    parquet_data = spark.read.parquet(path_parquet_table)

    parquet_update = jdbc_table.join(parquet_data.select(col_id),col_id )
    parquet_update.write.mode("overwrite").parquet(path_parquet_table)


def Delete_jbdc_in_parquet(table_name:str,jdbc_properties:dict,path_parquet_table:str,col_id:str)-> None:
    """Função para deletar dados"""
    
    jdbc_table = spark.read.jdbc(url=jdbc_properties['url'], table=table_name, properties=jdbc_properties)

    parquet_data = spark.read.parquet(path_parquet_table)

    parquet_delete = parquet_data.join(jdbc_table.select(col_id),col_id, "inner")
    parquet_delete.write.mode("overwrite").parquet(path_parquet_table)

    

__*Teste da função de Insert*__

In [0]:
# inserindo um novo valor no banco para testar a função 

query_insert = """
INSERT INTO public.products
(product_code, product_name, product_line, product_scale, product_vendor, product_description, quantity_in_stock, buy_price, msrp)
VALUES('S00_teste', 'product_test', 'teste_line', '1:100', 'teste', 'teste', 0, 0, 0);
"""

cursor = connection.cursor()
cursor.execute(query_insert)
connection.commit()

In [0]:
# verificando o numero de linhas antes de inserir 
df_insert = spark.read.parquet(f'{path}/products').count()
print(df_insert)

# inserindo os dados com a função 
Inser_jbdc_in_parquet(table_name='products',jdbc_properties=db_properties,path_parquet_table=f'{path}/products')
print('Dados atualizados')

# verificando o numero de linhas depois de inserir
df_insert = spark.read.parquet(f'{path}/products')
print(df_insert.count())





110
Dados atualizados
111


__*Teste da Função de Update*__

In [0]:
# Atualizando um novo valor no banco para testar a função 

query_insert = """
UPDATE public.products
SET product_name='product_test_update', product_line='teste_line_update'
where product_code = 'S00_teste';
"""

cursor = connection.cursor()
cursor.execute(query_insert)
connection.commit()

In [0]:
# visualizando os dados antes da atualização
spark.read.parquet(path+"/products").toPandas().tail(2)



Unnamed: 0,product_code,product_name,product_line,product_scale,product_vendor,product_description,quantity_in_stock,buy_price,msrp
109,S72_3212,Pont Yacht,Ships,1:72,Unimax Art Galleries,Measures 38 inches Long x 33 3/4 inches High. ...,414,33.3,54.6
110,S00_teste,product_test,teste_line,1:100,teste,teste,0,0.0,0.0


In [0]:
# atualizando os dados
Update_jbdc_in_parquet(table_name='products',jdbc_properties=db_properties,path_parquet_table=f'{path}/products',col_id='product_code')

In [0]:
# visualizando os dados depois da atualização
spark.read.parquet(path+"/products").toPandas().tail(2)



Unnamed: 0,product_code,product_name,product_line,product_scale,product_vendor,product_description,quantity_in_stock,buy_price,msrp
109,S72_3212,Pont Yacht,Ships,1:72,Unimax Art Galleries,Measures 38 inches Long x 33 3/4 inches High. ...,414,33.3,54.6
110,S00_teste,product_test_update,teste_line_update,1:100,teste,teste,0,0.0,0.0


__*Teste da Função de Delete*__

In [0]:
# Deletando um novo valor no banco para testar a função 

query_insert = """
DELETE FROM public.products
where product_code = 'S00_teste';
"""

cursor = connection.cursor()
cursor.execute(query_insert)
connection.commit()

In [0]:
# visualizando antes da exclusão dos dados
spark.read.parquet(path+'/products').toPandas().tail(2)



Unnamed: 0,product_code,product_name,product_line,product_scale,product_vendor,product_description,quantity_in_stock,buy_price,msrp
109,S72_3212,Pont Yacht,Ships,1:72,Unimax Art Galleries,Measures 38 inches Long x 33 3/4 inches High. ...,414,33.3,54.6
110,S00_teste,product_test_update,teste_line_update,1:100,teste,teste,0,0.0,0.0


In [0]:
# deletando os dados
Delete_jbdc_in_parquet(table_name='products',jdbc_properties=db_properties,path_parquet_table=f'{path}/products',col_id='product_code')

In [0]:
# visualizando depois da exclusão dos dados
spark.read.parquet(path+'/products').toPandas().tail(2)



Unnamed: 0,product_code,product_name,product_line,product_scale,product_vendor,product_description,quantity_in_stock,buy_price,msrp
108,S700_4002,American Airlines: MD-11S,Planes,1:700,Second Gear Diecast,Polished finish. Exact replia with official lo...,8820,36.27,74.03
109,S72_3212,Pont Yacht,Ships,1:72,Unimax Art Galleries,Measures 38 inches Long x 33 3/4 inches High. ...,414,33.3,54.6


### __*Análise dos Itens solicitados*__

#### __*Qual país possui a maior quantidade de itens cancelados?*__

__*Carregando as tabelas 'customers' e 'orders' com informações dos clientes e dos pedidos.*__

In [0]:
df_customers = spark.read.parquet(f'{path}/customers')
df_customers.toPandas().head(5)



Unnamed: 0,customer_number,customer_name,contact_last_name,contact_first_name,phone,address_line1,address_line2,city,state,postal_code,country,sales_rep_employee_number,credit_limit
0,103,Jake,King,Carine,40.32.2555,"54, rue Royale",,Nantes,Victoria,44000,France,1370.0,21000.0
1,112,Signal Gift Store,King,Jean,7025551838,8489 Strong St.,,Las Vegas,New York,83030,USA,1166.0,71800.0
2,114,"Australian Collectors, Co.",Ferguson,Peter Sr.,03 9520 4555,636 St Kilda Road,Level 3,Melbourne,Victoria,3004,Australia,1611.0,117300.0
3,119,La Rochelle Gifts,Labrune,Janine,40.67.8555,"67, rue des Cinquante Otages",,Nantes,,44000,France,1370.0,118200.0
4,121,Baane Mini Imports,Bergulfsen,Jonas,07-98 9555,Erling Skakkes gate 78,,Stavern,,4110,Norway,1504.0,81700.0


In [0]:
df_orders = spark.read.parquet(f'{path}/orders')
df_orders.toPandas().head(5)

Unnamed: 0,order_number,order_date,required_date,shipped_date,status,comments,customer_number
0,10100,2003-01-06,2003-01-13,2003-01-10,Resolved,,363
1,10101,2003-01-09,2003-01-18,2003-01-11,Shipped,Check on availability.,128
2,10102,2003-01-10,2003-01-18,2003-01-14,Shipped,,181
3,10103,2003-01-29,2003-02-07,2003-02-02,Shipped,,121
4,10104,2003-01-31,2003-02-09,2003-02-01,Shipped,,141


***Join entre as duas tabelas.***

In [0]:
df_join = df_orders.join(df_customers,on='customer_number',how='inner')
df_join.toPandas()



Unnamed: 0,customer_number,order_number,order_date,required_date,shipped_date,status,comments,customer_name,contact_last_name,contact_first_name,phone,address_line1,address_line2,city,state,postal_code,country,sales_rep_employee_number,credit_limit
0,103,10345,2004-11-25,2004-12-01,2004-11-26,Shipped,,Jake,King,Carine,40.32.2555,"54, rue Royale",,Nantes,Victoria,44000,France,1370,21000.00
1,103,10298,2004-09-27,2004-10-05,2004-10-01,Shipped,,Jake,King,Carine,40.32.2555,"54, rue Royale",,Nantes,Victoria,44000,France,1370,21000.00
2,103,10123,2003-05-20,2003-05-29,2003-05-22,Shipped,,Jake,King,Carine,40.32.2555,"54, rue Royale",,Nantes,Victoria,44000,France,1370,21000.00
3,112,10346,2004-11-29,2004-12-05,2004-11-30,Shipped,,Signal Gift Store,King,Jean,7025551838,8489 Strong St.,,Las Vegas,New York,83030,USA,1166,71800.00
4,112,10278,2004-08-06,2004-08-16,2004-08-09,Shipped,,Signal Gift Store,King,Jean,7025551838,8489 Strong St.,,Las Vegas,New York,83030,USA,1166,71800.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
324,495,10207,2003-12-09,2003-12-17,2003-12-11,Shipped,Check on availability.,Diecast,Franco,Dmitry,6175552555,6251 Ingle Ln.,,Boston,MA,51003,USA,1188,85100.00
325,496,10399,2005-04-01,2005-04-12,2005-04-03,Shipped,,Kelly Hello,Jason,Tony,+64 9 5555500,Arenales 2020,,Auckland,,,New Zealand,1612,110000.00
326,496,10360,2004-12-16,2004-12-22,2004-12-18,Shipped,,Kelly Hello,Jason,Tony,+64 9 5555500,Arenales 2020,,Auckland,,,New Zealand,1612,110000.00
327,496,10179,2003-11-11,2003-11-17,2003-11-13,Cancelled,Customer cancelled due to urgent budgeting iss...,Kelly Hello,Jason,Tony,+64 9 5555500,Arenales 2020,,Auckland,,,New Zealand,1612,110000.00


In [0]:
df_join = df_join.dropDuplicates()
df_join.toPandas().head(5)



Unnamed: 0,customer_number,order_number,order_date,required_date,shipped_date,status,comments,customer_name,contact_last_name,contact_first_name,phone,address_line1,address_line2,city,state,postal_code,country,sales_rep_employee_number,credit_limit
0,141,10104,2003-01-31,2003-02-09,2003-02-01,Shipped,,Euro+ Shopping Channel,Freyre,Diego,(91) 555 94 44,"C/ Moralzarzal, 86",,Madrid,,28034.0,Spain,1370,227600.0
1,211,10200,2003-12-01,2003-12-09,2003-12-06,Shipped,,"King Kong Collectables, Co.",Gao,Mike,+852 2251 1555,Bank of China Tower,1 Garden Road,Central Hong Kong,,,Hong Kong,1621,58600.0
2,321,10159,2003-10-10,2003-10-19,2003-10-16,Shipped,,Corporate Gift Ideas Co.,Brown,Julie,6505551386,7734 Strong St.,,San Francisco,CA,94217.0,USA,1165,105000.0
3,357,10202,2003-12-02,2003-12-09,2003-12-06,Shipped,,GiftsForHim.com,MacKinlay,Wales,64-9-3763555,199 Great North Road,,Auckland,,,New Zealand,1612,77700.0
4,124,10390,2005-03-04,2005-03-11,2005-03-07,Shipped,They want to reevaluate their terms agreement ...,Mini Gifts Distributors Ltd.,Nelson,Susan,4155551450,5677 Strong St.,,San Rafael,CA,97562.0,USA,1165,210500.0


In [0]:
df_join = df_join.select('customer_number','order_number','order_date','status','country')
df_join.toPandas()

Unnamed: 0,customer_number,order_number,order_date,status,country
0,141,10104,2003-01-31,Shipped,Spain
1,211,10200,2003-12-01,Shipped,Hong Kong
2,321,10159,2003-10-10,Shipped,USA
3,357,10202,2003-12-02,Shipped,New Zealand
4,124,10390,2005-03-04,Shipped,USA
...,...,...,...,...,...
324,119,10425,2005-05-31,Shipped,France
325,239,10222,2004-02-19,Shipped,USA
326,350,10122,2003-05-08,Shipped,France
327,353,10343,2004-11-24,Shipped,France


***Agrupando os paises com status cancelado***

In [0]:
df_join.filter(df_join.status == 'Cancelled').groupBy('country').count().orderBy(desc("count")).show()

+-----------+-----+
|    country|count|
+-----------+-----+
|New Zealand|    2|
|     Sweden|    1|
|      Spain|    1|
|        USA|    1|
|         UK|    1|
+-----------+-----+



***Portanto o país com o maior número de itens cancelados é a New Zealand***

In [0]:
df_join.filter(df_join.status == 'Cancelled').groupBy('country').count().orderBy(desc("count")).show(1)

+-----------+-----+
|    country|count|
+-----------+-----+
|New Zealand|    2|
+-----------+-----+
only showing top 1 row



#### __*Qual o faturamento da linha de produto mais vendido, considere como os itens Shipped, cujo o pedido foi realizado no ano de 2005?*__

In [0]:
df_orders = spark.read.parquet(f'{path}/orders')
df_orderdetails = spark.read.parquet(f'{path}/orderdetails')
df_products = spark.read.parquet(f'{path}/products')

***Primeiro vamos filtrar os pedidos do ano de 2005 e com status Shipped***

In [0]:
df_orders_filter = df_orders.filter((year(df_orders.order_date) == 2005) & (df_orders.status == "Shipped"))
df_orders_filter.toPandas().head(10)

Unnamed: 0,order_number,order_date,required_date,shipped_date,status,comments,customer_number
0,10362,2005-01-05,2005-01-16,2005-01-10,Shipped,,161
1,10363,2005-01-06,2005-01-12,2005-01-10,Shipped,,334
2,10364,2005-01-06,2005-01-17,2005-01-09,Shipped,,350
3,10365,2005-01-07,2005-01-18,2005-01-11,Shipped,,320
4,10366,2005-01-10,2005-01-19,2005-01-12,Shipped,,381
5,10368,2005-01-19,2005-01-27,2005-01-24,Shipped,Can we renegotiate this one?,124
6,10369,2005-01-20,2005-01-28,2005-01-24,Shipped,,379
7,10370,2005-01-20,2005-02-01,2005-01-25,Shipped,,276
8,10371,2005-01-23,2005-02-03,2005-01-25,Shipped,,124
9,10372,2005-01-26,2005-02-05,2005-01-28,Shipped,,398


***Realizando o join entre as 3 tabelas***

In [0]:
df_join = df_orderdetails.join(df_orders_filter,on='order_number',how='inner')
df_join.toPandas()



Unnamed: 0,order_number,product_code,quantity_ordered,price_each,order_line_number,order_date,required_date,shipped_date,status,comments,customer_number
0,10362,S10_4698,22,182.04,4,2005-01-05,2005-01-16,2005-01-10,Shipped,,161
1,10362,S12_2823,22,131.04,1,2005-01-05,2005-01-16,2005-01-10,Shipped,,161
2,10362,S18_2625,23,53.91,3,2005-01-05,2005-01-16,2005-01-10,Shipped,,161
3,10362,S24_1578,50,91.29,2,2005-01-05,2005-01-16,2005-01-10,Shipped,,161
4,10363,S12_1099,33,180.95,3,2005-01-06,2005-01-12,2005-01-10,Shipped,,334
...,...,...,...,...,...,...,...,...,...,...,...
417,10425,S24_2300,49,127.79,9,2005-05-31,2005-06-07,2021-05-03,Shipped,,119
418,10425,S24_2840,31,31.82,5,2005-05-31,2005-06-07,2021-05-03,Shipped,,119
419,10425,S32_1268,41,83.79,11,2005-05-31,2005-06-07,2021-05-03,Shipped,,119
420,10425,S32_2509,11,50.32,6,2005-05-31,2005-06-07,2021-05-03,Shipped,,119


In [0]:
df_join = df_join.join(df_products,on='product_code',how='inner')
df_join.toPandas()



Unnamed: 0,product_code,order_number,quantity_ordered,price_each,order_line_number,order_date,required_date,shipped_date,status,comments,customer_number,product_name,product_line,product_scale,product_vendor,product_description,quantity_in_stock,buy_price,msrp
0,S10_4698,10362,22,182.04,4,2005-01-05,2005-01-16,2005-01-10,Shipped,,161,2003 Harley-Davidson Eagle Drag Bike,Motorcycles,1:10,Red Start Diecast,"Model features, official Harley Davidson logos...",5582,91.02,193.66
1,S12_2823,10362,22,131.04,1,2005-01-05,2005-01-16,2005-01-10,Shipped,,161,2002 Suzuki XREO,Motorcycles,1:12,Unimax Art Galleries,"Official logos and insignias, saddle bags loca...",9997,66.27,150.62
2,S18_2625,10362,23,53.91,3,2005-01-05,2005-01-16,2005-01-10,Shipped,,161,1936 Harley Davidson El Knucklehead,Motorcycles,1:18,Welly Diecast Productions,Intricately detailed with chrome accents and t...,4357,24.23,60.57
3,S24_1578,10362,50,91.29,2,2005-01-05,2005-01-16,2005-01-10,Shipped,,161,1997 BMW R 1100 S,Motorcycles,1:24,Autoart Studio Design,Detailed scale replica with working suspension...,7003,60.86,112.70
4,S12_1099,10363,33,180.95,3,2005-01-06,2005-01-12,2005-01-10,Shipped,,334,1968 Fods Mustang,Classic Cars,1:12,Autoart Studio Design,"Hood, doors and trunk all open to reveal highl...",68,95.34,194.57
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
417,S24_2300,10425,49,127.79,9,2005-05-31,2005-06-07,2021-05-03,Shipped,,119,1962 Volkswagen Microbus,Trucks and Buses,1:24,Autoart Studio Design,This 1:18 scale die cast replica of the 1962 M...,2327,61.34,127.79
418,S24_2840,10425,31,31.82,5,2005-05-31,2005-06-07,2021-05-03,Shipped,,119,1958 Chevy Corvette Limited Edition,Classic Cars,1:24,Carousel DieCast Legends,The operating parts of this 1958 Chevy Corvett...,2542,15.91,35.36
419,S32_1268,10425,41,83.79,11,2005-05-31,2005-06-07,2021-05-03,Shipped,,119,1980�s GM Manhattan Express,Trucks and Buses,1:32,Motor City Art Classics,This 1980�s era new look Manhattan express is ...,5099,53.93,96.31
420,S32_2509,10425,11,50.32,6,2005-05-31,2005-06-07,2021-05-03,Shipped,,119,1954 Greyhound Scenicruiser,Trucks and Buses,1:32,Classic Metal Creations,"Model features bi-level seating, 50 windows, s...",2874,25.98,54.11


In [0]:
df_join.columns

Out[266]: ['product_code',
 'order_number',
 'quantity_ordered',
 'price_each',
 'order_line_number',
 'order_date',
 'required_date',
 'shipped_date',
 'status',
 'comments',
 'customer_number',
 'product_name',
 'product_line',
 'product_scale',
 'product_vendor',
 'product_description',
 'quantity_in_stock',
 'buy_price',
 'msrp']

***Filtro e cálculo para definir o faturamento de cada linha de produto.***

In [0]:
df_join = df_join.select('product_code','quantity_ordered','price_each','product_name','product_line')
df_join.toPandas()



Unnamed: 0,product_code,quantity_ordered,price_each,product_name,product_line
0,S10_4698,22,182.04,2003 Harley-Davidson Eagle Drag Bike,Motorcycles
1,S12_2823,22,131.04,2002 Suzuki XREO,Motorcycles
2,S18_2625,23,53.91,1936 Harley Davidson El Knucklehead,Motorcycles
3,S24_1578,50,91.29,1997 BMW R 1100 S,Motorcycles
4,S12_1099,33,180.95,1968 Fods Mustang,Classic Cars
...,...,...,...,...,...
417,S24_2300,49,127.79,1962 Volkswagen Microbus,Trucks and Buses
418,S24_2840,31,31.82,1958 Chevy Corvette Limited Edition,Classic Cars
419,S32_1268,41,83.79,1980�s GM Manhattan Express,Trucks and Buses
420,S32_2509,11,50.32,1954 Greyhound Scenicruiser,Trucks and Buses


***Calculando o faturamento por pedido feito.***

In [0]:
df_join=df_join.withColumn("order_billing", col("quantity_ordered") * col("price_each"))
df_join.toPandas()



Unnamed: 0,product_code,quantity_ordered,price_each,product_name,product_line,order_billing
0,S10_4698,22,182.04,2003 Harley-Davidson Eagle Drag Bike,Motorcycles,4004.88
1,S12_2823,22,131.04,2002 Suzuki XREO,Motorcycles,2882.88
2,S18_2625,23,53.91,1936 Harley Davidson El Knucklehead,Motorcycles,1239.93
3,S24_1578,50,91.29,1997 BMW R 1100 S,Motorcycles,4564.50
4,S12_1099,33,180.95,1968 Fods Mustang,Classic Cars,5971.35
...,...,...,...,...,...,...
417,S24_2300,49,127.79,1962 Volkswagen Microbus,Trucks and Buses,6261.71
418,S24_2840,31,31.82,1958 Chevy Corvette Limited Edition,Classic Cars,986.42
419,S32_1268,41,83.79,1980�s GM Manhattan Express,Trucks and Buses,3435.39
420,S32_2509,11,50.32,1954 Greyhound Scenicruiser,Trucks and Buses,553.52


***Somando os faturamentos por linha de produto.***

In [0]:
df_join.select('product_line','order_billing').groupBy('product_line').sum().orderBy(desc("sum(order_billing)")).show()


+----------------+------------------+
|    product_line|sum(order_billing)|
+----------------+------------------+
|    Classic Cars|         603666.99|
|    Vintage Cars|         222510.70|
|     Motorcycles|         212684.55|
|Trucks and Buses|         182231.45|
|          Planes|         109701.56|
|           Ships|          62989.19|
|          Trains|          22311.26|
+----------------+------------------+



***Portando a linha de produto com o maior faturamento e o Classic Cars  com um total de 603666.99 dolares no ano 2005.***

In [0]:
df_join.select('product_line','order_billing').groupBy('product_line').sum().orderBy(desc("sum(order_billing)")).show(1)

+------------+------------------+
|product_line|sum(order_billing)|
+------------+------------------+
|Classic Cars|         603666.99|
+------------+------------------+
only showing top 1 row



#### __*Nome, sobrenome e e-mail dos vendedores do Japão, o local-part do e-mail deve estar mascarado.*__

***Carregamento das tabelas e join entre os dados***

In [0]:
df_employees = spark.read.parquet(f'{path}/employees')
df_offices = spark.read.parquet(f'{path}/offices')

In [0]:
df_join = df_employees.join(df_offices,on = 'office_code',how='inner')
df_join.toPandas().head(10)

Unnamed: 0,office_code,employee_number,last_name,first_name,extension,email,reports_to,job_Title,city,phone,address_line1,address_line2,state,country,postal_code,territory
0,1,1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,,President,San Francisco11,+1 650 219 4782,100 Market Street,Suite 300,CA,USA,94080,
1,1,1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1002.0,VP Sales,San Francisco11,+1 650 219 4782,100 Market Street,Suite 300,CA,USA,94080,
2,1,1076,Firrelli,Jeff,x9273,jfirrelli@classicmodelcars.com,1002.0,VP Marketing,San Francisco11,+1 650 219 4782,100 Market Street,Suite 300,CA,USA,94080,
3,6,1088,Patterson,William,x4871,wpatterson@classicmodelcars.com,1056.0,Sales Manager (APAC),Sydney1,+61 2 9264 2451,5-11 Wentworth Avenue,Floor #2,,Australia,NSW 2010,APAC
4,4,1102,Bondur,Gerard,x5408,gbondur@classicmodelcars.com,1056.0,Sale Manager (EMEA),Paris,+33 14 723 4404,43 Rue Jouffroy D'abbans,,,France,75017,EMEA
5,1,1143,Bow,Anthony,x5428,abow@classicmodelcars.com,1056.0,Sales Manager (NA),San Francisco11,+1 650 219 4782,100 Market Street,Suite 300,CA,USA,94080,
6,1,1165,Jennings,Leslie,x3291,ljennings@classicmodelcars.com,1143.0,Sales Rep,San Francisco11,+1 650 219 4782,100 Market Street,Suite 300,CA,USA,94080,
7,1,1166,Thompson,Leslie,x4065,lthompson@classicmodelcars.com,1143.0,Sales Rep,San Francisco11,+1 650 219 4782,100 Market Street,Suite 300,CA,USA,94080,
8,2,1188,Firrelli,Julie,x2173,jfirrelli@classicmodelcars.com,1143.0,Sales Rep,Boston,+1 215 837 0825,1550 Court Place,Suite 102,MA,USA,02107,
9,2,1216,Patterson,Steve,x4334,spatterson@classicmodelcars.com,1143.0,Sales Rep,Boston,+1 215 837 0825,1550 Court Place,Suite 102,MA,USA,02107,


***Filtrando as colunas desejadas e mascarar o local-part***

In [0]:
df_join = df_join.select('first_name','last_name','email','country')
df_join = df_join.withColumn("email", regexp_replace(col("email"), "^[^@]+", "****"))
df_join.toPandas()

Unnamed: 0,first_name,last_name,email,country
0,Diane,Murphy,****@classicmodelcars.com,USA
1,Mary,Patterson,****@classicmodelcars.com,USA
2,Jeff,Firrelli,****@classicmodelcars.com,USA
3,William,Patterson,****@classicmodelcars.com,Australia
4,Gerard,Bondur,****@classicmodelcars.com,France
5,Anthony,Bow,****@classicmodelcars.com,USA
6,Leslie,Jennings,****@classicmodelcars.com,USA
7,Leslie,Thompson,****@classicmodelcars.com,USA
8,Julie,Firrelli,****@classicmodelcars.com,USA
9,Steve,Patterson,****@classicmodelcars.com,USA


***Pegando somento os vendedores do Japão***

In [0]:
df_join.filter(df_join.country == 'Japan').toPandas()

Unnamed: 0,first_name,last_name,email,country
0,Mami,Nishi,****@classicmodelcars.com,Japan
1,Yoshimi,Kato,****@classicmodelcars.com,Japan
