https://jira.x5.ru/browse/CVMXC-2043

In [1]:
name = 'CVMXC-2043_pepsi'

In [2]:
import os
import re
from pyspark.sql import SparkSession
from typing import List, Dict, Callable
import socket

spark = None

EXECUTOR_ENV = 'hdfs:///share/products/cvm5/lib/python/anaconda_2.4.4_ds.tar.gz'  # 2.4.4 
SPARK_ARCHIVE = 'hdfs:///share/lib/spark/sparkjars-2.4.4.zip'                     # 2.4.4
#EXECUTOR_ENV = 'hdfs:///share/lib/python/env/anaconda-2019.07.tar.gz'
#SPARK_ARCHIVE = 'hdfs:///share/lib/spark/sparkjars-2.3.1.zip'

os.environ["ARROW_LIBHDFS_DIR"] = "/usr/hdp/2.6.5.0-292/usr/lib"
os.environ['HADOOP_HOME'] = '/usr/hdp/current/hadoop-client/'
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64/'
os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf/'
os.environ['SPARK_HOME'] = '/opt/conda/lib/python3.7/site-packages/pyspark'
os.environ['PYSPARK_PYTHON'] = 'anaconda_2.4.4_ds.tar.gz/bin/python3'             # 2.4.4
#os.environ['PYSPARK_PYTHON'] = 'anaconda-2019.07.tar.gz/bin/python3'






def restart_spark(task_name: str, num_executors: int, executor_memory='4G', executor_cores=2,
                  driver_memory='2G', queue='cvm5-rnd', additional_params: Dict[str, str] = None):
    global spark

    if spark:
        sc = spark.sparkContext
        if sc and sc._jsc:
            if not sc._jsc.sc().isStopped():
                print('Using cached spark')
                return sc, spark

    need_ports_for_app = 3
    user_tcp_ports = _get_user_tcp_ports()
    free_ports = _get_free_ports(user_tcp_ports)
    assert len(free_ports) >= need_ports_for_app, \
        f"Not enough free ports ({len(free_ports)}), need {need_ports_for_app}, stop other apps"
    app_ports = free_ports[:need_ports_for_app]

    host_ip = os.getenv('HOST_IP')
    
    spark_session = (
        SparkSession
        .builder
        .appName(task_name)
        .master('yarn')
        .config('spark.driver.memory', driver_memory)
        .config('spark.driver.maxResultSize', driver_memory)
        .config('spark.executor.cores', executor_cores)
        .config('spark.executor.memory', executor_memory)
        .config('spark.executor.memoryOverhead', '1G')
        .config('spark.dynamicAllocation.enabled', 'true')
        .config('spark.dynamicAllocation.maxExecutors', num_executors)
        .config('spark.sql.broadcastTimeout', '36000')
        .config('spark.dynamicAllocation.cachedExecutorIdleTimeout', '1200s')
        .config('spark.ui.port', app_ports[0])
        .config('spark.blockManager.port', app_ports[1])
        .config('spark.driver.port', app_ports[2])
        .config('spark.driver.host', host_ip)
        .config('spark.driver.bindAddress', '0.0.0.0')
        .config('spark.driver.extraLibraryPath', '/usr/hdp/2.6.5.0-292/hadoop/lib/native')
        .config('spark.driver.extraJavaOptions', '-Dhdp.version=current')
        .config('spark.debug.maxToStringFields', '50')
        .config('spark.yarn.queue', queue)
        .config('spark.yarn.dist.archives', EXECUTOR_ENV)
        .config('spark.yarn.archive', SPARK_ARCHIVE)
        .config('spark.yarn.am.extraJavaOptions', '-Dhdp.version=current')
        .config('spark.rpc.message.maxSize', '1024')
        .config('spark.sql.warehouse.dir', '/apps/hive/warehouse')
        .config('spark.sql.execution.pandas.respectSessionTimeZone', 'false')
        .config('spark.sql.orc.filterPushdown', 'true')
        .config('spark.sql.hive.convertMetastoreOrc', 'true')
        .config('spark.shuffle.service.enabled', 'true')
        .config('spark.hadoop.yarn.timeline-service.enabled', 'false')
        .config('spark.hadoop.yarn.client.failover-proxy-provider',
                'org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider')
        .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
        .config('spark.kryoserializer.buffer.max', '128m')
        .config('spark.executor.extraLibraryPath', '/usr/hdp/2.6.5.0-292/hadoop/lib/native')
    )

    if additional_params:
        for key, value in additional_params.items():
            spark_session = spark_session.config(key, value)

    spark = (
        spark_session
        .enableHiveSupport()
        .getOrCreate()
    )
    sc = spark.sparkContext

    return sc, spark


def _get_user_tcp_ports() -> List[str]:
    regexp = re.compile(r'-2e')
    envuser= os.getenv('HOSTNAME')
    if regexp.search(envuser):       
      _, user_name, user_surname = envuser.upper().split('-')
      user_full_name = '_'.join([user_name, user_surname])
    else:
      _, user_name  = envuser.upper().split('-') 
      user_full_name = user_name
    user_tcp_ports = [v for k, v in os.environ.items() if user_full_name in k and k.endswith('TCP_PORT')]
    return user_tcp_ports


