Simple script to find top performing shelves based on facet control data from pcs.qarth_facet_control_metric

Check job status here: http://bfd-main-spark.walmartlabs.com:18080

Here are the departments and their number of shelves:
<ol>
<li>
   4 Cell Phones
</li><li>
   4 Gifts & Registry
</li><li>
   6 Photo Center
</li><li>
  28 Household Essentials
</li><li>
  30 Seasonal
</li><li>
  32 Jewelry
</li><li>
  35 Beauty
</li><li>
  40 Video Games
</li><li>
  55 Pets
</li><li>
 106 Baby
</li><li>
 108 Health
</li><li>
 114 Music
</li><li>
 121 Toys
</li><li>
 143 Food
</li><li>
 146 Party & Occasions
</li><li>
 203 Patio & Garden
</li><li>
 219 Office
</li><li>
 254 Clothing
</li><li>
 272 Movies & TV
</li><li>
 273 Home Improvement
</li><li>
 279 Electronics
</li><li>
 315 Sports & Outdoors
</li><li>
 375 Auto & Tires
</li><li>
 508 Home
</li><li>
 634 Books
</li>
</ol>

In [1]:
import sys, os, time
import json

from pyspark.sql import HiveContext

sqlContext = HiveContext(sc)

username = os.environ.get('USER')
control_metric_table = 'qarth_facet_control_metric'

print username, 'accessing hive table', control_metric_table
print time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())

pcs accessing hive table qarth_facet_control_metric
2016-11-03 17:02:31


In [2]:
pcs_control_metric_table = 'pcs.' + control_metric_table
# tableDef is a list[Row] 
tableDef = sqlContext.sql("DESCRIBE " + pcs_control_metric_table).collect()
for x in range(0,10):
    print tableDef[x]

Row(col_name=u'shelf_id', data_type=u'string', comment=None)
Row(col_name=u'facet', data_type=u'string', comment=None)
Row(col_name=u'items', data_type=u'int', comment=None)
Row(col_name=u'sales', data_type=u'int', comment=None)
Row(col_name=u'sales_avg', data_type=u'decimal(10,2)', comment=None)
Row(col_name=u'sales_percent', data_type=u'decimal(10,2)', comment=None)
Row(col_name=u'modulator_selected', data_type=u'string', comment=None)
Row(col_name=u'click_weight', data_type=u'decimal(10,2)', comment=None)
Row(col_name=u'purchase_weight', data_type=u'decimal(10,2)', comment=None)
Row(col_name=u'shelf_name', data_type=u'string', comment=None)


In [3]:
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

since_date_id='20161025'
since_date = datetime.strptime(since_date_id, '%Y%M%d')
str_to_date_diff = lambda x: (datetime.strptime(x, '%Y%M%d') - since_date).days
str_to_date_diff_udf =  udf (str_to_date_diff, IntegerType())

print 'starting data frame creation for control metric ...', time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())

metric_df = sqlContext.sql("SELECT shelf_id, date_id, items, sales, sales_percent, click_weight, purchase_weight, shelf_name FROM " + pcs_control_metric_table \
                           + " WHERE date_id>=" + since_date_id + " AND facet='__AVG__' AND items > 100" \
                          + " DISTRIBUTE BY shelf_id, shelf_name")

metric_df = metric_df.withColumn('date_diff', str_to_date_diff_udf(F.col('date_id')))

metric_df.printSchema()
# metric_df.describe().show()

print 'finishing data frame creation for control metric ...', time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())


starting data frame creation for control metric ... 2016-11-03 17:02:56
root
 |-- shelf_id: string (nullable = true)
 |-- date_id: string (nullable = true)
 |-- items: integer (nullable = true)
 |-- sales: integer (nullable = true)
 |-- sales_percent: decimal(10,2) (nullable = true)
 |-- click_weight: decimal(10,2) (nullable = true)
 |-- purchase_weight: decimal(10,2) (nullable = true)
 |-- shelf_name: string (nullable = true)
 |-- date_diff: integer (nullable = true)

finishing data frame creation for control metric ... 2016-11-03 17:02:58


In [4]:
print 'starting data frame creation for correlation ...', time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())

corr_df = metric_df.groupBy('shelf_id', 'shelf_name') \
    .agg(
        F.round(F.avg(metric_df.items), 0).alias('avg_items'),
        F.round(F.avg(metric_df.sales_percent), 5).alias('avg_sales_percent'),
        F.round(F.avg(metric_df.click_weight), 3).alias('avg_click_weight'),
        F.round(F.avg(metric_df.purchase_weight), 3).alias('avg_purchase_weight'),
        F.round(F.corr(metric_df.sales, metric_df.click_weight), 5).alias('click_corr'), 
        F.round(F.corr(metric_df.sales, metric_df.purchase_weight), 5).alias('purchase_corr'), 
        F.round(F.corr(metric_df.date_diff, metric_df.sales) * F.stddev(metric_df.sales), 7).alias('sales_slope'), 
        F.round(F.corr(metric_df.date_diff, metric_df.click_weight) * F.stddev(metric_df.click_weight), 7).alias('click_slope'), 
        F.round(F.corr(metric_df.date_diff, metric_df.purchase_weight) * F.stddev(metric_df.purchase_weight), 7).alias('purchase_slope'))

