In [13]:
from pyspark.sql import SparkSession, functions as F
import pandas as pd

# Create a spark session
spark = (
    SparkSession.builder.appName("BNPL Project")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.driver.memory", "4g")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

sdf = spark.read.parquet("../data/curated/process_data.parquet/")

cols = ['merchant_name', 'user_id', 'dollar_value', 'order_datetime', 'rate', 'category', 'subcategory', 'estimated_region_population_2021_sum',	'persons_earners_2018-19_sum', 'mean_earnings_2018-19_avg', 'sum_earnings_2018-19_sum',	'median_earnings_2018-19_avg', 'med_age_earners_2018-19_avg']
sdf = sdf.select(cols)

sdf.limit(10)

merchant_name,user_id,dollar_value,order_datetime,rate,category,subcategory,estimated_region_population_2021_sum,persons_earners_2018-19_sum,mean_earnings_2018-19_avg,sum_earnings_2018-19_sum,median_earnings_2018-19_avg,med_age_earners_2018-19_avg
Dolor Dapibus Gra...,8913,51.28,2021-07-24,0.0312,retail_and_wholes...,household_goods_r...,31499,28552.0,70738.0,2019717290.0,28339.0,31.0
Lorem Foundation,15797,5.84,2021-11-26,0.0655,retail_and_wholes...,household_goods_r...,3266,2023.0,60415.0,122219664.0,49925.0,48.0
Euismod Urna Inst...,4694,0.36,2022-05-07,0.0505,retail_and_wholes...,department_stores,22545,14289.0,84223.0,1203458028.0,54884.0,47.0
Mauris Inc.,3260,5.72,2022-04-18,0.0294,info_media_and_te...,,43353,25261.0,82029.0,1628989984.0,54081.0,44.333333333333336
Lobortis Ultrices...,15618,69.08,2022-08-13,0.0631,retail_and_wholes...,department_stores,37268,22626.0,57968.333333333336,1310262977.0,53277.333333333336,44.0
Imperdiet Non LLC,22134,129.51,2021-10-11,0.0684,info_media_and_te...,,7482,4086.0,43661.0,177649122.0,39343.5,46.0
Eget Venenatis A ...,8914,313.78,2021-04-25,0.0687,retail_and_wholes...,department_stores,97517,50162.0,63845.8,3177562100.0,55556.4,39.0
Non Vestibulum In...,1604,21.55,2021-05-13,0.058,retail_and_wholes...,department_stores,4296,2344.0,51198.0,120007795.0,43117.0,48.0
Lobortis Ultrices...,2786,60.47,2022-08-14,0.0631,retail_and_wholes...,department_stores,8448,4341.0,54531.0,236717978.0,46601.0,45.0
Lobortis Ultrices...,21018,56.28,2021-12-27,0.0631,retail_and_wholes...,department_stores,14916,7568.0,54270.0,423766641.0,50905.66666666666,40.66666666666666


### 1. **Identify** Merchants with the greatest gross and lowest volatility in income and the greatest evidence of performance

In [14]:
fullSdf = sdf.groupBy('merchant_name').agg(
        F.sum('dollar_value').alias('income_total'),
        F.stddev('dollar_value').alias('income_deviation'),
        F.mean('rate').alias('rate'),
        F.count('merchant_name').alias('count_merchant'))\
    .withColumn('income_total', F.col('income_total') * F.col('rate'))\
    .withColumn('income_deviation', F.col('income_deviation') * F.col('rate'))\
    .orderBy(F.col('count_merchant').desc(), F.col('income_total').desc(), F.col('income_deviation').asc())

In [15]:
days = sdf.select('order_datetime').distinct().count()

### 2. **Ranking** Merchants by Categories

In [16]:
def topNmerchants(sdf, categories, N):
    merchants = {}
    income_total = {}
    income_risk = {}
    counts = {}

    for category in categories:
        tdf = sdf.where(F.col('category') == category)
        tdf = tdf.groupBy('merchant_name').agg(
            F.sum('dollar_value').alias('income_total'),
            F.stddev('dollar_value').alias('income_deviation'),
            F.mean('rate').alias('rate'),
            F.count('merchant_name').alias('count_merchant'))\
        .withColumn('income_total', F.col('income_total') * F.col('rate'))\
        .withColumn('income_deviation', F.col('income_deviation') * F.col('rate'))\
        .orderBy(F.col('count_merchant').desc(), F.col('income_total').desc(), F.col('income_deviation').asc())\
        
        tdf = tdf.limit(N)

        merchants[category] = tdf.select('merchant_name').toPandas()['merchant_name'].to_list()
        
        # assuming that all income is Normally distributed
        income_total[category] = tdf.agg({'income_total': 'sum'}).first()['sum(income_total)']
        income_risk[category] = tdf.agg({'income_deviation': 'sum'}).first()['sum(income_deviation)']
        counts[category] = tdf.agg({'count_merchant': 'sum'}).first()['sum(count_merchant)']

    return pd.DataFrame(merchants), income_total, income_risk, counts

In [17]:
categories = ['retail_and_wholesale_trade', 'rental_hiring_and_real_estate', 'arts_and_recreation', 'info_media_and_telecommunications', 'others']

In [18]:
merchants, total, risk, counts = topNmerchants(sdf, categories, N=10)
merchants

                                                                                

Unnamed: 0,retail_and_wholesale_trade,rental_hiring_and_real_estate,arts_and_recreation,info_media_and_telecommunications,others
0,Erat Vitae LLP,Quis Massa Mauris Corporation,Ac Urna Consulting,Mauris Non Institute,Nec Tellus Ltd
1,Leo In Consulting,Quam A Felis Incorporated,Magna Sed Industries,Euismod In Corp.,Tempus Eu Ligula Limited
2,Pede Nonummy Corp.,Vel Lectus Cum LLC,Mi Consulting,Feugiat Sed Nec Institute,Sed Nec Inc.
3,Non Vestibulum Industries,Mi Eleifend Company,Lorem LLP,Amet Consulting,Natoque Consulting
4,Suspendisse Dui Corporation,Iaculis Enim Corp.,Volutpat Ornare Facilisis Associates,Arcu Sed Eu Incorporated,Gravida Praesent Corp.
5,Lacus Consulting,Leo Morbi Limited,Ligula Tortor Incorporated,Eleifend PC,Mauris Sagittis Corp.
6,Est Nunc Consulting,Laoreet Inc.,Nullam Scelerisque Ltd,Posuere Cubilia Curae Corporation,Nisl Elementum Ltd
7,Lorem Ipsum Sodales Industries,Massa Limited,Massa LLP,At Sem Corp.,Molestie Arcu Corporation
8,Ipsum Dolor Sit Corporation,Fermentum Institute,Non Cursus LLP,Et Nunc Consulting,Feugiat Lorem Incorporated
9,Vehicula Pellentesque Corporation,Morbi Non Corp.,Mollis Duis Sit Foundation,Suspendisse Incorporated,Risus Odio Auctor Foundation


In [19]:
merchants.to_csv('./merchants.csv', sep=' ', index=False)

In [20]:
total

{'retail_and_wholesale_trade': 3613688.9746479657,
 'rental_hiring_and_real_estate': 351859.06383600016,
 'arts_and_recreation': 878824.603607,
 'info_media_and_telecommunications': 2282460.32927698,
 'others': 705908.5849809962}

In [21]:
risk

{'retail_and_wholesale_trade': 11.66295848134123,
 'rental_hiring_and_real_estate': 322.02846931549146,
 'arts_and_recreation': 650.0662699383139,
 'info_media_and_telecommunications': 38.42206648478936,
 'others': 58.94039668372187}

In [22]:
counts

{'retail_and_wholesale_trade': 2324636,
 'rental_hiring_and_real_estate': 18007,
 'arts_and_recreation': 12123,
 'info_media_and_telecommunications': 449654,
 'others': 92071}

In [23]:
mu = sum(total.values()) / days
sigma = sum(risk.values()) / days

print('Average Daily Income:')
print(f'({mu - (3 * sigma)} <- 3 std - | {mu} | - 3 std -> {mu + (3 * sigma)})')

Average Daily Income:
(12919.964019581239 <- 3 std - | 12925.316099585712 | - 3 std -> 12930.668179590186)
