# Notebook de desenvolvimento de tarefas relacionadas ao teste para Engenharia de Dados da Lambda 3

## Segunfa tarefa: Utilizando o Pyspark para cálculo de total líquido da empresa de cartões de crédito

In [1]:
import pandas as pd

In [3]:
transacoes = [
{'transacao_id' : 1, 'total_bruto' : 3000, 'desconto_percentual' : 6.99},
{'transacao_id' : 2, 'total_bruto' : 57989, 'desconto_percentual' : 1.45},
{'transacao_id' : 4, 'total_bruto' : 1, 'desconto_percentual' : None},
{'transacao_id' : 5, 'total_bruto' : 34, 'desconto_percentual' : 0.0},

]


In [4]:
df_transacoes = pd.DataFrame(transacoes)

In [5]:
df_transacoes

Unnamed: 0,transacao_id,total_bruto,desconto_percentual
0,1,3000,6.99
1,2,57989,1.45
2,4,1,
3,5,34,0.0


### Validando a lógica com Pandas

In [7]:
df_transacoes.desconto_percentual = df_transacoes.desconto_percentual.fillna(0.0)

sum(df_transacoes['total_bruto'] * ((100 - df_transacoes['desconto_percentual']) / 100))

59973.4595

### Implementando a lógica em Pyspark

In [8]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
from pyspark.sql import functions as F

In [9]:
# Create a Spark session
spark = SparkSession.builder.appName("ReadDictionary").getOrCreate()

23/02/09 21:56:29 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.15.129 instead (on interface wlp0s20f3)
23/02/09 21:56:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/09 21:56:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [10]:
# Convert the dictionary to a Pandas dataframe
df = pd.DataFrame(transacoes)
df

Unnamed: 0,transacao_id,total_bruto,desconto_percentual
0,1,3000,6.99
1,2,57989,1.45
2,4,1,
3,5,34,0.0


In [11]:
# Convert the Pandas dataframe to a Spark dataframe
spark_df = spark.createDataFrame(df)
spark_df = spark_df.na.fill(0, subset=["desconto_percentual"])

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


In [12]:
spark_df.show()

+------------+-----------+-------------------+
|transacao_id|total_bruto|desconto_percentual|
+------------+-----------+-------------------+
|           1|       3000|               6.99|
|           2|      57989|               1.45|
|           4|          1|                0.0|
|           5|         34|                0.0|
+------------+-----------+-------------------+



In [13]:
spark_df = spark_df.withColumn("total_liquido", expr("total_bruto * ((100 - desconto_percentual) / 100)"))

In [14]:
spark_df.show()

+------------+-----------+-------------------+------------------+
|transacao_id|total_bruto|desconto_percentual|     total_liquido|
+------------+-----------+-------------------+------------------+
|           1|       3000|               6.99|            2790.3|
|           2|      57989|               1.45|57148.159499999994|
|           4|          1|                0.0|               1.0|
|           5|         34|                0.0|              34.0|
+------------+-----------+-------------------+------------------+



In [15]:
sum_net_total = spark_df.agg(F.sum(spark_df.total_liquido)).collect()[0][0]
print(sum_net_total)

59973.4595


## Terceira Tarefa: Do JSON para um modelo relacional de tabelas

In [17]:
df_task3 = pd.read_json("datasets/dataset_tarefa3.json")

In [18]:
df_task3

Unnamed: 0,CreateDate,EmissionDate,Discount,NFeNumber,NFeID,ItemList
0,2021-05-24T20:21:34.79,2021-05-24T00:00:00,0,501,1,"[{'ProductName': 'Rice', 'Value': 35.55, 'Quan..."
1,2021-05-24T20:21:34.79,2021-05-24T00:00:00,0,502,2,"[{'ProductName': 'Tomate', 'Value': 12.25, 'Qu..."
2,2021-05-24T20:21:34.79,2021-05-24T00:00:00,0,503,3,"[{'ProductName': 'Beer', 'Value': 9.0, 'Quanti..."


In [20]:
df_exploded_task3 = df_task3.explode('ItemList')

In [21]:
df_exploded_task3

Unnamed: 0,CreateDate,EmissionDate,Discount,NFeNumber,NFeID,ItemList
0,2021-05-24T20:21:34.79,2021-05-24T00:00:00,0,501,1,"{'ProductName': 'Rice', 'Value': 35.55, 'Quant..."
0,2021-05-24T20:21:34.79,2021-05-24T00:00:00,0,501,1,"{'ProductName': 'Flour', 'Value': 11.55, 'Quan..."
0,2021-05-24T20:21:34.79,2021-05-24T00:00:00,0,501,1,"{'ProductName': 'Bean', 'Value': 27.15, 'Quant..."
1,2021-05-24T20:21:34.79,2021-05-24T00:00:00,0,502,2,"{'ProductName': 'Tomate', 'Value': 12.25, 'Qua..."
1,2021-05-24T20:21:34.79,2021-05-24T00:00:00,0,502,2,"{'ProductName': 'Pasta', 'Value': 7.55, 'Quant..."
2,2021-05-24T20:21:34.79,2021-05-24T00:00:00,0,503,3,"{'ProductName': 'Beer', 'Value': 9.0, 'Quantit..."
2,2021-05-24T20:21:34.79,2021-05-24T00:00:00,0,503,3,"{'ProductName': 'French fries', 'Value': 10.99..."
2,2021-05-24T20:21:34.79,2021-05-24T00:00:00,0,503,3,"{'ProductName': 'Ice cream', 'Value': 27.15, '..."


In [22]:
df_normalized = pd.json_normalize(df_exploded_task3['ItemList'])

In [24]:
df_normalized['NFeID'] = df_exploded_task3['NFeID'].reset_index().drop(columns='index')
df_normalized

Unnamed: 0,ProductName,Value,Quantity,NFeID
0,Rice,35.55,2,1
1,Flour,11.55,5,1
2,Bean,27.15,7,1
3,Tomate,12.25,10,2
4,Pasta,7.55,5,2
5,Beer,9.0,6,3
6,French fries,10.99,2,3
7,Ice cream,27.15,1,3


In [25]:
df_task3_normalized = df_task3.drop(columns='ItemList')

In [26]:
df_task3_normalized

Unnamed: 0,CreateDate,EmissionDate,Discount,NFeNumber,NFeID
0,2021-05-24T20:21:34.79,2021-05-24T00:00:00,0,501,1
1,2021-05-24T20:21:34.79,2021-05-24T00:00:00,0,502,2
2,2021-05-24T20:21:34.79,2021-05-24T00:00:00,0,503,3
