In [1]:
from pyspark.sql import SparkSession, dataframe
from pyspark.sql.functions import when, col, sum, count, isnan
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import rtrim

In [2]:
# Criação da instância spark para leitura e criação de tabelas
spark = SparkSession.builder.master("local[*]").enableHiveSupport().getOrCreate()

In [3]:
# Lendo tabela CLIENTES do HDFS:

df_clientes = spark.read.options(delimiter=';', header=True).option('inferSchema','true')\
                                .csv('/dados_processamento/dados/CLIENTES.csv')

In [4]:
# Lendo tabela DIVISAO do HDFS:

df_divisao = spark.read.options(delimiter=';', header=True).option('inferSchema','true')\
                                .csv('/dados_processamento/dados/DIVISAO.csv')

In [5]:
# Lendo tabela ENDERECO do HDFS:

df_endereco = spark.read.options(delimiter=';', header=True).option('inferSchema','true')\
                                .csv('/dados_processamento/dados/ENDERECO.csv')

In [6]:
# Lendo tabela REGIAO do HDFS:

df_regiao = spark.read.options(delimiter=';', header=True).option('inferSchema','true')\
                                .csv('/dados_processamento/dados/REGIAO.csv')

In [7]:
# Lendo tabela VENDAS do HDFS:

df_vendas = spark.read.options(delimiter=';', header=True).option('inferSchema','true')\
                                .csv('/dados_processamento/dados/VENDAS.csv')

In [8]:
## Analisando a tabela Cliente

df_clientes.printSchema()

root
 |-- Address Number: integer (nullable = true)
 |-- Business Family: string (nullable = true)
 |-- Business Unit: integer (nullable = true)
 |-- Customer: string (nullable = true)
 |-- CustomerKey: integer (nullable = true)
 |-- Customer Type: string (nullable = true)
 |-- Division: integer (nullable = true)
 |-- Line of Business: string (nullable = true)
 |-- Phone: string (nullable = true)
 |-- Region Code: integer (nullable = true)
 |-- Regional Sales Mgr: string (nullable = true)
 |-- Search Type: string (nullable = true)



In [9]:
df_clientes.show()

+--------------+---------------+-------------+--------------------+-----------+-------------+--------+----------------+------------+-----------+------------------+-----------+
|Address Number|Business Family|Business Unit|            Customer|CustomerKey|Customer Type|Division|Line of Business|       Phone|Region Code|Regional Sales Mgr|Search Type|
+--------------+---------------+-------------+--------------------+-----------+-------------+--------+----------------+------------+-----------+------------------+-----------+
|      10000000|             R3|            1|    City Supermarket|   10000000|           G2|       2|                |816-455-8733|          4|               S16|          C|
|      10000453|             R3|            1|       A Supermarket|   10000453|           G1|       1|                |816-455-8733|          5|               S19|          C|
|      10000455|             R3|            1|Caribian Supermarket|   10000455|           G2|       2|                |8

In [10]:
# Verificando a existência de dados Null ou NaN
df_clientes.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_clientes.columns]).show()

+--------------+---------------+-------------+--------+-----------+-------------+--------+----------------+-----+-----------+------------------+-----------+
|Address Number|Business Family|Business Unit|Customer|CustomerKey|Customer Type|Division|Line of Business|Phone|Region Code|Regional Sales Mgr|Search Type|
+--------------+---------------+-------------+--------+-----------+-------------+--------+----------------+-----+-----------+------------------+-----------+
|             0|              0|            0|       0|          0|            0|       0|               0|    0|          0|                 0|          0|
+--------------+---------------+-------------+--------+-----------+-------------+--------+----------------+-----+-----------+------------------+-----------+



In [11]:
'''
Como a tabale não possui valores Null ou NaN, é necessário apenas alterar a coluna "Line of Business" que possuia a 
str "   " (três espaços) representando o campo não informado.
'''
# df_clientes.filter(col("Line of Business") == '   ').show()

df_clientes = df_clientes.withColumn('Line of Business', regexp_replace('Line of Business','   ', 'não informado'))
df_clientes.show()