def _get_free_ports(ports: List[str]):
    free_ports = []
    for port in ports:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            if s.connect_ex(('0.0.0.0', int(port))) != 0:
                free_ports.append(port)
    return free_ports


In [3]:
sc, spark = restart_spark(
                    name, 
                    21, 
                    executor_memory='5G', 
                    executor_cores=3, 
                    driver_memory='7G', 
                    additional_params={"spark.sql.shuffle.partitions": "300"})

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/11/24 19:10:02 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!


In [4]:
sc.setLogLevel('ERROR')

In [5]:
import pandas as pd
import sys
from pyspark.sql import functions as F
import datetime
from datetime import timedelta

sys.path.append('/home/jovyan/glow-byte-filters-pyspark')
from logic_filters import * 
from segmentation import *

In [6]:
LOYALTY_CARDS = "hive_ssa_tc5.loyalty_card"
LOYALTY_CARDHOLDERS = "hive_ssa_tc5.loyalty_cardholder"
ACCOUNTS = "hive_ssa_tc5.account"
CVM5_GUESTS = "hive_cvm_acrm.cvm5_guests"

DIM_STORE = "hive_ssa_main.dim_store"
CHECKS_HEADERS = "hive_ssa_main.fct_rtl_txn"
CHECKS_ITEMS = "hive_ssa_main.fct_rtl_txn_item"
PRODUCTS = "hive_ssa_tc5.cvm_product"

In [7]:
client_segmentation = spark.table('tc5_analytics_sanbox.t_TX_transform_client_segmentation')\
                            .withColumnRenamed('curr_acc_no','account_no')\
                            .filter(F.col('dt')==('2021-11-01'))\
                            .filter(F.col('segment_person') == '22-35, не мало НЦС, без вредных привычек')\
                            .select(['account_no','segment_person'])

### Проверка на доступность плю в магазинах

1. Гео1 - Москва
2. Гео2 - ЦФО

In [8]:
federal_subject1 = [77]
federal_subject2 = [50, 77, 31, 32, 36, 40, 46, 48, 57, 62, 67, 68, 71]

In [9]:
plu_codes = [4150433,4150434]

In [10]:
pplu = spark.createDataFrame(pd.DataFrame({"plu_code": plu_codes}))

shops = (spark.table(DIM_STORE)
            .filter(F.col('valid_to_dttm')== datetime.datetime(5999, 1, 1, 0, 0))
            .filter(F.col('federal_subject_dk').isin(federal_subject1))
#             .filter(F.col('macroregion_dk').isin(macroregion_dk))
            .selectExpr('store_id as plant')
            .distinct()
            #.toPandas()['plant']
            #.tolist()
            )


tc5_stores_assort = (
    spark.table('HIVE_SSA_MAIN.ASSORTMENT_X_PLU_X_STORE')
    .withColumnRenamed('plu_id', 'plu_code')
    .withColumnRenamed('store_id', 'plant')
    .filter(F.col("plu_negate_flg") != 1)
    .selectExpr('plu_code', 'plant')
    .join(F.broadcast(pplu), 'plu_code', 'inner')
)

plants = (shops.join(tc5_stores_assort, on='plant', how='inner')
                .select('plant')
                .distinct()
                .toPandas()['plant']
                .tolist()
         )

len(plants)

                                                                                

1745

In [11]:
pplu = spark.createDataFrame(pd.DataFrame({"plu_code": plu_codes}))

shops = (spark.table(DIM_STORE)
            .filter(F.col('valid_to_dttm')== datetime.datetime(5999, 1, 1, 0, 0))
            .filter(F.col('federal_subject_dk').isin(federal_subject2))
#             .filter(F.col('macroregion_dk').isin(macroregion_dk))
            .selectExpr('store_id as plant')
            .distinct()
            #.toPandas()['plant']
            #.tolist()
            )

plants2 = (shops.join(tc5_stores_assort, on='plant', how='inner')
                .select('plant')
                .distinct()
                .toPandas()['plant']
                .tolist()
         )

len(plants2)

                                                                                

4082

### Выбираем гостей нужного юзкейса

In [12]:
usecase = ['cross', 'upgrade', 'ump']
lifetime = 90
freq = 1
dt = datetime.date.today()

In [13]:
customers_usecase = (spark
                     .table(CVM5_GUESTS)
                     .filter(F.col('calculation_dt') >= dt)
                     .filter(F.col('usecase').isin(usecase))
                     .filter(F.col('lifetime') >= lifetime)
                     .filter(F.col('frequency') >= freq)
                     .select('account_no', 'customer_rk')
                            )

In [16]:
customers_usecase.count()

                                                                                

15769446

### Проверяем на доступность отобранных гостей в определенную дату

In [169]:
check_date = '2021-12-14'

