In [328]:
import os
import sys
curruser = os.environ.get('USER')

_labdata = os.environ.get("LABDATA_PYSPARK")
sys.path.insert(0, _labdata)

if curruser in os.listdir("/opt/workspace/"):
    sys.path.insert(0, '/opt/workspace/{user}/notebooks/support_library/'.format(user=curruser))
    sys.path.insert(0, '/opt/workspace/{user}/libs/python3.5/site-packages/'.format(user=curruser))
    # sys.path.insert(0, '/opt/workspace/{user}/notebooks/labdata_v1.2/lib/'.format(user=curruser))
else:
    sys.path.insert(0, '/home/{}/notebooks/support_library/'.format(curruser))
    sys.path.insert(0, '/home/{}/python35-libs/lib/python3.5/site-packages/'.format(curruser))
    # sys.path.insert(0, '/home/{}/notebooks/labdata/lib/'.format(curruser))

#import tendo.singleton
import warnings
warnings.filterwarnings('ignore')

import joblib
import json
from joblib import Parallel, delayed

from time import sleep
from itertools import islice
from multiprocessing import Pool, Process, JoinableQueue
from multiprocessing.pool import ThreadPool
from functools import partial
import subprocess
from threading import Thread
import time
from datetime import datetime as dt

from transliterate import translit

from lib.spark_connector import SparkConnector
from lib.sparkdb_loader import *
from lib.connector import OracleDB
import pyspark
from pyspark import SparkContext, SparkConf, HiveContext
from pyspark.sql.window import Window
from pyspark.sql.functions import *
import pyspark.sql.functions as f
from pyspark.sql.types import *
from pyspark.sql.dataframe import DataFrame

import re
import pandas as pd
import numpy as np
from tqdm._tqdm_notebook import tqdm_notebook
from pathlib import Path
import shutil
import loader as load
from collections import ChainMap

from lib.config import *
from lib.tools import *

# sing = tendo.singleton.SingleInstance()

# os.chdir('/opt/workspace/ektov1-av_ca-sbrf-ru/notebooks/Clickstream_Analytics/AutoUpdate/')
# os.chdir('/opt/workspace/{}/notebooks/clickstream/AutoUpdate/'.format(curruser))

def show(self, n=10):
    return self.limit(n).toPandas()

def typed_udf(return_type):
    '''Make a UDF decorator with the given return type'''

    def _typed_udf_wrapper(func):
        return f.udf(func,return_type)

    return _typed_udf_wrapper

pyspark.sql.dataframe.DataFrame.show = show

def print_and_log(message: str):
    print(message)
    logger.info(message)
    return None


In [3]:
sp = spark(schema=CONN_SCHEMA,
               dynamic_alloc=False,
               numofinstances=15,
               numofcores=8,
               executor_memory='40g',
               driver_memory='40g',
               kerberos_auth=False,
               process_label="TEST_PYSPARK_"
               )

hive = sp.sql
print(sp.sc.version)

2.4.0.cloudera2


# __init__ : begin


In [115]:
conn_schema = 'sbx_team_digitcamp'
table_name = 'MA_CMDM_MA_DEAL_NEW'

In [173]:
startdt = dt.now()

#/////////////////////////////////////////////////////////////////////////////////////
descr = hive.sql("describe extended {}.{}".format(conn_schema,table_name)).collect()
hasPartitioned = len([item.asDict() for item in descr if item.asDict()['col_name'] =='# Partition Information']) > 0
if hasPartitioned:
    try:
        parts = hive.sql("show partitions {}.{}".format(conn_schema,table_name)).collect()
        max_part = sorted(parts, reverse=True)[0]['partition']
        extract_date=re.compile("\d{4}\-\d{2}\-\d{2}")
        ext = extract_date.search(max_part)
        try:
            max_trunc_dt = ext.group(0)
        except:
            max_trunc_dt = None
    except (AnalysisException, IndexError):
        max_trunc_dt = None
else:
    max_trunc_dt = None
#/////////////////////////////////////////////////////////////////////////////////////


In [174]:
if max_trunc_dt is not None:
  sql = '''select * from {}.{} where creation_month in 
                 ('2021-06-01', '2021-05-01',' 2021-04-01') '''.format(conn_schema, table_name)
else:
  sql = '''select * from {}.{}'''.format(conn_schema,table_name)

