In [0]:
# Importando libs
from pyspark.sql.functions import months_between, lit, to_date
from pyspark.sql import functions as F

# Carregando bases
order = spark.table('workspace.tabelas_ifood.order')
order_details = spark.table('workspace.tabelas_ifood.order_details')
restaurant = spark.table('workspace.tabelas_ifood.restaurant')
consumer = spark.table('workspace.tabelas_ifood.consumer')
ab_ref = spark.table('workspace.tabelas_ifood.ab_test_ref')

#Tratando tabelas
restaurant = restaurant.withColumnRenamed('id','merchant_id')
restaurant = restaurant.withColumnRenamed('created_at','restaurant_created_at')

order = order.withColumn('order_created_at', to_date('order_created_at'))
order = order.withColumn('order_scheduled_date', to_date('order_scheduled_date'))
order_details = order_details.withColumn('order_created_at', to_date('order_created_at'))

In [0]:
#Criando variável desde a criação do usuário
consumer = consumer.withColumn('user_months',months_between(lit('2018-12-01'), to_date('created_at')))

# Checando se há variáveis nulas
display(consumer.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in consumer.columns]))

customer_id,language,created_at,active,customer_name,customer_phone_area,customer_phone_number,is_target,user_months
0,0,0,0,0,0,0,0,0


In [0]:
# Checando se existem variações de moeda
cols = [
    'unit_currency', 'total_currency', 'addition_currency', 'discount_currency',
    'total_addition_currency', 'total_discount_currency', 'garnish_addition_currency',
    'garnish_discount_currency', 'garnish_unit_currency', 'garnish_total_currency'
]
for c in cols:
    display(order_details.select(c).distinct())

unit_currency
BRL


total_currency
BRL


addition_currency
BRL


discount_currency
BRL


total_addition_currency
BRL


total_discount_currency
BRL


garnish_addition_currency
""
BRL


garnish_discount_currency
""
BRL


garnish_unit_currency
""
BRL


garnish_total_currency
""
BRL


In [0]:
# Criando variáveis de número de itens por pedido
order_item_count = order_details.groupBy('order_id','order_created_at').agg(F.sum('item_quantity').alias('item_count'),
                                                                            F.sum('garnish_quantity').alias('garnish_count'))

order = order.select('customer_id', 'order_id', 'merchant_id', 'order_created_at', 'delivery_address_city', 'delivery_address_state', 'origin_platform', 'order_scheduled', 'order_scheduled_date', 'order_total_amount').dropDuplicates()
order = order.join(order_item_count, on=['order_id', 'order_created_at'], how='left')

# Unindo as tabelas para criação da view
consumer = consumer.join(ab_ref, on='customer_id', how='left')
user_orders = consumer.join(order, on='customer_id', how='left')
user_orders = user_orders.join(restaurant, on='merchant_id', how='left')

#Exportação da view
user_orders.write.mode('overwrite').saveAsTable('workspace.tabelas_ifood.user_orders')

In [0]:
order = spark.table('workspace.tabelas_ifood.order').dropDuplicates()
order_details = spark.table('workspace.tabelas_ifood.order_details').dropDuplicates()
consumer = spark.table('workspace.tabelas_ifood.consumer').dropDuplicates()
ab_ref = spark.table('workspace.tabelas_ifood.ab_test_ref').dropDuplicates()

order = order.withColumn('order_created_at', to_date('order_created_at'))
order = order.withColumn('order_scheduled_date', to_date('order_scheduled_date'))
order_details = order_details.withColumn('order_created_at', to_date('order_created_at'))

In [0]:
order_calc_price = order_details.select('order_id','order_created_at','item_sequence','item_externalId','garnish_externalId',
'item_quantity','unit_price','garnish_quantity','garnish_unit_price').dropDuplicates().fillna({'order_id': '', 'order_created_at': '', 'item_sequence': '', 'item_externalId': '', 'garnish_externalId': '', 'item_quantity': 0, 'unit_price': 0, 'garnish_quantity': 0, 'garnish_unit_price': 0})
order_calc_price = order_calc_price.withColumn('total_item_calc_price', (F.col('item_quantity') * F.col('unit_price'))/100)
order_calc_price = order_calc_price.withColumn('total_garnish_calc_price', (F.col('garnish_quantity') * F.col('garnish_unit_price'))/100)
order_calc_price = order_calc_price.withColumn('total_item_cost', (F.col('total_item_calc_price') + F.col('total_garnish_calc_price')))

order_calc_price_total = order_calc_price.groupBy('order_id','order_created_at','item_sequence','item_externalId','garnish_externalId').agg(F.sum('total_item_cost').alias('total_order_amount_calc'))
order_calc_price = order_calc_price.withColumn('unit_price', F.col('unit_price').cast('double')/100)
order_calc_price = order_calc_price.withColumn('garnish_unit_price', F.col('garnish_unit_price').cast('double')/100)
order_calc_price = order_calc_price.join(order_calc_price_total, on=['order_id','order_created_at','item_sequence','item_externalId','garnish_externalId'], how='left')

In [0]:
order = order.select('customer_id', 'order_id', 'merchant_id', 'order_created_at', 'order_total_amount')
order_complete = order.join(order_calc_price, on=['order_id', 'order_created_at'], how='left')

In [0]:
order_complete.write.mode('overwrite').saveAsTable('workspace.tabelas_ifood.order_complete')