In [1]:
import re
import pandas as pd
import numpy as np

import os
import sys
from pyspark import SparkContext, SparkConf, HiveContext
from pyspark.sql import SparkSession

import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import *
from pyspark.sql import utils


import subprocess
import joblib
import warnings

config = [
    ('spark.dynamicAllocation.enabled', 'true'),
    ('spark.dynamicAllocation.minExecutors', '4'),
    ('spark.dynamicAllocation.initialExecutors', '4'),
    ('spark.dynamicAllocation.maxExecutors', '8'),
    ('spark.dynamicAllocation.executorIdleTimeout', '120s'),
    ('spark.executor.instances', '4'),
    ('spark.executor.cores', '4'),
    ('spark.executor.memory', '16G'),
    ('spark.executor.extraJavaOptions', '-XX:+UseG1GC'), # Оптимизатор GC
    ('spark.driver.cores', '2'),
    ('spark.driver.memory', '8G'),
    ('spark.driver.maxResultSize', '0'),
    ('spark.driver.extraJavaOptions', '-XX:+UseG1GC'),
    ('spark.memory.fraction', '0.9'),
    ('spark.network.timeout', '14400s'),
    ('spark.shuffle.service.enabled', 'true'),
    ('spark.sql.broadcastTimeout', '-1'),
    ('spark.yarn.driver.memory.overhead', '2G'),
    ('spark.yarn.executor.memory.overhead', '2G'),
    ('spark.sql.shuffle.partitions', '2000'),
    ('spark.yarn.queue', 'g_lab_antifraud_users_team_models'),
    ('spark.sql.parquet.int96RebaseModeInRead', 'LEGACY'),
    ('spark.sql.parquet.int96RebaseModeInWrite', 'LEGACY'),
    ('spark.sql.parquet.datetimeRebaseModeInRead', 'LEGACY'),
    ('spark.sql.parquet.datetimeRebaseModeInWrite', 'LEGACY'),

#     ('spark.sql.files.maxPartitionBytes', '256Mb'),
#    ('spark.jars', 'upkmodellib-v2-assembly-1.0.jar'),
#    ('spark.submit.pyFiles', 'scala_wrapper.zip')
]

spark_conf = SparkConf().setAll(config)

spark = SparkSession.builder.appName("dyukarev_ul_onlinetarget").config(conf=spark_conf).getOrCreate()
sc= spark.sparkContext

sc.setLogLevel('ERROR')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/09 21:28:43 WARN Client: Exception encountered while connecting to the server 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
	at org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:376)
	at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:623)
	at org.apache.hadoop.ipc.Client$Connection.access$2300(Client.java:414)
	at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:832)
	at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:828)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
	at org.apache.hadoop.ipc.Client$

In [58]:
pd.options.display.max_rows=150
pd.options.display.max_columns=300

In [5]:
#!hdfs dfs -put sup_tr_val.parquet

### Это общая выборка train+val

In [6]:
suptrval = spark.read.parquet('sup_tr_val.parquet')

                                                                                

In [7]:
suptrval.printSchema()

root
 |-- device_id: string (nullable = true)
 |-- receipt_id: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- server_date: string (nullable = true)
 |-- local_date: string (nullable = true)
 |-- name: string (nullable = true)
 |-- local_dt: string (nullable = true)
 |-- server_dt: string (nullable = true)
 |-- name_firstword: string (nullable = true)
 |-- name_secword: string (nullable = true)
 |-- name_fstsecword: string (nullable = true)
 |-- price: float (nullable = true)
 |-- quantity: float (nullable = true)
 |-- __index_level_0__: long (nullable = true)



In [9]:
suptrval.createOrReplaceTempView('suptrval')

### Схлопнуть одинаковые айтемы в чеке в 1 строчку

In [25]:
suptr2 = spark.sql('''select device_id, receipt_id, item_id, to_date(substring(local_date,1,10), 'yyyy-MM-dd') as local_dt,
        to_timestamp(local_date, 'yyyy-MM-dd HH:mm:SS') as local_date, name, name_firstword, name_fstsecword,        
        avg(price) as price, sum(quantity) as quantity
           from suptrval
           group by device_id, receipt_id, item_id,server_date, local_date, name, name_firstword, name_fstsecword''')

In [26]:
suptr2.printSchema()

root
 |-- device_id: string (nullable = true)
 |-- receipt_id: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- local_dt: date (nullable = true)
 |-- local_date: timestamp (nullable = true)
 |-- name: string (nullable = true)
 |-- name_firstword: string (nullable = true)
 |-- name_fstsecword: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: double (nullable = true)



In [30]:
suptr2.createOrReplaceTempView('suptr2')

Сохраняем ее, чтобы потом на каждый айтем и дату сделать на нее джойн фичей   
Проблема, если этот айтем в этот день не продавался, а мы его выбрали в кандидаты  
Для решения надо было все айтемы сделать по всем дням, но это наверно много, можно было взять только 5000 самых популярных