+--------------+---------------+-------------+--------------------+-----------+-------------+--------+----------------+------------+-----------+------------------+-----------+
|Address Number|Business Family|Business Unit|            Customer|CustomerKey|Customer Type|Division|Line of Business|       Phone|Region Code|Regional Sales Mgr|Search Type|
+--------------+---------------+-------------+--------------------+-----------+-------------+--------+----------------+------------+-----------+------------------+-----------+
|      10000000|             R3|            1|    City Supermarket|   10000000|           G2|       2|   não informado|816-455-8733|          4|               S16|          C|
|      10000453|             R3|            1|       A Supermarket|   10000453|           G1|       1|   não informado|816-455-8733|          5|               S19|          C|
|      10000455|             R3|            1|Caribian Supermarket|   10000455|           G2|       2|   não informado|8

In [12]:
df_endereco.printSchema()

root
 |-- Address Number: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Customer Address 1: string (nullable = true)
 |-- Customer Address 2: string (nullable = true)
 |-- Customer Address 3: string (nullable = true)
 |-- Customer Address 4: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zip Code: string (nullable = true)



In [13]:
df_endereco.show()

+--------------+--------------------+-------+--------------------+--------------------+--------------------+--------------------+-----+------------+
|Address Number|                City|Country|  Customer Address 1|  Customer Address 2|  Customer Address 3|  Customer Address 4|State|    Zip Code|
+--------------+--------------------+-------+--------------------+--------------------+--------------------+--------------------+-----+------------+
|      10000000|               Akron|     US|         PO Box 6258|                 ...|                 ...|                 ...|   OH|       44312|
|      10000453|                 ...|     UK|                 ...|                 ...|                 ...|                 ...| null|            |
|      10000455|    Huntington Beach|     US|   7392 Count Circle|                 ...|                 ...|                 ...|   CA|       92647|
|      10000456|            Edmonton|     CA|    8151 Wagner Road|                 ...|                 ..

In [27]:
# Verificando a existência de dados Null ou NaN
df_endereco.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_endereco.columns]).show()

+--------------+----+-------+------------------+------------------+------------------+------------------+-----+--------+
|Address Number|City|Country|Customer Address 1|Customer Address 2|Customer Address 3|Customer Address 4|State|Zip Code|
+--------------+----+-------+------------------+------------------+------------------+------------------+-----+--------+
|             0|   0|      0|                 0|                 0|                 0|                 0|    0|       0|
+--------------+----+-------+------------------+------------------+------------------+------------------+-----+--------+



In [15]:
'''
Pela inspeção visual foi possível perceber que uma série de colunas possui dados faltentes, onde esses campos são preenchido 
por um conjunto de espaços (space). Dependendo da coluna a quantidade de espaços variam. No intuito de uniformizar os campos
que não possuiam dados apenas espaços, foi utilizado a função rtrim (remove espaços a direita).O resultado da aplicação desta
função nos campos onde só possuiam espaços é o retorno do campo vazio (''). Logo, aplicando a substituição dos valores nos 
campos vazios, resolvemos uma parte do problema.

O segundo problema encontrado na tabela foi 70 valores Null na coluna 'state'. Então foi utilizado o método isNull para
identificar os campos nulos e a função when para subsitui-los.
'''


for c in df_endereco.columns:
    df_endereco = df_endereco.withColumn(c,rtrim(df_endereco[c]))
    df_endereco = df_endereco.withColumn(c, when(df_endereco[c] == '', 'não informado')\
                                         .when(df_endereco[c].isNull(), 'não informado')\
                                         .otherwise(df_endereco[c]))

df_endereco.show()

+--------------+----------------+-------+--------------------+--------------------+------------------+------------------+-------------+-------------+
|Address Number|            City|Country|  Customer Address 1|  Customer Address 2|Customer Address 3|Customer Address 4|        State|     Zip Code|
+--------------+----------------+-------+--------------------+--------------------+------------------+------------------+-------------+-------------+
|      10000000|           Akron|     US|         PO Box 6258|       não informado|     não informado|     não informado|           OH|        44312|
|      10000453|   não informado|     UK|       não informado|       não informado|     não informado|     não informado|não informado|não informado|
|      10000455|Huntington Beach|     US|   7392 Count Circle|       não informado|     não informado|     não informado|           CA|        92647|
|      10000456|        Edmonton|     CA|    8151 Wagner Road|       não informado|     não informad

