In [1]:
from pyspark.sql import functions as f
from pyspark.sql import types as t
from pyspark.sql.window import Window
import dateutil.parser 


#### Read relevant tables

In [3]:
countryCode = 'za'

In [4]:
universe = spark.table('dev_derived_ouniverse.output_total_universe_' + countryCode)
relevantProducts  =  spark.table('data_user_hien.valueTier_opr_cuEanCode_'+ countryCode).where(f.col('cuEanCode').isNotNull())

productAssumptions  =  spark.table('data_user_hien.valueTier_productAssumptions_'+ countryCode)
productAssumptionsNoRelevantProducts  = productAssumptions.select(f.avg(f.col('avgTotalProductPrice')).alias('avgTotalProductPriceNoProducts'))

relevantProductsGrabOperators = relevantProducts.where(f.col('cuEanCode').isNotNull()).select((f.countDistinct(\
                    f.concat(\
                             f.concat(f.col('cuEanCode'),f.lit(' / ')),\
                             f.col('operatorid')))/f.countDistinct(f.col('operatorid'))).alias('avgGrab'))

menu      =  spark.table("dev_derived_omenu.output_business_product_mapping_long").where(f.lower(f.col('countryCode')) == countryCode)
productDetails = spark.table('data_sifu.sifu_product_details').where(f.col('countryCode') == countryCode)

In [5]:
universe.select('operatorOhubId').distinct().count()

#### Calculate operator sales value

In [7]:
sale  =  spark.table("dev_sources_ohub.cleaned_operator_sales").where(f.lower(f.col('countryCode')) == countryCode)\
              .where(f.col('transactionDate').isNotNull())
sale = sale.withColumn('year',f.substring(f.col('transactiondate'),1,4).cast('int')).where(f.col('year') >= 2019)
sale = sale.where(f.col('amount') > 0)


operatorSalesValue = sale.where(f.col('amount') > 0).where(f.col('quantityOfUnits') > 0) \
                         .where(f.col('cuEanCode').isNotNull()) \
                         .groupBy('operatorOhubId') \
                         .agg(f.sum(f.col('quantityOfUnits')*((f.col('cuListingPriceInCents')/f.lit(100))) \
                                                                .cast('double')).alias('operatorSalesValue') \
                                                                ,f.countDistinct(f.col('cuEanCode')).alias('grab'))

In [8]:
display(operatorSalesValue)

operatorOhubId,operatorSalesValue,grab
25d4b03c-7f69-3743-b66f-e0de734f3396,2212737.0,58
58b116af-0117-3dc8-b30a-01d530a542e6,7729.0,14
3ef1468e-cdad-3daa-b38d-dad6454ee0b5,195824.0,28
77d60f34-9df1-3344-8678-f11fdeba70ef,14545.0,38
d41b3811-6247-3964-9d04-0fd07ea552e0,1139.0,5
9dd0b6c0-b348-3b60-8d24-db5faa8f861d,81076.0,24
5cf91a69-f08c-3d8f-925f-4f1f96590cac,49125.0,9
d0c84589-6493-368c-a371-d4d01be2b812,6533.0,14
e95ccb7e-97f1-327f-a725-94d96805f325,5809.0,4
6220a1af-f5e6-3107-87dc-9d7d8900bcdf,77726.0,9


#### Calculate operator potential based on SSD & menu data

In [10]:
relevantProductsValue = relevantProducts.join(productAssumptions, on = ['cuEanCode'], how = 'left_outer')
display(
  relevantProductsValue.where(f.col('cuEanCode').isNotNull()) \
  .select('operatorid', 'cuEanCode', 'productName', 'avgQuantityOfUnits', 'avgListingPriceInCents', 'avgTotalProductPrice'))