In [42]:
%%time
suptr2.repartition(3).write.mode('overwrite').parquet("suptr2.parquet")



CPU times: user 3.59 ms, sys: 2.26 ms, total: 5.84 ms
Wall time: 5.32 s


                                                                                

In [43]:
!hdfs dfs -get suptr2.parquet

**Статистика в разрезе item_id, device_id, local_dt**

In [15]:
suptr_grp2 = spark.sql('''
                    
    select
    item_id, device_id, local_dt, count_grp, quantity_sum_grp, price_mean,
    
    sum(count_grp) over(partition by item_id, device_id order by local_dt rows between 7 preceding and 7 preceding) as count_grp_7dago,
    sum(quantity_sum_grp) over(partition by item_id, device_id order by local_dt rows between 7 preceding and 1 preceding) as quantity_sum_7dago,
    avg(price_mean) over(partition by item_id, device_id order by local_dt rows between 7 preceding and 7 preceding) as price_mean_7dago,
    
    sum(count_grp) over(partition by item_id, device_id order by local_dt rows between 7 preceding and 1 preceding) as count_grp_1week,
    sum(quantity_sum_grp) over(partition by item_id, device_id order by local_dt rows between 7 preceding and 1 preceding) as quantity_sum_1week,
    avg(price_mean) over(partition by item_id, device_id order by local_dt rows between 7 preceding and 1 preceding) as price_mean_1week,
    
    sum(count_grp) over(partition by item_id, device_id order by local_dt rows between 14 preceding and 1 preceding) as count_grp_2week,
    sum(quantity_sum_grp) over(partition by item_id, device_id order by local_dt rows between 14 preceding and 1 preceding) as quantity_sum_2week,
    avg(price_mean) over(partition by item_id, device_id order by local_dt rows between 14 preceding and 1 preceding) as price_mean_2week,
    
    sum(count_grp) over(partition by item_id, device_id order by local_dt rows between 31 preceding and 1 preceding) as count_grp_31days,
    sum(quantity_sum_grp) over(partition by item_id, device_id order by local_dt rows between 31 preceding and 1 preceding) as quantity_sum_31days,
    avg(price_mean) over(partition by item_id, device_id order by local_dt rows between 31 preceding and 1 preceding) as price_mean_31days
    from (
        select item_id, device_id, local_dt, count(1) as count_grp,
        sum(quantity) as quantity_sum_grp, avg(price) as price_mean
        from suptr2
        group by item_id, device_id, local_dt
        ) a
    ''')

In [16]:
%%time
suptr_grp2.repartition(3).write.mode('overwrite').parquet("suptr_grp2.parquet")

[Stage 21:>                                                         (0 + 3) / 3]

CPU times: user 5.27 ms, sys: 3.43 ms, total: 8.7 ms
Wall time: 8.67 s


                                                                                

In [17]:
!hdfs dfs -get suptr_grp2.parquet

По кассе (девайсу) суммы, чтобы потом на них делить

In [20]:
suptr_grp3 = spark.sql('''
                    
    select
    device_id, local_dt, count_grp, quantity_sum_grp, price_mean,
    
    sum(count_grp) over(partition by device_id order by local_dt rows between 7 preceding and 7 preceding) as count_grp_7dago,
    sum(quantity_sum_grp) over(partition by device_id order by local_dt rows between 7 preceding and 1 preceding) as quantity_sum_7dago,
    avg(price_mean) over(partition by device_id order by local_dt rows between 7 preceding and 7 preceding) as price_mean_7dago,
    
    sum(count_grp) over(partition by device_id order by local_dt rows between 7 preceding and 1 preceding) as count_grp_1week,
    sum(quantity_sum_grp) over(partition by device_id order by local_dt rows between 7 preceding and 1 preceding) as quantity_sum_1week,
    avg(price_mean) over(partition by device_id order by local_dt rows between 7 preceding and 1 preceding) as price_mean_1week,
    
    sum(count_grp) over(partition by device_id order by local_dt rows between 14 preceding and 1 preceding) as count_grp_2week,
    sum(quantity_sum_grp) over(partition by device_id order by local_dt rows between 14 preceding and 1 preceding) as quantity_sum_2week,
    avg(price_mean) over(partition by device_id order by local_dt rows between 14 preceding and 1 preceding) as price_mean_2week,
    
    sum(count_grp) over(partition by device_id order by local_dt rows between 31 preceding and 1 preceding) as count_grp_31days,
    sum(quantity_sum_grp) over(partition by device_id order by local_dt rows between 31 preceding and 1 preceding) as quantity_sum_31days,
    avg(price_mean) over(partition by device_id order by local_dt rows between 31 preceding and 1 preceding) as price_mean_31days
    from (
        select device_id, local_dt, count(1) as count_grp,
        sum(quantity) as quantity_sum_grp, avg(price) as price_mean
        from suptr2
        group by device_id, local_dt
        ) a
    ''')

