# Classe FlowItems

Classe que implementa o fluxo de Items por Order herdando da classe FlowBase.

In [2]:
from pyspark.sql.functions import col, concat, regexp_replace, max, lit, from_json, explode
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField, StringType
from datetime import datetime
import json
import re
from pyspark.sql.functions import udf

class FlowItems(FlowBase):
    def run(self):
        orderDF = FlowBase.loadUsedDatasets('Order')
        
        json_schema = ArrayType(StructType([
          StructField("name", StringType(), True), \
          StructField("addition", 
            StructType([ StructField("value", StringType(), True), \
                        StructField("currency",StringType(), True)])),
          StructField("discount", 
            StructType([ StructField("value", StringType(), True), \
                        StructField("currency",StringType(), True)])),
          StructField("quantity", StringType(), True), \
          StructField("sequence", StringType(), True), \
          StructField("unitPrice", 
            StructType([ StructField("value", StringType(), True), \
                        StructField("currency",StringType(), True)])),
          StructField("externalId", StringType(), True), \
          StructField("totalValue", 
            StructType([ StructField("value", StringType(), True), \
                        StructField("currency",StringType(), True)])),
          StructField("customerNote", StringType(), True), \
          StructField("garnishItems", StringType(), True), \
          StructField("integrationId", StringType(), True), \
          StructField("totalAddition", 
            StructType([ StructField("value", StringType(), True), \
                        StructField("currency",StringType(), True)])),
          StructField("totalDiscount", 
            StructType([ StructField("value", StringType(), True), \
                        StructField("currency",StringType(), True)]))]))
        
        udf_parse_json = udf(FlowBase.parse_json, json_schema)
        df_new = orderDF.select(orderDF.order_id, udf_parse_json(orderDF.items).alias("items_2"))
        
        df = df_new.select(df_new.order_id, explode(df_new.items_2))
        
        df = df.select(   col('order_id'),\
                          col('col.name').alias('item_name'),\
                          col('col.addition.value').alias('item_adition_value'),\
                          col('col.addition.currency').alias('item_addition_currency'),\
                          col('col.discount.value').alias('item_discount_value'),\
                          col('col.discount.currency').alias('item_discount_currency'),\
                          col('col.quantity').alias('item_quantity'),\
                          col('col.sequence').alias('item_sequence'),\
                          col('col.unitPrice.value').alias('item_unitPrice_value'),\
                          col('col.unitPrice.currency').alias('item_unitPrice_currency'),\
                          col('col.externalId').alias('item_externalId'),\
                          col('col.totalValue.value').alias('item_totalValue_value'),\
                          col('col.totalValue.currency').alias('item_totalValue_currency'),\
                          col('col.customerNote').alias('item_customerNote'),\
                          col('col.garnishItems').alias('item_garnishItems'),\
                          col('col.integrationId').alias('item_integrationId'),\
                          col('col.totalAddition.value').alias('item_totalAddition_value'),\
                          col('col.totalAddition.currency').alias('item_totalAddition_currency'),\
                          col('col.totalDiscount.value').alias('item_totalDiscount_value'),\
                          col('col.totalDiscount.currency').alias('item_totalDiscount_currency')
                         )
        df_final = orderDF.join(df, orderDF.order_id == df.order_id)
        
        if FlowBase.checkDuplicates(df_final):
            dbutils.notebook.exit('ERROR: Existem linhas duplicadas')
            
        if FlowBase.checkColumnsNull(df, ['cpf', 'order_id']):
            dbutils.notebook.exit('ERROR: Existem chaves com valores nulos')
        
        spark_udf = udf(FlowBase.encrypt_value, StringType())
        df_final = df_final.withColumn('cpf', spark_udf(col('cpf')))
        
        return df_final