In [1]:
# Instalando JDK
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
# Instalando Pyspark
!pip install -q pyspark

[K     |████████████████████████████████| 281.4 MB 44 kB/s 
[K     |████████████████████████████████| 199 kB 67.5 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [3]:
# Setando varíavel e iniciando sessão

import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [4]:
transacoes = [{'transacao_id': 1, 'total_bruto': 3000, 'desconto_percentual': 6.99},
              {'transacao_id': 2, 'total_bruto': 57989, 'desconto_percentual': 1.45},
              {'transacao_id': 4, 'total_bruto': 1, 'desconto_percentual': None},
              {'transacao_id': 5, 'total_bruto': 34, 'desconto_percentual': 0.0}]

In [5]:
# Criando Dataframe das transações

df = spark.createDataFrame(transacoes)
df.show()

+-------------------+-----------+------------+
|desconto_percentual|total_bruto|transacao_id|
+-------------------+-----------+------------+
|               6.99|       3000|           1|
|               1.45|      57989|           2|
|               null|          1|           4|
|                0.0|         34|           5|
+-------------------+-----------+------------+



In [6]:
# Alterando valores nulos da coluna desconto_percentual para 0

df = df.na.fill(value=0,subset=["desconto_percentual"])

In [7]:
df.show()

+-------------------+-----------+------------+
|desconto_percentual|total_bruto|transacao_id|
+-------------------+-----------+------------+
|               6.99|       3000|           1|
|               1.45|      57989|           2|
|                0.0|          1|           4|
|                0.0|         34|           5|
+-------------------+-----------+------------+



In [8]:
# Verificando tipo de dados das colunas para efetuar os cálculos

df.printSchema()

root
 |-- desconto_percentual: double (nullable = false)
 |-- total_bruto: long (nullable = true)
 |-- transacao_id: long (nullable = true)



In [9]:
# Criando TempView para usar spark sql

df.createOrReplaceTempView("dataframe")

In [10]:
# Realizando cálculo da soma do total bruto menos o valor do desconto, arredondando para 2 casas decimais

df2 = spark.sql("SELECT round(sum(total_bruto * (1 - desconto_percentual / 100 )), 2) as total from dataframe")

In [11]:
df2.show()

+--------+
|   total|
+--------+
|59973.46|
+--------+



O Valor total líquido da empresa é 59973.46

In [45]:
# from pyspark.sql.types import StructType
# import json

In [12]:
from pyspark.sql import functions as F

In [13]:
# Ingerindo arquivo JSON
nf_df = spark.read.option("multiline","true").json('/content/data.json')
nf_df.show()

+--------------------+--------+-------------------+--------------------+-----+---------+
|          CreateDate|Discount|       EmissionDate|            ItemList|NFeID|NFeNumber|
+--------------------+--------+-------------------+--------------------+-----+---------+
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|[{Rice, 2, 35.55}...|    1|      501|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|[{Tomate, 10, 12....|    2|      502|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|[{Beer, 6, 9.0}, ...|    3|      503|
+--------------------+--------+-------------------+--------------------+-----+---------+



In [15]:
# Criamos um dataframe para normalizar os dados, usando o campo NFeID para fazer o join com o dataframe principal

df_items = nf_df.withColumn("Data", F.explode("ItemList")).select("Data.ProductName", "Data.Quantity", "Data.Value", "NFeID").show()

+------------+--------+-----+-----+
| ProductName|Quantity|Value|NFeID|
+------------+--------+-----+-----+
|        Rice|       2|35.55|    1|
|       Flour|       5|11.55|    1|
|        Bean|       7|27.15|    1|
|      Tomate|      10|12.25|    2|
|       Pasta|       5| 7.55|    2|
|        Beer|       6|  9.0|    3|
|French fries|       2|10.99|    3|
|   Ice cream|       1|27.15|    3|
+------------+--------+-----+-----+



In [19]:
# Criamos o dataframe principal denormalizado, para análise de forma mais fácil, evitando join's, com todas as informações da coluna expandida

df_principal = (nf_df.select('*', F.explode('ItemList').alias('items'))
         .select('CreateDate', 'Discount', 'EmissionDate', 'items.*', 'NFeID', 'NFeNumber'))

In [21]:
df_principal.show(truncate=False)

+----------------------+--------+-------------------+------------+--------+-----+-----+---------+
|CreateDate            |Discount|EmissionDate       |ProductName |Quantity|Value|NFeID|NFeNumber|
+----------------------+--------+-------------------+------------+--------+-----+-----+---------+
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|Rice        |2       |35.55|1    |501      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|Flour       |5       |11.55|1    |501      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|Bean        |7       |27.15|1    |501      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|Tomate      |10      |12.25|2    |502      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|Pasta       |5       |7.55 |2    |502      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|Beer        |6       |9.0  |3    |503      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|French fries|2       |10.99|3    |503      |
|2021-05-24T20:21:34

In [22]:
# Criando o dataframe normalizado, nos moldes de uma tabela fato, resguardando a chave NFeID para join com dataframe_items

df_fact = (df_principal.select('CreateDate', 'Discount', 'EmissionDate', 'NFeID', 'NFeNumber'))

In [23]:
df_fact.show(truncate=False)

+----------------------+--------+-------------------+-----+---------+
|CreateDate            |Discount|EmissionDate       |NFeID|NFeNumber|
+----------------------+--------+-------------------+-----+---------+
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|1    |501      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|1    |501      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|1    |501      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|2    |502      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|2    |502      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|3    |503      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|3    |503      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|3    |503      |
+----------------------+--------+-------------------+-----+---------+