In [21]:
%%time
suptr_grp3.repartition(3).write.mode('overwrite').parquet("suptr_grp3.parquet")



CPU times: user 3.86 ms, sys: 964 µs, total: 4.82 ms
Wall time: 4.4 s


                                                                                

In [22]:
!hdfs dfs -get suptr_grp3.parquet

В разрезе категорий товаров по первому слову

In [32]:
suptr_grp4 = spark.sql('''
                    
    select
    name_firstword, device_id, local_dt, count_grp, quantity_sum_grp, price_mean,
    
    sum(count_grp) over(partition by name_firstword,device_id order by local_dt rows between 7 preceding and 7 preceding) as count_grp_7dago,
    sum(quantity_sum_grp) over(partition by name_firstword,device_id order by local_dt rows between 7 preceding and 1 preceding) as quantity_sum_7dago,
    avg(price_mean) over(partition by name_firstword,device_id order by local_dt rows between 7 preceding and 7 preceding) as price_mean_7dago,
    
    sum(count_grp) over(partition by name_firstword,device_id order by local_dt rows between 7 preceding and 1 preceding) as count_grp_1week,
    sum(quantity_sum_grp) over(partition by name_firstword,device_id order by local_dt rows between 7 preceding and 1 preceding) as quantity_sum_1week,
    avg(price_mean) over(partition by name_firstword,device_id order by local_dt rows between 7 preceding and 1 preceding) as price_mean_1week,
    
    sum(count_grp) over(partition by name_firstword,device_id order by local_dt rows between 14 preceding and 1 preceding) as count_grp_2week,
    sum(quantity_sum_grp) over(partition by name_firstword,device_id order by local_dt rows between 14 preceding and 1 preceding) as quantity_sum_2week,
    avg(price_mean) over(partition by name_firstword,device_id order by local_dt rows between 14 preceding and 1 preceding) as price_mean_2week,
    
    sum(count_grp) over(partition by name_firstword,device_id order by local_dt rows between 31 preceding and 1 preceding) as count_grp_31days,
    sum(quantity_sum_grp) over(partition by name_firstword,device_id order by local_dt rows between 31 preceding and 1 preceding) as quantity_sum_31days,
    avg(price_mean) over(partition by name_firstword,device_id order by local_dt rows between 31 preceding and 1 preceding) as price_mean_31days
    from (
        select name_firstword, device_id, local_dt, count(1) as count_grp,
        sum(quantity) as quantity_sum_grp, avg(price) as price_mean
        from suptr2
        group by name_firstword, device_id, local_dt
        ) a
    ''')

In [33]:
%%time
suptr_grp4.repartition(3).write.mode('overwrite').parquet("suptr_grp4.parquet")



CPU times: user 5.33 ms, sys: 1.21 ms, total: 6.54 ms
Wall time: 7.55 s


                                                                                

In [34]:
!hdfs dfs -get suptr_grp4.parquet

В разрезе категорий товаров по первым двум словам

In [35]:
suptr_grp5 = spark.sql('''
                    
    select
    name_fstsecword, device_id, local_dt, count_grp, quantity_sum_grp, price_mean,
    
    sum(count_grp) over(partition by name_fstsecword,device_id order by local_dt rows between 7 preceding and 7 preceding) as count_grp_7dago,
    sum(quantity_sum_grp) over(partition by name_fstsecword,device_id order by local_dt rows between 7 preceding and 1 preceding) as quantity_sum_7dago,
    avg(price_mean) over(partition by name_fstsecword,device_id order by local_dt rows between 7 preceding and 7 preceding) as price_mean_7dago,
    
    sum(count_grp) over(partition by name_fstsecword,device_id order by local_dt rows between 7 preceding and 1 preceding) as count_grp_1week,
    sum(quantity_sum_grp) over(partition by name_fstsecword,device_id order by local_dt rows between 7 preceding and 1 preceding) as quantity_sum_1week,
    avg(price_mean) over(partition by name_fstsecword,device_id order by local_dt rows between 7 preceding and 1 preceding) as price_mean_1week,
    
    sum(count_grp) over(partition by name_fstsecword,device_id order by local_dt rows between 14 preceding and 1 preceding) as count_grp_2week,
    sum(quantity_sum_grp) over(partition by name_fstsecword,device_id order by local_dt rows between 14 preceding and 1 preceding) as quantity_sum_2week,
    avg(price_mean) over(partition by name_fstsecword,device_id order by local_dt rows between 14 preceding and 1 preceding) as price_mean_2week,
    
    sum(count_grp) over(partition by name_fstsecword,device_id order by local_dt rows between 31 preceding and 1 preceding) as count_grp_31days,
    sum(quantity_sum_grp) over(partition by name_fstsecword,device_id order by local_dt rows between 31 preceding and 1 preceding) as quantity_sum_31days,
    avg(price_mean) over(partition by name_fstsecword,device_id order by local_dt rows between 31 preceding and 1 preceding) as price_mean_31days
    from (
        select name_fstsecword, device_id, local_dt, count(1) as count_grp,
        sum(quantity) as quantity_sum_grp, avg(price) as price_mean
        from suptr2
        group by name_fstsecword, device_id, local_dt
        ) a
    ''')

