In [1]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
from os.path import join as path_join
from clan_tools.data_adapters.YTAdapter import YTAdapter
from clan_tools.data_adapters.YQLAdapter import YQLAdapter
import spyt
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import col, lit
from pyspark.sql.window import Window
from pyspark.sql.session import SparkSession
from clan_tools.utils.spark import SPARK_CONF_MEDIUM
import re
from os.path import join as path_join
import pickle
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.metrics import roc_auc_score, precision_score, recall_score, f1_score

pd.set_option('display.max_rows', 200)
pd.set_option('display.max_columns', 250)
os.environ['NUMEXPR_MAX_THREADS'] = '32'

In [2]:
from clan_tools.secrets.Vault import Vault
Vault().get_secrets(secret_id='sec-01fm06fw1zsqp08cxtyd247tm5')
yt_adapter = YTAdapter()

spark = spyt.connect(spark_conf_args=SPARK_CONF_MEDIUM)
spyt.info(spark)

2022-02-07 19:44:30,621 - INFO - spyt.client - SPYT Cluster version: 3.0.1-1.23.1+yandex
2022-02-07 19:44:30,623 - INFO - spyt.client - SPYT library version: 1.3.5


### Generate CSM leads

In [3]:
from clan_tools.data_adapters.crm.CRMHistoricalDataAdapter import CRMHistoricalDataAdapter
PARAMS = {
    'TARGET_PAID_COND': 50000,
    'MIN_PAID_LAST_30D': 5000,
    'MIN_CALIBR_PROBA': 0.001,
    'MAX_CALIBR_PROBA': 0.851,
    'PRED_PROBA_BRD': 0.3,
    'CONF_INTERVAL': 0.95
}

results_path = "//home/cloud_analytics/ml/scoring/consumption_predictor_v2/data/prod_results"
features_path = "//home/cloud_analytics/ml/scoring/consumption_predictor_v2/data/actual_features"
leads_path = "//home/cloud_analytics/ml/scoring/consumption_predictor_v2/crm/upsell"
calib_path = "//home/cloud_analytics/ml/scoring/consumption_predictor_v2/model/prod/calibrators_history/csm"
contact_info = "//home/cloud_analytics/import/crm/leads/contact_info"

In [4]:
threshold_th = PARAMS['TARGET_PAID_COND'] // 1000
pred_colname = f"{threshold_th:.0f}k_pred"
leads_count = 100
rep_date = (datetime.now()+timedelta(days=-1)).strftime("%Y-%m-%d")

calib_name = max(yt_adapter.yt.list(calib_path))
calibr_ser = yt_adapter.yt.read_file(path_join(calib_path, calib_name)).read()
calibr = pickle.loads(calibr_ser)

def make_proba(x):
    return calibr.predict([x])[0]



In [5]:
spdf_info = (
    spark.read.yt(results_path)
    .filter(col("billing_record_msk_date")==rep_date)
    .join(spark.read.yt(features_path), on=["billing_account_id", "billing_record_msk_date"], how="inner")
    .filter(F.coalesce("prev_30d_cons", lit(0)) < PARAMS['TARGET_PAID_COND'])
    .filter(F.coalesce("prev_30d_cons", lit(0)) > PARAMS['MIN_PAID_LAST_30D'])
    .filter(F.coalesce("billing_account_state", lit("active"))=="active")
    .filter(~F.coalesce("billing_account_is_suspended_by_antifraud", lit(False)))
    .filter(~F.coalesce("billing_account_is_isv", lit(False)))
    .filter(~F.coalesce("billing_account_is_var", lit(False)))
    .filter(col("crm_segment").isin(["Mass", "Medium"]))
    .withColumn(pred_colname, col("prev_15d_cons")+col("billing_record_total_rub")+col("next_14d_cons_pred"))
    .select(
        "billing_account_id",
        "billing_record_msk_date",
        "billing_account_usage_status",
        "billing_account_person_type",
        "billing_account_currency",
        "billing_account_state",
        "billing_account_is_fraud",
        "billing_account_is_suspended_by_antifraud",
        "billing_account_is_isv",
        "billing_account_is_var",
        "billing_account_is_crm_account",
        "crm_segment",
        "days_from_created",
        "prev_30d_cons",
        "prev_15d_cons",
        "next_14d_cons_pred",
        pred_colname
    ).cache()
)

