In [8]:
import pandas as pd
import numpy as np
import json

In [23]:
import pyspark

In [113]:
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import date_sub
from pyspark.sql.functions import udf
from pyspark.sql.window import Window

In [3]:
sqlContext = SQLContext(sc)

In [None]:
!ls lentahack

## Data Loading

In [26]:
checks_ofr_df = spark.read.csv(path='./lentahack/checks_ofr_df.csv', header=True, inferSchema=True)
checks_ofr_df.cache().count(), checks_ofr_df.columns

(21512276,
 ['client_id',
  'date',
  'day',
  'shop_id',
  'check_id',
  'time',
  'sku',
  'promo_id',
  'check_pos',
  'num_sales',
  'supplier_price',
  'selling_price',
  'region_name',
  'index',
  'Promo_type',
  'Offer_ID',
  'start_date',
  'end_date',
  'train_test_group',
  'hierarchy_level1',
  'hierarchy_level2',
  'hierarchy_level3',
  'hierarchy_level4',
  'UpLift'])

In [92]:
import datetime as dt
@udf(DateType())
def to_time(time_str):
     return dt.datetime.strptime(time_str, '%Y-%m-%d')

In [35]:
checks_ofr_df = checks_ofr_df.withColumn('date', to_time(F.col('date')))
checks_ofr_df.show(5)

+--------------------+----------+--------+--------------------+--------------------+--------+--------------------+--------------------+---------+---------+--------------+-------------+--------------------+-----+----------+--------+----------+--------+----------------+--------------------+--------------------+--------------------+--------------------+------+
|           client_id|      date|     day|             shop_id|            check_id|    time|                 sku|            promo_id|check_pos|num_sales|supplier_price|selling_price|         region_name|index|Promo_type|Offer_ID|start_date|end_date|train_test_group|    hierarchy_level1|    hierarchy_level2|    hierarchy_level3|    hierarchy_level4|UpLift|
+--------------------+----------+--------+--------------------+--------------------+--------+--------------------+--------------------+---------+---------+--------------+-------------+--------------------+-----+----------+--------+----------+--------+----------------+------------

In [93]:
checks_ofr_df = checks_ofr_df.withColumn('start_date', to_time(F.col('start_date')))\
                             .withColumn('end_date', to_time(F.col('end_date')))

In [96]:
checks_ofr_df = checks_ofr_df.withColumn('promo_duration', F.datediff(F.col('end_date'),F.col('start_date')))
checks_ofr_df = checks_ofr_df.withColumn('pred_start_date', F.expr("date_sub(start_date, promo_duration)") )
checks_ofr_df = checks_ofr_df.withColumn('pred_end_date', F.expr("date_sub(end_date, promo_duration)") )

In [97]:
checks_ofr_df.dtypes

[('client_id', 'string'),
 ('date', 'date'),
 ('day', 'int'),
 ('shop_id', 'string'),
 ('check_id', 'string'),
 ('time', 'string'),
 ('sku', 'string'),
 ('promo_id', 'string'),
 ('check_pos', 'int'),
 ('num_sales', 'double'),
 ('supplier_price', 'double'),
 ('selling_price', 'double'),
 ('region_name', 'string'),
 ('index', 'double'),
 ('Promo_type', 'string'),
 ('Offer_ID', 'string'),
 ('start_date', 'date'),
 ('end_date', 'date'),
 ('train_test_group', 'string'),
 ('hierarchy_level1', 'string'),
 ('hierarchy_level2', 'string'),
 ('hierarchy_level3', 'string'),
 ('hierarchy_level4', 'string'),
 ('UpLift', 'double'),
 ('promo_duration', 'int'),
 ('pred_start_date', 'date'),
 ('pred_end_date', 'date')]

In [5]:
!ls data

off_to_skus_map.json  submission2.csv			  test_category_ct.csv
offers_df_agg.csv     submission3.csv			  test_other_ofrs.csv
offers_df_agg.json    submission4.csv			  train_category_ct.csv
submission1.csv       test_Offer_ID_map_category_ct.json  train_other_ofrs.csv