In [37]:
%%time
suptr_grp5.repartition(3).write.mode('overwrite').parquet("suptr_grp5.parquet")

[Stage 57:>                                                       (0 + 16) / 16]

CPU times: user 3.91 ms, sys: 1.24 ms, total: 5.15 ms
Wall time: 6 s


                                                                                

In [38]:
!hdfs dfs -get suptr_grp5.parquet

### Теперь все соединяем в пандасе  
У нас есть полная таблица с айтемами и всеми полями (но без повторений айтема в чеке), на нее все джойним

In [44]:
suptr2 = pd.read_parquet('suptr2.parquet')

In [77]:
suptr2.to_parquet('suptr2_.parquet')

In [49]:
suptr2.head()

Unnamed: 0,device_id,receipt_id,item_id,local_dt,local_date,name,name_firstword,name_fstsecword,price,quantity
0,352398080458112,11228401386,100328,2022-04-19,2022-04-19 14:33:00.140,Батон нарезной 300гр нарезка /Куньинский х/з/,Батон,Батон нарезной,37.0,1.0
1,352398080037759,11897877309,103625,2022-06-27,2022-06-27 06:32:00.500,К-са Купеческая в/к в/с н/о /Славянский МК/,К-са,К-са Купеческая,392.0,0.15
2,352398080043187,9681140598,114896,2021-10-27,2021-10-27 10:57:00.460,Сыр пл.Хохланд 55% 140гр сливочный и бекон (ас...,Сыр,Сыр пл.Хохланд,97.0,1.0
3,352398080124383,12668664112,108066,2022-09-10,2022-09-10 12:31:00.030,Морож.Фишка 61гр эскимо сгущенка в белой глазу...,Морож.Фишка,Морож.Фишка 61гр,51.0,1.0
4,352398080124383,9789339428,110911,2021-11-09,2021-11-09 10:25:00.310,Пивной напиток Бочкарев Немецкое пастер 4.5% 1...,Пивной,Пивной напиток,101.0,6.0


### Данные на каждый день по 'item_id', 'device_id', 'local_dt'

In [67]:
suptr_grp2 = pd.read_parquet('suptr_grp2.parquet')

In [68]:
suptr_grp2.head()

Unnamed: 0,item_id,device_id,local_dt,count_grp,quantity_sum_grp,price_mean,count_grp_7dago,quantity_sum_7dago,price_mean_7dago,count_grp_1week,quantity_sum_1week,price_mean_1week,count_grp_2week,quantity_sum_2week,price_mean_2week,count_grp_31days,quantity_sum_31days,price_mean_31days
0,111085,352398080458112,2021-10-03,1,1.0,138.0,,,,,,,,,,,,
1,114045,352398080462627,2023-05-06,1,1.0,105.0,1.0,11.0,104.0,11.0,11.0,104.392857,20.0,20.0,104.196429,37.0,37.0,104.525
2,117124,352398080391651,2023-04-27,4,2.726,135.0,3.0,13.366,135.0,20.0,13.366,135.0,31.0,21.104,135.0,51.0,33.876,133.120402
3,100439,352398080037759,2023-06-15,1,0.51,242.0,1.0,8.044,238.0,10.0,8.044,205.186666,11.0,9.074,209.288333,11.0,9.074,209.288333
4,105726,352398082091853,2021-09-25,1,2.0,20.0,,17.0,,5.0,17.0,20.0,5.0,17.0,20.0,5.0,17.0,20.0


In [69]:
join1 = suptr2.merge(suptr_grp2, on=['item_id', 'device_id', 'local_dt'], how='left')

In [70]:
join1.head()

Unnamed: 0,device_id,receipt_id,item_id,local_dt,local_date,name,name_firstword,name_fstsecword,price,quantity,count_grp,quantity_sum_grp,price_mean,count_grp_7dago,quantity_sum_7dago,price_mean_7dago,count_grp_1week,quantity_sum_1week,price_mean_1week,count_grp_2week,quantity_sum_2week,price_mean_2week,count_grp_31days,quantity_sum_31days,price_mean_31days
0,352398080458112,11228401386,100328,2022-04-19,2022-04-19 14:33:00.140,Батон нарезной 300гр нарезка /Куньинский х/з/,Батон,Батон нарезной,37.0,1.0,9,11.0,38.666667,7.0,81.0,36.0,65.0,81.0,36.547619,116.0,150.0,36.597619,235.0,290.0,35.841321
1,352398080037759,11897877309,103625,2022-06-27,2022-06-27 06:32:00.500,К-са Купеческая в/к в/с н/о /Славянский МК/,К-са,К-са Купеческая,392.0,0.15,3,1.55,355.333333,2.0,6.32,392.0,10.0,6.32,358.021429,20.0,11.234,347.431905,39.0,21.164,323.509882
2,352398080043187,9681140598,114896,2021-10-27,2021-10-27 10:57:00.460,Сыр пл.Хохланд 55% 140гр сливочный и бекон (ас...,Сыр,Сыр пл.Хохланд,97.0,1.0,1,1.0,97.0,,,,,,,,,,,,
3,352398080124383,12668664112,108066,2022-09-10,2022-09-10 12:31:00.030,Морож.Фишка 61гр эскимо сгущенка в белой глазу...,Морож.Фишка,Морож.Фишка 61гр,51.0,1.0,1,1.0,51.0,2.0,25.0,50.0,13.0,25.0,50.571429,23.0,40.0,50.285714,31.0,50.0,48.095238
4,352398080124383,9789339428,110911,2021-11-09,2021-11-09 10:25:00.310,Пивной напиток Бочкарев Немецкое пастер 4.5% 1...,Пивной,Пивной напиток,101.0,6.0,1,6.0,101.0,1.0,43.0,101.0,10.0,43.0,101.0,19.0,81.0,101.0,36.0,160.0,101.0


In [72]:
# это различная динамика по айтему

join1['item_trend_q7d'] = join1['quantity']/join1['quantity_sum_7dago']
join1['item_trend_c7d'] = join1['count_grp']/join1['count_grp_7dago']
join1['item_trend_qg7d'] = join1['quantity_sum_grp']/join1['quantity_sum_7dago']
join1['item_trend_c1w2w'] = join1['count_grp_1week']/join1['count_grp_2week']
join1['item_trend_c2w31d'] = join1['count_grp_2week']/join1['count_grp_31days']
join1['item_trend_p7d'] = join1['price']/join1['price_mean_7dago']

join1['item_trend_p1w'] = join1['price']/join1['price_mean_1week']
join1['item_trend_p2w'] = join1['price']/join1['price_mean_2week']
join1['item_trend_p31d'] = join1['price']/join1['price_mean_31days']
join1['item_trend_p1w31d'] = join1['price_mean_1week']/join1['price_mean_31days']

Данные в разрезе 'device_id', 'local_dt' - чтобы посчитать долю айтема от общих продаж по девайсу

In [73]:
suptr_grp3 = pd.read_parquet('suptr_grp3.parquet')

In [74]:
suptr_grp3.head()

Unnamed: 0,device_id,local_dt,count_grp,quantity_sum_grp,price_mean,count_grp_7dago,quantity_sum_7dago,price_mean_7dago,count_grp_1week,quantity_sum_1week,price_mean_1week,count_grp_2week,quantity_sum_2week,price_mean_2week,count_grp_31days,quantity_sum_31days,price_mean_31days
0,352398080032768,2023-08-09,46,85.82,105.636957,20.0,508.785,125.294501,325.0,508.785,118.752788,642.0,997.004,135.334057,1328.0,1903.787,138.925643
1,352398080464094,2021-11-06,30,87.34,126.163333,63.0,780.78,93.948095,519.0,780.78,111.305921,1037.0,1550.087,113.073408,2120.0,3358.727,116.098125
2,352398080124383,2022-12-29,116,149.0,134.967241,113.0,968.0,91.062832,803.0,968.0,109.991475,1615.0,2027.0,102.129648,3827.0,5041.0,102.401174
3,352398080037759,2022-01-01,161,201.64,96.742236,265.0,1814.194001,112.852755,1471.0,1814.194001,123.384355,2761.0,3374.238001,117.890707,6046.0,7356.009002,115.289908
4,352398080043187,2022-03-11,278,471.155999,124.611979,46.0,1101.672,164.230435,789.0,1101.672,120.900981,1894.0,2522.247999,117.092254,4386.0,5897.058002,120.660527


In [75]:
join2 = join1.merge(suptr_grp3, on=['device_id', 'local_dt'], how='left')

In [76]:
join2

Unnamed: 0,device_id,receipt_id,item_id,local_dt,local_date,name,name_firstword,name_fstsecword,price,quantity,count_grp_x,quantity_sum_grp_x,price_mean_x,count_grp_7dago_x,quantity_sum_7dago_x,price_mean_7dago_x,count_grp_1week_x,quantity_sum_1week_x,price_mean_1week_x,count_grp_2week_x,quantity_sum_2week_x,price_mean_2week_x,count_grp_31days_x,quantity_sum_31days_x,price_mean_31days_x,item_trend_q7d,item_trend_c7d,item_trend_qg7d,item_trend_c1w2w,item_trend_c2w31d,item_trend_p7d,item_trend_p1w,item_trend_p2w,item_trend_p31d,item_trend_p1w31d,count_grp_y,quantity_sum_grp_y,price_mean_y,count_grp_7dago_y,quantity_sum_7dago_y,price_mean_7dago_y,count_grp_1week_y,quantity_sum_1week_y,price_mean_1week_y,count_grp_2week_y,quantity_sum_2week_y,price_mean_2week_y,count_grp_31days_y,quantity_sum_31days_y,price_mean_31days_y
0,352398080458112,11228401386,100328,2022-04-19,2022-04-19 14:33:00.140,Батон нарезной 300гр нарезка /Куньинский х/з/,Батон,Батон нарезной,37.0,1.00,9,11.00,38.666667,7.0,81.00,36.0,65.0,81.00,36.547619,116.0,150.000,36.597619,235.0,290.000,35.841321,0.012346,1.285714,0.135802,0.560345,0.493617,1.027778,1.012378,1.010995,1.032328,1.019706,232,369.417,144.342888,394.0,2938.073001,112.121294,2007.0,2938.073001,141.151519,3764.0,5510.811001,136.412267,8521.0,11920.768001,131.307198
1,352398080037759,11897877309,103625,2022-06-27,2022-06-27 06:32:00.500,К-са Купеческая в/к в/с н/о /Славянский МК/,К-са,К-са Купеческая,392.0,0.15,3,1.55,355.333333,2.0,6.32,392.0,10.0,6.32,358.021429,20.0,11.234,347.431905,39.0,21.164,323.509882,0.023734,1.500000,0.245253,0.500000,0.512821,1.000000,1.094907,1.128279,1.211710,1.106678,231,285.402,109.046277,289.0,2431.111001,116.135294,1933.0,2431.111001,110.050683,3646.0,4555.033000,112.348489,8199.0,10037.371000,116.316475
2,352398080043187,9681140598,114896,2021-10-27,2021-10-27 10:57:00.460,Сыр пл.Хохланд 55% 140гр сливочный и бекон (ас...,Сыр,Сыр пл.Хохланд,97.0,1.00,1,1.00,97.000000,,,,,,,,,,,,,,,,,,,,,,,278,382.216,102.396979,305.0,1780.552000,97.616295,1245.0,1780.552000,115.489595,2343.0,3396.128999,114.858862,3097.0,4495.109999,117.837217
3,352398080124383,12668664112,108066,2022-09-10,2022-09-10 12:31:00.030,Морож.Фишка 61гр эскимо сгущенка в белой глазу...,Морож.Фишка,Морож.Фишка 61гр,51.0,1.00,1,1.00,51.000000,2.0,25.00,50.0,13.0,25.00,50.571429,23.0,40.000,50.285714,31.0,50.000,48.095238,0.040000,0.500000,0.040000,0.565217,0.741935,1.020000,1.008475,1.014205,1.060396,1.051485,186,250.000,103.821505,177.0,1251.000000,96.090395,934.0,1251.000000,97.054575,2188.0,3061.000000,93.249965,5118.0,6998.000000,90.266795
4,352398080124383,9789339428,110911,2021-11-09,2021-11-09 10:25:00.310,Пивной напиток Бочкарев Немецкое пастер 4.5% 1...,Пивной,Пивной напиток,101.0,6.00,1,6.00,101.000000,1.0,43.00,101.0,10.0,43.00,101.000000,19.0,81.000,101.000000,36.0,160.000,101.000000,0.139535,1.000000,0.139535,0.526316,0.527778,1.000000,1.000000,1.000000,1.000000,1.000000,139,163.000,74.533813,239.0,1661.000000,92.446025,1299.0,1661.000000,86.470139,2623.0,3328.000000,87.473251,5848.0,7488.000000,85.414983
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
826955,352398080391651,15185922934,116094,2023-05-31,2023-05-31 14:29:00.320,Чай Richard Royal Lemon 25 пак. черн.аромат.,Чай,Чай Richard,99.0,1.00,1,1.00,99.000000,,,,,,,,,,,,,,,,,,,,,,,192,233.830,139.593385,202.0,1542.490001,145.583614,1189.0,1542.490001,132.558800,2147.0,2934.961000,124.937981,4821.0,6677.157001,122.954833
826956,352398080458112,12261808028,102612,2022-08-01,2022-08-01 14:07:00.220,Зажигалка А-02 прозр.,Зажигалка,Зажигалка А-02,20.0,1.00,1,1.00,20.000000,2.0,12.00,15.0,11.0,12.00,16.428571,17.0,20.000,15.769231,17.0,20.000,15.769231,0.083333,0.500000,0.083333,0.647059,1.000000,1.333333,1.217391,1.268293,1.268293,1.041812,108,138.816,123.208889,337.0,3790.840000,134.913561,2684.0,3790.840000,119.563796,4703.0,6373.713002,126.547633,9385.0,12825.484002,132.419188
826957,352398080032768,15630896908,106959,2023-07-10,2023-07-10 12:30:00.270,Мак.изд.Вкусно Улиточки 400гр м/уп. /Чернов/,Мак.изд.Вкусно,Мак.изд.Вкусно Улиточки,30.0,1.00,1,1.00,30.000000,,,,,,,,,,,,,,,,,,,,,,,37,38.550,147.281081,,371.933000,,282.0,371.933000,143.303900,282.0,371.933000,143.303900,282.0,371.933000,143.303900
826958,352398080043187,9461345170,103127,2021-10-04,2021-10-04 14:21:00.000,Йогурт Савушкин продукт 2% 120гр двухслойный к...,Йогурт,Йогурт Савушкин,24.0,4.00,1,4.00,24.000000,,,,,,,,,,,,,,,,,,,,,,,193,237.457,107.222435,180.0,1676.569000,94.843389,1175.0,1676.569000,113.071338,2255.0,3303.356999,114.199743,2374.0,3488.108000,118.159411


In [None]:
# доля данного товара в общем потоке на кассе

join2['item_share_c'] = join2['count_grp_x']/join2['count_grp_y']
join2['item_share_q'] = join2['quantity_sum_grp_x']/join1['quantity_sum_grp_y']

join2['item_share_c7d'] = join2['count_grp_7dago_x']/join1['count_grp_7dago_y']
join2['item_share_q7d'] = join2['quantity_sum_7dago_x']/join1['quantity_sum_7dago_y']

join2['item_share_c1w'] = join2['count_grp_1week_x']/join1['count_grp_1week_y']
join2['item_share_q1w'] = join2['quantity_sum_1week_x']/join1['quantity_sum_1week_y']

join2['item_share_c2w'] = join2['count_grp_2week_x']/join1['count_grp_2week_y']
join2['item_share_q2w'] = join2['quantity_sum_2week_x']/join1['quantity_sum_2week_y']

join2['item_share_c31d'] = join2['count_grp_31days_x']/join1['count_grp_31days_y']
join2['item_share_q31d'] = join2['quantity_sum_31days_x']/join1['quantity_sum_31days_y']

# общие продажи на кассе - растут/падают

join2['item_trend_dev_c12w'] = join2['count_grp_1week_y']/join2['count_grp_2week_y']
join2['item_trend_dev_q12w'] = join2['quantity_sum_1week_y']/join1['quantity_sum_2week_y']

join2['item_trend_dev_c2w31d'] = join2['count_grp_2week_y']/join2['count_grp_31days_y']
join2['item_trend_dev_q2w31d'] = join2['quantity_sum_2week_y']/join1['quantity_sum_31days_y']

Удаляем столбцы с _y, так как это общая цифра по кассе, она не нужна  
### цикл НЕ тестировал

In [None]:
# удаляем все _y так как они нужны были для деления
for col in ctgr_join.columns:
    if col[-2:]=='_y':
        ctgr_join.drop(columns=col, inplace=True)

### Категории товаров для расчета их доли в продажах

In [None]:
suptr_grp4 = pd.read_parquet("suptr_grp4.parquet")

In [None]:
ctgr_join = suptr_grp4.merge(suptr_grp3, on=['device_id', 'local_dt'], how='left')

In [None]:
# динамика по категории
ctgr_join['ctgr_trend_c1w'] = ctgr_join['count_grp_x']/ctgr_join['count_grp_1week_x']
ctgr_join['ctgr_trend_c2w'] = ctgr_join['count_grp_1week_x']/ctgr_join['count_grp_2week_x']
ctgr_join['ctgr_trend_c31d'] = ctgr_join['count_grp_1week_x']/ctgr_join['count_grp_31days_x']

ctgr_join['ctgr_trend_q1w'] = ctgr_join['quantity_sum_grp_x']/ctgr_join['quantity_sum_1week_x']
ctgr_join['ctgr_trend_q2w'] = ctgr_join['quantity_sum_1week_x']/ctgr_join['quantity_sum_2week_x']
ctgr_join['ctgr_trend_q31d'] = ctgr_join['quantity_sum_1week_x']/ctgr_join['quantity_sum_31days_x']
 
# доля от продаж
ctgr_join['ctgr_share_c'] = ctgr_join['count_grp_x']/ctgr_join['count_grp_y']
ctgr_join['ctgr_share_q'] = ctgr_join['quantity_sum_grp_x']/ctgr_join['quantity_sum_grp_y']

ctgr_join['ctgr_share_c7d'] = ctgr_join['count_grp_7dago_x']/ctgr_join['count_grp_7dago_y']
ctgr_join['ctgr_share_q7d'] = ctgr_join['quantity_sum_7dago_x']/ctgr_join['quantity_sum_7dago_y']

ctgr_join['ctgr_share_c1w'] = ctgr_join['count_grp_1week_x']/ctgr_join['count_grp_1week_y']
ctgr_join['ctgr_share_q1w'] = ctgr_join['quantity_sum_1week_x']/ctgr_join['quantity_sum_1week_y']

ctgr_join['ctgr_share_c2w'] = ctgr_join['count_grp_2week_x']/ctgr_join['count_grp_2week_y']
ctgr_join['ctgr_share_q2w'] = ctgr_join['quantity_sum_2week_x']/ctgr_join['quantity_sum_2week_y']

ctgr_join['ctgr_share_c31d'] = ctgr_join['count_grp_31days_x']/ctgr_join['count_grp_31days_y']
ctgr_join['ctgr_share_q31d'] = ctgr_join['quantity_sum_31days_x']/ctgr_join['quantity_sum_31days_y']

 удаляем все _y так как они нужны были для деления  
### Эти циклы не проверял

In [None]:
# удаляем все _y так как они нужны были для деления
for col in ctgr_join.columns:
    if col[-2:]=='_y':
        ctgr_join.drop(columns=col, inplace=True)

In [None]:
# переименования столбцов _x чтобы потом их добавить в фичи
for col in ctgr_join.columns:
    if col[-2:]=='_x':
        ctgr_join.rename(columns={col:col[:-2]+'_ctgr_1'})

### То же самое для категории из 2-х слов (не тестировал)

In [None]:
suptr_grp5 = pd.read_parquet("suptr_grp5.parquet")

In [None]:
ctgr_join2 = suptr_grp5.merge(suptr_grp3, on=['device_id', 'local_dt'], how='left')

In [None]:
# динамика по категории
ctgr_join2['ctgr2_trend_c1w'] = ctgr_join2['count_grp_x']/ctgr_join2['count_grp_1week_x']
ctgr_join2['ctgr2_trend_c2w'] = ctgr_join2['count_grp_1week_x']/ctgr_join2['count_grp_2week_x']
ctgr_join2['ctgr2_trend_c31d'] = ctgr_join2['count_grp_1week_x']/ctgr_join2['count_grp_31days_x']

ctgr_join2['ctgr2_trend_q1w'] = ctgr_join2['quantity_sum_grp_x']/ctgr_join2['quantity_sum_1week_x']
ctgr_join2['ctgr2_trend_q2w'] = ctgr_join2['quantity_sum_1week_x']/ctgr_join2['quantity_sum_2week_x']
ctgr_join2['ctgr2_trend_q31d'] = ctgr_join2['quantity_sum_1week_x']/ctgr_join2['quantity_sum_31days_x']
 
# доля от продаж2
ctgr_join2['ctgr2_share_c'] = ctgr_join2['count_grp_x']/ctgr_join2['count_grp_y']
ctgr_join2['ctgr2_share_q'] = ctgr_join2['quantity_sum_grp_x']/ctgr_join2['quantity_sum_grp_y']

ctgr_join2['ctgr2_share_c7d'] = ctgr_join2['count_grp_7dago_x']/ctgr_join2['count_grp_7dago_y']
ctgr_join2['ctgr2_share_q7d'] = ctgr_join2['quantity_sum_7dago_x']/ctgr_join2['quantity_sum_7dago_y']

ctgr_join2['ctgr2_share_c1w'] = ctgr_join2['count_grp_1week_x']/ctgr_join2['count_grp_1week_y']
ctgr_join2['ctgr2_share_q1w'] = ctgr_join2['quantity_sum_1week_x']/ctgr_join2['quantity_sum_1week_y']

ctgr_join2['ctgr2_share_c2w'] = ctgr_join2['count_grp_2week_x']/ctgr_join2['count_grp_2week_y']
ctgr_join2['ctgr2_share_q2w'] = ctgr_join2['quantity_sum_2week_x']/ctgr_join2['quantity_sum_2week_y']

ctgr_join2['ctgr2_share_c31d'] = ctgr_join2['count_grp_31days_x']/ctgr_join2['count_grp_31days_y']
ctgr_join2['ctgr2_share_q31d'] = ctgr_join2['quantity_sum_31days_x']/ctgr_join2['quantity_sum_31days_y']

In [None]:
# удаляем все _y так как они нужны были для деления
for col in ctgr_join2.columns:
    if col[-2:]=='_y':
        ctgr_join2.drop(columns=col, inplace=True)

In [None]:
# переименования столбцов _x чтобы потом их добавить
for col in ctgr_join2.columns:
    if col[-2:]=='_x':
        ctgr_join2.rename(columns={col: col[:-2]+'_ctgr_2'})

Соединяем с фичами по категории (это не тестировал)

In [None]:
join3 = join2.merge(ctgr_join, on = ['name_firstword', 'device_id', 'local_dt'], how='left') \
    .merge(ctgr_join2, on = ['name_fstsecword', 'device_id', 'local_dt'], how='left')

## В итоге получили по каждому айтему фичи на каждый день local_dt