filter_crm = (
    CRMHistoricalDataAdapter(yt_adapter, spark)
    .historical_preds()
    .select("billing_account_id", col("lead_source_crm").alias("lead_source"))
    .union(
        spark.read.yt("//home/cloud_analytics/kulaga/leads_cube").select("billing_account_id", "lead_source"))
    .filter(~col("billing_account_id").isNull())
    .filter(col("billing_account_id")!='{{BILLING_ACCOUNT_ID}}')
    .filter(col("lead_source")!="trial")
)

dff = spdf_info["billing_account_id", "billing_record_msk_date", pred_colname].toPandas()
dff['proba'] = calibr.predict(dff[pred_colname])
spdf_pred = spark.createDataFrame(dff[["billing_account_id", "billing_record_msk_date", "proba"]])

spdf_main = spdf_info.join(spdf_pred,
                            on=["billing_account_id", "billing_record_msk_date"],
                            how="left").cache()

print(spdf_main.count())
spdf_main.limit(3).toPandas()

2103


Unnamed: 0,billing_account_id,billing_record_msk_date,billing_account_usage_status,billing_account_person_type,billing_account_currency,billing_account_state,billing_account_is_fraud,billing_account_is_suspended_by_antifraud,billing_account_is_isv,billing_account_is_var,billing_account_is_crm_account,crm_segment,days_from_created,prev_30d_cons,prev_15d_cons,next_14d_cons_pred,50k_pred,proba
0,dn219pa215mkq9mv0hr6,2022-02-06,paid,individual,RUB,active,False,False,False,False,False,Mass,365,5698.3884,11.7348,-0.083626,11.651174,0.001
1,dn22us53siqqv2dkoq54,2022-02-06,paid,company,RUB,active,True,False,False,False,False,Mass,26,14049.191719,11039.338844,9965.975547,21562.642989,0.851
2,dn2309b3sbeeaq1f1m80,2022-02-06,paid,individual,RUB,active,False,False,False,False,True,Mass,365,13381.253921,6693.519214,6502.149828,13578.553748,0.022727


In [6]:
spdf_contacts = spark.read.yt(contact_info).cache()

leads_source_1 = (
    CRMHistoricalDataAdapter(yt_adapter, spark)
    .historical_preds()
    .select(
        "billing_account_id",
        col("lead_source_crm").alias("lead_source"),
        F.to_date(F.to_timestamp(col("date_entered")/1000000)).alias("lead_date")
    )
    .distinct()
)

leads_source_2 = (
    spark.read.yt("//home/cloud_analytics/kulaga/leads_cube")
    .select("billing_account_id", "lead_source", F.to_date("date_entered").alias("lead_date"))
    .distinct()
)

crm_leads = (
    leads_source_1
    .union(leads_source_2)
    .distinct()
    .filter(~col("billing_account_id").isNull())
    .filter(col("billing_account_id")!='{{BILLING_ACCOUNT_ID}}')
)

crm_ba_filter = (
    crm_leads.filter(col("lead_source")=='upsell')
    .select("billing_account_id", lit(1).alias("has_upsell_lead"))
    .distinct()
)
crm_date_filter = crm_leads.groupby("billing_account_id").agg(F.max("lead_date").alias("lead_date"))

In [7]:
spdf_res_leads = (
    spdf_main
    .join(crm_ba_filter, on=['billing_account_id'], how="left")
    .join(crm_date_filter, on=['billing_account_id'], how="left")
    .join(spdf_contacts, on=['billing_account_id'], how="left")
    .sort(col("proba").desc())
    .cache()
    .filter(col("has_upsell_lead").isNull())
    .filter(F.coalesce(F.datediff("billing_record_msk_date", "lead_date"), lit(1000)) > 30)
    .filter(col("proba")>0.3)
    .select(
        lit(int(datetime.now().timestamp())).alias('Timestamp'),
        lit(None).astype('string').alias('CRM_Lead_ID'),
        F.concat(lit('["'), "billing_account_id", lit('"]')).alias("Billing_account_id"),
        lit(None).astype('string').alias('Status'),
        F.concat(
            lit('Confidence that "'),
            col('billing_account_id'),
            lit('" is target is '),
            F.round(col('proba')*100),
            lit('%.')
        ).alias('Description'),
        lit('admin').alias('Assigned_to'),
        col("first_name").alias('First_name'),
        col("last_name").alias('Last_name'),
        col("phone").alias('Phone_1'),
        lit(None).astype('string').alias('Phone_2'),
        col("email").alias('Email'),
        lit('upsell').alias('Lead_Source'),
        lit('Potential candidate for 50k over 28 days period').alias('Lead_Source_Description'),
        lit(None).astype('string').alias('Callback_date'),
        lit(None).astype('string').alias('Last_communication_date'),
        lit(None).astype('string').alias('Promocode'),
        lit(None).astype('string').alias('Promocode_sum'),
        lit(None).astype('string').alias('Notes'),
        lit(None).astype('string').alias('Dimensions'),
        lit(None).astype('string').alias('Tags'),
        lit('').alias('Timezone'),
        col("display_name").alias('Account_name')
    )
)