In [6]:
offers_df_agg = spark.read.csv(path='./data/offers_df_agg.json', header=True, inferSchema=True)

In [37]:
offers_df_agg.columns[:10]

['Offer_ID',
 'Promo_type',
 'start_date',
 'end_date',
 'train_test_group',
 'num_skus',
 'num_hl1',
 'num_hl2',
 'num_hl3',
 'num_hl4']

## Time Conversion

In [38]:
offers_df_agg = offers_df_agg.withColumn('start_date', to_time(F.col('start_date')))
offers_df_agg = offers_df_agg.withColumn('end_date', to_time(F.col('end_date')))

In [43]:
offers_df_agg = offers_df_agg.withColumn('duration', F.datediff(F.col('end_date'),F.col('start_date')))

In [44]:
offers_df_agg.select('duration').dtypes

[('duration', 'int')]

In [52]:
offers_df_agg = offers_df_agg.withColumn('pred_start_date', F.expr("date_sub(start_date, duration)") )

In [54]:
offers_df_agg = offers_df_agg.withColumn('pred_end_date', F.expr("date_sub(end_date, duration)") )

In [59]:
ls

[0m[01;34mAzureML[0m/                       dana2_eda.ipynb     [01;34mpytorch[0m/
Data-Prep-Spark-Alisa.ipynb    dana_eda.ipynb      submission0.csv
[01;34mMMLSpark[0m/                      [01;34mdata[0m/               test_Offer_ID_map.json
Nazar.ipynb                    derby.log           test_as_is.csv
[01;34mSparkML[0m/                       [01;34mexplanation[0m/        test_category_ct.csv
alisa-data-prep.ipynb          [01;34mh2o[0m/                train_as_is-Copy1.csv
automated_ml_errors-child.log  [01;34mjulia[0m/              train_as_is.csv
automated_ml_errors.log        [01;34mlentahack[0m/          Плавильня-Alisa.ipynb
automl.log                     [34;42mlogs[0m/               Плавильня-Copy1.ipynb
[01;34mcache[0m/                         model_1             Плавильня-Copy2.ipynb
[01;34mcatboost[0m/                      offers_date.csv     Плавильня.ipynb
[01;34mcatboost_info[0m/                 [01;34moffers_df_agg.pqt[0m/


In [56]:
offers_df_agg.write.parquet('offers_df_agg.pqt')

In [60]:
offers_df_agg.select('duration', 'start_date', 'pred_start_date', 'pred_end_date')

DataFrame[duration: int, start_date: date, pred_start_date: date, pred_end_date: date]

In [None]:
checks_ofr_df = checks_ofr_df.withColumn('date', to_time(F.col('date')))

In [9]:
off_to_skus_map = json.load(open('./data/off_to_skus_map.json' , 'r'))

## Skipping... 

In [None]:
offers_df = pd.read_csv('./lentahack/20210521_offers.csv')
offers_df['start_date'] = pd.to_datetime(offers_df['start_date'], format="%Y%m%d")
offers_df['end_date'] = pd.to_datetime(offers_df['end_date'], format="%Y%m%d")
offers_df.shape, offers_df.columns

In [None]:
hierarchy_df = pd.read_csv('./lentahack/20210518_hierarchy.csv')
hierarchy_df.shape, hierarchy_df.columns

In [None]:
uplift_df = pd.read_csv('./lentahack/20210518_uplift.csv')
uplift_df.shape, uplift_df.columns

In [None]:
offers_df = offers_df.merge(hierarchy_df, on='sku', how='left')
offers_df.shape, offers_df.columns

In [None]:
offers_df_agg = offers_df.groupby(['Offer_ID', 'Promo_type', 'start_date', 'end_date', 'train_test_group']).agg({'sku': lambda x: list(x),
                                    'hierarchy_level1': lambda x: list(x), 
                                   'hierarchy_level2': lambda x: list(x), 
                                   'hierarchy_level3': lambda x: list(x), 
                                   'hierarchy_level4': lambda x: list(x),                                                                                              
                                    })

In [None]:
offers_df_agg.reset_index(inplace=True)

In [None]:
offers_df_agg['num_skus'] = offers_df_agg['sku'].apply(lambda x: len(set(x)))
offers_df_agg['num_hl1'] = offers_df_agg['hierarchy_level1'].apply(lambda x: len(set(x)))
offers_df_agg['num_hl2'] = offers_df_agg['hierarchy_level2'].apply(lambda x: len(set(x)))
offers_df_agg['num_hl3'] = offers_df_agg['hierarchy_level3'].apply(lambda x: len(set(x)))
offers_df_agg['num_hl4'] = offers_df_agg['hierarchy_level4'].apply(lambda x: len(set(x)))

In [None]:
from collections import Counter

def category_map(level): 
    global hierarchy_df
    if level not in [1, 2, 3, 4]: 
        raise ValueError 
    
    cat_list = hierarchy_df[f'hierarchy_level{level}'].unique()
    cat_map = {k:v for v, k in enumerate(cat_list)} 
    headers = [f'hl_{level}_{i}' for i, v in enumerate(cat_list)]
    return headers, cat_map 

def add_category_at_level(df, level): 
    COL_NAME = 'hl_{}_{}'
    
    cat_headers, cat_map = category_map(level)    
    df[cat_headers] = np.zeros((df.shape[0], len(cat_headers)))
    
    for i, row in df.iterrows():
        cats = row[f'hierarchy_level{level}']        
        cat_ct = Counter(cats)
        for cat in cat_ct: 
            cat_idx = cat_map[cat] 
            df[COL_NAME.format(level, cat_idx)].loc[i] = cat_ct[cat]
    return cat_map        

In [None]:
add_category_at_level(offers_df_agg, 1)
add_category_at_level(offers_df_agg, 2)
add_category_at_level(offers_df_agg, 3)
add_category_at_level(offers_df_agg, 4)
offers_df_agg.shape

In [None]:
offers_df_sku_in_ofrs = offers_df.groupby(['sku', 'start_date','end_date']).agg({'Offer_ID':lambda x: list(x)})

In [None]:
offers_df_sku_in_ofrs.reset_index(inplace=True)

In [None]:
def add_if_in_other_promos(df):
    global offers_df_sku_in_ofrs
    
    df['num_other_promos'] = np.zeros(df.shape[0])
    df['num_skus_in_other_promos'] = np.zeros(df.shape[0])
    
    for i, row in df.iterrows(): 
        skus = row['sku']
        start_date = row['start_date'] 
        end_date = row['end_date'] 
        
        offers_set = set()
        sku_ct = 0 
        for sku in skus: 
             offers = offers_df_sku_in_ofrs[(offers_df_sku_in_ofrs['sku'] == sku) 
                                 & (offers_df_sku_in_ofrs['start_date'] == start_date)
                                 & (offers_df_sku_in_ofrs['end_date'] == end_date)]['Offer_ID'].iloc[0]
                
             offers_set.update(offers)    
             if len(offers) > 1:
                sku_ct += 1 

        df['num_other_promos'].iloc[i] = len(offers_set) - 1
        df['num_skus_in_other_promos'].iloc[i] = sku_ct        
                                                    

In [None]:
offers_df_agg.shape, offers_df_agg.columns[:20]

In [None]:
add_if_in_other_promos(offers_df_agg)

In [None]:
offers_df_agg['duration'] = offers_df_agg['end_date'] - offers_df_agg['start_date'] + pd.Timedelta(days=1)
offers_df_agg['mo'] = offers_df_agg['start_date'].dt.month
offers_df_agg['start_day_of_week'] = offers_df_agg['start_date'].dt.dayofweek
offers_df_agg['end_day_of_week'] = offers_df_agg['end_date'].dt.dayofweek

In [None]:
off_to_skus_df = offers_df_agg[['Offer_ID', 'sku']].copy()
off_to_skus_df = off_to_skus_df.set_index('Offer_ID')

In [None]:
off_to_skus_map = off_to_skus_df.to_dict(orient='index')
list(off_to_skus_map.items())[:5]

In [None]:
import json 

json.dump(off_to_skus_map, open('./data/off_to_skus_map.json', 'w'))

In [None]:
COLUMNS_TO_EXCLUDE = ['sku', 'hierarchy_level1', 'hierarchy_level2', 'hierarchy_level3', 'hierarchy_level4']

In [None]:
offers_df_agg_pdf = offers_df_agg.drop(COLUMNS_TO_EXCLUDE, axis=1)
offers_df_agg.shape, offers_df_agg_pdf.shape

In [None]:
!ls data

In [None]:
offers_df_agg_pdf.to_csv('./data/offers_df_agg.csv', index=False)

In [None]:
offers_df_agg_df = sqlContext.createDataFrame(offers_df_agg_pdf)
offers_df_agg_df.cache().count(), offers_df_agg_df.columns[:3]

In [14]:
offers_df_agg.dtypes[:10]

[('Offer_ID', 'string'),
 ('Promo_type', 'string'),
 ('start_date', 'string'),
 ('end_date', 'string'),
 ('train_test_group', 'string'),
 ('num_skus', 'int'),
 ('num_hl1', 'int'),
 ('num_hl2', 'int'),
 ('num_hl3', 'int'),
 ('num_hl4', 'int')]

In [62]:
checks_ofr_df.show(3, truncate=False, vertical=True)

-RECORD 0--------------------------------------------
 client_id        | d9da50b77962ad6401d0527db5b65b04 
 date             | 2019-12-30                       
 day              | 20191230                         
 shop_id          | 25bbdcd06c32d477f7fa1c3e4a91b032 
 check_id         | c0b3f9c2215f924e27a6039617999c60 
 time             | 07:28:39                         
 sku              | b17c870027bb4a22e3aedb971bc00def 
 promo_id         | b3deb0286313f0b888c0eac49580cc23 
 check_pos        | 1                                
 num_sales        | 5.0                              
 supplier_price   | 133.4                            
 selling_price    | 249.45                           
 region_name      | 152f1b77a32508570e2745daf9ce7aec 
 index            | null                             
 Promo_type       | null                             
 Offer_ID         | null                             
 start_date       | null                             
 end_date         | null    

In [None]:
2019-12-30

In [77]:
sales_for_sku_df = checks_ofr_df.filter((F.col('sku') == 'b17c870027bb4a22e3aedb971bc00def') 
                                        & (F.col('date') <= datetime.strptime('2020-01-01', '%Y-%m-%d'))
                                        & (F.col('date') >= datetime.strptime('2019-12-30', '%Y-%m-%d'))
                                       ).select('num_sales', 'supplier_price', 'selling_price')



In [91]:
delta = dt.datetime.strptime('2020-01-01', '%Y-%m-%d') - dt.datetime.strptime('2019-12-30', '%Y-%m-%d') + dt.timedelta(days=1)
delta

datetime.timedelta(days=3)

In [78]:
sales_for_sku_df.show(5)

+---------+--------------+-------------+
|num_sales|supplier_price|selling_price|
+---------+--------------+-------------+
|      5.0|         133.4|       249.45|
|      2.0|         53.36|        99.98|
|      1.0|         26.68|        49.99|
|      1.0|         26.68|        49.99|
|      1.0|         26.68|        49.99|
+---------+--------------+-------------+
only showing top 5 rows



In [71]:
stats = sales_for_sku_df.select(F.mean('selling_price').alias('mean_sales'),
                                F.max('selling_price').alias('max_sales'),
                                F.min('selling_price').alias('min_sales')
                               )

In [79]:
stats2= sales_for_sku_df.select(*[F.mean(c).alias(c) for c in sales_for_sku_df.columns],
          *[F.max(c).alias(c) for c in sales_for_sku_df.columns],
         *[F.min(c).alias(c) for c in sales_for_sku_df.columns])

In [80]:
stats2.collect()

[Row(num_sales=1.3333333333333333, supplier_price=35.57333333333334, selling_price=66.62, num_sales=5.0, supplier_price=133.4, selling_price=249.45, num_sales=1.0, supplier_price=26.68, selling_price=49.99)]

In [75]:
stats = stats.collect()
mean = stats[0]['mean_sales']

In [76]:
mean

66.62

### Agg Checks by Sku, Days

In [99]:
print(checks_ofr_df.columns)

['client_id', 'date', 'day', 'shop_id', 'check_id', 'time', 'sku', 'promo_id', 'check_pos', 'num_sales', 'supplier_price', 'selling_price', 'region_name', 'index', 'Promo_type', 'Offer_ID', 'start_date', 'end_date', 'train_test_group', 'hierarchy_level1', 'hierarchy_level2', 'hierarchy_level3', 'hierarchy_level4', 'UpLift', 'promo_duration', 'pred_start_date', 'pred_end_date']


In [100]:
sales_cols = ['num_sales', 'supplier_price', 'selling_price']
sku_by_day_stats_df = checks_ofr_df.groupBy('sku', 'date').agg(*[F.mean(c).alias('mean_' + c) for c in sales_cols],
                                              *[F.max(c).alias('max_' + c) for c in sales_cols],
                                             *[F.min(c).alias('min_' + c) for c in sales_cols])

In [102]:
sku_by_day_stats_df.columns, sku_by_day_stats_df.cache().count()

(['sku',
  'date',
  'mean_num_sales',
  'mean_supplier_price',
  'mean_selling_price',
  'max_num_sales',
  'max_supplier_price',
  'max_selling_price',
  'min_num_sales',
  'min_supplier_price',
  'min_selling_price'],
 5602946)

In [106]:
durations = offers_df_agg.select('duration').distinct().collect()

In [114]:
durations[0]['duration']

28

In [117]:
days = lambda i: i * 86400

In [123]:
w28 = (Window.partitionBy(F.col("sku")).orderBy(F.col("date").cast('long')).rangeBetween(-days(28), 0))

In [124]:
windows = []

for duration in durations:
    d = duration['duration']
    w = (Window.partitionBy(F.col("sku")).orderBy(F.col("date").cast('long')).rangeBetween(-days(d), 0))
    windows.append(w)

In [125]:
len(windows)

17

In [120]:

sku_by_day_stats_df = sku_by_day_stats_df.withColumn('sales_avg_28', F.avg("mean_selling_price").over(w))

In [126]:
print(sku_by_day_stats_df.columns)

['sku', 'date', 'mean_num_sales', 'mean_supplier_price', 'mean_selling_price', 'max_num_sales', 'max_supplier_price', 'max_selling_price', 'min_num_sales', 'min_supplier_price', 'min_selling_price', 'sales_avg_28']


In [129]:
for i, w in enumerate(windows):
    d = durations[i]['duration']
    sku_by_day_stats_df = sku_by_day_stats_df.withColumn(f'sales_avg_{d}', F.avg("mean_selling_price").over(w))\
                                             .withColumn(f'sales_max_{d}', F.avg("max_selling_price").over(w))\
                                             .withColumn(f'sales_min_{d}', F.avg("min_selling_price").over(w))\
                                             .withColumn(f'num_avg_{d}', F.avg("mean_num_sales").over(w))\
                                             .withColumn(f'num_max_{d}', F.avg("max_num_sales").over(w))\
                                             .withColumn(f'num_min_{d}', F.avg("min_num_sales").over(w))\
                                             .withColumn(f'suppl_avg_{d}', F.avg("mean_supplier_price").over(w))\
                                            .withColumn(f'suppl_max_{d}', F.avg("max_supplier_price").over(w))\
                                            .withColumn(f'suppl_max_{d}', F.avg("min_supplier_price").over(w))             

In [122]:
sku_by_day_stats_df.show(2, vertical=True, truncate=False)

-RECORD 0-----------------------------------------------
 sku                 | 007e3a48d0881eb6131c0e30c7bb36eb 
 date                | 2019-10-06                       
 mean_num_sales      | 1.8615                           
 mean_supplier_price | 166.41250000000002               
 mean_selling_price  | 286.505                          
 max_num_sales       | 2.272                            
 max_supplier_price  | 218.06                           
 max_selling_price   | 361.32                           
 min_num_sales       | 1.044                            
 min_supplier_price  | 100.79                           
 min_selling_price   | 179.2                            
 sales_avg_28        | 243.3021007526432                
-RECORD 1-----------------------------------------------
 sku                 | 007e3a48d0881eb6131c0e30c7bb36eb 
 date                | 2020-05-20                       
 mean_num_sales      | 2.048                            
 mean_supplier_price | 189.05  

## Pred-Period

In [82]:
class PredPeriodFeats: 
    
    def __init__(self, off_to_skus_map):
        self.off_to_skus_map = off_to_skus_map

        
    def pred_period_stats(self, offer_id, start_date_str, end_date_str):
        skus = self.off_to_skus_map.get(offer_id, [])                                    
        
        delta = dt.datetime.strptime(end_date_str, '%Y-%m-%d') - dt.datetime.strptime(start_date_str, '%Y-%m-%d') + dt.timedelta(days=1)
        pred_start_date
        pred_end_date
        
        
        

        
        
        mean_sales = 0
        
        for sku in skus: 
            sales_for_sku_df = self.checks_ofr_df.filter((F.col('sku') == sku) & (F.col('date') <= pred_end_date)
                                                         & (F.col('date') >= pred_start_date)
                                                        ).select('num_sales', 'supplier_price', 'selling_price')
            
            stats = sales_for_sku_df.select(*[F.mean(c).alias('mean_' + c) for c in sales_for_sku_df.columns],
                                              *[F.max(c).alias('max_' + c) for c in sales_for_sku_df.columns],
                                             *[F.min(c).alias('min_' + c) for c in sales_for_sku_df.columns]).collect()
            
            mean_nums += stats[0]['mean_num_sales']
            max_nums += stats[0]['max_num_sales']
            min_nums += stats[0]['min_num_sales'] 

            mean_suppl += stats[0]['mean_supplier_price']
            max_suppl += stats[0]['max_supplier_price']
            min_suppl += stats[0]['min_supplier_price']

            mean_sales += stats[0]['mean_selling_price']
            max_sales += stats[0]['max_selling_price']
            min_sales += stats[0]['min_selling_price']
            
        return (mean_nums/len(skus), max_nums/len(skus), min_suppl/len(skus),
                mean_suppl/len(skus), max_suppl/len(skus), min_suppl/len(skus), 
                mean_sales/len(skus), max_sales/len(skus), min_sales/len(skus))
 

In [83]:
pred_period_stats_schema = StructType([
    StructField("avg_mean_num_pred" ,FloatType(), False),
    StructField("avg_max_num_pred" ,FloatType(), False),
    StructField("avg_min_num_pred" ,FloatType(), False),
    StructField("avg_mean_suppl_pred" ,FloatType(), False),
    StructField("avg_max_suppl_pred" ,FloatType(), False),
    StructField("avg_min_suppl_pred" ,FloatType(), False),
    StructField("avg_mean_sales_pred" ,FloatType(), False),
    StructField("avg_max_sales_pred" ,FloatType(), False),
    StructField("avg_min_sales_pred" ,FloatType(), False)
])

In [84]:
pred_period_stats_udf = udf(PredPeriodFeats(off_to_skus_map, checks_ofr_df).pred_period_stats , pred_period_stats_schema)

In [85]:
len(offers_df_agg.columns), offers_df_agg.cache().count()

(2616, 889)

In [86]:
offers_df_agg = offers_df_agg.withColumn('pred_stats', 
                                         pred_period_stats_udf(F.col('Offer_ID'), 
                                                               F.col('pred_start_date'), F.col('pred_end_date')))

Traceback (most recent call last):
  File "/dsvm/tools/spark/current/python/pyspark/serializers.py", line 437, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
  File "/dsvm/tools/spark/current/python/pyspark/cloudpickle/cloudpickle_fast.py", line 72, in dumps
    cp.dump(obj)
  File "/dsvm/tools/spark/current/python/pyspark/cloudpickle/cloudpickle_fast.py", line 540, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle '_thread.RLock' object


PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object