In [18]:
seg_sms1 = (sms_channel_filters_glowbyte(spark=spark,
                                         guests=customers_usecase, 
                                         usecase_name=usecase[0], 
                                         check_date=check_date, 
                                         debug_mode=False)
                                    .select('customer_rk')
                                    .distinct()
           )

[93m Время выполнения: 0:00:17


In [19]:
seg_sms2 = (sms_channel_filters_glowbyte(spark=spark,
                                         guests=customers_usecase, 
                                         usecase_name=usecase[1], 
                                         check_date=check_date, 
                                         debug_mode=False)
                                    .select('customer_rk')
                                    .distinct()
           )

[93m Время выполнения: 0:00:34


In [20]:
seg_sms3 = (sms_channel_filters_glowbyte(spark=spark,
                                         guests=customers_usecase, 
                                         usecase_name=usecase[2], 
                                         check_date=check_date, 
                                         debug_mode=False)
                                    .select('customer_rk')
                                    .distinct()
           )

[93m Время выполнения: 0:00:20


In [21]:
seg_sms = seg_sms1.union(seg_sms2).union(seg_sms3).distinct()

In [14]:
# seg_sms.write.parquet('temp01', mode='overwrite')
seg_sms = spark.read.parquet('temp01')

                                                                                

# seg1(geo1-geo2)

### Собираем чеки этих гостей и оставляем тех, кто покупал продукты с synthetic_catalog_id

In [135]:
start_date = datetime.date(2021, 8, 14)
end_date = datetime.date(2021, 11, 14)

In [192]:
checks_headers = (spark.table(CHECKS_HEADERS)
                      .filter(F.col('rtl_txn_dt').between(start_date, end_date))
                      .filter((F.col('loyalty_card_no') != '') & (F.col('loyalty_card_no').isNotNull()))
                      .filter(F.col('financial_unit_format_dk') == 'D')
                      .filter(F.col('rtl_txn_cancel_flg') == 0)
                      .select('rtl_txn_id', 'loyalty_card_no', 'store_id')
                     )


In [193]:
checks_headers_tc5 = checks_headers.filter(F.col('store_id').isin(plants)) #оставили чеки только с нужными магазинами

In [194]:
checks_headers_tc5_2 = checks_headers.filter(F.col('store_id').isin(plants2)) #оставили чеки только с нужными магазинами

In [139]:
loyalty_cards = (spark
                    .table(LOYALTY_CARDS)
                    .withColumnRenamed("loyalty_card_id", "loyalty_card_no")
                    .withColumnRenamed("loyalty_account_id", "account_no")
                    .withColumnRenamed("loyalty_account_acrm_id", "account_rk")
                    .select('account_no', 'loyalty_card_no')
                )
loyalty_cardholders = (spark
                        .table(LOYALTY_CARDHOLDERS)
                        .withColumnRenamed("loyalty_cardholder_acrm_id", "customer_rk")
                        .withColumnRenamed("loyalty_account_id", "account_no")
                        .select('account_no', 'customer_rk')
                      )
clients_info = loyalty_cards.join(loyalty_cardholders, on='account_no', how='inner')
clients_info = clients_info.join(seg_sms, on='customer_rk', how='inner')

In [140]:
checks_headers_tc5 = checks_headers_tc5.join(clients_info, on='loyalty_card_no') #оставили чеки только нужных гостей

In [141]:
checks_headers_tc5_2 = checks_headers_tc5_2.join(clients_info, on='loyalty_card_no') #оставили чеки только нужных гостей

In [142]:
checks_items = (spark.table(CHECKS_ITEMS) 
                    .withColumnRenamed('plu_id', 'plu_code')
                    .withColumnRenamed('turnover_no_vat_amt', 'zsalnovat')
                    .withColumnRenamed('turnover_vat_rub_amt', 'zsale_vat')
                    .withColumnRenamed('prime_cost_no_vat_amt', 'zcst_n')
                    .withColumnRenamed('turnover_base_uom_amt', 'base_qty')
                    .withColumnRenamed('discount_amt', 'zdiscount')
                    .withColumnRenamed('fact_regular_promo_flg', 'zpromofl')
                    .filter(F.col('rtl_txn_dt').between(start_date, end_date))
                    .filter((F.col('zsalnovat') >= 0) & (F.col('base_qty') >= 0) & (F.col('zcst_n') > 0)) #keep only correct data
                        .select('plu_code' #items id
                                , 'rtl_txn_id' #cheques id
                                , 'base_qty' #count
                               )
                   )


In [33]:
# plu_hierarchy_lvl_4_dk = 'FR0606006'
syntethic_category_id = [51,26,52]
# plu_brand_code = ['2572','3403','O899','2268','1120','2325','0844','2251']
# plu_not_in = [4138521]
base_qty_per_period = 1

In [34]:
plu_codes_cat = (spark
                 .table(PRODUCTS)
#                  .filter(F.col('plu_hierarchy_lvl_4_dk') == plu_hierarchy_lvl_4_dk)
#                  .filter(F.col('plu_brand_code').isin(plu_brand_code))
                 .filter(F.col('syntethic_category_id').isin(syntethic_category_id))
#                  .filter(~F.col('plu_id').isin(plu_not_in))
                 .select('plu_id')
                 .distinct()
                 .toPandas()['plu_id']
                 .tolist()
                )

                                                                                

In [35]:
checks_items = checks_items.filter(F.col('plu_code').isin(plu_codes_cat)) # только чеки с нужными plu

In [36]:
checks_tc5 = checks_items.join(checks_headers_tc5, 'rtl_txn_id', how='inner')

In [37]:
checks_tc5_2 = checks_items.join(checks_headers_tc5_2, 'rtl_txn_id', how='inner')

In [38]:
accs = ['account_no']
pdf = checks_tc5.groupby(accs).agg(F.sum('base_qty').alias('base_qty_per_period'))
pdf = pdf.filter(F.col('base_qty_per_period') >= base_qty_per_period)
checks_tc5 = checks_tc5.join(pdf, on=accs)

In [39]:
pdf = checks_tc5_2.groupby(accs).agg(F.sum('base_qty').alias('base_qty_per_period'))
pdf = pdf.filter(F.col('base_qty_per_period') >= base_qty_per_period)
checks_tc5_2 = checks_tc5_2.join(pdf, on=accs)

In [40]:
seg1_1 = (checks_tc5.withColumn('geo', F.lit('geo1')).select('customer_rk', 'geo')
                  .distinct()
      )

In [41]:
seg1_2 = (checks_tc5_2.withColumn('geo', F.lit('geo2')).select('customer_rk', 'geo')
                  .distinct()
      )

In [42]:
seg = seg1_1.union(seg1_2)

In [43]:
seg.write.parquet('temp02', mode='overwrite')
seg = spark.read.parquet('temp02')

                                                                                ]]]]]