spdf_res_leads.toPandas()

Unnamed: 0,Timestamp,CRM_Lead_ID,Billing_account_id,Status,Description,Assigned_to,First_name,Last_name,Phone_1,Phone_2,Email,Lead_Source,Lead_Source_Description,Callback_date,Last_communication_date,Promocode,Promocode_sum,Notes,Dimensions,Tags,Timezone,Account_name
0,1644252551,,"[""dn2jvtiq2co0osi7160e""]",,"Confidence that ""dn2jvtiq2co0osi7160e"" is targ...",admin,,УК КВАДРАТ,8352380488,,squaremc@yandex.ru,upsell,Potential candidate for 50k over 28 days period,,,,,,,,,"ООО ""УК ""Квадрат"""
1,1644252551,,"[""dn2ip3eamftuhtrpo6ie""]",,"Confidence that ""dn2ip3eamftuhtrpo6ie"" is targ...",admin,Максим,Демах,79518650746,,enkisoftvrn@gmail.com,upsell,Potential candidate for 50k over 28 days period,,,,,,,,,Максим Сергеевич Демах
2,1644252551,,"[""dn2ne1hr313nifao7ni9""]",,"Confidence that ""dn2ne1hr313nifao7ni9"" is targ...",admin,,ООО “ПВК Технологический Консалтинг”,79857603115,,pwccloud@yandex.ru,upsell,Potential candidate for 50k over 28 days period,,,,,,,,,Общество с Ограниченной ответственностью “Прай...
3,1644252551,,"[""dn287flat7n6f0puvfml""]",,"Confidence that ""dn287flat7n6f0puvfml"" is targ...",admin,Тихон,Чумаков,79957874202,,kondrashov.it@yandex.ru,upsell,Potential candidate for 50k over 28 days period,,,,,,,,,Тихон Александрович Чумаков
4,1644252551,,"[""dn2p7pm9pdo5lopoqp0s""]",,"Confidence that ""dn2p7pm9pdo5lopoqp0s"" is targ...",admin,Агамир,Алиев,79217415897,,bot@smileson.ru,upsell,Potential candidate for 50k over 28 days period,,,,,,,,,Агамир Тофикофич Алиев
5,1644252551,,"[""dn2fhisotf1hpcen8t2n""]",,"Confidence that ""dn2fhisotf1hpcen8t2n"" is targ...",admin,,ООО «Гулливер энд Ко Интернейшнл»,79193012636,,cloud.gulliver@yandex.ru,upsell,Potential candidate for 50k over 28 days period,,,,,,,,,Общество с ограниченной ответственностью «Гулл...
6,1644252551,,"[""dn208mklvsmmlgactn1u""]",,"Confidence that ""dn208mklvsmmlgactn1u"" is targ...",admin,,KNOMARY PRODUCTION CJSC,375293572366,,info@knomary.com,upsell,Potential candidate for 50k over 28 days period,,,,,,,,,KNOMARY PRODUCTION CJSC
7,1644252551,,"[""dn21csmjctdva54ur8th""]",,"Confidence that ""dn21csmjctdva54ur8th"" is targ...",admin,Иван,Целых,79778680891,,sashagreysuhareva@yandex.ru,upsell,Potential candidate for 50k over 28 days period,,,,,,,,,Иван Дмитриевич Целых
8,1644252551,,"[""dn27beosenkq8rqaecsg""]",,"Confidence that ""dn27beosenkq8rqaecsg"" is targ...",admin,,hsr,84994000236,,hsr24.techteam@yandex.ru,upsell,Potential candidate for 50k over 28 days period,,,,,,,,,ООО «Хом шоппинг Раша»
9,1644252551,,"[""dn2o94kk9ijaighavbl2""]",,"Confidence that ""dn2o94kk9ijaighavbl2"" is targ...",admin,,"ООО ""ДИВЬЮ""",79037400787,,dvue@dvue.ru,upsell,Potential candidate for 50k over 28 days period,,,,,,,,,"ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТСТВЕННОСТЬЮ ""ДИВЬЮ"""