In [29]:
# Verificando a existência de dados Null ou NaN
df_endereco.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_endereco.columns]).show()

+--------------+----+-------+------------------+------------------+------------------+------------------+-----+--------+
|Address Number|City|Country|Customer Address 1|Customer Address 2|Customer Address 3|Customer Address 4|State|Zip Code|
+--------------+----+-------+------------------+------------------+------------------+------------------+-----+--------+
|             0|   0|      0|                 0|                 0|                 0|                 0|    0|       0|
+--------------+----+-------+------------------+------------------+------------------+------------------+-----+--------+



In [16]:
df_vendas.show()

+--------------------+-----------+----------+---------------+------------+--------------+----------+-----------+--------------------+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+
|Actual Delivery Date|CustomerKey|   DateKey|Discount Amount|Invoice Date|Invoice Number|Item Class|Item Number|                Item|Line Number|List Price|Order Number|Promised Delivery Date|Sales Amount|Sales Amount Based on List Price|Sales Cost Amount|Sales Margin Amount|Sales Price|Sales Quantity|Sales Rep|U/M|
+--------------------+-----------+----------+---------------+------------+--------------+----------+-----------+--------------------+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+
|          28/04/2019|   10000481|28/04/2018| 

In [17]:
df_vendas.printSchema()

root
 |-- Actual Delivery Date: string (nullable = true)
 |-- CustomerKey: integer (nullable = true)
 |-- DateKey: string (nullable = true)
 |-- Discount Amount: string (nullable = true)
 |-- Invoice Date: string (nullable = true)
 |-- Invoice Number: integer (nullable = true)
 |-- Item Class: string (nullable = true)
 |-- Item Number: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- Line Number: integer (nullable = true)
 |-- List Price: string (nullable = true)
 |-- Order Number: integer (nullable = true)
 |-- Promised Delivery Date: string (nullable = true)
 |-- Sales Amount: string (nullable = true)
 |-- Sales Amount Based on List Price: string (nullable = true)
 |-- Sales Cost Amount: string (nullable = true)
 |-- Sales Margin Amount: string (nullable = true)
 |-- Sales Price: string (nullable = true)
 |-- Sales Quantity: integer (nullable = true)
 |-- Sales Rep: integer (nullable = true)
 |-- U/M: string (nullable = true)



In [18]:
# Verificando a existência de dados Null ou NaN
df_vendas.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_vendas.columns]).show()

+--------------------+-----------+-------+---------------+------------+--------------+----------+-----------+----+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+
|Actual Delivery Date|CustomerKey|DateKey|Discount Amount|Invoice Date|Invoice Number|Item Class|Item Number|Item|Line Number|List Price|Order Number|Promised Delivery Date|Sales Amount|Sales Amount Based on List Price|Sales Cost Amount|Sales Margin Amount|Sales Price|Sales Quantity|Sales Rep|U/M|
+--------------------+-----------+-------+---------------+------------+--------------+----------+-----------+----+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+
|                 253|        253|    253|            255|         253|           253|      8542|      

In [19]:
'''
É possivel verificar que algumas linhas possuem todos os campos nulos, logo, iremos deletar todas estas linhas. Para selecionar
as linhas onde todos os campos são nulos, basta verificar as 3 primeiras colunas.
'''
df_vendas.filter(((col("Actual Delivery Date").isNull()) & (col("CustomerKey").isNull()) & (col("DateKey").isNull()))).show(5)

+--------------------+-----------+-------+---------------+------------+--------------+----------+-----------+----+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+----+
|Actual Delivery Date|CustomerKey|DateKey|Discount Amount|Invoice Date|Invoice Number|Item Class|Item Number|Item|Line Number|List Price|Order Number|Promised Delivery Date|Sales Amount|Sales Amount Based on List Price|Sales Cost Amount|Sales Margin Amount|Sales Price|Sales Quantity|Sales Rep| U/M|
+--------------------+-----------+-------+---------------+------------+--------------+----------+-----------+----+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+----+
|                null|       null|   null|           null|        null|          null|      null|   

In [20]:
# Removendo do dataframe todas as linhas que possuem todos os campos nulos
df_vendas = df_vendas.filter(((col("Actual Delivery Date").isNotNull())\
                              & (col("CustomerKey").isNotNull())\
                              & (col("DateKey").isNotNull())))

