## Teste de transformação de dataframe e modelagem

In [1]:
# import base do pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, explode, round as spark_round

sc = SparkContext()
sqlContext = SQLContext(sc)

In [2]:
input_data = sqlContext.read.json('input/data.json', multiLine=True)

## Achatando dados aninhados em pyspark

Para cada etapa, a fim de visualização e entendimento do processo, usarei as funções _show_, que mostra os dados em forma de tabela e _printSchema_, que mostra a estrutura do schema do dataframe.

In [3]:
input_data.show(truncate=False)

+----------------------+--------+-------------------+-----------------------------------------------------------------+-----+---------+
|CreateDate            |Discount|EmissionDate       |ItemList                                                         |NFeID|NFeNumber|
+----------------------+--------+-------------------+-----------------------------------------------------------------+-----+---------+
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|[{Rice, 2, 35.55}, {Flour, 5, 11.55}, {Bean, 7, 27.15}]          |1    |501      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|[{Tomate, 10, 12.25}, {Pasta, 5, 7.55}]                          |2    |502      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|[{Beer, 6, 9.0}, {French fries, 2, 10.99}, {Ice cream, 1, 27.15}]|3    |503      |
+----------------------+--------+-------------------+-----------------------------------------------------------------+-----+---------+



In [4]:
input_data.printSchema()

root
 |-- CreateDate: string (nullable = true)
 |-- Discount: double (nullable = true)
 |-- EmissionDate: string (nullable = true)
 |-- ItemList: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ProductName: string (nullable = true)
 |    |    |-- Quantity: long (nullable = true)
 |    |    |-- Value: double (nullable = true)
 |-- NFeID: long (nullable = true)
 |-- NFeNumber: long (nullable = true)



### Função _explode_ e como ela funciona

Analisando o nosso DataFrame inicial, temos na coluna `ItemList` um array de structs, ou seja, uma lista de objetos. Como podemos lidar com isso? 

Uma das formas de achatar os dados é utilizar a função _explode_ do spark, separando a lista em linhas diferentes para cada objeto.

In [5]:
exploded_data = input_data.select(*input_data.columns, explode("ItemList").alias("Item")).drop("ItemList")

In [6]:
exploded_data.show()