### Restore recycled

In [8]:
def max_by(x, y):
    return F.expr(f'max_by({x}, {y})')


class Generate_CSM_Leads:
    dm_yc_consumption_path = '//home/cloud-dwh/data/prod/cdm/dm_yc_consumption'
    dm_crm_tags = '//home/cloud-dwh/data/prod/cdm/dm_ba_crm_tags'

    def __init__(self, spark: SparkSession, yt_adapter: YTAdapter, days_period: int = 90, consumption_threshold: int = 150000) -> None:
        self.spark = spark
        self.consumption_threshold = consumption_threshold
        self.historical_data_adapter = CRMHistoricalDataAdapter(yt_adapter, spark)
        self.days_period = days_period
        self.today = datetime.now().date()
        self.date_from = self.today - timedelta(days_period)

    def get_ba_over_threshold(self):
        crm_tags_filter = (
            self.spark.read.yt(self.dm_crm_tags)
            .select(
                'billing_account_id',
                'account_owner_current',
                'usage_status_current',
                'segment_current',
                'state_current',
                'is_suspended_by_antifraud_current',
                'is_var_current'
            )
            .distinct()
        )
        ba_over_threshold = (
            self.spark.read.yt(self.dm_yc_consumption_path)
            .filter(F.to_date('billing_record_msk_date') >= self.date_from)
            .filter(F.to_date('billing_record_msk_date') < self.today)
            .groupby('billing_account_id')
            .agg(F.sum('billing_record_real_consumption_rub').alias('paid_cons'))
            .filter(col('paid_cons')>self.consumption_threshold)
            .join(crm_tags_filter, on='billing_account_id', how='left')
            .filter(col('usage_status_current')=='paid')
            .filter(col('segment_current').isin(['Mass', 'Medium']))
            .filter(col('account_owner_current')=='No Account Owner')
            .filter(col('state_current')=='active')
            .filter(~col('is_suspended_by_antifraud_current'))
            .filter(~col('is_var_current'))
            .select('billing_account_id', 'paid_cons')
            .cache()
        )

        rows_num = ba_over_threshold.count()
        days_num = self.days_period
        thrsh = self.consumption_threshold
        print(f'Loaded BAs with consumption exceeded {thrsh} RUB in {days_num} days period: {rows_num} row(s)')

        return ba_over_threshold

    def restore_recycled(self):
        ba_over_threshold = self.get_ba_over_threshold()
        crm_historical_data = self.historical_data_adapter.historical_preds().cache()
        recycled_ba = (
            crm_historical_data
            .filter(col('lead_source').like('%30k%') | col('lead_source').like('%50k%'))
            .groupby('billing_account_id')
            .agg(
                F.max('date_modified').alias("date_modified"),
                max_by('status', 'date_modified').alias("last_status")
            )
            .filter(F.lower(col("last_status")) == 'recycled')
            .filter(F.from_unixtime(col('date_modified')/1000000) < self.date_from)
            .cache()
        )
        qualified_ba = (
            crm_historical_data
            .filter(col('lead_source_crm')!='trial')
            .filter(col('status')=='Converted')
            .filter(~col('billing_account_id').isNull())
            .select('billing_account_id')
        )
        recycled = (
            crm_historical_data
            .join(recycled_ba, on=['billing_account_id', 'date_modified'], how='inner')
        )
        recycled_to_restore = (
            recycled
            .join(ba_over_threshold, how='leftsemi', on='billing_account_id')
            .withColumn('description',
                        lit('Restored from recycled, since consumed more'+
                            f'than {self.consumption_threshold} over {self.days_period} days'))
        )
        max_date = (
            recycled_to_restore
            .groupby('billing_account_id')
            .agg(F.max(col('date_entered')).alias('date_entered'))
        )
        filtered_restore = (
            recycled_to_restore
            .join(max_date, on=['billing_account_id', 'date_entered'], how='leftsemi')
            .join(qualified_ba, on='billing_account_id', how='leftanti')
        )
        return filtered_restore

    def top_cunsomers(self):
        ba_over_threshold = self.get_ba_over_threshold()
        crm_wo_trial = (
            self.historical_data_adapter
            .historical_preds()
            .filter(col('lead_source_crm')!='trial')
            .cache()
        )
        top_cunsomers = (
            ba_over_threshold
            .join(crm_wo_trial, on='billing_account_id', how='leftanti')
            .withColumn(
                'description',
                F.concat(
                    lit('Account "'),
                    col('billing_account_id'),
                    lit('" has consumption '),
                    F.round(col('paid_cons'), 2),
                    lit(f'rub over last {self.days_period} days')
                )
            )
        )
        return top_cunsomers

