In [0]:
import json
import ast
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, MapType, StringType, TimestampType
from pyspark.sql.window import Window

# Reading iFood S3

In [0]:
aws_bucket_name = "ifood-data-architect-test-source"
mount_name = "ifood-de-test"

df_rest = spark.read.csv("dbfs:/mnt/%s/restaurant.csv.gz" % mount_name, header=True)
df_consumer = spark.read.csv("dbfs:/mnt/%s/consumer.csv.gz" % mount_name, header=True)

In [0]:
df_order = spark.read.json("dbfs:/mnt/%s/order.json.gz" % mount_name)

# Own S3 credentials

In [0]:
a_key = dbutils.secrets.get('s3_db','a_key')
s_a_key = dbutils.secrets.get('s3_db','sec_a_key')
encoded_secret_key = s_a_key.replace("/", "%2F")
aws_bucket_name_s3 = "ifood-de-test-arb"

In [0]:
def parseJSONCols(df, *cols, sanitize=True):
    """Auto infer the schema of a json column and parse into a struct.

    rdd-based schema inference works if you have well-formatted JSON,
    like ``{"key": "value", ...}``, but breaks if your 'JSON' is just a
    string (``"data"``) or is an array (``[1, 2, 3]``). In those cases you
    can fix everything by wrapping the data in another JSON object
    (``{"key": [1, 2, 3]}``). The ``sanitize`` option (default True)
    automatically performs the wrapping and unwrapping.

    The schema inference is based on this
    `SO Post <https://stackoverflow.com/a/45880574)/>`_.

    Parameters
    ----------
    df : pyspark dataframe
        Dataframe containing the JSON cols.
    *cols : string(s)
        Names of the columns containing JSON.
    sanitize : boolean
        Flag indicating whether you'd like to sanitize your records
        by wrapping and unwrapping them in another JSON object layer.

    Returns
    -------
    pyspark dataframe
        A dataframe with the decoded columns.
    """
    res = df
    for i in cols:

        # sanitize if requested.
        if sanitize:
            res = (
                res.withColumn(
                    i,
                    F.concat(F.lit('{"data": '), i, F.lit('}'))
                )
            )
        # infer schema and apply it
        schema = spark.read.json(res.rdd.map(lambda x: x[i])).schema
        res = res.withColumn(i, F.from_json(F.col(i), schema))

        # unpack the wrapped object if needed
        if sanitize:
            res = res.withColumn(i, F.col(i).data)
    return res

# Curating

## Orders

In [0]:
df_orders_curated = df_order.select('order_id',
                                            'cpf',
                                            'customer_id',
                                            'delivery_address_city',
                                            'delivery_address_country',
                                            'delivery_address_district',
                                            'delivery_address_external_id',
                                            'delivery_address_latitude',
                                            'delivery_address_longitude',
                                            'delivery_address_state',
                                            'delivery_address_zip_code',
                                            'merchant_id',
                                            'merchant_latitude',
                                            'merchant_longitude',
                                            'merchant_timezone',
                                            F.col("order_created_at").cast(TimestampType()).alias('order_created_at_local_time'),
                                            F.to_utc_timestamp(F.col("order_created_at"), F.col("merchant_timezone")).alias('order_created_at_utc0'),
                                            'order_scheduled',
                                            'order_scheduled_date',
                                            'order_total_amount',
                                            'origin_platform')\
                                            .withColumn('date_partition', F.to_date('order_created_at_utc0', "yyyy-MM-dd").cast(StringType()))\
                                            .repartition("date_partition")

In [0]:
df_orders_curated.write.save("s3a://%s:%s@%s/orders.json" % (a_key, encoded_secret_key, aws_bucket_name_s3), format="json")

## Items

In [0]:
df_order_partition = df_order.withColumn('order_created_at_utc0', F.to_utc_timestamp(F.col("order_created_at"), F.col("merchant_timezone")).alias('order_created_at_utc0'))\
                             .withColumn('date_partition', F.to_date('order_created_at_utc0', "yyyy-MM-dd").cast(StringType()))

In [0]:
df_order_json_parsed = parseJSONCols(df_order_partition.select('order_id', 'date_partition', 'items'), 'items')