corr_df.printSchema()
# corr_list.explain()

print 'finishing data frame creation for  correlation ...', time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())


starting data frame creation for correlation ... 2016-11-03 17:02:58
root
 |-- shelf_id: string (nullable = true)
 |-- shelf_name: string (nullable = true)
 |-- avg_items: double (nullable = true)
 |-- avg_sales_percent: decimal(14,5) (nullable = true)
 |-- avg_click_weight: decimal(14,3) (nullable = true)
 |-- avg_purchase_weight: decimal(14,3) (nullable = true)
 |-- click_corr: double (nullable = true)
 |-- purchase_corr: double (nullable = true)
 |-- sales_slope: double (nullable = true)
 |-- click_slope: double (nullable = true)
 |-- purchase_slope: double (nullable = true)

finishing data frame creation for  correlation ... 2016-11-03 17:02:58


In [5]:
print 'starting top shelves computation ...', time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())

corr_list = corr_df.filter('click_corr > 0.5').filter('click_slope > 0').cache()

top_shelf_list = corr_list.orderBy(['click_slope', 'click_corr'], ascending=[0, 0]).take(100)

print 'top shelves in terms of the trending facet click weights and correlation between facet clicks and sales:'

top_shelf_list

starting top shelves computation ... 2016-11-03 17:02:58
top shelves in terms of the trending facet click weights and correlation between facet clicks and sales:


[Row(shelf_id=u'0:5438:133195:164064', shelf_name=u"Clothing/Women's Plus/Women's Plus Swimwear", avg_items=556.0, avg_sales_percent=Decimal('6.37444'), avg_click_weight=Decimal('10.231'), avg_purchase_weight=Decimal('9.683'), click_corr=0.94591, purchase_corr=0.92908, sales_slope=7.6186254, click_slope=1.01356, purchase_slope=0.8744581),
 Row(shelf_id=u'0:5438:133195:1228776:1218834', shelf_name=u"Clothing/Women's Plus/Women's Plus Bottoms/Women's Plus Skirts", avg_items=904.0, avg_sales_percent=Decimal('8.13667'), avg_click_weight=Decimal('11.537'), avg_purchase_weight=Decimal('11.018'), click_corr=0.72035, purchase_corr=0.74868, sales_slope=14.5766491, click_slope=0.842003, purchase_slope=0.6408187),
 Row(shelf_id=u'0:5428:4091:8774540:5944564', shelf_name=u'Patio & Garden/Gardening & Lawn Care/Hydroponics/Hydroponic Systems', avg_items=940.0, avg_sales_percent=Decimal('0.37857'), avg_click_weight=Decimal('1.150'), avg_purchase_weight=Decimal('0.733'), click_corr=0.56612, purchase_c

In [6]:
print 'finishing top shelves computation ...', time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())

top_shelf_id_list = map(lambda x: x.shelf_id, top_shelf_list)

finishing top shelves computation ... 2016-11-03 17:09:24


In [7]:
#corr_list.orderBy(['click_slope'], ascending=[0]).take(10)
#corr_list.orderBy(['click_slope'], ascending=[1]).take(10)

In [8]:
#corr_list.orderBy(['click_corr'], ascending=[0]).take(10)
#corr_list.orderBy(['click_corr'], ascending=[1]).take(10)

In [10]:
pcs_update_metric_table = 'pcs.qarth_facet_update_metric'

# the facet update time window should be further in the past
since_date_id='20161020'
change_hql = "SELECT shelf_id, facet, sum(items) as items_cnt FROM " + pcs_update_metric_table \
                          + " WHERE date_id>=" + since_date_id \
                           + " AND shelf_id IN {0} GROUP BY shelf_id, facet ORDER BY shelf_id, items_cnt" \
                           .format('("' + '","'.join(map(lambda x: str(x), top_shelf_id_list)) + '")')

print change_hql

print 'starting facet update computation ...', time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
shell_hive_out=!hive -S -e '$change_hql'
print 'finishing facet updates computation ...', time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())

shell_hive_out[:10]

