Write a query that:

1. Returns the top 5 users and their emails by total gross orders (only successful orders) in the last 1 year by active vendor type with ordering by the oldest user on the platform
2. Vendor ID and vendor type that did the most in amt in non-cancelled orders in the last 3 years
3. Vendor ID and vendor type with the most amt in any in cancelled orders

Python: implement query 1 in Python using only the standard library. No spark, pandas, etc. Imagine you have a CSV for each table with the same headers.

In [2]:
%python
# import the required modules
from datetime import datetime,timedelta
from collections import Counter,defaultdict
from itertools import groupby
import operator
import csv

In [3]:
%python
order = csv.DictReader(open('/dbfs/mnt/datalake/practice/FCT_ORDERS.csv'),delimiter=';')
user = csv.DictReader(open('/dbfs/mnt/datalake/practice/DIM_USERS.csv'),delimiter=';')
vendor = csv.DictReader(open('/dbfs/mnt/datalake/practice/DIM_VENDORS.csv'),delimiter=';')

In [4]:
%python
def merge(order, user, vendor):
    user = list(user)
    vendor = list(vendor)
    matchedlist = []
    ddict = defaultdict(int)
    for orderline in order:
        for userline in user:
                if (orderline['USER_ID'] == userline['ID']
                    and orderline['STATUS'] == '0' 
                    and datetime.strptime(orderline['PLACED_AT'], '%d-%m-%Y %H:%M:%S') > (datetime.now() - timedelta(days=365))
                    and any(orderline['VENDOR_ID'] == v['ID'] and v['IS_ACTIVE'] == 'true' for v in vendor)):
                    ddict[orderline['USER_ID']] += 1
                    matchedlist.append(userline)
    return ddict,matchedlist

In [5]:
%python
ddict,ls = merge(order,user,vendor)

In [6]:
%python
for i in dict(Counter(ddict).most_common(5)):
    for k in ls:
        if int(i) == int(k['ID']):
            print(k['ID'],k['NAME'],k['EMAIL'],k['ADDED_AT'],sep=';')
            break

In [7]:
%python
orderdf = spark.read.format('csv').options(sep=';',header='true',inferSchema='true').load('/mnt/datalake/practice/FCT_ORDERS.csv')
userdf = spark.read.format('csv').options(sep=';',header='true',inferSchema='true').load('/mnt/datalake/practice/DIM_USERS.csv')
vendordf = spark.read.format('csv').options(sep=';',header='true',inferSchema='true').load('/mnt/datalake/practice/DIM_VENDORS.csv')

In [8]:
orderdf.createOrReplaceTempView("fct_order")

In [9]:
userdf.createOrReplaceTempView("dim_user")
vendordf.createOrReplaceTempView("dim_vendor")

Returns the top 5 users and their emails by total gross orders (only successful orders) in the last 1 year by active vendor type with ordering by the oldest user on the platform

In [11]:
%sql
SELECT du.ID, du.NAME,du.ADDED_AT, COUNT(*) cnt FROM fct_order fo 
INNER JOIN dim_user du ON fo.USER_ID = du.ID
WHERE fo.STATUS = 0 
      AND to_date(fo.PLACED_AT, 'dd-MM-yyyy HH:mm:SS') > DATE_SUB(CURRENT_TIMESTAMP, 365)
      AND EXISTS (SELECT ID FROM dim_vendor dv WHERE dv.IS_ACTIVE = true AND dv.ID = fo.VENDOR_ID)
GROUP BY du.ID, du.NAME,du.ADDED_AT
ORDER BY cnt DESC, to_date(du.ADDED_AT, 'dd-MM-yyyy') ASC
LIMIT 5;

ID,NAME,ADDED_AT,cnt
2,Herbert,25-02-2020,6
12,Millard,26-02-2020,6
64,James,15-01-2020,5
94,Warren,29-01-2020,5
57,John,05-01-2020,4


In [12]:
%sql
SELECT dv.ID, dv.TYPE, SUM(fo.AMT) AMT FROM fct_order fo
INNER JOIN dim_vendor dv ON dv.ID = fo.VENDOR_ID
WHERE fo.STATUS != -1 
      AND to_date(fo.PLACED_AT, 'dd-MM-yyyy HH:mm:SS') >= DATE_SUB(CURRENT_TIMESTAMP, 365*3)
GROUP BY dv.ID,dv.TYPE
ORDER BY AMT DESC
LIMIT 1;

ID,TYPE,AMT
83,-1,29.19653939612048


In [13]:
%sql
--Vendor ID and vendor type with the most in AMT in cancelled orders
SELECT dv.ID, dv.TYPE, SUM(fo.AMT) AMT FROM fct_order fo
INNER JOIN dim_vendor dv ON dv.ID = fo.VENDOR_ID
WHERE fo.STATUS = -1
GROUP BY dv.ID,dv.TYPE
ORDER BY AMT DESC
LIMIT 1

ID,TYPE,AMT
77,0,14.97174871015765