operatorid,cuEanCode,productName,avgQuantityOfUnits,avgListingPriceInCents,avgTotalProductPrice
/ / 1ca36d9d-e408-30f7-b902-467ff779ec08,6001087360814,knorr professional hearty beef soup,5.589861751152074,15900.0,888.7880184331797
/ / 1ca36d9d-e408-30f7-b902-467ff779ec08,6001328004606,fine foods vinegar sachets,6.690501600853788,12500.0,836.3127001067236
/ / 1ca36d9d-e408-30f7-b902-467ff779ec08,6001087001076,hellmann's tangy mayonnaise 20kg,3.531146087342053,73200.0,2584.798935934383
/ / 1ca36d9d-e408-30f7-b902-467ff779ec08,6001171020259,knorr professional aromat original,13.720322580645162,11100.0,1522.955806451613
/ / 1ca36d9d-e408-30f7-b902-467ff779ec08,6001087307086,robertsons barbecue spice,12.920765790944662,11600.0,1498.8088317495808
/ / 1ca36d9d-e408-30f7-b902-467ff779ec08,6001087307604,robertsons chicken spice,13.179578246392897,11600.0,1528.831076581576
/ / 1ca36d9d-e408-30f7-b902-467ff779ec08,6001087360876,knorr professional minestrone soup,6.229014598540146,15900.0,990.4133211678832
/ / 1ca36d9d-e408-30f7-b902-467ff779ec08,6001087362412,knorr professional original chicken breading 5kg,3.901008249312557,19400.0,756.7956003666361
/ / 1ca36d9d-e408-30f7-b902-467ff779ec08,6001087302333,knorr professional sweet chilli sauce,11.904392764857882,9400.0,1119.0129198966408
/ / 1ca36d9d-e408-30f7-b902-467ff779ec08,6001087001144,hellmann's fine whip salad cream 5kg,3.1814671814671813,22600.0,719.011583011583


In [11]:
relevantProductsValueOperators = relevantProductsValue.groupBy(f.col('operatorid')) \
                                                      .agg(f.sum(f.col('avgTotalProductPrice')).alias('totalProductPrice'))

In [12]:
relevantProductsValueOperators.where(f.col('totalProductPrice') != 0).count()

In [13]:
relevantProductsValueOperators = universe.join(relevantProductsValueOperators, on = ['operatorid'], how = 'left_outer')
relevantProductsValueOperators = relevantProductsValueOperators.crossJoin(productAssumptionsNoRelevantProducts)
relevantProductsValueOperators = relevantProductsValueOperators.withColumn('totalProductPrice', f.when(\
                                                                                                       f.col('totalProductPrice').isNotNull(),\
                                                                                                       f.col('totalProductPrice'))\
                                                                                             .otherwise(f.col('avgTotalProductPriceNoProducts')))


In [14]:
relevantProductsValueOperators.where(f.col('operatorOhubId').isNotNull()).count()

In [15]:
print(
      'relevant products assumptions table row count: ', relevantProductsValueOperators.count(), '\n',
      'relevant products assumptions table id count: ', relevantProductsValueOperators.select('operatorid').distinct().count(), '\n',
      'relevant products assumptions table id count: ', relevantProductsValueOperators.where(f.col('totalProductPrice').isNotNull()).select('operatorid').distinct().count()  
     )

#### adding propensity to buy

In [17]:
propensityScore = spark.table("data_user_hien.propensity_model_za_dum_density_urbanisation_competition_discrete_v2") \
                                                            .withColumnRenamed('score','propensityScore') \
                                                            .withColumn('propensityScore',f.col('propensityScore').cast('double'))

noPropensityScore  = propensityScore.select(f.avg(f.col('propensityScore')).alias('avgPropensityScore'))


In [18]:
propensityScore.count()

In [19]:
operatorValue      = relevantProductsValueOperators.join(propensityScore,on= ['operatorId'],how = 'left_outer')
operatorValue = operatorValue.crossJoin(noPropensityScore)
# operatorValue = operatorValue.withColumn('propensityScore',\
#                                          f.when(f.col('propensityScore').isNotNull(), f.col('propensityScore')).\
#                                          otherwise(f.col('avgPropensityScore')))

operatorValue      = operatorValue.withColumn('finalScore',f.col("totalProductPrice")*f.col("propensityScore"))
operatorValue        = operatorValue.withColumn('finalScore',f.when(\
                                                                    (f.col('propensityScore').isNotNull()), \
                                                                     f.col("totalProductPrice")*f.col("propensityScore")) \
                                                 .otherwise(f.lit(0)))
                                                  

operatorValue = operatorValue.\
                            join(operatorSalesValue.select('operatorSalesValue','operatorOhubId'), on = ['operatorOhubId'], how = 'left_outer') \
                            .distinct()

