In [1]:
import findspark
findspark.init()
findspark.find()

'C:\\Users\\Admin\\BigData\\Spark'

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import *
spark = SparkSession.builder.appName('etl').getOrCreate()  # I initilize the spark session "spark"

### Creating order_items_dataset

In [18]:
part_df = spark.read.json(r'C:\Users\Admin\Documents\Spark\test\part.json\part.json')


df_new = part_df.select('order_id', 'items')
df = df_new.withColumn('col', from_json("items", ArrayType(StringType())))
df = df.withColumn('explode_col', explode("col"))
df = df.withColumn('col', from_json("explode_col", MapType(StringType(), StringType())))
df = df.withColumn("name", df.col.getItem("name")).withColumn("addition", df.col.getItem("addition")).withColumn("discount", df.col.getItem("discount")).withColumn("quantity", df.col.getItem("quantity")).withColumn("sequence", df.col.getItem("sequence")).withColumn("unitPrice", df.col.getItem("unitPrice")).withColumn("externalId", df.col.getItem("externalId")).withColumn("totalValue", df.col.getItem("totalValue")).withColumn("customerNote", df.col.getItem("customerNote")).withColumn("garnishItems", df.col.getItem("garnishItems")).withColumn("integrationId", df.col.getItem("integrationId")).withColumn("totalAddition", df.col.getItem("totalAddition")).withColumn("totalDiscount", df.col.getItem("totalDiscount"))
order_item_dataset = df.select('order_id', 'name', 'addition', 'discount', 'quantity', 'sequence', 'unitPrice', 'externalId', 'totalValue', 'customerNote', 'garnishItems', 'integrationId', 'totalAddition', 'totalDiscount')
order_item_dataset.show(1, vertical = True)

-RECORD 0-----------------------------
 order_id      | e67e26c2-a04d-4ec... 
 name          | 403 Mariano Procópio 
 addition      | {"value":"0","cur... 
 discount      | {"value":"0","cur... 
 quantity      | 1.0                  
 sequence      | 1                    
 unitPrice     | {"value":"0","cur... 
 externalId    | ee0f88c318af46129... 
 totalValue    | {"value":"0","cur... 
 customerNote  | null                 
 garnishItems  | [{"name":"AO PONT... 
 integrationId | null                 
 totalAddition | {"value":"0","cur... 
 totalDiscount | {"value":"0","cur... 
only showing top 1 row



### Creating order_status_dataset

In [8]:
schema = "`created_at` timestamp, `order_id` STRING, `status_id` STRING, `value` STRING"
status = spark.read.schema(schema).json(r'C:\Users\Admin\Documents\Spark\test\status.json\status.json')

col = ['PLACED', 'REGISTERED', 'CONCLUDED', 'CANCELLED']
out_df = status.groupby("order_id").pivot("value", col).agg(first("created_at"))
out_df.show(5)

+--------------------+-------------------+-------------------+-------------------+---------+
|            order_id|             PLACED|         REGISTERED|          CONCLUDED|CANCELLED|
+--------------------+-------------------+-------------------+-------------------+---------+
|0002fe02-d7dc-423...|2019-01-24 17:04:28|2019-01-24 17:04:27|2019-01-24 19:05:07|     null|
|000cef8c-83c7-49e...|2019-01-17 16:42:18|2019-01-17 16:42:17|2019-01-17 18:45:02|     null|
|0010995b-9212-455...|2019-01-01 16:11:22|2019-01-01 16:11:21|2019-01-01 18:15:14|     null|
|0012d95c-9c4b-424...|2019-01-03 12:12:24|2019-01-03 12:12:23|2019-01-03 14:15:06|     null|
|0013fc5c-4c10-440...|2019-01-06 08:16:18|2019-01-06 08:16:17|2019-01-06 10:20:27|     null|
+--------------------+-------------------+-------------------+-------------------+---------+
only showing top 5 rows



### Creating order_dataset

In [17]:
restaurant = spark.read.csv(r'C:\Users\Admin\Documents\Spark\test\restaurant.csv\restaurant.csv', header=True)
consumer = spark.read.csv(r'C:\Users\Admin\Documents\Spark\test\consumer.csv\consumer.csv', header=True)

# Creating a status dataset with only the last status value
w = Window.partitionBy(status['order_id']).orderBy(status['created_at'].desc())
status = status.withColumn('Rank', dense_rank().over(w))
status_new = status.filter(status.Rank == 1).drop(status.Rank)

order_dataset = part_df.join(restaurant, part_df["merchant_id"] == restaurant['id']).join(consumer, part_df['customer_id'] == consumer['customer_id']).join(status_new, part_df["order_id"] == status_new['order_id'])
order_dataset.show(1, vertical = True)

-RECORD 0--------------------------------------------
 cpf                          | 23513081815          
 customer_id                  | 42b790bb-1499-4d2... 
 customer_name                | JULIO                
 delivery_address_city        | RECIFE               
 delivery_address_country     | BR                   
 delivery_address_district    | AREIAS               
 delivery_address_external_id | 7612816              
 delivery_address_latitude    | -34.94               
 delivery_address_longitude   | -8.10                
 delivery_address_state       | PE                   
 delivery_address_zip_code    | 50780                
 items                        | [{"name": "125 - ... 
 merchant_id                  | 1365cc91-546a-4a9... 
 merchant_latitude            | -34.94               
 merchant_longitude           | -8.10                
 merchant_timezone            | America/Recife       
 order_created_at             | 2019-01-01T22:11:... 
 order_id                   