+--------------------+--------+-------------------+-----+---------+--------------------+
|          CreateDate|Discount|       EmissionDate|NFeID|NFeNumber|                Item|
+--------------------+--------+-------------------+-----+---------+--------------------+
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    1|      501|    {Rice, 2, 35.55}|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    1|      501|   {Flour, 5, 11.55}|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    1|      501|    {Bean, 7, 27.15}|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    2|      502| {Tomate, 10, 12.25}|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    2|      502|    {Pasta, 5, 7.55}|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    3|      503|      {Beer, 6, 9.0}|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    3|      503|{French fries, 2,...|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    3|      503|{Ice cream, 1, 27...|
+--------------------

In [7]:
exploded_data.printSchema()

root
 |-- CreateDate: string (nullable = true)
 |-- Discount: double (nullable = true)
 |-- EmissionDate: string (nullable = true)
 |-- NFeID: long (nullable = true)
 |-- NFeNumber: long (nullable = true)
 |-- Item: struct (nullable = true)
 |    |-- ProductName: string (nullable = true)
 |    |-- Quantity: long (nullable = true)
 |    |-- Value: double (nullable = true)



### Expandindo a columa Item em colunas de itens separados

Após utilizar o _explode_, transformamos a estrutura da nossa lista de itens em apenas um item por linha, mas ainda temos uma hierarquia, pois o Item ainda é um objeto com 3 campos próprios.

Como temos apenas uma estrutura com 3 colunas, esta parte é simples de resolver, precisamos apenas criar colunas novas com os conteúdos de cada campo do Item, e excluir a coluna Item no final,

Ao fazer a separação, chegamos a um DataFrame _flat_, sem nenhuma estrutura aninhada, e resulta no que seria a saída de um _join_ entre as duas tabelas relacionais da próxima etapa, tendo como _key_ o `NFeID`.

In [8]:
expanded_columns_data = exploded_data                               \
                .withColumn("ProductName", col("Item.ProductName")) \
                .withColumn("Quantity",    col("Item.Quantity"))    \
                .withColumn("Value",       col("Item.Value"))       \
                .drop("Item")

### Outra possibilidade:

Se estivéssemos lidando com uma estrutura aninhada muito maior, existem formas menos verbosas de chegar ao resultado esperado. Criando uma função flatten_df, que identifica as colunas de tipo _struct_ e seleciona as subcolunas na estrutura raiz. 

In [9]:
def flatten_df(nested_df):
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']

    flat_df = nested_df.select(flat_cols +
                               [col(nc+'.'+c).alias(c)
                                for nc in nested_cols
                                for c in nested_df.select(nc+'.*').columns])
    return flat_df

flattened_data = flatten_df(exploded_data)

In [10]:
flattened_data.show()

+--------------------+--------+-------------------+-----+---------+------------+--------+-----+
|          CreateDate|Discount|       EmissionDate|NFeID|NFeNumber| ProductName|Quantity|Value|
+--------------------+--------+-------------------+-----+---------+------------+--------+-----+
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    1|      501|        Rice|       2|35.55|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    1|      501|       Flour|       5|11.55|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    1|      501|        Bean|       7|27.15|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    2|      502|      Tomate|      10|12.25|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    2|      502|       Pasta|       5| 7.55|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    3|      503|        Beer|       6|  9.0|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    3|      503|French fries|       2|10.99|
|2021-05-24T20:21:...|     0.0|2021-05-2

In [11]:
expanded_columns_data.show()

+--------------------+--------+-------------------+-----+---------+------------+--------+-----+
|          CreateDate|Discount|       EmissionDate|NFeID|NFeNumber| ProductName|Quantity|Value|
+--------------------+--------+-------------------+-----+---------+------------+--------+-----+
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    1|      501|        Rice|       2|35.55|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    1|      501|       Flour|       5|11.55|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    1|      501|        Bean|       7|27.15|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    2|      502|      Tomate|      10|12.25|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    2|      502|       Pasta|       5| 7.55|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    3|      503|        Beer|       6|  9.0|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    3|      503|French fries|       2|10.99|
|2021-05-24T20:21:...|     0.0|2021-05-2

In [12]:
expanded_columns_data.printSchema()

root
 |-- CreateDate: string (nullable = true)
 |-- Discount: double (nullable = true)
 |-- EmissionDate: string (nullable = true)
 |-- NFeID: long (nullable = true)
 |-- NFeNumber: long (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- Value: double (nullable = true)



## Separando o DataFrame em dois seguindo o modelo relacional

In [13]:
notas = input_data.drop("ItemList")

produtos = expanded_columns_data.select("NFeID", "ProductName", "Quantity", "Value")

In [14]:
notas.show()

+--------------------+--------+-------------------+-----+---------+
|          CreateDate|Discount|       EmissionDate|NFeID|NFeNumber|
+--------------------+--------+-------------------+-----+---------+
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    1|      501|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    2|      502|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    3|      503|
+--------------------+--------+-------------------+-----+---------+



In [15]:
produtos.show()

+-----+------------+--------+-----+
|NFeID| ProductName|Quantity|Value|
+-----+------------+--------+-----+
|    1|        Rice|       2|35.55|
|    1|       Flour|       5|11.55|
|    1|        Bean|       7|27.15|
|    2|      Tomate|      10|12.25|
|    2|       Pasta|       5| 7.55|
|    3|        Beer|       6|  9.0|
|    3|French fries|       2|10.99|
|    3|   Ice cream|       1|27.15|
+-----+------------+--------+-----+

