# Expansão dos dados aninhados e desnormalização dos dados 

Nosso primeiro objetivo será criar uma tabela desnormalizada (todas as informações em uma única tabela)

In [1]:
from pyspark.sql.functions import explode
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Criando dataframes a partir de dados aninhados').getOrCreate()


22/01/24 02:03:43 WARN Utils: Your hostname, MacBook-Pro-do-Renato-Turtienski.local resolves to a loopback address: 127.0.0.1; using 192.168.0.202 instead (on interface en0)
22/01/24 02:03:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/01/24 02:03:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Lemos o json para dentro de dataframe ainda não normalizado
df = spark.read.option('multiline', True).json('data.json')

# dando uma olhada no esquema dos dados aninhados
df.printSchema()



root
 |-- CreateDate: string (nullable = true)
 |-- Discount: double (nullable = true)
 |-- EmissionDate: string (nullable = true)
 |-- ItemList: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ProductName: string (nullable = true)
 |    |    |-- Quantity: long (nullable = true)
 |    |    |-- Value: double (nullable = true)
 |-- NFeID: long (nullable = true)
 |-- NFeNumber: long (nullable = true)



In [3]:
df.show(df.count(), False)

''' podemos perceber que, o número de itens são oitos, sendo:
    Rice, Flour, Bean, Tomate, Pasta, Beer, French Fries e Ice Cream
    O que denota que teremos uma tabela normalizada com oito linhas
'''

+----------------------+--------+-------------------+-----------------------------------------------------------------+-----+---------+
|CreateDate            |Discount|EmissionDate       |ItemList                                                         |NFeID|NFeNumber|
+----------------------+--------+-------------------+-----------------------------------------------------------------+-----+---------+
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|[{Rice, 2, 35.55}, {Flour, 5, 11.55}, {Bean, 7, 27.15}]          |1    |501      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|[{Tomate, 10, 12.25}, {Pasta, 5, 7.55}]                          |2    |502      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|[{Beer, 6, 9.0}, {French fries, 2, 10.99}, {Ice cream, 1, 27.15}]|3    |503      |
+----------------------+--------+-------------------+-----------------------------------------------------------------+-----+---------+



' podemos perceber que, o número de itens são oitos, sendo:\n    Rice, Flour, Bean, Tomate, Pasta, Beer, French Fries e Ice Cream\n    O que denota que teremos uma tabela normalizada com oito linhas\n'

In [4]:
# Vamos desnormalizar o dataframe, Com a coluna com dados aninhados (ItemList) sendo "explodida"


df_desnormalizado = df.withColumn('ItemList', explode('ItemList'))\
                 .select('CreateDate', 'Discount', 'EmissionDate', 'ItemList.*', 'NFeID', 'NFeNumber')

In [7]:
# Por fim temos os dados desnormalizados 
df_desnormalizado.show(df_desnormalizado.count(), False)

+----------------------+--------+-------------------+------------+--------+-----+-----+---------+
|CreateDate            |Discount|EmissionDate       |ProductName |Quantity|Value|NFeID|NFeNumber|
+----------------------+--------+-------------------+------------+--------+-----+-----+---------+
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|Rice        |2       |35.55|1    |501      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|Flour       |5       |11.55|1    |501      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|Bean        |7       |27.15|1    |501      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|Tomate      |10      |12.25|2    |502      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|Pasta       |5       |7.55 |2    |502      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|Beer        |6       |9.0  |3    |503      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|French fries|2       |10.99|3    |503      |
|2021-05-24T20:21:34

# Criação dos dataframes 
Criemos agora os dois dataframes que devem ser releacionar por uma chave
Para tal, usaremos NFeID como chave

Serão:
* Tabela 1: NFe:
    * Variáveis: NFeID, NFeNumber, EmissionDate, CreateDate

* Tabela 2: Itens:
    * Variáveis: NFeID, ProductName, Quantity, Value
    
.