In [44]:
seg.count()

21/11/24 16:28:04 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from /192.168.234.57:60772 is closed
21/11/24 16:28:04 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from /192.168.234.57:46782 is closed
21/11/24 16:28:05 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from /192.168.234.57:35636 is closed
21/11/24 16:28:05 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from /192.168.234.57:46076 is closed
21/11/24 16:28:05 ERROR client.TransportClient: Failed to send RPC RPC 5122850370810426028 to /192.168.234.57:46076: java.nio.channels.ClosedChannelException
java.nio.channels.ClosedChannelException
	at io.netty.channel.AbstractChannel$AbstractUnsafe.write(...)(Unknown Source)
21/11/24 16:28:05 ERROR client.TransportClient: Failed to send RPC RPC 8922448374372518186 to /192.168.234.57:46076: java.nio.channels.ClosedChann

2833221

In [45]:
seg_pd = seg.toPandas()

                                                                                

In [46]:
len(seg_pd)

2833221

In [47]:
seg_1pd = seg_pd[seg_pd['geo'] == 'geo1']
seg_2pd = seg_pd[seg_pd['geo'] == 'geo2']

In [48]:
seg_no = 'seg1_geo1'

In [49]:
seg_1pd.to_csv(name + '_' + seg_no + '_All_uc' + str(dt) + '.csv', index=False)
seg_1pd = seg_1pd.astype('str')
cross, upgrade, ump = on_usecases(seg_1pd)
cross.to_csv(name + '_' + seg_no + '_' + usecase[0] + '_' + str(dt) + '.csv', index=False)
upgrade.to_csv(name + '_' + seg_no + '_' + usecase[1] + '_' + str(dt) + '.csv', index=False)
ump.to_csv(name + '_' + seg_no + '_' + usecase[2] + '_' + str(dt) + '.csv', index=False)

In [50]:
print('cross: {}, upgrade: {}, ump: {}'.format(len(cross), len(upgrade), len(ump)))

cross: 385851, upgrade: 369880, ump: 196071


In [51]:
seg_no = 'seg1_geo2'

In [52]:
seg_2pd.to_csv(name + '_' + seg_no + '_All_uc' + str(dt) + '.csv', index=False)
seg_2pd = seg_2pd.astype('str')
cross, upgrade, ump = on_usecases(seg_2pd)
cross.to_csv(name + '_' + seg_no + '_' + usecase[0] + '_' + str(dt) + '.csv', index=False)
upgrade.to_csv(name + '_' + seg_no + '_' + usecase[1] + '_' + str(dt) + '.csv', index=False)
ump.to_csv(name + '_' + seg_no + '_' + usecase[2] + '_' + str(dt) + '.csv', index=False)

In [53]:
print('cross: {}, upgrade: {}, ump: {}'.format(len(cross), len(upgrade), len(ump)))

cross: 756245, upgrade: 740406, ump: 384768


# seg2(geo1-geo2)

### Собираем чеки этих гостей и оставляем тех, кто покупал продукты с synthetic_catalog_id

In [23]:
# plu_hierarchy_lvl_4_dk = 'FR0606006'
syntethic_category_id = [81]
# plu_brand_code = ['2572','3403','O899','2268','1120','2325','0844','2251']
# plu_not_in = [4138521]
base_qty_per_period = 1