operatorValue = operatorValue.withColumn('operatorSalesValue',
                                         f.when(f.col('operatorSalesValue').isNull(), 0).otherwise(f.col('operatorSalesValue')))

operatorValue = operatorValue.withColumn('finalScoreCorrected', \
                                         f.when(f.col('operatorSalesValue') > f.col('finalScore'), \
                                                f.col('operatorSalesValue')).otherwise(f.col('finalScore')))

In [20]:
operatorValue.where(f.col('operatorOhubId').isNotNull()).distinct().count()

In [21]:
# avgfinalScoreCorrected  = operatorValue.select(f.avg(f.col('finalScoreCorrected')).alias('avgFinalScoreCorrected'))
avgfinalScore  = operatorValue.select(f.avg(f.col('finalScore')).alias('avgFinalScore'))
operatorValue  = operatorValue.crossJoin(avgfinalScore)
operatorValue  = operatorValue.withColumn("finalScoreCorrected",\
                                          f.when(f.col('finalScoreCorrected') !=0,f.col('finalScoreCorrected')).\
                                          otherwise(f.col('avgfinalScore')))

In [22]:
w1 = Window.orderBy(f.col('finalScoreCorrected').asc())
operatorValue = operatorValue.withColumn("valueTier", f.ntile(10).over(w1))


In [23]:

hiveTable = 'dev_derived_eotm.output_valuetier_' + countryCode
deltaTable = '/mnt/datamodel/dev/derived/ovalue/output/valueTier_' + countryCode 

operatorValue.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .save(deltaTable)

sqlQuery1 = "drop table if exists " + hiveTable
sqlQuery2 = "create table " + hiveTable + " using delta location '" + deltaTable + "'"
spark.sql(sqlQuery1)
spark.sql(sqlQuery2)


#### Validation of propensity score, product to menu fit & value tier

##### propensity score

In [26]:
display(operatorValue)