deals = hive.sql(sql)
# max_resp_dt_str = dt.strftime(max_dt, format='%Y-%m-%d %H:%M:%S.%f')

In [150]:
deals.select('appl_stage_name').distinct().show(20)

Unnamed: 0,appl_stage_name
0,12.Закрыта/Отказ
1,07.Подготовка документации
2,05. Подготовка документов по сделке
3,04.Тендерная процедура
4,04.2.Ожидание оформ. обеспеч.
5,Договор подписан
6,02.Предварительные переговоры
7,03.Подписание док-ии Клиентом
8,Закрыта/Заключена
9,


## Cancelled Deals

In [270]:
deals_d = \
deals.filter((f.col('appl_stage_name').like("%Закрыта/Отказ%"))&
             (~f.isnull(f.col('close_reason_type_name')))&
             (~f.isnull(f.col('product_cd_mmb')))
            ) 
    
deals_d = deals_d.select('inn', 'product_cd_mmb', f.lit(1).alias('flg_deal_deny'))

deals_deny = deals_d.groupBy('product_cd_mmb').agg(f.count('flg_deal_deny').alias('cnt_deals_deny')).orderBy(f.col('cnt_deals_deny').desc())

In [271]:
deals_deny.cache()

DataFrame[product_cd_mmb: string, cnt_deals_deny: bigint]

In [272]:
deals_deny.show()

Unnamed: 0,product_cd_mmb,cnt_deals_deny
0,CREDCORPCARD,20285
1,SMARTCRED,4917
2,SMARTCARD,2055
3,EINVOICING,957
4,CORPCARD,912
5,SALARYPROJ,840
6,2216GARANTY,697
7,RKO,547
8,MOMENTBIZCARD,428
9,TELEMED,341


## Succeeded Deals

In [273]:
deals_s = \
deals.filter(f.col('appl_stage_name').like("%Закрыта/Заключена%")) \
     .filter(f.isnull(f.col('close_reason_type_name'))&(~f.isnull('product_cd_mmb'))
            ) \
     .select('inn', 
             'product_cd_mmb', 
             f.col('SUM_RUB').cast(IntegerType())
            ).orderBy(f.col('SUM_RUB').desc())

deals_succ = deals_s.groupBy('product_cd_mmb').agg(f.count('SUM_RUB').alias('cnt_deals_succ'),
                                                   f.sum('SUM_RUB').alias('total_sum_rub')
                                                  ).orderBy(f.col('cnt_deals_succ').desc())

In [274]:
deals_succ.cache()

DataFrame[product_cd_mmb: string, cnt_deals_succ: bigint, total_sum_rub: bigint]

In [275]:
deals_succ.show()

Unnamed: 0,product_cd_mmb,cnt_deals_succ,total_sum_rub
0,RKO,42582,288452
1,MINBALANCE,42506,6565099043
2,ENCASHSELF,36517,1946
3,DEPOSIT,30560,2678459350
4,SMSINFORM,15339,73789
5,MOMENTBIZCARD,14019,44
6,ACQUIRING,10571,87573912
7,CREDCORPCARD,9764,8427297
8,CORPCARD,7576,10184
9,SPPU,6167,35991


## MA_PRODUCT_DICT

In [329]:
prod_dct = hive.sql("select * from {}.{} where CRM_PRODUCT_ID is not Null".format(conn_schema, 'ma_dict_v_product_dict'))
prod_dct = prod_dct.filter("(PRODUCT_CD_MMB is not Null) and (PRODUCT_SHORT_NM is not Null)")

prod_dct.columns

['ID',
 'CRM_PRODUCT_ID',
 'PRODUCT_CRM_NM',
 'CRM_ACTIVE_FLG',
 'PRODUCT_CD_KSB',
 'PRODUCT_CD_RGS',
 'PRODUCT_CD_MMB',
 'PRODUCT_CD_ASUP',
 'PRODUCT_SHORT_NM',
 'PRODUCT_FULL_NM',
 'PARTNER_NM',
 'CHANNEL_SALE',
 'CHANNEL_KKSB_ISKRA',
 'CHANNEL_KKSB_EFS',
 'CHANNEL_RGS_ISKRA',
 'CHANNEL_ECO_EXPERT',
 'CHANNEL_MMB_MS',
 'CHANNEL_MMB_CKR',
 'CHANNEL_MMB_MKK',
 'CHANNEL_DZO',
 'AGG_FORMULA_KSB',
 'AGG_FORMULA_RGS',
 'CNT_DAY_DEAL',
 'PRODUCT_SUBGROUP',
 'PRODUCT_GROUP',
 'PRODUCT_TYPE',
 'PRODUCT_PRIORITY']