In [24]:
plu_codes_cat = (spark
                 .table(PRODUCTS)
#                  .filter(F.col('plu_hierarchy_lvl_4_dk') == plu_hierarchy_lvl_4_dk)
#                  .filter(F.col('plu_brand_code').isin(plu_brand_code))
                 .filter(F.col('syntethic_category_id').isin(syntethic_category_id))
#                  .filter(~F.col('plu_id').isin(plu_not_in))
                 .select('plu_id')
                 .distinct()
                 .toPandas()['plu_id']
                 .tolist()
                )

                                                                                

In [25]:
checks_items = checks_items.filter(F.col('plu_code').isin(plu_codes_cat)) # только чеки с нужными plu

In [26]:
checks_tc5 = checks_items.join(checks_headers_tc5, 'rtl_txn_id', how='inner')

In [27]:
checks_tc5_2 = checks_items.join(checks_headers_tc5_2, 'rtl_txn_id', how='inner')

In [28]:
accs = ['account_no']
pdf = checks_tc5.groupby(accs).agg(F.sum('base_qty').alias('base_qty_per_period'))
pdf = pdf.filter(F.col('base_qty_per_period') >= base_qty_per_period)
checks_tc5 = checks_tc5.join(pdf, on=accs)

In [29]:
pdf = checks_tc5_2.groupby(accs).agg(F.sum('base_qty').alias('base_qty_per_period'))
pdf = pdf.filter(F.col('base_qty_per_period') >= base_qty_per_period)
checks_tc5_2 = checks_tc5_2.join(pdf, on=accs)

In [30]:
seg1_1 = (checks_tc5.withColumn('geo', F.lit('geo1')).select('customer_rk', 'geo')
                  .distinct()
      )

In [31]:
seg1_2 = (checks_tc5_2.withColumn('geo', F.lit('geo2')).select('customer_rk', 'geo')
                  .distinct()
      )

In [32]:
seg = seg1_1.union(seg1_2)

In [33]:
seg.write.parquet('temp03', mode='overwrite')
seg = spark.read.parquet('temp03')

                                                                                ]]]]

In [34]:
seg.count()

                                                                                

2300759

In [35]:
seg_pd = seg.toPandas()

                                                                                

In [36]:
len(seg_pd)

2300759

In [37]:
seg_1pd = seg_pd[seg_pd['geo'] == 'geo1']
seg_2pd = seg_pd[seg_pd['geo'] == 'geo2']

In [38]:
seg_no = 'seg2_geo1'

In [39]:
seg_1pd.to_csv(name + '_' + seg_no + '_All_uc' + str(dt) + '.csv', index=False)
seg_1pd = seg_1pd.astype('str')
cross, upgrade, ump = on_usecases(seg_1pd)
cross.to_csv(name + '_' + seg_no + '_' + usecase[0] + '_' + str(dt) + '.csv', index=False)
upgrade.to_csv(name + '_' + seg_no + '_' + usecase[1] + '_' + str(dt) + '.csv', index=False)
ump.to_csv(name + '_' + seg_no + '_' + usecase[2] + '_' + str(dt) + '.csv', index=False)

In [40]:
print('cross: {}, upgrade: {}, ump: {}'.format(len(cross), len(upgrade), len(ump)))

cross: 309477, upgrade: 291528, ump: 156477


In [41]:
seg_no = 'seg2_geo2'

In [42]:
seg_2pd.to_csv(name + '_' + seg_no + '_All_uc' + str(dt) + '.csv', index=False)
seg_2pd = seg_2pd.astype('str')
cross, upgrade, ump = on_usecases(seg_2pd)
cross.to_csv(name + '_' + seg_no + '_' + usecase[0] + '_' + str(dt) + '.csv', index=False)
upgrade.to_csv(name + '_' + seg_no + '_' + usecase[1] + '_' + str(dt) + '.csv', index=False)
ump.to_csv(name + '_' + seg_no + '_' + usecase[2] + '_' + str(dt) + '.csv', index=False)

In [43]:
print('cross: {}, upgrade: {}, ump: {}'.format(len(cross), len(upgrade), len(ump)))

cross: 624000, upgrade: 602961, ump: 316316


# seg3(geo1-geo2)

In [100]:
seg_no = 'seg3_geo1'

### Выбираем гостей нужного юзкейса

In [99]:
# plu_hierarchy_lvl_4_dk = 'FR0606006'
syntethic_category_id = [18]
# plu_brand_code = ['2572','3403','O899','2268','1120','2325','0844','2251']
# plu_not_in = [4138521]
base_qty_per_period = 1

In [101]:
plu_codes_cat = (spark
                 .table(PRODUCTS)
#                  .filter(F.col('plu_hierarchy_lvl_4_dk') == plu_hierarchy_lvl_4_dk)
#                  .filter(F.col('plu_brand_code').isin(plu_brand_code))
                 .filter(F.col('syntethic_category_id').isin(syntethic_category_id))
#                  .filter(~F.col('plu_id').isin(plu_not_in))
                 .select('plu_id')
                 .distinct()
                 .toPandas()['plu_id']
                 .tolist()
                )

                                                                                