operatorOhubID,operatorId,osmId,placeIdGoogle,name,address,postalCode,city,latitude,longitude,businessType,website,phone,nameGoogle,addressGoogle,postalCodeGoogle,cityGoogle16,latitudeGoogle,longitudeGoogle,businessTypeGoogle,websiteGoogle,type,uid,user_sid,nameOSM,addressOSM,postalCodeOSM,cityOSM,latitudeOSM,longitudeOSM,businessTypeOSM,websiteOSM,cuisineTypeOSM,phoneOSM,nameOHUB,addressOhub,zipcodeOHUB,cityOHUB,operatorConcatID,channelOHUB,source,globalChannel,cuisineType,chain,globalListChannels,addressCombined,CityGoogle46,ohubId,totalProductPrice,avgTotalProductPriceNoProducts,label,propensityScore,avgPropensityScore,finalScore,operatorSalesValue,finalScoreCorrected,avgFinalScore,valueTier
8a8285e2-aad8-4e2b-879b-e6e7d1cc8f25,/ / 8a8285e2-aad8-4e2b-879b-e6e7d1cc8f25,,,Michelle,,,,,,Unassigned,,,,,,,,,,,,,,,,,,,,,,,,Michelle,,,,,Unassigned,OHUB,Other,Unknown,Unknown,[Other],Michelle,,,1690.020985199781,1690.020985199781,,,0.6046551034096741,0.0,38.0,38.0,2941.745339416348,1
d2b1da13-dc1c-4bef-89b2-5850854d87cb,/ / d2b1da13-dc1c-4bef-89b2-5850854d87cb,,,Pulani Catering,Alhof & Ryk tulbach Kitchen,6505,Mossel Bay,-34.1772563,22.122066,other-institutions,,,,"Alhof Drive , Mossel Bay",6506.0,,-34.1772563,22.122066,['route'],,,,,,,,,,,,,,,Pulani Catering,Alhof & Ryk tulbach Kitchen,6505,Mossel Bay,ZA~EMAKINA~ff643c51-fff2-4801-b081-a5452f1b383e,other-institutions,OHUB,Other,Unknown,Unknown,[Other],"Pulani Catering,Alhof & Ryk tulbach Kitchen,Mossel Bay",South Cape DC,,58.92573872472784,1690.020985199781,dep_dum_buyer,0.5292635167141772,0.6046551034096741,31.187243702430223,42.0,42.0,2941.745339416348,1
f16ad552-11da-367b-84e7-34aeb60170fe,/ / f16ad552-11da-367b-84e7-34aeb60170fe,,,Lee Mich Solutions,,9323,,,,other_institutions,,,,,,,,,,,,,,,,,,,,,,,,Lee Mich Solutions,,9323,,ZA~EMAKINA~eb9ace3a-46b8-476a-8566-814c6df9ee76,other_institutions,OHUB,Other,Unknown,Unknown,[Other],Lee Mich Solutions,,,574.6811767511465,1690.020985199781,,,0.6046551034096741,0.0,58.0,58.0,2941.745339416348,1
b415b5db-a455-30e9-b793-80595a89b509,/ / b415b5db-a455-30e9-b793-80595a89b509,,,ROBBERG SAMPLE ACOOUNT,"Theron Street, Industria, Plettenberg Bay, 6600",6600,,,,Wholesalers/Resellers & Distributors,,,,,,,,,,,,,,,,,,,,,,,,ROBBERG SAMPLE ACOOUNT,"Theron Street, Industria, Plettenberg Bay, 6600",6600,,ZA~SSD_OTHER~UFS_SSD-ID_10441,Wholesalers/Resellers & Distributors,OHUB,Motel,Unknown,Unknown,"[Other, Motel]","ROBBERG SAMPLE ACOOUNT,Theron Street, Industria, Plettenberg Bay, 6600",,,6644.159819384625,1690.020985199781,,,0.6046551034096741,0.0,60.0,60.0,2941.745339416348,1
71b85807-67de-3194-bb80-f707383d6dbd,/ / 71b85807-67de-3194-bb80-f707383d6dbd,,,ROBBY’S PLACE,5634 ZONE 5 PIMVILLE SOWETO,1809,,-26.27716,27.907366,LETs,,,,"Mokoka Street 5634, Johannesburg",1809.0,,-26.27716,27.907366,"['bar', 'establishment', 'food', 'point_of_interest', 'restaurant']",,,,,,,,,,,,,,,ROBBY’S PLACE,5634 ZONE 5 PIMVILLE SOWETO,1809,,ZA~ARMSTRONG~OPDB00281591SA,LETs,OHUB,Motel,Unknown,Unknown,"[Other, Motel]","ROBBY’S PLACE,5634 ZONE 5 PIMVILLE SOWETO",City of Johannesburg Metropolitan Municipality,71b85807-67de-3194-bb80-f707383d6dbd,81.77777777777777,1690.020985199781,dep_dum_buyer,0.7352864380065701,0.6046551034096741,60.13009093031506,0.0,60.13009093031506,2941.745339416348,1
2a35e6aa-8ecc-3cab-abf2-19f5105721a3,/ / 2a35e6aa-8ecc-3cab-abf2-19f5105721a3,,,Daryls Test,test test,4068,Test,,,hotels,,,,,,,,,,,,,,,,,,,,,,,,Daryls Test,test test,4068,Test,ZA~EMAKINA~97cbb3db-9144-4d04-a03b-23583745d1c4,hotels,OHUB,Hotel,Unknown,Unknown,"[Other, Hotel]","Daryls Test,test test,Test",,,1531.1725961778354,1690.020985199781,,,0.6046551034096741,0.0,66.0,66.0,2941.745339416348,1
68966274-0af5-3b06-a250-c36f213d0966,/ / 68966274-0af5-3b06-a250-c36f213d0966,,,FOUR 20 CAFE,420 PHADIMA SECTION,1432,,,,Restaurant,,,,,,,,,,,,,,,,,,,,,,,,FOUR 20 CAFE,420 PHADIMA SECTION,1432,,ZA~ARMSTRONG~OPDB00255008SA,Restaurant,OHUB,Restaurant,Unknown,Unknown,"[Other, Restaurant, Cafe]","FOUR 20 CAFE,420 PHADIMA SECTION",,,5423.347919959983,1690.020985199781,,,0.6046551034096741,0.0,76.0,76.0,2941.745339416348,1
eb6dadad-7510-3540-b67b-fbba1f1f146d,/ / eb6dadad-7510-3540-b67b-fbba1f1f146d,,,Wild coast snails,,5251,,,,other_buyer,,,,,,,,,,,,,,,,,,,,,,,,Wild coast snails,,5251,,ZA~EMAKINA~cc9821ee-3014-4c21-84bc-3e3575869b66,other_buyer,OHUB,Other,Unknown,Unknown,[Other],Wild coast snails,,,1690.020985199781,1690.020985199781,,,0.6046551034096741,0.0,76.0,76.0,2941.745339416348,1
8a494f82-49b5-3bf2-9306-58a50bf607e3,/ / 8a494f82-49b5-3bf2-9306-58a50bf607e3,,,HUIS LETTIE THERON FEEDEM,DE GOEDE STREET,7200,,-34.42153,19.23114,Elderly homes,,,,"De Goede Street , Hermanus",7200.0,,-34.42153,19.23114,"['establishment', 'health', 'point_of_interest']",,,,,,,,,,,,,,,HUIS LETTIE THERON FEEDEM,DE GOEDE STREET,7200,,ZA~ARMSTRONG~OPDB00368402SA,Elderly homes,OHUB,Elderly home,Unknown,Unknown,"[Other, Elderly home]","HUIS LETTIE THERON FEEDEM,DE GOEDE STREET",Overberg District Municipality,8a494f82-49b5-3bf2-9306-58a50bf607e3,171.3844559585492,1690.020985199781,dep_dum_buyer,0.4683252497796555,0.6046551034096741,80.26366814513794,52.0,80.26366814513794,2941.745339416348,1
3374dfaf-035e-32a2-b9f9-a5f8731240ca,/ / 3374dfaf-035e-32a2-b9f9-a5f8731240ca,,,EATFRESH FREYS,,3600,,,,catering,,,,,,,,,,,,,,,,,,,,,,,,EATFRESH FREYS,,3600,,ZA~EMAKINA~a4a89101-b7b5-4047-a994-8918e1ab300f,catering,OHUB,Other,Unknown,Unknown,[Other],EATFRESH FREYS,,,789.7188940092166,1690.020985199781,,,0.6046551034096741,0.0,81.0,81.0,2941.745339416348,1