spdf_res = Generate_CSM_Leads(spark,
                              yt_adapter,
                              consumption_threshold=150000,
                              days_period=90).restore_recycled()

df_fr = spdf_res.toPandas()
print(df_fr.shape)
df_fr

Loaded BAs with consumption exceeded 150000 RUB in 90 days period: 69 row(s)
(13, 18)


Unnamed: 0,billing_account_id,date_entered,date_modified,lead_id,first_name,last_name,client_name,title,phone,description,timezone,status,lead_source,lead_priority,user_name,email,lead_source_crm,last_status
0,dn2k17tluvqbeip2nclb,1631109662000000,1631623433000000,4dc57b2a-10ad-11ec-ab73-9216a807bccf,Екатерина,Решетник,Веденеев Михаил Андреевич,,79254099451,"Restored from recycled, since consumed moretha...",Europe/Moscow,Recycled,Potential candidate for 30k over 28 days period,Medium,m-pisarenko,reshetnik-ecaterina@yandex.ru,upsell,Recycled
1,dn2k17tluvqbeip2nclb,1631109662000000,1631623433000000,4dc57b2a-10ad-11ec-ab73-9216a807bccf,Екатерина,Решетник,Веденеев Михаил Андреевич,,79254099451,"Restored from recycled, since consumed moretha...",Europe/Moscow,Recycled,Potential candidate for 30k over 28 days period,Medium,m-pisarenko,reshetnik.ecaterina@yandex.ru,upsell,Recycled
2,dn2eht2pbri1plgtldl3,1619355601000000,1631694819000000,387125c8-a5c6-11eb-8545-5840e080c8fc,Юрий,Киберспорт,КИБЕРСПОРТ,,79689755854,"Restored from recycled, since consumed moretha...",Asia/Yekaterinburg,Recycled,Potential candidate for 30k over 28 days period,Medium,dmayudin,cto@sport-pay.ru,upsell,Recycled
3,dn2qbr3hb10t6l3sis6i,1623070801000000,1628160583000000,69422346-c790-11eb-adc6-a93a7ead6470,Иван,Лебедь,Фонд «Образование и наука»,,79057471804,"Restored from recycled, since consumed moretha...",Europe/Moscow,Recycled,Potential candidate for 30k over 28 days period,Medium,ipivnev,iwan-lebed@yandex.ru,upsell,Recycled
4,dn2qk8bmmo4gm6ssudv0,1631537461000000,1631537723000000,7823f256-1491-11ec-af10-1110359b52e8,Support,MineBet,Богер Константин Викторович,,79139227503,"Restored from recycled, since consumed moretha...",Asia/Novosibirsk,Recycled,Potential candidate for 30k over 28 days period,Medium,m-pisarenko,support@minebet.com,upsell,Recycled
5,dn28u5fiq1pbl47tol5r,1632747721000000,1633001215000000,2034264e-1f93-11ec-aeb6-c57929b23bc0,Марат,Шайхетдинов,Шайхетдинов Марат Гадилович,,79372920862,"Restored from recycled, since consumed moretha...",Europe/Moscow,Recycled,Potential candidate for 50k over 28 days period,Medium,m-pisarenko,schaihetdinov-marat@yandex.ru,upsell,Recycled
6,dn28u5fiq1pbl47tol5r,1632747721000000,1633001215000000,2034264e-1f93-11ec-aeb6-c57929b23bc0,Марат,Шайхетдинов,Шайхетдинов Марат Гадилович,,79372920862,"Restored from recycled, since consumed moretha...",Europe/Moscow,Recycled,Potential candidate for 50k over 28 days period,Medium,m-pisarenko,schaihetdinov.marat@yandex.ru,upsell,Recycled
7,dn2n9j7q5h8hv1unjvh1,1623242401000000,1632986204000000,d151d892-c91f-11eb-8ef6-5cf32889de6c,Админ,Хайсмит,ООО ХайСмит,,79165707929,"Restored from recycled, since consumed moretha...",Europe/Moscow,Recycled,Potential candidate for 30k over 28 days period,Medium,askryuchkov,a-hismith@yandex.ru,upsell,Recycled
8,dn2ai7epnn4r2thoj1k6,1613041201000000,1620814745000000,55ab71a4-6c58-11eb-a3e9-a6c7e7fbf937,Артём,Круковец,ООО КСК,,79629316108,"Restored from recycled, since consumed moretha...",Europe/Moscow,Recycled,Potential candidate for 30k over 28 days period,Medium,askryuchkov,krukovets@bfs.su,upsell,Recycled
9,dn20k53ptrc5nvh4hqlr,1631537461000000,1631693789000000,72defb38-1491-11ec-860f-97c413777ae0,Ксения,Бухгалтер,Tips,,79251738482,"Restored from recycled, since consumed moretha...",Europe/Moscow,Recycled,Potential candidate for 30k over 28 days period,Medium,ipivnev,tips-uit@yandex.ru,upsell,Recycled