## MA_MMB_OFFER_NONTOP

In [336]:
nontop = hive.sql("select * from {}.{} where (INN is not Null) and (CREATE_DTTM >= timestamp('2021-04-01'))".format(conn_schema, 'MA_MMB_OFFER_NONTOP'))

In [337]:
def getProdCDFromId(product_dict_nm):
    def get_product(product_id, product_dict_nm):
        if product_id is None:
            return None

        for _id in product_dict_nm:
            if product_id == _id:
                return product_dict_nm[_id]

        return list([])

    return f.udf(lambda x: get_product(x,product_dict_nm), ArrayType(StringType()))

w2=Window.partitionBy('HASHINN', 'CRM_PRODUCT_ID').orderBy("COEF_PRODUCT_CD")

product_dict_sp = prod_dct.select('ID', 
                                  'CRM_PRODUCT_ID', 
                                  'PRODUCT_CD_MMB',
                                  'PRODUCT_SHORT_NM',
                                  'PRODUCT_GROUP', 
                                  'PRODUCT_SUBGROUP')

product_dict_pd = product_dict_sp.collect()
product_dict_nm = {row.ID: (row.CRM_PRODUCT_ID, 
                            row.PRODUCT_CD_MMB, 
                            row.PRODUCT_SHORT_NM,
                            row.PRODUCT_GROUP,
                            row.PRODUCT_SUBGROUP
                           ) for row in product_dict_pd}

nontop = nontop.withColumn('PROD_LST', getProdCDFromId(product_dict_nm)('PRODUCT_ID'))

nontop = nontop.select(['INN', f.col('COEF_PRODUCT_CD').alias("POTENTIAL")] +
                       [f.col('PROD_LST')[0].alias('CRM_PRODUCT_ID'), 
                        f.col('PROD_LST')[1].alias('PRODUCT_CD_MMB'),
                        f.col('PROD_LST')[2].alias('PRODUCT_SHORT_NM'),
                        f.col('PROD_LST')[3].alias('PRODUCT_GROUP'),
                        f.col('PROD_LST')[4].alias('PRODUCT_SUBGROUP')
                       ]).filter("PRODUCT_CD_MMB is not Null")

In [None]:
nontop.show()

In [340]:
mean_nontop = nontop.groupBy('CRM_PRODUCT_ID').agg(f.mean('POTENTIAL').cast(IntegerType()).alias('MEAN_POTENTIAL'),
                                                   f.first('PRODUCT_CD_MMB').alias('PRODUCT_CD_MMB'),
                                                   f.first('PRODUCT_SHORT_NM').alias('PRODUCT_SHORT_NM'),
                                                   f.first('PRODUCT_GROUP').alias('PRODUCT_GROUP'),
                                                   f.first('PRODUCT_SUBGROUP').alias('PRODUCT_SUBGROUP')
                                                  ).orderBy(
                                                            f.col("MEAN_POTENTIAL").desc()
                                                           )

mean_nontop.cache()

DataFrame[CRM_PRODUCT_ID: string, MEAN_POTENTIAL: int, PRODUCT_CD_MMB: string, PRODUCT_SHORT_NM: string, PRODUCT_GROUP: string, PRODUCT_SUBGROUP: string]

In [341]:
mean_nontop.count()

61

In [None]:
mean_nontop.show()

## OFFER_PRIORITY 

In [343]:
priority = hive.sql("select * from {}.{} where inn is not Null and load_dttm >= timestamp('2021-04-01')".format(conn_schema, 'OFFER_PRIORITY'))

In [344]:
priority_part = priority.select('INN', 
                                'CRM_PRODUCT_ID', 
                                f.col('offer_priority').alias("POTENTIAL") 
                                )

In [345]:
priority_part = priority_part.join(product_dict_sp, on='CRM_PRODUCT_ID', how='left_outer').select('INN', 
                                                                                                  'CRM_PRODUCT_ID', 
                                                                                                  'PRODUCT_CD_MMB',
                                                                                                  'PRODUCT_SHORT_NM',
                                                                                                  'PRODUCT_GROUP',
                                                                                                  'PRODUCT_SUBGROUP',
                                                                                                  'POTENTIAL').filter("PRODUCT_CD_MMB is not Null")