In [21]:
# Verificando novamente a existência de dados Null ou NaN para visualizar se as linhas nulas foram removidas
df_vendas.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_vendas.columns]).show()

+--------------------+-----------+-------+---------------+------------+--------------+----------+-----------+----+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+
|Actual Delivery Date|CustomerKey|DateKey|Discount Amount|Invoice Date|Invoice Number|Item Class|Item Number|Item|Line Number|List Price|Order Number|Promised Delivery Date|Sales Amount|Sales Amount Based on List Price|Sales Cost Amount|Sales Margin Amount|Sales Price|Sales Quantity|Sales Rep|U/M|
+--------------------+-----------+-------+---------------+------------+--------------+----------+-----------+----+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+
|                   0|          0|      0|              2|           0|             0|      8289|      

In [22]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, FloatType
from pyspark.sql.functions import unix_timestamp, from_unixtime, to_date

In [None]:
from pyspark.sql.functions import *
df=spark.createDataFrame([["02/03/2013"],["05/06/2023"]],["input"])
df_vendas.select(col("Promised Delivery Date"),to_date(col("Promised Delivery Date"),"dd/MM/yyyy").alias("Promised Delivery Date")) \
  .show()

In [23]:
# Antes de continuar com limpeza dos dados vamos corrigir as colunas que estão com os tipos errados de dados
cols_float = ['Discount Amount', 'List Price', 'Sales Amount', 'Sales Amount Based on List Price', 'Sales Margin Amount',\
              'Sales Price']

for col_name in cols_float:
    df_vendas = df_vendas.withColumn(col_name, col(col_name).cast('float'))
    
cols_dates = ['Actual Delivery Date', 'DateKey', 'Invoice Date', 'Promised Delivery Date']

for col_name_data in cols_dates:
    df_vendas = df_vendas.withColumn(col_name_data, to_date(col(col_name_data),"dd/MM/yyyy"))
    

In [24]:
df_vendas.printSchema()

root
 |-- Actual Delivery Date: date (nullable = true)
 |-- CustomerKey: integer (nullable = true)
 |-- DateKey: date (nullable = true)
 |-- Discount Amount: float (nullable = true)
 |-- Invoice Date: date (nullable = true)
 |-- Invoice Number: integer (nullable = true)
 |-- Item Class: string (nullable = true)
 |-- Item Number: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- Line Number: integer (nullable = true)
 |-- List Price: float (nullable = true)
 |-- Order Number: integer (nullable = true)
 |-- Promised Delivery Date: date (nullable = true)
 |-- Sales Amount: float (nullable = true)
 |-- Sales Amount Based on List Price: float (nullable = true)
 |-- Sales Cost Amount: string (nullable = true)
 |-- Sales Margin Amount: float (nullable = true)
 |-- Sales Price: float (nullable = true)
 |-- Sales Quantity: integer (nullable = true)
 |-- Sales Rep: integer (nullable = true)
 |-- U/M: string (nullable = true)



In [25]:
# Realizando a limpeza dos dados que são nulos na tabela

for c in df_vendas.columns:
    if (str(df_vendas.schema[c].dataType) == 'StringType'):
        df_vendas = df_vendas.withColumn(c, when(df_vendas[c].isNull(), 'não informado').otherwise(df_vendas[c]))
    if (str(df_vendas.schema[c].dataType) == 'FloatType'):
        df_vendas = df_vendas.withColumn(c, when(df_vendas[c].isNull(), 0.0).otherwise(df_vendas[c]))
        


In [28]:
# Verificando novamente a existência de dados Null ou NaN para visualizar se as linhas nulas foram removidas
df_vendas.select([count(when(col(c).isNull(), c)).alias(c) for c in df_vendas.columns]).show()

+--------------------+-----------+-------+---------------+------------+--------------+----------+-----------+----+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+
|Actual Delivery Date|CustomerKey|DateKey|Discount Amount|Invoice Date|Invoice Number|Item Class|Item Number|Item|Line Number|List Price|Order Number|Promised Delivery Date|Sales Amount|Sales Amount Based on List Price|Sales Cost Amount|Sales Margin Amount|Sales Price|Sales Quantity|Sales Rep|U/M|
+--------------------+-----------+-------+---------------+------------+--------------+----------+-----------+----+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+
|                   0|          0|      0|              0|           0|             0|         0|      