In [0]:
df_items = df_order_json_parsed.select('order_id', F.explode(F.col('items')).alias('item'), 'date_partition')\
                                .withColumn('name', F.col('item').getItem('name'))\
                                .withColumn('quantity', F.col('item').getItem('quantity'))\
                                .withColumn('customer_note', F.col('item').getItem('customerNote'))\
                                .withColumn('external_id', F.col('item').getItem('externalId'))\
                                .withColumn('sequence', F.col('item').getItem('sequence'))\
                                .withColumn('integration_id', F.col('item').getItem('integrationId'))\
                                .withColumn('total_discount_currency', F.col('item').getItem('totalDiscount').getItem('currency'))\
                                .withColumn('total_discount_value', F.col('item').getItem('totalDiscount').getItem('value'))\
                                .withColumn('addition_currency', F.col('item').getItem('totalDiscount').getItem('currency'))\
                                .withColumn('addition_value', F.col('item').getItem('totalDiscount').getItem('value'))\
                                .withColumn('unit_price_currency', F.col('item').getItem('unitPrice').getItem('currency'))\
                                .withColumn('unit_price_value', F.col('item').getItem('unitPrice').getItem('value'))\
                                .withColumn('discount_currency', F.col('item').getItem('discount').getItem('currency'))\
                                .withColumn('discount_value', F.col('item').getItem('discount').getItem('value'))\
                                .withColumn('total_value_currency', F.col('item').getItem('totalValue').getItem('currency'))\
                                .withColumn('total_value', F.col('item').getItem('totalValue').getItem('value'))\
                                .drop('item')\
                                .repartition("date_partition")

In [0]:
df_items.write.save("s3a://%s:%s@%s/items.json" % (a_key, encoded_secret_key, aws_bucket_name_s3), format="json")

## Garnish

In [0]:
df_garnish = df_order_json_parsed.select('order_id', F.explode(F.col('items')).alias('item'), 'date_partition')\
                            .withColumn('garnish_items', F.col('item').getItem('garnishItems'))\
                            .withColumn('item_external_id', F.col('item').getItem('externalId'))\
                            .drop('item')\
                            .select('order_id', 'date_partition', 'item_external_id', F.explode(F.col('garnish_items')).alias('garnish_item'))\
                            .withColumn('garnish_external_id', F.col('garnish_item').getItem('externalId'))\
                            .withColumn('name', F.col('garnish_item').getItem('name'))\
                            .withColumn('garnish_integration_id', F.col('garnish_item').getItem('integrationId'))\
                            .withColumn('quantity', F.col('garnish_item').getItem('quantity'))\
                            .withColumn('sequence', F.col('garnish_item').getItem('sequence'))\
                            .withColumn('category_name', F.col('garnish_item').getItem('categoryName'))\
                            .withColumn('unit_price_currency', F.col('garnish_item').getItem('unitPrice').getItem('currency'))\
                            .withColumn('unit_price_value', F.col('garnish_item').getItem('unitPrice').getItem('value'))\
                            .withColumn('addition_currency', F.col('garnish_item').getItem('addition').getItem('currency'))\
                            .withColumn('addition_value', F.col('garnish_item').getItem('addition').getItem('value'))\
                            .withColumn('discount_currency', F.col('garnish_item').getItem('discount').getItem('currency'))\
                            .withColumn('discount_value', F.col('garnish_item').getItem('discount').getItem('value'))\
                            .withColumn('total_value_currency', F.col('garnish_item').getItem('totalValue').getItem('currency'))\
                            .withColumn('total_value', F.col('garnish_item').getItem('totalValue').getItem('value'))\
                            .drop('garnish_item')\
                            .repartition("date_partition")

In [0]:
df_garnish.write.save("s3a://%s:%s@%s/garnish.json" % (a_key, encoded_secret_key, aws_bucket_name_s3), format="json")

In [0]:
df_order_cur = spark.read.json("s3a://%s:%s@%s/orders.json" % (a_key, encoded_secret_key, aws_bucket_name_s3))\
                         .withColumn('order_created_at_local_timestamp', F.col("order_created_at_local_time").cast(TimestampType()))\
                         .withColumn('date_partition_local', F.to_date('order_created_at_local_timestamp', "yyyy-MM-dd").cast(StringType()))

## Daily Orders by city and state

In [0]:
df_orders_city_state = df_order_cur.groupBy('date_partition_local', 'delivery_address_state', 'delivery_address_city')\
                                   .agg(F.count('order_id').alias('number_of_orders'))

In [0]:
display(df_orders_city_state)

date_partition_local,delivery_address_state,delivery_address_city,number_of_orders
2019-01-28,RN,NATAL,715
2018-12-14,RJ,NOVA IGUACU,80
2019-01-25,PE,PETROLINA,9
2018-12-12,RS,GRAVATAI,3
2019-01-28,PI,TERESINA,13
2019-01-29,SP,VARZEA PAULISTA,2
2018-12-12,ES,VILA VELHA,286
2018-12-12,RJ,NOVA IGUACU,99
2019-01-28,RJ,QUEIMADOS,6
2018-12-14,PR,CASCAVEL,7


In [0]:
df_orders_city_state.write.save("s3a://%s:%s@%s/top_orders_city_state.json" % (a_key, encoded_secret_key, aws_bucket_name_s3), format="json")