SELECT shelf_id, facet, sum(items) as items_cnt FROM pcs.qarth_facet_update_metric WHERE date_id>=20161020 AND shelf_id IN ("0:5438:133195:164064","0:5438:133195:1228776:1218834","0:5428:4091:8774540:5944564","0:5438:133195:1218833","0:3891:540912:1228412","0:5438:133195:1228775:1218840","0:5438:133195:1228776:1218838","0:5438:133201:1228778:1225056","0:5438:133284:1227863:1199552","0:5438:133195:1228776:1218835","0:3944:1060825:1229815:1229820","0:5438:133201:1228777:1225063","0:5438:133195:1218839","0:4044:103150:102547:539386:927959","0:5438:133201:1228777:1225061","0:5427:978579:1323123:654419","0:5438:133202:1218844","0:5438:133195:1228776:1218837","0:5438:133198:1228782:1224684","0:5438:133195:593002","0:3891:540912:1228376","0:4044:539103:9474113:5287544","0:5438:133201:1228778:1225058","0:4044:539103:133061","0:5438:1045804:1045806:1228547","0:5438:133198:538913","0:5438:133195:163854","0:4044:539103:1024979","0:5438:133198:538914","0:4125:546956:1107532:1080108:1080304","0:543

['',
 'Attached to the *bfd-main* Hadoop cluster',
 '0:2637:667479:8516310:4742028\t"number_of_pieces":"I"\t1',
 '0:2637:667479:8516310:4742028\t"number_of_pieces_raw_data":"I"\t1',
 '0:2637:667479:8516310:4742028\t"occasion":"I"\t1',
 '0:2637:667479:8516310:4742028\t"type":"I"\t1',
 '0:2637:667479:8516310:4742028\t"type":"D"\t1',
 '0:2637:667479:8516310:4742028\t"Primary Shelf":"M"\t1',
 '0:2637:667479:8516310:4742028\t"assembled_product_length":"I"\t1']

In [11]:
print 'top facet updates in terms of accumulated number of items:'

for shelf_id in top_shelf_id_list:
    print filter(lambda x: x.shelf_id == shelf_id, top_shelf_list)
    print '\t', '\n\t'.join(filter(lambda line: line.startswith(shelf_id + '\t'), shell_hive_out[3:])[-20:])

top facet updates in terms of accumulated number of items:
[Row(shelf_id=u'0:5438:133195:164064', shelf_name=u"Clothing/Women's Plus/Women's Plus Swimwear", avg_items=556.0, avg_sales_percent=Decimal('6.37444'), avg_click_weight=Decimal('10.231'), avg_purchase_weight=Decimal('9.683'), click_corr=0.94591, purchase_corr=0.92908, sales_slope=7.6186254, click_slope=1.01356, purchase_slope=0.8744581)]
	0:5438:133195:164064	"actual_color":"I"	30
	0:5438:133195:164064	"size":"I"	30
	0:5438:133195:164064	"style":"M"	32
	0:5438:133195:164064	"type":"I"	39
	0:5438:133195:164064	"category":"I"	42
	0:5438:133195:164064	"autographed":"I"	42
	0:5438:133195:164064	"apparel_category":"I"	45
	0:5438:133195:164064	"clothing_size":"D"	48
	0:5438:133195:164064	"fabric_material":"I"	48
	0:5438:133195:164064	"size_raw_unit":"M"	57
	0:5438:133195:164064	"package_quantity":"D"	62
	0:5438:133195:164064	"clothing_size":"M"	77
	0:5438:133195:164064	"Primary Shelf":"M"	79
	0:5438:133195:164064	"lifestage":"I"	80


In [None]:
print 'STOP HERE'

Summary of major contributions from facet updates:
<ul>
<li>Clothing: lifestage, gender, fabric_material, clothing_size_group, Primary Shelf
<li>Electronics: television_type, screen_size, operating_system
<li>Home: number_of_pieces, hypoallergenic,
<li>Jewelry: occasion, brand, Primary Shelf, ring_type
<li>Sports & Outdoors: lifestage, age_group, sport_type, brand
</ul>

In [None]:
change_df = sqlContext.sql(change_hql)

change_df.printSchema()

# change_df = change_df[change_df.shelf_id.isin(top_shelf_id_list)]
# change_list = change_df.groupBy('shelf_id', 'facet').agg(F.sum('items').alias('items_cnt')).orderBy(['shelf_id', 'items_cnt'], ascending=[1, 0]).collect()

change_list = change_df.collect()

print 'top facet updates in terms of accumulated number of items:'

for shelf_id in top_shelf_id_list:
    print filter(lambda x: x.shelf_id == shelf_id, top_shelf_list)
    for facet_change in filter(lambda x: x.shelf_id == shelf_id, change_list)[:20]:
        print '\t', map(lambda x: x.facet + '\t' + x.items_cnt, facet_change)
        
print 'finishing facet updates computation ...', time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())