In [27]:
operatorValue.where(f.col('operatorOhubId').isNotNull()).count()

In [28]:
validatePropensityScore = operatorValue.\
select('operatorid', 'propensityScore','operatorOhubID', 'operatorSalesValue', f.when(f.col('operatorOhubID').isNotNull() ,1).otherwise(0).alias('buyer')).distinct()

w1 = Window.orderBy(f.col('propensityScore').asc())
                                                               
validatePropensityScore = validatePropensityScore.where(f.col('propensityScore').isNotNull()).withColumn("decilePropensityScore", f.ntile(10).over(w1))

display(validatePropensityScore.groupBy(f.col('decilePropensityScore')).\
agg(
    f.countDistinct(f.col('operatorID')), \
    f.min(f.col('propensityScore')), \
    f.max(f.col('propensityScore')), \
    f.avg(f.col('operatorSalesValue')),\
    f.avg(f.col('buyer'))))\

decilePropensityScore,count(operatorID),min(propensityScore),max(propensityScore),avg(operatorSalesValue),avg(buyer)
1,5390,0.2836057762621502,0.3982497096627385,7224.0400742115025,0.425417439703154
2,5390,0.3982497096627385,0.451664825769066,7127.599628942486,0.39721706864564
3,5390,0.451664825769066,0.5058577319848541,9160.769758812616,0.4597402597402597
4,5390,0.5058577319848541,0.5397056813213178,9020.872727272726,0.5196660482374769
5,5390,0.5397056813213178,0.591118585031571,12705.40556586271,0.5278293135435993
6,5390,0.591118585031571,0.6268868853499784,8345.516141001855,0.5476808905380334
7,5390,0.6268868853499784,0.7020476183110764,10539.810204081632,0.6042671614100186
8,5390,0.7020476183110764,0.7724794653598196,12526.43654916512,0.7473098330241187
9,5390,0.7724794653598196,0.8388962017793165,15374.609461966606,0.8834879406307977
10,5390,0.8388962017793165,0.9593987054769516,13666.174025974024,0.8966604823747681


##### product to menu fit