In [102]:
checks_items = checks_items.filter(F.col('plu_code').isin(plu_codes_cat)) # только чеки с нужными plu

In [103]:
checks_tc5 = checks_items.join(checks_headers_tc5, 'rtl_txn_id', how='inner')

In [104]:
checks_tc5_2 = checks_items.join(checks_headers_tc5_2, 'rtl_txn_id', how='inner')

In [105]:
accs = ['account_no']
pdf = checks_tc5.groupby(accs).agg(F.sum('base_qty').alias('base_qty_per_period'))
pdf = pdf.filter(F.col('base_qty_per_period') >= base_qty_per_period)
checks_tc5 = checks_tc5.join(pdf, on=accs)

In [106]:
pdf = checks_tc5_2.groupby(accs).agg(F.sum('base_qty').alias('base_qty_per_period'))
pdf = pdf.filter(F.col('base_qty_per_period') >= base_qty_per_period)
checks_tc5_2 = checks_tc5_2.join(pdf, on=accs)

In [107]:
seg1_1 = (checks_tc5.withColumn('geo', F.lit('geo1')).select('customer_rk', 'geo')
                  .distinct()
      )

In [108]:
seg1_2 = (checks_tc5_2.withColumn('geo', F.lit('geo2')).select('customer_rk', 'geo')
                  .distinct()
      )

In [109]:
seg = seg1_1.union(seg1_2)

In [110]:
seg.write.parquet('temp04', mode='overwrite')
seg = spark.read.parquet('temp04')

                                                                                00]]]]]

In [111]:
seg.count()

                                                                                

2411872

In [112]:
seg_pd = seg.toPandas()

                                                                                

In [113]:
len(seg_pd)

2411872

In [114]:
seg_1pd = seg_pd[seg_pd['geo'] == 'geo1']
seg_2pd = seg_pd[seg_pd['geo'] == 'geo2']

In [115]:
len(seg_1pd)

801696

In [116]:
len(seg_2pd)

1610176

In [117]:
seg_no = 'seg3_geo1'

In [118]:
seg_1pd.to_csv(name + '_' + seg_no + '_All_uc' + str(dt) + '.csv', index=False)
seg_1pd = seg_1pd.astype('str')
cross, upgrade, ump = on_usecases(seg_1pd)
cross.to_csv(name + '_' + seg_no + '_' + usecase[0] + '_' + str(dt) + '.csv', index=False)
upgrade.to_csv(name + '_' + seg_no + '_' + usecase[1] + '_' + str(dt) + '.csv', index=False)
ump.to_csv(name + '_' + seg_no + '_' + usecase[2] + '_' + str(dt) + '.csv', index=False)

In [119]:
print('cross: {}, upgrade: {}, ump: {}'.format(len(cross), len(upgrade), len(ump)))

cross: 327077, upgrade: 309633, ump: 164986


In [120]:
seg_no = 'seg3_geo2'

In [121]:
seg_2pd.to_csv(name + '_' + seg_no + '_All_uc' + str(dt) + '.csv', index=False)
seg_2pd = seg_2pd.astype('str')
cross, upgrade, ump = on_usecases(seg_2pd)
cross.to_csv(name + '_' + seg_no + '_' + usecase[0] + '_' + str(dt) + '.csv', index=False)
upgrade.to_csv(name + '_' + seg_no + '_' + usecase[1] + '_' + str(dt) + '.csv', index=False)
ump.to_csv(name + '_' + seg_no + '_' + usecase[2] + '_' + str(dt) + '.csv', index=False)

In [122]:
print('cross: {}, upgrade: {}, ump: {}'.format(len(cross), len(upgrade), len(ump)))

cross: 650581, upgrade: 630459, ump: 329136


# seg4(geo1-geo2)

### Собираем чеки этих гостей и оставляем тех, кто покупал продукты с synthetic_catalog_id

In [143]:
# plu_hierarchy_lvl_4_dk = 'FR0606006'
syntethic_category_id = [5]
# plu_brand_code = ['2572','3403','O899','2268','1120','2325','0844','2251']
# plu_not_in = [4138521]
base_qty_per_period = 1

In [144]:
plu_codes_cat = (spark
                 .table(PRODUCTS)
#                  .filter(F.col('plu_hierarchy_lvl_4_dk') == plu_hierarchy_lvl_4_dk)
#                  .filter(F.col('plu_brand_code').isin(plu_brand_code))
                 .filter(F.col('syntethic_category_id').isin(syntethic_category_id))
#                  .filter(~F.col('plu_id').isin(plu_not_in))
                 .select('plu_id')
                 .distinct()
                 .toPandas()['plu_id']
                 .tolist()
                )

                                                                                

In [145]:
checks_items = checks_items.filter(F.col('plu_code').isin(plu_codes_cat)) # только чеки с нужными plu

In [146]:
checks_tc5 = checks_items.join(checks_headers_tc5, 'rtl_txn_id', how='inner')

In [147]:
checks_tc5_2 = checks_items.join(checks_headers_tc5_2, 'rtl_txn_id', how='inner')