## Top 10 Restaurants per customer

In [0]:
window = Window.partitionBy('customer_id').orderBy(F.col('number_of_orders').desc())

df_top_rest_cust = df_order_cur.groupBy('customer_id', 'merchant_id')\
                               .agg(F.count('order_id').alias('number_of_orders'))\
                               .withColumn('rank', F.row_number().over(window).alias('rank'))\
                               .filter(F.col('rank') <= 10).repartition(1)

In [0]:
display(df_rest)

id,created_at,enabled,price_range,average_ticket,takeout_time,delivery_time,minimum_order_value,merchant_zip_code,merchant_city,merchant_state,merchant_country
02c94103-61f3-4906-a4a9-55611db9f28c,2017-01-23T12:52:30.910Z,False,3,60.0,0,50.0,30.0,14025,RIBEIRAO PRETO,SP,BR
15e7f5fd-090d-47b9-9f14-b6f7fce3c95d,2017-01-20T13:14:48.286Z,True,3,60.0,0,0.0,30.0,50180,SAO PAULO,SP,BR
33ca5d3d-b99f-404d-84d9-8df8f38a2261,2017-01-23T12:46:33.457Z,True,5,100.0,0,45.0,10.0,23090,RIO DE JANEIRO,RJ,BR
4927035f-a343-4a65-a9be-945818e2efff,2017-01-20T13:15:04.806Z,True,3,80.0,0,0.0,18.9,40255,SALVADOR,BA,BR
52feaad8-4961-4afc-8d60-3f29ffd0a7a7,2017-01-20T13:14:27.701Z,True,3,60.0,0,0.0,25.0,64600,BARUERI,SP,BR
539cb925-52ad-4624-a6d4-b24f002f4067,2017-01-23T12:36:59.158Z,False,1,30.0,0,60.0,10.0,18030,SOROCABA,SP,BR
5b6a43d0-8e25-4ade-8a38-95f1c2e67189,2017-01-23T12:40:20.431Z,False,1,30.0,0,70.0,11.0,13208,JUNDIAI,SP,BR
7a6348e5-748c-4d36-92b5-1860a1760f5c,2017-01-20T13:14:31.699Z,True,4,80.0,0,30.0,24.0,34020,SAO PAULO,SP,BR
8355199d-8a01-4814-b928-0a9eec0c3327,2017-01-20T13:13:05.136Z,False,5,81.0,30,15.0,10.0,45380,SAO PAULO,SP,BR
85f2e1f2-101b-4b76-b72a-4eca1cfb8b5b,2017-01-20T13:15:57.202Z,False,1,30.0,0,30.0,0.0,60541,FORTALEZA,CE,BR


In [0]:
display(df_top_rest_cust)

customer_id,merchant_id,number_of_orders,rank
00008a49-bc7f-42d1-935a-8a036d1f6847,c2deb290-3fa0-4a6d-8d7b-111ea10644fa,2,1
0000a012-e914-44eb-9373-5479c62a7a73,12196e33-08ec-471a-beb9-1e0f7cd054f3,2,1
0000a012-e914-44eb-9373-5479c62a7a73,2403c20b-7cee-40ca-98a6-bf1067e61165,1,2
0000ac44-e818-4546-be8b-394f67e2f245,a898e186-16d7-4146-b291-daa523b2b742,2,1
0000ac44-e818-4546-be8b-394f67e2f245,a69b2860-741c-48eb-ad89-20a9dd75f1dc,2,2
0000ac44-e818-4546-be8b-394f67e2f245,5ce429f0-c1d8-4623-8403-c7e6335f2ce1,1,3
0000ac44-e818-4546-be8b-394f67e2f245,8fe366c1-7154-40e0-b7fb-f9c649aada05,1,4
0000ac44-e818-4546-be8b-394f67e2f245,fe04e16c-7f0c-404c-90d5-98c0a63c2842,1,5
00010f17-5f9c-4c33-8f69-0793402c7c16,d2e05889-e5b2-4cf6-a96b-ed64f2057716,4,1
00010f17-5f9c-4c33-8f69-0793402c7c16,c2e46de3-2db0-4708-b420-98c4fe5027c1,1,2


In [0]:
df_top_rest_cust.rdd.getNumPartitions()

In [0]:
df_top_rest_cust.write.save("s3a://%s:%s@%s/top_rest_by_customer_no_partition.json" % (a_key, encoded_secret_key, aws_bucket_name_s3), format="json")

In [0]:
df_order_cur.filter(F.col('customer_id').isNull()).count()

In [0]:
df_order_cur.count()

In [0]:
display(df_top_rest_cust.select('rank').distinct())

rank
1
6
3
5
9
4
8
7
10
2