In [9]:
def get_current_upsell_staff_logins(spark):
    staff_info_path = '//home/cloud-dwh/data/prod/ods/staff/persons'
    staff_pii_info_path = '//home/cloud-dwh/data/prod/ods/staff/PII/persons'
    current_staff = (
        spark.read.yt(staff_info_path)
        .join(
            spark.read.yt(staff_pii_info_path),
            on='staff_user_id', how='left'
        )
        .filter(col('department_id').isin([16682, 13473]))
        .filter(~col('official_is_dismissed'))
        .select('staff_user_login')
    )
    staff_list = current_staff.toPandas()['staff_user_login'].tolist()
    return staff_list

contact_info_path = '//home/cloud_analytics/import/crm/leads/contact_info'
spdf_contacts = spark.read.yt(contact_info_path)
leads_timestamp = int((datetime.now()+timedelta(hours=3)).timestamp())  # adapting to Moscow timezone (on system is UTC+0)
actual_logins = get_current_upsell_staff_logins(spark)

spdf_res_leads = (
    spdf_res
    .join(spdf_contacts, on=['billing_account_id'], how="left")
    .withColumn('null_prev_names', spdf_res.first_name.isNull() & spdf_res.last_name.isNull())
    .withColumn('crm_phone', F.regexp_replace(spdf_res.phone, '[+\\-\\s\\(\\)]', ''))
    .select(
        lit(leads_timestamp).alias('Timestamp'),
        lit(None).astype('string').alias('CRM_Lead_ID'),
        F.concat(lit('["'), "billing_account_id", lit('"]')).alias("Billing_account_id"),
        lit(None).astype('string').alias('Status'),
        col('description').alias('Description'),
        F.when(col('user_name').isin(actual_logins), col('user_name')).otherwise(lit('admin')).alias('Assigned_to'),
        F.when(col('null_prev_names'), spdf_contacts.first_name).otherwise(spdf_res.first_name).alias('First_name'),
        F.when(col('null_prev_names'), spdf_contacts.last_name).otherwise(spdf_res.last_name).alias('Last_name'),
        spdf_contacts.phone.alias('Phone_1'),
        F.when(spdf_contacts.phone==col('crm_phone'), lit(None).astype('string')).otherwise(col('crm_phone')).alias('Phone_2'),
        spdf_res.email.alias('Email'),
        lit('upsell').alias('Lead_Source'),
        col('lead_source').alias('Lead_Source_Description'),
        lit(None).astype('string').alias('Callback_date'),
        lit(None).astype('string').alias('Last_communication_date'),
        lit(None).astype('string').alias('Promocode'),
        lit(None).astype('string').alias('Promocode_sum'),
        lit(None).astype('string').alias('Notes'),
        lit(None).astype('string').alias('Dimensions'),
        lit(None).astype('string').alias('Tags'),
        lit('').alias('Timezone'),
        col("display_name").alias('Account_name')
    )
)