In [148]:
accs = ['account_no']
pdf = checks_tc5.groupby(accs).agg(F.sum('base_qty').alias('base_qty_per_period'))
pdf = pdf.filter(F.col('base_qty_per_period') >= base_qty_per_period)
checks_tc5 = checks_tc5.join(pdf, on=accs)

In [149]:
pdf = checks_tc5_2.groupby(accs).agg(F.sum('base_qty').alias('base_qty_per_period'))
pdf = pdf.filter(F.col('base_qty_per_period') >= base_qty_per_period)
checks_tc5_2 = checks_tc5_2.join(pdf, on=accs)

In [150]:
seg1_1 = (checks_tc5.withColumn('geo', F.lit('geo1')).select('customer_rk', 'geo')
                  .distinct()
      )

In [151]:
seg1_2 = (checks_tc5_2.withColumn('geo', F.lit('geo2')).select('customer_rk', 'geo')
                  .distinct()
      )

In [152]:
seg = seg1_1.union(seg1_2)

In [153]:
seg.write.parquet('temp05', mode='overwrite')
seg = spark.read.parquet('temp05')

                                                                                1400]]]

In [154]:
seg.count()

                                                                                

962409

In [155]:
seg_pd = seg.toPandas()

                                                                                

In [156]:
len(seg_pd)

962409

In [157]:
seg_1pd = seg_pd[seg_pd['geo'] == 'geo1']
seg_2pd = seg_pd[seg_pd['geo'] == 'geo2']

In [158]:
len(seg_1pd)

316389

In [159]:
len(seg_2pd)

646020

In [160]:
seg_no = 'seg4_geo1'

In [161]:
seg_1pd.to_csv(name + '_' + seg_no + '_All_uc' + str(dt) + '.csv', index=False)
seg_1pd = seg_1pd.astype('str')
cross, upgrade, ump = on_usecases(seg_1pd)
cross.to_csv(name + '_' + seg_no + '_' + usecase[0] + '_' + str(dt) + '.csv', index=False)
upgrade.to_csv(name + '_' + seg_no + '_' + usecase[1] + '_' + str(dt) + '.csv', index=False)
ump.to_csv(name + '_' + seg_no + '_' + usecase[2] + '_' + str(dt) + '.csv', index=False)

In [162]:
print('cross: {}, upgrade: {}, ump: {}'.format(len(cross), len(upgrade), len(ump)))

cross: 129135, upgrade: 122248, ump: 65006


In [163]:
seg_no = 'seg4_geo2'

In [164]:
seg_2pd.to_csv(name + '_' + seg_no + '_All_uc' + str(dt) + '.csv', index=False)
seg_2pd = seg_2pd.astype('str')
cross, upgrade, ump = on_usecases(seg_2pd)
cross.to_csv(name + '_' + seg_no + '_' + usecase[0] + '_' + str(dt) + '.csv', index=False)
upgrade.to_csv(name + '_' + seg_no + '_' + usecase[1] + '_' + str(dt) + '.csv', index=False)
ump.to_csv(name + '_' + seg_no + '_' + usecase[2] + '_' + str(dt) + '.csv', index=False)

In [165]:
print('cross: {}, upgrade: {}, ump: {}'.format(len(cross), len(upgrade), len(ump)))

cross: 260498, upgrade: 253177, ump: 132345


# seg5(geo1-geo2)

In [166]:
# plu_hierarchy_lvl_4_dk = 'FR0606006'
# syntethic_category_id = [5]
# plu_brand_code = ['2572','3403','O899','2268','1120','2325','0844','2251']
# plu_not_in = [4138521]
base_qty_per_period = 1

### Собираем чеки этих гостей и оставляем тех, кто покупал продукты с synthetic_catalog_id

In [167]:
loyalty_cards = (spark
                    .table(LOYALTY_CARDS)
                    .withColumnRenamed("loyalty_card_id", "loyalty_card_no")
                    .withColumnRenamed("loyalty_account_id", "account_no")
                    .withColumnRenamed("loyalty_account_acrm_id", "account_rk")
                    .select('account_no', 'loyalty_card_no')
                )
loyalty_cardholders = (spark
                        .table(LOYALTY_CARDHOLDERS)
                        .withColumnRenamed("loyalty_cardholder_acrm_id", "customer_rk")
                        .withColumnRenamed("loyalty_account_id", "account_no")
                        .select('account_no', 'customer_rk')
                      )
clients_info = loyalty_cards.join(loyalty_cardholders, on='account_no', how='inner')
clients_info = clients_info.join(client_segmentation, on='account_no', how='inner')

In [170]:
seg_sms1 = (sms_channel_filters_glowbyte(spark=spark,
                                         guests=clients_info, 
                                         usecase_name=usecase[0], 
                                         check_date=check_date, 
                                         debug_mode=False)
                                    .select('customer_rk')
                                    .distinct()
           )

