<a href="https://colab.research.google.com/github/VitorPaes/L3_Test/blob/main/NFes_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Teste de transformação de dataframe e modelagem

In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 38 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 57.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805911 sha256=1abbe8663f4ee58bb070a56f30886d7ab494ebb76db8567f7ee1f09fbfba8ad1
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [4]:
# import base do pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, explode, round as spark_round

sc = SparkContext()
sqlContext = SQLContext(sc)



In [6]:
input_data = sqlContext.read.json('/data.json', multiLine=True)

## Flattening nested data



In [8]:
input_data.show(truncate=False)

+----------------------+--------+-------------------+-----------------------------------------------------------------+-----+---------+
|CreateDate            |Discount|EmissionDate       |ItemList                                                         |NFeID|NFeNumber|
+----------------------+--------+-------------------+-----------------------------------------------------------------+-----+---------+
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|[{Rice, 2, 35.55}, {Flour, 5, 11.55}, {Bean, 7, 27.15}]          |1    |501      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|[{Tomate, 10, 12.25}, {Pasta, 5, 7.55}]                          |2    |502      |
|2021-05-24T20:21:34.79|0.0     |2021-05-24T00:00:00|[{Beer, 6, 9.0}, {French fries, 2, 10.99}, {Ice cream, 1, 27.15}]|3    |503      |
+----------------------+--------+-------------------+-----------------------------------------------------------------+-----+---------+



In [9]:
input_data.printSchema()

root
 |-- CreateDate: string (nullable = true)
 |-- Discount: double (nullable = true)
 |-- EmissionDate: string (nullable = true)
 |-- ItemList: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ProductName: string (nullable = true)
 |    |    |-- Quantity: long (nullable = true)
 |    |    |-- Value: double (nullable = true)
 |-- NFeID: long (nullable = true)
 |-- NFeNumber: long (nullable = true)



### Função _explode_ e como ela funciona

Analisando o nosso DataFrame inicial, temos na coluna `ItemList` um array de structs, ou seja, uma lista de objetos. Como podemos lidar com isso? 

Uma das formas de achatar os dados é utilizar a função _explode_ do spark, separando a lista em linhas diferentes para cada objeto.

In [11]:
exploded_data = input_data.select(*input_data.columns, explode("ItemList").alias("Item")).drop("ItemList")

In [12]:
exploded_data.show()

+--------------------+--------+-------------------+-----+---------+--------------------+
|          CreateDate|Discount|       EmissionDate|NFeID|NFeNumber|                Item|
+--------------------+--------+-------------------+-----+---------+--------------------+
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    1|      501|    {Rice, 2, 35.55}|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    1|      501|   {Flour, 5, 11.55}|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    1|      501|    {Bean, 7, 27.15}|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    2|      502| {Tomate, 10, 12.25}|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    2|      502|    {Pasta, 5, 7.55}|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    3|      503|      {Beer, 6, 9.0}|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    3|      503|{French fries, 2,...|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    3|      503|{Ice cream, 1, 27...|
+--------------------

In [13]:
exploded_data.printSchema()

root
 |-- CreateDate: string (nullable = true)
 |-- Discount: double (nullable = true)
 |-- EmissionDate: string (nullable = true)
 |-- NFeID: long (nullable = true)
 |-- NFeNumber: long (nullable = true)
 |-- Item: struct (nullable = true)
 |    |-- ProductName: string (nullable = true)
 |    |-- Quantity: long (nullable = true)
 |    |-- Value: double (nullable = true)



### Expandindo a columa Item em colunas de itens separados



In [14]:
expanded_columns_data = exploded_data                               \
                .withColumn("ProductName", col("Item.ProductName")) \
                .withColumn("Quantity",    col("Item.Quantity"))    \
                .withColumn("Value",       col("Item.Value"))       \
                .drop("Item")

In [15]:
expanded_columns_data.show()

+--------------------+--------+-------------------+-----+---------+------------+--------+-----+
|          CreateDate|Discount|       EmissionDate|NFeID|NFeNumber| ProductName|Quantity|Value|
+--------------------+--------+-------------------+-----+---------+------------+--------+-----+
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    1|      501|        Rice|       2|35.55|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    1|      501|       Flour|       5|11.55|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    1|      501|        Bean|       7|27.15|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    2|      502|      Tomate|      10|12.25|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    2|      502|       Pasta|       5| 7.55|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    3|      503|        Beer|       6|  9.0|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    3|      503|French fries|       2|10.99|
|2021-05-24T20:21:...|     0.0|2021-05-2

## Separando o DataFrame em dois 

In [16]:
notas = input_data.drop("ItemList")

produtos = expanded_columns_data.select("NFeID", "ProductName", "Quantity", "Value")

In [17]:
notas.show()

+--------------------+--------+-------------------+-----+---------+
|          CreateDate|Discount|       EmissionDate|NFeID|NFeNumber|
+--------------------+--------+-------------------+-----+---------+
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    1|      501|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    2|      502|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    3|      503|
+--------------------+--------+-------------------+-----+---------+



In [18]:
produtos.show()

+-----+------------+--------+-----+
|NFeID| ProductName|Quantity|Value|
+-----+------------+--------+-----+
|    1|        Rice|       2|35.55|
|    1|       Flour|       5|11.55|
|    1|        Bean|       7|27.15|
|    2|      Tomate|      10|12.25|
|    2|       Pasta|       5| 7.55|
|    3|        Beer|       6|  9.0|
|    3|French fries|       2|10.99|
|    3|   Ice cream|       1|27.15|
+-----+------------+--------+-----+

