In [1]:
dbutils.widgets.text("orders", defaultValue="dbfs:/test-data/orders.txt", label="Source orders path")
dbutils.widgets.text("orderDetails", defaultValue="dbfs:/test-data/orderDetails.txt", label="Source order details path")
dbutils.widgets.text("output", defaultValue="test_output", label="Output order details spark table name")

In [2]:
orders = (spark
  .read
  .json(dbutils.widgets.get("orders"))
  .cache()
)
display(orders)

CreatedDate,CustomerID,OrderDate,OrderID,ShipDate,TotalCost,UpdatedDate
2017-04-17T00:00:00,ad803f0b-688f-4d6b-a8ba-6c1c96ccb911,2017-04-17T00:00:00,197ef1bf-2d4d-4813-83ca-000037f8ffb6,2017-04-21T00:00:00,34.98,2017-04-21T00:00:00
2017-08-01T00:00:00,f60eee3c-89c8-4dc3-a150-87bd4e170ae1,2017-08-01T00:00:00,1a8ccf3f-44b2-4f91-8a93-0000447a035c,2017-08-03T00:00:00,49.97,2017-08-03T00:00:00
2017-04-29T00:00:00,643a1e1c-070a-4791-83c3-a42b624cc2c8,2017-04-29T00:00:00,3c326201-a86e-4083-a124-0000b3850edf,2017-05-01T00:00:00,34.98,2017-05-01T00:00:00


In [3]:
orderDetails = (spark
  .read
  .json(dbutils.widgets.get("orderDetails"))
  .withColumnRenamed("CreatedDate", "DetailCreatedDate")
  .withColumnRenamed("UpdatedDate", "DetailUpdatedDate")
  .cache()
)
display(orderDetails)

DetailCreatedDate,LineNumber,MovieID,OrderDetailID,OrderID,Quantity,UnitCost,DetailUpdatedDate
2017-07-27T00:00:00,2,90f708a0-2408-44b5-bba8-2918dfa6930a,a004dd6d-8c75-49e9-ab9f-000061f96947,197ef1bf-2d4d-4813-83ca-000037f8ffb6,1,14.99,2017-07-27T00:00:00
2017-04-26T00:00:00,2,9952631d-dad1-43e7-af07-4458c24e3c65,8ca2fb1a-07b8-4b3b-afad-000147c5b0b9,197ef1bf-2d4d-4813-83ca-000037f8ffb6,1,14.99,2017-04-26T00:00:00
2017-09-27T00:00:00,2,3b39eb73-dc73-4435-9b2c-379c224b3068,4344f610-e0c3-4018-ae2a-00019069d551,1a8ccf3f-44b2-4f91-8a93-0000447a035c,1,14.99,2017-09-27T00:00:00
2017-11-12T00:00:00,3,05fc1fe7-20a9-4786-81db-c4fb8c26d6d2,4b3258a5-3e9a-4799-ae77-0001983d56b8,4d47265a-f006-4b59-8593-fd6046900b4d,1,14.99,2017-11-12T00:00:00


In [4]:
#Perform a FULL OUTER JOIN of orders and orderDetails
orders_with_detail = orders.join(orderDetails, ['OrderId'], how='full')

In [5]:
# Count lines (i.e. number of orderDetails) by order
from pyspark.sql.window import Window
import pyspark.sql.functions as F

overOrder = Window.partitionBy("OrderId")
orders_full = orders_with_detail.withColumn("LinesInOrder", F.count("OrderDetailId").over(overOrder)).cache()
display(orders_full)

OrderID,CreatedDate,CustomerID,OrderDate,ShipDate,TotalCost,UpdatedDate,DetailCreatedDate,LineNumber,MovieID,OrderDetailID,Quantity,UnitCost,DetailUpdatedDate,LinesInOrder
197ef1bf-2d4d-4813-83ca-000037f8ffb6,2017-04-17T00:00:00,ad803f0b-688f-4d6b-a8ba-6c1c96ccb911,2017-04-17T00:00:00,2017-04-21T00:00:00,34.98,2017-04-21T00:00:00,2017-07-27T00:00:00,2.0,90f708a0-2408-44b5-bba8-2918dfa6930a,a004dd6d-8c75-49e9-ab9f-000061f96947,1.0,14.99,2017-07-27T00:00:00,2
197ef1bf-2d4d-4813-83ca-000037f8ffb6,2017-04-17T00:00:00,ad803f0b-688f-4d6b-a8ba-6c1c96ccb911,2017-04-17T00:00:00,2017-04-21T00:00:00,34.98,2017-04-21T00:00:00,2017-04-26T00:00:00,2.0,9952631d-dad1-43e7-af07-4458c24e3c65,8ca2fb1a-07b8-4b3b-afad-000147c5b0b9,1.0,14.99,2017-04-26T00:00:00,2
1a8ccf3f-44b2-4f91-8a93-0000447a035c,2017-08-01T00:00:00,f60eee3c-89c8-4dc3-a150-87bd4e170ae1,2017-08-01T00:00:00,2017-08-03T00:00:00,49.97,2017-08-03T00:00:00,2017-09-27T00:00:00,2.0,3b39eb73-dc73-4435-9b2c-379c224b3068,4344f610-e0c3-4018-ae2a-00019069d551,1.0,14.99,2017-09-27T00:00:00,1
4d47265a-f006-4b59-8593-fd6046900b4d,,,,,,,2017-11-12T00:00:00,3.0,05fc1fe7-20a9-4786-81db-c4fb8c26d6d2,4b3258a5-3e9a-4799-ae77-0001983d56b8,1.0,14.99,2017-11-12T00:00:00,1
3c326201-a86e-4083-a124-0000b3850edf,2017-04-29T00:00:00,643a1e1c-070a-4791-83c3-a42b624cc2c8,2017-04-29T00:00:00,2017-05-01T00:00:00,34.98,2017-05-01T00:00:00,,,,,,,,0


In [6]:
(orders_full
  .write
  .mode("overwrite")
  .saveAsTable(dbutils.widgets.get("output"))
)