In [30]:
# operatorProductToMenuFit = relevantProducts.where(f.col('rationale') == 'Menu Data').groupBy(f.col('operatorid')).agg(f.countDistinct(f.col('cuEanCode')).alias('productToMenuFit'))
# validateProductToMenuFit = universe.\
# join(operatorProductToMenuFit, on = ['operatorID'], how = 'left_outer').\
# join(operatorSalesValue, on = ['operatorConcatId'], how = 'left_outer').\
# join(propensityScore, on = ['operatorID'], how = 'left_outer').\
# select('operatorid', 'productToMenuFit', 'operatorSalesValue','grab','propensityScore',f.when(f.col('operatorSalesValue').isNotNull(),1).otherwise(0).alias('buyer')).distinct()

# validateProductToMenuFit = validateProductToMenuFit.withColumn('relevantProducts',f.when(f.col('grab').isNotNull(),f.col('grab')).otherwise(f.col('productToMenuFit')))
# validateProductToMenuFit = validateProductToMenuFit.crossJoin(relevantProductsGrabOperators).\
# withColumn('relevantProducts',f.when(f.col('relevantProducts').isNotNull(),f.col('relevantProducts')).otherwise(f.col('avgGrab')))

# validateProductToMenuFit = validateProductToMenuFit.withColumn('grabMeasurement',f.col('relevantProducts')*f.col('propensityScore'))

# w = Window.orderBy(f.col('grabMeasurement'))
                                                               
# validateProductToMenuFit = validateProductToMenuFit.where(f.col('grabMeasurement').isNotNull()).withColumn("decileProductToMenuFit", f.ntile(5).over(w))

# display(validateProductToMenuFit.groupBy(f.col('decileProductToMenuFit')).\
# agg(f.countDistinct(f.col('operatorID')), \
#     f.min(f.col('grabMeasurement')), \
#     f.max(f.col('grabMeasurement')), \
#     f.avg(f.col('operatorSalesValue')),\
#     f.avg(f.col('buyer'))))

##### value Tier

In [32]:
validateValuetier   = operatorValue.\
select('operatorid','operatorOhubId', 'finalScoreCorrected', 'operatorSalesValue', f.when(f.col('operatorSalesValue') != 0 ,1) \
         .otherwise(0).alias('buyer')).distinct()

w1 = Window.orderBy(f.col('finalScoreCorrected').asc())
                                                               
validateValuetier = validateValuetier.where(f.col('finalScoreCorrected').isNotNull()).withColumn("valuetier", f.ntile(10).over(w1))

display(validateValuetier.groupBy(f.col('valuetier')).\
agg(
    f.countDistinct(f.col('operatorID')), \
    f.countDistinct(f.col('operatorOhubId')), \
    f.sum(f.col('buyer')), \
    f.min(f.col('finalScoreCorrected')), \
    f.max(f.col('finalScoreCorrected')), \
    f.avg(f.col('operatorSalesValue')),\
    f.avg(f.col('buyer'))))



valuetier,count(operatorID),count(operatorOhubId),sum(buyer),min(finalScoreCorrected),max(finalScoreCorrected),avg(operatorSalesValue),avg(buyer)
1,6530,2302,135,42.0,724.5398383670334,5.464012251148545,0.0206738131699846
6,6530,5097,31,1186.4752075552449,1350.1725308469934,3.1301684532924963,0.0047473200612557
3,6530,3013,23,833.7848584235263,928.5153821262812,2.0110260336906585,0.0035222052067381
5,6530,3319,21,1041.7763753665351,1186.4752075552449,2.039356814701378,0.0032159264931087
9,6529,6525,1874,2941.745339416348,11684.380832461042,1161.2400061265123,0.2870271098177362
4,6530,2703,19,928.5153821262812,1041.7763753665351,1.7309341500765696,0.0029096477794793
8,6530,6519,0,2941.745339416348,2941.745339416348,0.0,0.0
7,6530,5572,328,1350.1725308469934,2941.745339416348,53.4281776416539,0.050229709035222
10,6529,6529,6343,11696.800537480223,14050232.0,90704.89293919435,0.9715117169551232
2,6530,2194,30,724.5398383670334,833.7848584235263,1.6895865237366003,0.004594180704441