[93m Время выполнения: 0:00:14


In [171]:
seg_sms2 = (sms_channel_filters_glowbyte(spark=spark,
                                         guests=clients_info, 
                                         usecase_name=usecase[1], 
                                         check_date=check_date, 
                                         debug_mode=False)
                                    .select('customer_rk')
                                    .distinct()
           )

[93m Время выполнения: 0:00:14


In [172]:
seg_sms3 = (sms_channel_filters_glowbyte(spark=spark,
                                         guests=clients_info, 
                                         usecase_name=usecase[2], 
                                         check_date=check_date, 
                                         debug_mode=False)
                                    .select('customer_rk')
                                    .distinct()
           )

[93m Время выполнения: 0:00:12


In [173]:
seg_sms = seg_sms1.union(seg_sms2).union(seg_sms3).distinct()

In [174]:
seg_sms.write.parquet('temp06', mode='overwrite')
seg_sms = spark.read.parquet('temp06')

                                                                                 1790]]]

In [184]:
seg_sms.count()

                                                                                

282189

In [195]:
loyalty_cards = (spark
                    .table(LOYALTY_CARDS)
                    .withColumnRenamed("loyalty_card_id", "loyalty_card_no")
                    .withColumnRenamed("loyalty_account_id", "account_no")
                    .withColumnRenamed("loyalty_account_acrm_id", "account_rk")
                    .select('account_no', 'loyalty_card_no')
                )
loyalty_cardholders = (spark
                        .table(LOYALTY_CARDHOLDERS)
                        .withColumnRenamed("loyalty_cardholder_acrm_id", "customer_rk")
                        .withColumnRenamed("loyalty_account_id", "account_no")
                        .select('account_no', 'customer_rk')
                      )
clients_info = loyalty_cards.join(loyalty_cardholders, on='account_no', how='inner')
clients_info = clients_info.join(seg_sms, on='customer_rk', how='inner')

In [200]:
checks_headers_tc5 = checks_headers_tc5.join(clients_info, on='loyalty_card_no') #оставили чеки только нужных гостей

In [202]:
checks_headers_tc5_2 = checks_headers_tc5_2.join(clients_info, on='loyalty_card_no') #оставили чеки только нужных гостей

In [203]:
checks_tc5 = checks_items.join(checks_headers_tc5, 'rtl_txn_id', how='inner')

In [204]:
checks_tc5_2 = checks_items.join(checks_headers_tc5_2, 'rtl_txn_id', how='inner')

In [206]:
seg1_1 = (checks_tc5.withColumn('geo', F.lit('geo1')).select('customer_rk', 'geo')
                  .distinct()
      )

In [207]:
seg1_2 = (checks_tc5_2.withColumn('geo', F.lit('geo2')).select('customer_rk', 'geo')
                  .distinct()
      )

In [208]:
seg = seg1_1.union(seg1_2)

In [209]:
seg.write.parquet('temp07', mode='overwrite')
seg = spark.read.parquet('temp07')

                                                                                1400]]0]

In [210]:
seg.count()

                                                                                

26801

In [211]:
seg_pd = seg.toPandas()

                                                                                

In [212]:
len(seg_pd)

26801

In [213]:
seg_1pd = seg_pd[seg_pd['geo'] == 'geo1']
seg_2pd = seg_pd[seg_pd['geo'] == 'geo2']

In [214]:
len(seg_1pd)

9339

In [215]:
len(seg_2pd)

17462

In [216]:
seg_no = 'seg5_geo1'

In [217]:
seg_1pd.to_csv(name + '_' + seg_no + '_All_uc' + str(dt) + '.csv', index=False)
seg_1pd = seg_1pd.astype('str')
cross, upgrade, ump = on_usecases(seg_1pd)
cross.to_csv(name + '_' + seg_no + '_' + usecase[0] + '_' + str(dt) + '.csv', index=False)
upgrade.to_csv(name + '_' + seg_no + '_' + usecase[1] + '_' + str(dt) + '.csv', index=False)
ump.to_csv(name + '_' + seg_no + '_' + usecase[2] + '_' + str(dt) + '.csv', index=False)

In [218]:
print('cross: {}, upgrade: {}, ump: {}'.format(len(cross), len(upgrade), len(ump)))

cross: 2777, upgrade: 4796, ump: 1766


In [219]:
seg_no = 'seg5_geo2'

In [220]:
seg_2pd.to_csv(name + '_' + seg_no + '_All_uc' + str(dt) + '.csv', index=False)
seg_2pd = seg_2pd.astype('str')
cross, upgrade, ump = on_usecases(seg_2pd)
cross.to_csv(name + '_' + seg_no + '_' + usecase[0] + '_' + str(dt) + '.csv', index=False)
upgrade.to_csv(name + '_' + seg_no + '_' + usecase[1] + '_' + str(dt) + '.csv', index=False)
ump.to_csv(name + '_' + seg_no + '_' + usecase[2] + '_' + str(dt) + '.csv', index=False)

In [221]:
print('cross: {}, upgrade: {}, ump: {}'.format(len(cross), len(upgrade), len(ump)))

cross: 5491, upgrade: 8637, ump: 3334