spdf_res_leads.toPandas()

Unnamed: 0,Timestamp,CRM_Lead_ID,Billing_account_id,Status,Description,Assigned_to,First_name,Last_name,Phone_1,Phone_2,Email,Lead_Source,Lead_Source_Description,Callback_date,Last_communication_date,Promocode,Promocode_sum,Notes,Dimensions,Tags,Timezone,Account_name
0,1644263631,,"[""dn2n9j7q5h8hv1unjvh1""]",,"Restored from recycled, since consumed moretha...",admin,Админ,Хайсмит,79165707929,,a-hismith@yandex.ru,upsell,Potential candidate for 30k over 28 days period,,,,,,,,,Общество с Ограниченной Ответственностью ХАЙСМИТ
1,1644263631,,"[""dn2qbr3hb10t6l3sis6i""]",,"Restored from recycled, since consumed moretha...",admin,Иван,Лебедь,79057471804,,iwan-lebed@yandex.ru,upsell,Potential candidate for 30k over 28 days period,,,,,,,,,Фонд поддержки инновационных программ «Образов...
2,1644263631,,"[""dn2ivhgierstat7570t6""]",,"Restored from recycled, since consumed moretha...",gadirov,Александр,Зобов,79139406578,,printfestcloud@yandex.ru,upsell,Potential candidate for 30k over 28 days period,,,,,,,,,"ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТСТВЕННОСТЬЮ ""ФАБР..."
3,1644263631,,"[""dn2eht2pbri1plgtldl3""]",,"Restored from recycled, since consumed moretha...",dmayudin,Юрий,Киберспорт,79689755854,,cto@sport-pay.ru,upsell,Potential candidate for 30k over 28 days period,,,,,,,,,ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТСТВЕННОСТЬЮ «КИБЕ...
4,1644263631,,"[""dn2kbppkbeacsck5n6dv""]",,"Restored from recycled, since consumed moretha...",admin,,ИП Белов М. А.,79150349022,,it@likebg.ru,upsell,Potential candidate for 30k over 28 days period,,,,,,,,,Индивидуальный предприниматель Белов Максим Ал...
5,1644263631,,"[""dn20k53ptrc5nvh4hqlr""]",,"Restored from recycled, since consumed moretha...",admin,Ксения,Бухгалтер,79251738482,,tips-uit@yandex.ru,upsell,Potential candidate for 30k over 28 days period,,,,,,,,,Umbrella IT
6,1644263631,,"[""dn2k17tluvqbeip2nclb""]",,"Restored from recycled, since consumed moretha...",m-pisarenko,Екатерина,Решетник,79254099451,,reshetnik-ecaterina@yandex.ru,upsell,Potential candidate for 30k over 28 days period,,,,,,,,,Михаил Андреевич Веденеев
7,1644263631,,"[""dn2k17tluvqbeip2nclb""]",,"Restored from recycled, since consumed moretha...",m-pisarenko,Екатерина,Решетник,79254099451,,reshetnik.ecaterina@yandex.ru,upsell,Potential candidate for 30k over 28 days period,,,,,,,,,Михаил Андреевич Веденеев
8,1644263631,,"[""dn2qk8bmmo4gm6ssudv0""]",,"Restored from recycled, since consumed moretha...",m-pisarenko,Support,MineBet,79139227503,,support@minebet.com,upsell,Potential candidate for 30k over 28 days period,,,,,,,,,Константин Викторович Богер
9,1644263631,,"[""dn28h691n944hfqfe8f7""]",,"Restored from recycled, since consumed moretha...",m-pisarenko,Сергей,Новиков,79164154710,,solsidus@yandex.ru,upsell,Potential candidate for 30k over 28 days period,,,,,,,,,Сергей Сергеевич Новиков


### top_cunsomers

In [10]:
spdf_res = Generate_CSM_Leads(spark,
                              yt_adapter,
                              consumption_threshold=40000,
                              days_period=30).top_cunsomers()

df_fr = spdf_res.toPandas()
print(df_fr.shape)
df_fr

Loaded BAs with consumption exceeded 40000 RUB in 30 days period: 129 row(s)
(7, 3)