In [72]:
'''
5 - efetuar as responder as perguntas:

- quantos pedidos foram realizados
- quantos clientes tem em nossa base
- quantos clientes temos por Região
- quantidade de vendas em 2018
'''

# Criando as views:
df_clientes.createOrReplaceTempView('vw_clientes')
df_vendas.createOrReplaceTempView('vw_vendas')
df_regiao.createOrReplaceTempView('vw_regiao')

In [71]:
# REPOSTA PARA - QUANTOS PEDIDOS FORAM REALIZADOS
spark.sql('''
            SELECT 
            COUNT(*) AS Num_registros
            FROM vw_vendas
        ''').show()

+-------------+
|Num_registros|
+-------------+
|        65282|
+-------------+



In [None]:
# REPOSTA PARA - QUANTOS CLIENTES TEM EM NOSSA BASE
'''
Quantidade de clientes na base de dados é definida pela quantidade total de linhas, e não pela quantidade distinta de 
"Customerkey". Ao verificar a quantidade distinta da coluna "Customerkey" é possivel verificar que a quantidade de ocorrencia é
uma unidade a menos que a quantidade de linhas da tabela. Investigando mais a fundo os dois registro com a mesma "Customerkey" 
é possivel visulizar os campos "Customer Type" e "Regional Sales Mgr" possuem valores diferentes. Então eu considerei que o 
número total de clientes seria o número de linhas da tabela.

'''
spark.sql('''
            SELECT 
            COUNT(*) AS Num_registros,
            COUNT(DISTINCT CustomerKey) AS Num_CustomerKey
            FROM vw_clientes
        ''').show()

In [61]:
spark.sql('''
            SELECT
            CustomerKey,
            COUNT(Customer) AS Num_clientes
            FROM vw_clientes
            GROUP BY CustomerKey
            HAVING COUNT(CustomerKey) > 1
        ''').show()

+-----------+------------+
|CustomerKey|Num_clientes|
+-----------+------------+
|   10021911|           2|
+-----------+------------+



In [62]:
spark.sql('''
            SELECT *  
            FROM vw_clientes
            where CustomerKey = '10021911' 
        ''').show()

+--------------+---------------+-------------+---------+-----------+-------------+--------+----------------+------------+-----------+------------------+-----------+
|Address Number|Business Family|Business Unit| Customer|CustomerKey|Customer Type|Division|Line of Business|       Phone|Region Code|Regional Sales Mgr|Search Type|
+--------------+---------------+-------------+---------+-----------+-------------+--------+----------------+------------+-----------+------------------+-----------+
|      10021911|             R3|            1|PING Shop|   10021911|           G1|       2|              M1|816-455-8733|          2|               S19|          C|
|      10021911|             R3|            1|Ping Shop|   10021911|           G2|       2|              M1|816-455-8733|          2|               S16|          C|
+--------------+---------------+-------------+---------+-----------+-------------+--------+----------------+------------+-----------+------------------+-----------+



In [76]:
# REPOSTA PARA - QUANTOS CLIENTES TEMOS POR REGIÃO:
spark.sql('''
            SELECT 
            vw_regiao.`Region Name`,
            COUNT(*)
            FROM vw_clientes 
            INNER JOIN vw_regiao
            ON vw_clientes.`Region Code` = vw_regiao.`Region Code`
            GROUP BY vw_regiao.`Region Name`
''').show()

+-------------+--------+
|  Region Name|count(1)|
+-------------+--------+
|International|     299|
|     Southern|     100|
|      Central|     117|
|       Canada|      37|
|      Western|      89|
|    Northeast|      42|
+-------------+--------+



In [78]:
# REPOSTA PARA - QUANTIDADE DE VENDAS EM 2018

spark.sql('''
        SELECT 
        COUNT(*) AS Num_vendas_2018
        FROM vw_vendas
        WHERE YEAR(DateKey) = 2018
''').show()

+---------------+
|Num_vendas_2018|
+---------------+
|          30560|
+---------------+