In [346]:
mean_priority = priority_part.groupBy('CRM_PRODUCT_ID').agg(f.mean('POTENTIAL').cast(IntegerType()).alias('MEAN_POTENTIAL'),
                                                            f.first('PRODUCT_CD_MMB').alias('PRODUCT_CD_MMB'),
                                                            f.first('PRODUCT_SHORT_NM').alias('PRODUCT_SHORT_NM'),
                                                            f.first('PRODUCT_GROUP').alias('PRODUCT_GROUP'),
                                                            f.first('PRODUCT_SUBGROUP').alias('PRODUCT_SUBGROUP')
                                                           ).orderBy(f.col("MEAN_POTENTIAL").desc())

mean_priority.cache()

DataFrame[CRM_PRODUCT_ID: string, MEAN_POTENTIAL: int, PRODUCT_CD_MMB: string, PRODUCT_SHORT_NM: string, PRODUCT_GROUP: string, PRODUCT_SUBGROUP: string]

In [347]:
mean_priority.cache()

DataFrame[CRM_PRODUCT_ID: string, MEAN_POTENTIAL: int, PRODUCT_CD_MMB: string, PRODUCT_SHORT_NM: string, PRODUCT_GROUP: string, PRODUCT_SUBGROUP: string]

In [348]:
mean_priority.count()

49

## Union Offers

In [349]:
union_offers = mean_priority.unionAll(mean_nontop).select('CRM_PRODUCT_ID', 
                                                          'PRODUCT_CD_MMB', 
                                                          'PRODUCT_SHORT_NM',
                                                          'PRODUCT_GROUP',
                                                          'PRODUCT_SUBGROUP',
                                                          'MEAN_POTENTIAL')
union_offers = union_offers.drop_duplicates()

In [350]:
union_offers = union_offers.join(deals_succ, on=['PRODUCT_CD_MMB'], how='left_outer').select(
                                                                                             'CRM_PRODUCT_ID',
                                                                                             'PRODUCT_CD_MMB',
                                                                                             'PRODUCT_SHORT_NM',
                                                                                             'PRODUCT_GROUP',
                                                                                             'PRODUCT_SUBGROUP',    
                                                                                             'MEAN_POTENTIAL',
                                                                                             'cnt_deals_succ',
                                                                                             'total_sum_rub'          
                                                                                             )


union_offers = union_offers.join(deals_deny, on=['PRODUCT_CD_MMB'], how='left_outer').select( 
                                                                                             'CRM_PRODUCT_ID',
                                                                                             'PRODUCT_CD_MMB',
                                                                                             'PRODUCT_SHORT_NM',
                                                                                             'PRODUCT_GROUP',
                                                                                             'PRODUCT_SUBGROUP',    
                                                                                             'MEAN_POTENTIAL',
                                                                                             'cnt_deals_deny',
                                                                                             'cnt_deals_succ',
                                                                                             'total_sum_rub'
                                                                                             )

union_offers = union_offers.drop_duplicates()

In [351]:
fin = union_offers.groupBy('CRM_PRODUCT_ID').agg( f.first('PRODUCT_CD_MMB').alias('PRODUCT_CD_MMB'),
                                                  f.first('PRODUCT_SHORT_NM').alias('PRODUCT_SHORT_NM'),
                                                  f.first('PRODUCT_GROUP').alias('PRODUCT_GROUP'),
                                                  f.first('PRODUCT_SUBGROUP').alias('PRODUCT_SUBGROUP'),
                                                  f.first('MEAN_POTENTIAL').alias('MEAN_POTENTIAL'),
                                                  f.sum('cnt_deals_succ').alias('cnt_deals_succ'.upper()),
                                                  f.sum('cnt_deals_deny').alias('cnt_deals_deny'.upper()),
                                                  f.sum('total_sum_rub').alias('total_sum_rub'.upper()),
                                                  ).orderBy(f.col("MEAN_POTENTIAL").desc(),
                                                            f.col("cnt_deals_succ").desc()
                                                           )

In [352]:
df = fin.filter(~f.isnull('cnt_deals_succ')).toPandas()

In [None]:
df.head()

In [354]:
df.to_excel("/opt/workspace/ektov1-av_ca-sbrf-ru/notebooks/site_idprod_deals_rank.xlsx", index = False, encoding='utf8')