Unnamed: 0,billing_account_id,paid_cons,description
0,dn27beosenkq8rqaecsg,44360.259435,"Account ""dn27beosenkq8rqaecsg"" has consumption..."
1,dn2eu2t9aan486dih9qf,46412.544205,"Account ""dn2eu2t9aan486dih9qf"" has consumption..."
2,dn2edncqd067knrv4j4s,95763.81696,"Account ""dn2edncqd067knrv4j4s"" has consumption..."
3,dn24urb66ogmnf0bmdd1,45256.555887,"Account ""dn24urb66ogmnf0bmdd1"" has consumption..."
4,dn27kgiqkgoar0peks2l,43040.425642,"Account ""dn27kgiqkgoar0peks2l"" has consumption..."
5,dn29pbfcvt99lj5f9708,84030.95434,"Account ""dn29pbfcvt99lj5f9708"" has consumption..."
6,dn244ju40phs755rvvs9,44068.451944,"Account ""dn244ju40phs755rvvs9"" has consumption..."


In [11]:
exclude_csm_leads = '//home/cloud_analytics/ml/scoring/consumption_predictor_v2/crm/upsell/csm_history_test'
contact_info_path = '//home/cloud_analytics/import/crm/leads/contact_info'
spdf_contacts = spark.read.yt(contact_info_path)
leads_timestamp = int((datetime.now()+timedelta(hours=3)).timestamp())  # adapting to Moscow timezone (on system is UTC+0)

leads_to_exclude = (
    spark.read.yt(exclude_csm_leads)
    .select(F.regexp_replace('Billing_account_id', r'[\[\]"]', '').alias('billing_account_id'))
    .distinct()
)

spdf_res_leads = (
    spdf_res
    .join(spdf_contacts, on='billing_account_id', how="left")
    .join(leads_to_exclude, on='billing_account_id', how="leftanti")
    .select(
        lit(leads_timestamp).alias('Timestamp'),
        lit(None).astype('string').alias('CRM_Lead_ID'),
        F.concat(lit('["'), "billing_account_id", lit('"]')).alias("Billing_account_id"),
        lit(None).astype('string').alias('Status'),
        col('description').alias('Description'),
        lit('admin').alias('Assigned_to'),
        col('first_name').alias('First_name'),
        col('last_name').alias('Last_name'),
        col('phone').alias('Phone_1'),
        lit(None).astype('string').alias('Phone_2'),
        col('email').alias('Email'),
        lit('upsell').alias('Lead_Source'),
        lit('Consumed more than 40k over last 30 days').alias('Lead_Source_Description'),
        lit(None).astype('string').alias('Callback_date'),
        lit(None).astype('string').alias('Last_communication_date'),
        lit(None).astype('string').alias('Promocode'),
        lit(None).astype('string').alias('Promocode_sum'),
        lit(None).astype('string').alias('Notes'),
        lit(None).astype('string').alias('Dimensions'),
        lit(None).astype('string').alias('Tags'),
        lit('').alias('Timezone'),
        col("display_name").alias('Account_name')
    )
)

spdf_res_leads.toPandas()

Unnamed: 0,Timestamp,CRM_Lead_ID,Billing_account_id,Status,Description,Assigned_to,First_name,Last_name,Phone_1,Phone_2,Email,Lead_Source,Lead_Source_Description,Callback_date,Last_communication_date,Promocode,Promocode_sum,Notes,Dimensions,Tags,Timezone,Account_name
0,1644263834,,"[""dn2edncqd067knrv4j4s""]",,"Account ""dn2edncqd067knrv4j4s"" has consumption...",admin,,NOP RTK,79056658899,,noprtk@yandex.ru,upsell,Consumed more than 40k over last 30 days,,,,,,,,,NOP RTK
1,1644263834,,"[""dn29pbfcvt99lj5f9708""]",,"Account ""dn29pbfcvt99lj5f9708"" has consumption...",admin,,ООО «12 историй»,79775026184,,a.mokronosova@12storeez.com,upsell,Consumed more than 40k over last 30 days,,,,,,,,,Общество с ограниченной ответственностью «12 и...
2,1644263834,,"[""dn244ju40phs755rvvs9""]",,"Account ""dn244ju40phs755rvvs9"" has consumption...",admin,,ТК,89090020880,,aleksandr.sabirov@gt-m.ru,upsell,Consumed more than 40k over last 30 days,,,,,,,,,"ООО ""ГТ ИТ"""
