In [1]:
# ASSUMPTIONS
# KEEP HIGHEST BUNDLE VALUE FOR CUSTOMERS THAT PURCHASED MORE THAN ONE BUNDLE
# CONSIDER CUSTOMERS THAT ARE MORE 3 MONTHS IN OUR BASE (TENURE >3) & 1 MONTH ACTIVE

### Table of Contents
* [Import libraries](#import_libraries)
* [Functions](#functions)
* [DF1](#df1)
    1. [Higher_Bundle_Migrators](#Higher_Bundle_Migrators)
    2. [gdpr](#gdpr)
    3. [status](#status)
    4. [usage](#usage)
    5. [Add/Convert_Features](#Add/Convert_Features)
    6. [demographics](#demographics)
    7. [post_code](#post_code)
    8. [students](#students)
    9. [buckets](#buckets)
    10. [balance](#balance)
    11. [drop_calls](#drop_calls)
    12. [tickets](#tickets)
    13. [channel](#channel)
    14. [NumberOfBundles](#NumberOfBundles)
* [DF2](#df2)
    1. [Higher_Bundle_Migrators2](#Higher_Bundle_Migrators2)
    2. [gdpr2](#gdpr2)
    3. [status2](#status2)
    4. [usage2](#usage2)
    5. [Add/Convert_Features2](#Add/Convert_Features2)
    6. [demographics2](#demographics2)
    7. [post_code2](#post_code2)
    8. [students2](#students2)
    9. [buckets2](#buckets2)
    10. [balance2](#balance2)
    11. [drop_calls2](#drop_calls2)
    12. [tickets2](#tickets2)
    13. [channel2](#channel2)
    14. [NumberOfBundles2](#NumberOfBundles2)
* [Modelling](#modelling)
    1. [Analysis](#Analysis)
    2. [Preprocessing](#Preprocessing)
    3. [Training](#Training)
    4. [Evaluation](#Evaluation)
    5. [Grid_Search](#Grid_Search)
    6. [Tuning](#Tuning)
* [Scoring_dataset](#Scoring_dataset)
    1. [gdpr3](#gdpr3)
    2. [status3](#status3)
    3. [usage3](#usage3)
    4. [Add/Convert_Features3](#Add/Convert_Features3)
    5. [demographics3](#demographics3)
    6. [post_code3](#post_code3)
    7. [students3](#students3)
    8. [buckets3](#buckets3)
    9. [balance3](#balance3)
    10. [drop_calls3](#drop_calls3)
    11. [tickets3](#tickets3)
    12. [channel3](#channel3)
    13. [NumberOfBundles3](#NumberOfBundles3)
* [Score](#score)
* [Save_model](#save_model)

In [2]:
#Set parameters
#input
customerprofilecar_rawprepared_bucket = "vfgr-dh-customerprofilecar-rawprepared"
permsandprefs_rawprepared_bucket = "vfgr-dh-permsandprefs-rawprepared"
mediatedcdrs_bucket = "vfgr-dh-mediatedcdrs-rawprepared"
cellinventory_bucket = "vfgr-dh-cellinventory-rawprepared"
model_outputs_bucket = "vfgr-dh-ca-modeloutputs"
dhdwh_bucket = "vfgr-dh-dwh-rawprepared"
medallia_bucket = 'vfgr-dh-medallia-rawprepared'

# Live project
#files_bucket = "vf-gr-ca-live-proda"
#output_bucket_new = "vfgr-dh-ca-modeloutputs"

# NonLive project
files_bucket = "vf-gr-ca-nonlive-devde"
output_bucket_new = "vf-gr-ca-nonlive-devds/modeloutputs"

version = "2.0"

In [3]:
gdpr = 'gs://'+ permsandprefs_rawprepared_bucket +'/ope_cpm_consent/'+ version +'/parquet/year={}/month={}/'
status = 'gs://' + customerprofilecar_rawprepared_bucket + '/car_pp_master_status/2.0/parquet/year={}/month={}/'
usage = 'gs://' + customerprofilecar_rawprepared_bucket + '/car_pp_master_usage/2.0/parquet/year={}/month={}/'

demographics = 'gs://'+ customerprofilecar_rawprepared_bucket +'/car_pega_customer/1.0/parquet/year={}/month={}/'
line = 'gs://' + customerprofilecar_rawprepared_bucket + '/car_line/1.0/parquet/year={}/month={}/day={}/'

buckets = 'gs://'+ model_outputs_bucket + '/prepay_buckets/result/parquet/1.0/year={}/month={}/'

status_service = 'gs://'+ dhdwh_bucket +'/master_status_services/1.0/parquet/year={}/month={}/'
ticket_service = 'gs://'+ dhdwh_bucket +'/mobile_sr_tt/1.0/parquet/year={}/month={}/'

cdrs = 'gs://'+ mediatedcdrs_bucket +'/eds_network_cdr/2.0/parquet/year={}/month={}/'
events = 'gs://'+ mediatedcdrs_bucket + '/alu_prepay_cdr/2.0/parquet/year={}/month={}/'

### Import_libaries <a class="anchor" id="import_libraries"></a>

In [4]:
%matplotlib inline
import numpy as np
import pandas as pd
import os
import re
import datetime
from dateutil.relativedelta import relativedelta
import calendar
from functools import reduce
import datetime
from pyspark.sql.functions import expr
from past.builtins import xrange
from pyspark.sql.types import *
from pyspark.sql import Window
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf, split, col, count, sum, coalesce, lit, avg
from pyspark.sql.functions import regexp_replace, col
from pyspark.sql import functions as F
from pyspark.sql.functions import format_number, format_string, dayofmonth, hour, month, year, date_format, countDistinct
from pyspark.sql.functions import rank, col, row_number, mean, stddev
from pyspark.sql.functions import isnan, when, count, col, round
from pyspark.sql.functions import desc
from pyspark.sql.functions import substring
from pyspark.sql.types import StructField, StringType, IntegerType, StructType
from pyspark.sql.functions import udf
from pyspark.sql.functions import mean, min, max
import subprocess
import math
from time import time
import warnings
warnings.filterwarnings('ignore')


pd.set_option('display.max_columns', 200)
pd.set_option('display.max_rows', 100)
pd.set_option('display.expand_frame_repr', True)

from pyspark.sql.session import SparkSession
from pyspark.conf import SparkConf

import findspark
findspark.init()
import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext 
sc = SparkContext.getOrCreate()
sql = SQLContext(sc)

from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder
import lightgbm as lgb
from sklearn.model_selection import GridSearchCV
from sklearn.linear_model import SGDClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.naive_bayes import MultinomialNB
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import confusion_matrix, accuracy_score
from sklearn.metrics import classification_report
import shap

In [5]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all" 

In [6]:
# Get spark
config = SparkConf().setAll([
        ('spark.driver.port', '59600'),
        ('spark.blockManager.port', '59601'),
        ('spark.broadcast.port', '59602'),
        ('spark.replClassServer', '59603'),
        ('spark.ui.port', '59604'),
        ('spark.executor.port', '59605'),
        ('spark.fileserver.port', '59606'),
        ('spark.yarn.executor.memoryOverhead', 4096),
        ('spark.yarn.driver.memoryOverhead', 6144)#,
    #('spark.sql.session.timeZone','EET')
    ])

spark = SparkSession.builder.config(conf=config).getOrCreate()
spark

In [7]:
print('Go')

Go


In [8]:
import time
start_time = time.time()
print(start_time)

1674230119.1559803


In [9]:
import datetime
currentDay = datetime.date.today().day
currentMonth = 12
currentYear = 2022

In [10]:
print(currentDay, currentMonth, currentYear)

20 12 2022


In [11]:
project_date = datetime.datetime(currentYear, currentMonth, 1)
print(project_date)

2022-12-01 00:00:00


In [12]:
from dateutil.relativedelta import relativedelta
year_val = []
month_val = []
for x in range (1,9):
        month = (project_date+relativedelta(months=-x)).date().month
        year = (project_date+relativedelta(months=-x)).date().year
        month_val.append(month)
        year_val.append(year)
        print(month_val, year_val)

[11] [2022]
[11, 10] [2022, 2022]
[11, 10, 9] [2022, 2022, 2022]
[11, 10, 9, 8] [2022, 2022, 2022, 2022]
[11, 10, 9, 8, 7] [2022, 2022, 2022, 2022, 2022]
[11, 10, 9, 8, 7, 6] [2022, 2022, 2022, 2022, 2022, 2022]
[11, 10, 9, 8, 7, 6, 5] [2022, 2022, 2022, 2022, 2022, 2022, 2022]
[11, 10, 9, 8, 7, 6, 5, 4] [2022, 2022, 2022, 2022, 2022, 2022, 2022, 2022]


### Functions <a class="anchor" id="functions"></a>

In [13]:
# Read data from a specific month,year

def read_in_data(in_file, year = None, month = None):
    df = spark.read.parquet(in_file.format(year, month))
    return df

In [14]:
# Read data from a specific day,month,year

def read_in_data2(in_file, year = None, month = None, day = None):
    df = spark.read.parquet(in_file.format(year, month, day))
    return df

In [15]:
def downsampling(dataset, n_majority):
    dataset_major = dataset[(dataset['HIGHER_BUNDLE'] == 0)]
    dataset_minor = dataset[(dataset['HIGHER_BUNDLE'] == 1)]
    major = dataset_major.sample(n=n_majority, random_state=10) #downsampling
    df_sampled = pd.concat([major, dataset_minor])
    df_shuffled = df_sampled.sample(frac=1)
    
    return df_shuffled

In [16]:
def up_down_sampling(X, y, up_proportion, down_proportion):
    # PIPELINE
    # upsampling
    over = SMOTE(up_proportion) # , k_neighbors=neighbors
    # downsampling
    under = RandomUnderSampler(down_proportion)
    #under = TomekLinks(ratio=down_proportion)
    steps = [('o', over), ('u', under)]
    pipeline = Pipeline(steps=steps)
    X_resampled, y_resampled = pipeline.fit_resample(X, y)
    
    return X_resampled, y_resampled

In [17]:
def stratified_sampling(dataset, n_majority):
    dataset_major = dataset[(dataset['HIGHER_BUNDLE'] == 0)]
    dataset_minor = dataset[(dataset['HIGHER_BUNDLE'] == 1)]
    major = dataset_major.groupby('TARIFF_PLAN', group_keys=False).apply(lambda x: x.sample(n_majority,replace=True))
    df_sampled = pd.concat([major, dataset_minor])
    df_shuffled = df_sampled.sample(frac=1)
    
    return df_shuffled

In [18]:
def split_x_y(df):
    X = df.drop(["MSISDN","HIGHER_BUNDLE"], axis=1)
    y = df["HIGHER_BUNDLE"].values
    
    return X,y

In [19]:
def encoding_data(X, y):
    le = LabelEncoder()
    objects = X.select_dtypes(include=['object'])
    object_names = list(objects.columns.values)
    
    # Encoding all the categorical columns
    for object_name in object_names:
        if (X[object_name].nunique() > 2):
            enc_pc = pd.get_dummies(X[object_name], drop_first = True)
            X = X.drop(object_name, axis = 1)
            X = pd.concat([X,enc_pc], axis = 1)
        elif (X[object_name].nunique() <= 2):
            le.fit(X[object_name].astype(str))
            X[object_name] = le.transform(X[object_name].astype(str))


    y = le.fit_transform(y)
    
    return X,y

In [20]:
def estimate_proportion(y):
    count_train_0= (y == 0).sum()
    count_train_1= (y == 1).sum()
    
    proportion = float(float(count_train_0) / float(count_train_1))
    
    return proportion

In [21]:
def split_train_test(X,y):
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.25, random_state = 1)
    
    return X_train, X_test, y_train, y_test

In [22]:
def evaluation(X_test, y_test, classifier):
    y_pred = classifier.predict(X_test)
    cm = confusion_matrix(y_test, y_pred)
    accuracy = accuracy_score(y_test, y_pred)
    
    target_names = ['No higher bundle', 'Higher Bundle']
    evaluation_report = classification_report(y_test, y_pred, target_names=target_names)
    
    return cm, accuracy, evaluation_report

In [23]:
def feature_importance(X_test, classifier):
    explainer = shap.TreeExplainer(classifier)
    shap_values = explainer.shap_values(X_test)
    
    return shap_values

### DF1 <a class="anchor" id="df1"></a>

### Higher_Bundle_Migrators

In [24]:
#print(month_val[2], month_val[1])

In [25]:
status_service = 'gs://'+ dhdwh_bucket +'/master_status_services/1.0/parquet/year={}/month={}'

y_prev = read_in_data2(status_service, year = year_val[2], month = month_val[2]).select("MSISDN", "SERVICE_CODE")
y_next = read_in_data2(status_service, year = year_val[1], month = month_val[1]).select("MSISDN", "SERVICE_CODE")

In [26]:
y_prev.createOrReplaceTempView("y_prev_view")
y_next.createOrReplaceTempView("y_next_view")

In [27]:
y_prev = spark.sql("""SELECT MSISDN, SERVICE_CODE FROM y_prev_view
                WHERE (SERVICE_CODE="BDLEthnicDataH") OR (SERVICE_CODE="BDLDataTazaINT") 
                
                OR (SERVICE_CODE="BDLIntegLPak") OR (SERVICE_CODE="BDLIntegPak") 
                OR (SERVICE_CODE="BDLIntegLInd") OR (SERVICE_CODE="BDLIntegInd")
                OR (SERVICE_CODE="BDLIntegLBang") OR (SERVICE_CODE="BDLIntegBang")
                OR (SERVICE_CODE="BDLAlbania") OR (SERVICE_CODE="BDLVFAlbInt")
                
                OR (SERVICE_CODE="BDLXNetData") OR (SERVICE_CODE="BDLPreCombo")
                
                OR (SERVICE_CODE="BDLTalkText600") OR (SERVICE_CODE="BDLComboMax") OR (SERVICE_CODE="BDLCUComboXL")                
                OR (SERVICE_CODE="BDLPasoComboH") OR (SERVICE_CODE="BDLPasoComboXL") OR (SERVICE_CODE="BDLPasoComboML")
                OR (SERVICE_CODE="BDLPasoComboTL")
                """)

In [28]:
y_next = spark.sql("""SELECT MSISDN, SERVICE_CODE FROM y_next_view
                WHERE (SERVICE_CODE="BDLEthnicDataH") OR (SERVICE_CODE="BDLDataTazaINT") 
                
                OR (SERVICE_CODE="BDLIntegLPak") OR (SERVICE_CODE="BDLIntegPak") 
                OR (SERVICE_CODE="BDLIntegLInd") OR (SERVICE_CODE="BDLIntegInd")
                OR (SERVICE_CODE="BDLIntegLBang") OR (SERVICE_CODE="BDLIntegBang")
                OR (SERVICE_CODE="BDLAlbania") OR (SERVICE_CODE="BDLVFAlbInt")
                
                OR (SERVICE_CODE="BDLXNetData") OR (SERVICE_CODE="BDLPreCombo")
                
                OR (SERVICE_CODE="BDLTalkText600") OR (SERVICE_CODE="BDLComboMax") OR (SERVICE_CODE="BDLCUComboXL")                
                OR (SERVICE_CODE="BDLPasoComboH") OR (SERVICE_CODE="BDLPasoComboXL") OR (SERVICE_CODE="BDLPasoComboML")
                OR (SERVICE_CODE="BDLPasoComboTL")
                """)

In [29]:
                # CU
y_prev = y_prev.withColumn("PRICE", when(col("SERVICE_CODE") == "BDLTalkText600", 10.9)
                .when(col("SERVICE_CODE") == "BDLComboMax", 13.5)
                .when(col("SERVICE_CODE") == "BDLCUComboXL", 15)
                .when(col("SERVICE_CODE") == "BDLPasoComboH", 8.5)
                .when(col("SERVICE_CODE") == "BDLPasoComboXL", 10)
                .when(col("SERVICE_CODE") == "BDLPasoComboML", 12)
                .when(col("SERVICE_CODE") == "BDLPasoComboTL", 17.5)
                # TAZA
                .when(col("SERVICE_CODE") == "BDLIntegLPak", 5.3)
                .when(col("SERVICE_CODE") == "BDLIntegLInd", 5.3)
                .when(col("SERVICE_CODE") == "BDLIntegLBang", 5.3)
                .when(col("SERVICE_CODE") == "BDLAlbania", 5.3)
                .when(col("SERVICE_CODE") == "BDLIntegPak", 8.5)
                .when(col("SERVICE_CODE") == "BDLIntegInd", 8.5)
                .when(col("SERVICE_CODE") == "BDLIntegBang", 8.5)
                .when(col("SERVICE_CODE") == "BDLVFAlbInt", 8.5)
                # INTERNATIONAL
                .when(col("SERVICE_CODE") == "BDLEthnicDataH", 8.9)
                .when(col("SERVICE_CODE") == "BDLDataTazaINT", 10.5)
                # VFPP
                .when(col("SERVICE_CODE") == "BDLXNetData", 10.9)
                .when(col("SERVICE_CODE") == "BDLPreCombo", 13.5)
                )

In [30]:
                # CU
y_next = y_next.withColumn("PRICE", when(col("SERVICE_CODE") == "BDLTalkText600", 10.9)
                .when(col("SERVICE_CODE") == "BDLComboMax", 13.5)
                .when(col("SERVICE_CODE") == "BDLCUComboXL", 15)
                .when(col("SERVICE_CODE") == "BDLPasoComboH", 8.5)
                .when(col("SERVICE_CODE") == "BDLPasoComboXL", 10)
                .when(col("SERVICE_CODE") == "BDLPasoComboML", 12)
                .when(col("SERVICE_CODE") == "BDLPasoComboTL", 17.5)
                # TAZA
                .when(col("SERVICE_CODE") == "BDLIntegLPak", 5.3)
                .when(col("SERVICE_CODE") == "BDLIntegLInd", 5.3)
                .when(col("SERVICE_CODE") == "BDLIntegLBang", 5.3)
                .when(col("SERVICE_CODE") == "BDLAlbania", 5.3)
                .when(col("SERVICE_CODE") == "BDLIntegPak", 8.5)
                .when(col("SERVICE_CODE") == "BDLIntegInd", 8.5)
                .when(col("SERVICE_CODE") == "BDLIntegBang", 8.5)
                .when(col("SERVICE_CODE") == "BDLVFAlbInt", 8.5)
                # INTERNATIONAL
                .when(col("SERVICE_CODE") == "BDLEthnicDataH", 8.9)
                .when(col("SERVICE_CODE") == "BDLDataTazaINT", 10.5)
                # VFPP
                .when(col("SERVICE_CODE") == "BDLXNetData", 10.9)
                .when(col("SERVICE_CODE") == "BDLPreCombo", 13.5)
                )

In [31]:
### drop duplicates and keep the highest value bundle
y_prev = y_prev.select("MSISDN", "SERVICE_CODE", "PRICE", F.row_number().over(Window.partitionBy("MSISDN").orderBy(desc("PRICE"))).alias("row_num"))
y_prev = y_prev.filter(col("row_num") == 1).drop("row_num")

### drop duplicates and keep the highest value bundle
y_next = y_next.select("MSISDN", "SERVICE_CODE", "PRICE", F.row_number().over(Window.partitionBy("MSISDN").orderBy(desc("PRICE"))).alias("row_num"))
y_next = y_next.filter(col("row_num") == 1).drop("row_num")

In [32]:
# Rename columns for next month
y_next = y_next.withColumnRenamed("PRICE", "PRICE_NEW")
y_next = y_next.withColumnRenamed("SERVICE_CODE", "SERVICE_CODE_NEW")

In [33]:
y_prev.createOrReplaceTempView("y_prev_view")
y_next.createOrReplaceTempView("y_next_view")

In [34]:
migrations = spark.sql("""SELECT A.MSISDN, A.PRICE, B.PRICE_NEW
                                FROM y_prev_view A
                                LEFT JOIN y_next_view B
                                ON A.MSISDN= B.MSISDN""")

In [35]:
migrations = migrations.na.fill(value= 0, subset=["PRICE_NEW"])

In [36]:
# ADD NEW COLUMN FLAG FOR THOSE WHO PURCHASED HIGHER VALUE BUNDLE THE NEXT MONTH
migrations = migrations.withColumn("HIGHER_BUNDLE", when(col("PRICE") < col("PRICE_NEW"), 1).otherwise(0))

In [37]:
higher_bundle_segmentation_1 = migrations.select("MSISDN", "HIGHER_BUNDLE")

In [38]:
#higher_bundle_segmentation_1.groupby("HIGHER_BUNDLE").count().show()

# Train Base

In [39]:
#print(year_val[2], month_val[2])

### gdpr

In [40]:
day_max = 1
gdpr = 'gs://'+ permsandprefs_rawprepared_bucket +'/ope_cpm_consent/'+ version +'/parquet/year={}/month={}/'

# find max day
locals()["find_day"] = read_in_data2(gdpr, year = year_val[2], month = month_val[2]).select('MSISDN_CLI', 'day')
locals()["day_max"] = locals()["find_day"].select(F.max(F.col("day")).alias("MAX")).limit(1).collect()[0].MAX
day_max = (locals()["day_max"])
#print(day_max)

In [41]:
gdpr = 'gs://'+ permsandprefs_rawprepared_bucket +'/ope_cpm_consent/'+ version +'/parquet/year={}/month={}/day={}/'

df_gdpr = read_in_data2(gdpr, year = year_val[2], month = month_val[2], day=day_max)

### status

In [42]:
# find max day
day_max = 1
status = 'gs://' + customerprofilecar_rawprepared_bucket + '/car_pp_master_status/2.0/parquet/year={}/month={}/'

#print(year_val[2], month_val[2])
locals()["find_day"] = read_in_data2(status, year = year_val[2], month = month_val[2]).select('MSISDN', 'day')
locals()["day_max"] = locals()["find_day"].select(F.max(F.col("day")).alias("MAX")).limit(1).collect()[0].MAX
day_max = (locals()["day_max"])
#print(day_max)

In [43]:
status = 'gs://' + customerprofilecar_rawprepared_bucket + '/car_pp_master_status/2.0/parquet/year={}/month={}/day={}/'

df_status = read_in_data2(status, year = year_val[2], month = month_val[2], day= day_max).select("MSISDN", "TARIFF_PLAN", "CONNECTION_DAY", "SMARTPHONE_FLAG", "INSERTED", "STATUS")

In [44]:
#df_status = df_status.select("MSISDN", "TARIFF_PLAN", "CONNECTION_DAY", "SMARTPHONE_FLAG", "INSERTED", "STATUS")

In [45]:
df_status.createOrReplaceTempView("status_view")

In [46]:
df_status = df_status.dropDuplicates(["MSISDN"])

In [47]:
df_status.count()

2663296

In [48]:
test = spark.sql("""SELECT *
                         FROM (
                             SELECT substring(MSISDN, 3 , 10) AS MSISDN, TARIFF_PLAN, CONNECTION_DAY, SMARTPHONE_FLAG, INSERTED
                             FROM status_view 
                             WHERE STATUS IN ('A','B'))
                            """ )

In [49]:
test.count()

2487966

In [50]:
df_gdpr.createOrReplaceTempView("gdpr_view")
df_status.createOrReplaceTempView("status_view")

In [51]:
# inner join status with gdpr
consent_prepay_status = spark.sql("""SELECT A.*
                         FROM (
                             SELECT substring(MSISDN, 3 , 10) AS MSISDN, TARIFF_PLAN, CONNECTION_DAY, SMARTPHONE_FLAG, INSERTED
                             FROM status_view A
                             WHERE STATUS IN ('A','B')
                             ) AS A
                         INNER JOIN 
                             (SELECT substring(MSISDN_CLI, 3 , 10) AS MSISDN, MAX(CONSENT_VALID_FROM_DATE) AS CONSENT_VALID_FROM_DATE
                              from gdpr_view
                              WHERE asset_status = 'Active' and CURRENT_IND = '1' and advanced_permission != 'NO'
                              GROUP BY MSISDN_CLI
                             ) AS B
                         ON A.MSISDN = B.MSISDN
                         ORDER BY A.MSISDN """)

In [52]:
consent_prepay_status.count()

2481657

### usage

In [53]:
# print(month_val[2], year_val[2])
# print(month_val[3], year_val[3])
# print(month_val[4], year_val[4])
# print(month_val[5], year_val[5])
# print(month_val[6], year_val[6])
# print(month_val[7], year_val[7])

In [54]:
# find max day available in dataset for each month
k = 0
day_max = []
usage = 'gs://' + customerprofilecar_rawprepared_bucket + '/car_pp_master_usage/2.0/parquet/year={}/month={}/'

for i,j in zip(year_val[2:],month_val[2:]):
    k=k+1
    locals()["find_day"+str(k)] = read_in_data2(usage, year = i, month = j).select('MSISDN', 'day')
    locals()["day_max"+str(k)] = locals()["find_day"+str(k)].select(F.max(F.col("day")).alias("MAX")).limit(1).collect()[0].MAX
    day_max.append(locals()["day_max"+str(k)])

In [55]:
# Read datasets from the previous 6-month period
k = 0 
day = 1
usage = 'gs://' + customerprofilecar_rawprepared_bucket + '/car_pp_master_usage/2.0/parquet/year={}/month={}/day={}/'

for i,j,m in zip(year_val[2:], month_val[2:], day_max):
    k=k+1
    locals()["usage_m"+str(k)] = read_in_data2(usage, year = i, month = j, day = m)

In [56]:
for name in usage_m1.schema.names: usage_m1 = usage_m1.withColumnRenamed(name, name.replace('M1', 'M1'))
for name in usage_m2.schema.names: usage_m2 = usage_m2.withColumnRenamed(name, name.replace('M1', 'M2'))
for name in usage_m3.schema.names: usage_m3 = usage_m3.withColumnRenamed(name, name.replace('M1', 'M3'))
for name in usage_m4.schema.names: usage_m4 = usage_m4.withColumnRenamed(name, name.replace('M1', 'M4'))
for name in usage_m5.schema.names: usage_m5 = usage_m5.withColumnRenamed(name, name.replace('M1', 'M5'))
for name in usage_m6.schema.names: usage_m6 = usage_m6.withColumnRenamed(name, name.replace('M1', 'M6'))

In [57]:
usage_m2 = usage_m2.withColumnRenamed("MSISDN", "M2_MSISDN")
usage_m3 = usage_m3.withColumnRenamed("MSISDN", "M3_MSISDN")
usage_m4 = usage_m4.withColumnRenamed("MSISDN", "M4_MSISDN")
usage_m5 = usage_m5.withColumnRenamed("MSISDN", "M5_MSISDN")
usage_m6 = usage_m6.withColumnRenamed("MSISDN", "M6_MSISDN")

In [58]:
usage_m1.createOrReplaceTempView("usage_view_m1")
usage_m2.createOrReplaceTempView("usage_view_m2")
usage_m3.createOrReplaceTempView("usage_view_m3")
usage_m4.createOrReplaceTempView("usage_view_m4")
usage_m5.createOrReplaceTempView("usage_view_m5")
usage_m6.createOrReplaceTempView("usage_view_m6")

In [59]:
# Take active base for the last three months
usage_prepay = spark.sql("""SELECT substring(A.MSISDN, 3 , 10) AS MSISDN,
                     A.M1_TTL_OUT_CALLS, A.M1_TTL_OUT_MINUTES,
                     A.M1_TTL_OUT_REVENUE, A.M1_V_CALLS_TO_FIXED_LINES, 
                     A.M1_V_MINUTES_TO_FIXED_LINES, A.M1_V_REVENUE_TO_FIXED_LINES, 
                     A.M1_V_CALLS_TO_COMPETITION, A.M1_V_MINUTES_TO_COMPETITION, A.M1_V_REVENUE_TO_COMPETITION,
                     A.M1_V_CALLS_TO_INTERNATIONAL, A.M1_V_MINUTES_TO_INTERNATIONAL, 
                     A.M1_V_REVENUE_TO_INTERNATIONAL, A.M1_GPRS_SESSION,
                     A.M1_GPRS_VOLUME, A.M1_GPRS_REVENUE, A.M1_RECHARGES_NUMBER, 
                     A.M1_RECHARGES_VALUE, A.M1_OUT_DAYS, A.M1_INC_DAYS, A.M1_BUNDLE_REVENUE,
                     
                     B.M2_TTL_OUT_CALLS, B.M2_TTL_OUT_MINUTES, 
                     B.M2_TTL_OUT_REVENUE, B.M2_V_CALLS_TO_FIXED_LINES,
                     B.M2_V_MINUTES_TO_FIXED_LINES, B.M2_V_REVENUE_TO_FIXED_LINES, 
                     B.M2_V_CALLS_TO_COMPETITION, B.M2_V_MINUTES_TO_COMPETITION, 
                     B.M2_V_REVENUE_TO_COMPETITION, B.M2_V_CALLS_TO_INTERNATIONAL,
                     B.M2_V_MINUTES_TO_INTERNATIONAL, B.M2_V_REVENUE_TO_INTERNATIONAL, 
                     B.M2_GPRS_SESSION, B.M2_GPRS_VOLUME, 
                     B.M2_GPRS_REVENUE, B.M2_RECHARGES_NUMBER, 
                     B.M2_RECHARGES_VALUE, B.M2_OUT_DAYS,
                     B.M2_INC_DAYS, B.M2_BUNDLE_REVENUE,
                     
                     C.M3_TTL_OUT_CALLS, C.M3_TTL_OUT_MINUTES, 
                     C.M3_TTL_OUT_REVENUE, C.M3_V_CALLS_TO_FIXED_LINES,
                     C.M3_V_MINUTES_TO_FIXED_LINES, C.M3_V_REVENUE_TO_FIXED_LINES, 
                     C.M3_V_CALLS_TO_COMPETITION, C.M3_V_MINUTES_TO_COMPETITION, 
                     C.M3_V_REVENUE_TO_COMPETITION, C.M3_V_CALLS_TO_INTERNATIONAL,
                     C.M3_V_MINUTES_TO_INTERNATIONAL, C.M3_V_REVENUE_TO_INTERNATIONAL, 
                     C.M3_GPRS_SESSION, C.M3_GPRS_VOLUME, 
                     C.M3_GPRS_REVENUE, C.M3_RECHARGES_NUMBER, 
                     C.M3_RECHARGES_VALUE, C.M3_OUT_DAYS,
                     C.M3_INC_DAYS, C.M3_BUNDLE_REVENUE,
                     
                     D.M4_TTL_OUT_CALLS, D.M4_TTL_OUT_MINUTES, 
                     D.M4_TTL_OUT_REVENUE, D.M4_V_CALLS_TO_FIXED_LINES,
                     D.M4_V_MINUTES_TO_FIXED_LINES, D.M4_V_REVENUE_TO_FIXED_LINES, 
                     D.M4_V_CALLS_TO_COMPETITION, D.M4_V_MINUTES_TO_COMPETITION, 
                     D.M4_V_REVENUE_TO_COMPETITION, D.M4_V_CALLS_TO_INTERNATIONAL,
                     D.M4_V_MINUTES_TO_INTERNATIONAL, D.M4_V_REVENUE_TO_INTERNATIONAL, 
                     D.M4_GPRS_SESSION, D.M4_GPRS_VOLUME, 
                     D.M4_GPRS_REVENUE, D.M4_RECHARGES_NUMBER, 
                     D.M4_RECHARGES_VALUE, D.M4_OUT_DAYS, 
                     D.M4_INC_DAYS, D.M4_BUNDLE_REVENUE,
                     
                     E.M5_TTL_OUT_CALLS, E.M5_TTL_OUT_MINUTES, 
                     E.M5_TTL_OUT_REVENUE, E.M5_V_CALLS_TO_FIXED_LINES,
                     E.M5_V_MINUTES_TO_FIXED_LINES, E.M5_V_REVENUE_TO_FIXED_LINES, 
                     E.M5_V_CALLS_TO_COMPETITION, E.M5_V_MINUTES_TO_COMPETITION, 
                     E.M5_V_REVENUE_TO_COMPETITION, E.M5_V_CALLS_TO_INTERNATIONAL,
                     E.M5_V_MINUTES_TO_INTERNATIONAL, E.M5_V_REVENUE_TO_INTERNATIONAL, 
                     E.M5_GPRS_SESSION, E.M5_GPRS_VOLUME, 
                     E.M5_GPRS_REVENUE, E.M5_RECHARGES_NUMBER, 
                     E.M5_RECHARGES_VALUE, E.M5_OUT_DAYS,
                     E.M5_INC_DAYS, E.M5_BUNDLE_REVENUE,
                  
                     F.M6_TTL_OUT_CALLS, F.M6_TTL_OUT_MINUTES, 
                     F.M6_TTL_OUT_REVENUE, F.M6_V_CALLS_TO_FIXED_LINES,
                     F.M6_V_MINUTES_TO_FIXED_LINES, F.M6_V_REVENUE_TO_FIXED_LINES, 
                     F.M6_V_CALLS_TO_COMPETITION, F.M6_V_MINUTES_TO_COMPETITION, 
                     F.M6_V_REVENUE_TO_COMPETITION, F.M6_V_CALLS_TO_INTERNATIONAL,
                     F.M6_V_MINUTES_TO_INTERNATIONAL, F.M6_V_REVENUE_TO_INTERNATIONAL, 
                     F.M6_GPRS_SESSION, F.M6_GPRS_VOLUME, 
                     F.M6_GPRS_REVENUE, F.M6_RECHARGES_NUMBER, 
                     F.M6_RECHARGES_VALUE, F.M6_OUT_DAYS,
                     F.M6_INC_DAYS, F.M6_BUNDLE_REVENUE
                     
                     from usage_view_m1 A
                     left join usage_view_m2 B
                          on A.MSISDN = B.M2_MSISDN 
                     left join usage_view_m3 C
                          on A.MSISDN = C.M3_MSISDN
                     left join usage_view_m4 D
                          on A.MSISDN = D.M4_MSISDN
                     left join usage_view_m5 E
                          on A.MSISDN = E.M5_MSISDN
                     left join usage_view_m6 F
                          on A.MSISDN = F.M6_MSISDN
                          
                     WHERE ((A.M1_TTL_OUT_CALLS > 0) or (A.M1_GPRS_SESSION > 0))""") 

In [60]:
usage_prepay = usage_prepay.dropDuplicates(["MSISDN"])

In [61]:
usage_prepay.createOrReplaceTempView("usage_prepay_view")
df_gdpr.createOrReplaceTempView("gdpr_view")

In [62]:
# join usage with consent
consent_prepay_usage = spark.sql("""SELECT A.*
                         FROM usage_prepay_view A
                         INNER JOIN 
                             (SELECT substring(MSISDN_CLI, 3 , 10) AS MSISDN, MAX(CONSENT_VALID_FROM_DATE) AS CONSENT_VALID_FROM_DATE
                              from gdpr_view
                              WHERE asset_status = 'Active' and CURRENT_IND = '1' and advanced_permission != 'NO'
                              GROUP BY MSISDN_CLI
                             ) AS B
                         ON A.MSISDN = B.MSISDN
                         ORDER BY A.MSISDN """) 

# join status with usage

In [63]:
consent_prepay_usage.createOrReplaceTempView("consent_prepay_usage_view")
consent_prepay_status.createOrReplaceTempView("consent_prepay_status_view")

In [64]:
base_usage_status = spark.sql("""SELECT A.*, B.TARIFF_PLAN, B.CONNECTION_DAY, B.SMARTPHONE_FLAG, B.INSERTED
                             FROM consent_prepay_usage_view A
                             INNER JOIN consent_prepay_status_view B
                                  ON A.MSISDN = B.MSISDN """)

In [65]:
# Fill NaNs
for c in base_usage_status.columns:
    if (dict(base_usage_status.dtypes)[c] == 'int64' or dict(base_usage_status.dtypes)[c] == 'double' or
       dict(base_usage_status.dtypes)[c] == 'int'):
        base_usage_status = base_usage_status.na.fill(value=0, subset=[c])
    elif (dict(base_usage_status.dtypes)[c] == 'object' or dict(base_usage_status.dtypes)[c] == 'string' or dict(base_usage_status.dtypes)[c] == 'timestamp'):
        base_usage_status = base_usage_status.na.fill(value="N/A", subset=[c])

### Add/Convert_Features

In [66]:
# 1. Convert from second -> minutes
# 2. Convert from KByte -> MByte
for column in base_usage_status.columns:
    if 'MINUTES' in column:
        base_usage_status = base_usage_status.withColumn(column, col(column)/60)
    if 'VOLUME' in column:
        base_usage_status = base_usage_status.withColumn(column, col(column)/1024)

# Group TARIFFS

In [67]:
# (ETHNIC -> INT, TAZA) & (DOMESTIC -> CU, VFPP)

In [68]:
# CU
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'Cuba40', 'CU'))
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'Cuba', 'CU'))

In [69]:
# VFPP
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'VALCBASE', 'VFPP'))
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'HAM', 'VFPP'))
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'Advanced', 'VFPP'))
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'OCFP', 'VFPP'))

In [70]:
# ETHNIC (INTERNATIONAL + TAZA)
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'INTPACK', 'INTERNATIONAL'))
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'Taza', 'TAZA'))
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'TAZA', 'TAZA'))

# Tenure

In [71]:
# Calculate tenure in months
base_usage_status = base_usage_status.withColumn("TENURE_IN_MONTHS", round(F.months_between(col("INSERTED"), col("CONNECTION_DAY"))))
base_usage_status = base_usage_status.withColumn("TENURE_IN_MONTHS", col("TENURE_IN_MONTHS").cast(IntegerType()))

In [72]:
# SELECT ONLY CUSTOMERS THAT ARE MORE THAN 3 MONTHS IN OUR DATABASE
base_usage_status = base_usage_status[base_usage_status["TENURE_IN_MONTHS"] > 3] 

# Average talk per time

In [73]:
for month in range(1,7):
    base_usage_status = base_usage_status.withColumn("M"+ str(month) + "_MINUTES_PER_CALL", 
                                                     col("M" + str(month) + "_TTL_OUT_MINUTES") / col("M" + str(month) + "_TTL_OUT_CALLS"))

In [74]:
# Fill occured NaNs
for c in base_usage_status.columns:
    if (dict(base_usage_status.dtypes)[c] == 'int64' or dict(base_usage_status.dtypes)[c] == 'double' or
       dict(base_usage_status.dtypes)[c] == 'int'):
        base_usage_status = base_usage_status.na.fill(value=0, subset=[c])

# ARPU

In [75]:
for month in range(1, 7):
    base_usage_status = base_usage_status.withColumn("M"+str(month)+"_ARPU", 
                        col("M"+str(month)+"_TTL_OUT_REVENUE") + col("M"+str(month)+"_GPRS_REVENUE") 
                                                     + col("M"+str(month)+"_BUNDLE_REVENUE"))

# ROC

In [76]:
for column in base_usage_status.schema.names:
    if ((dict(base_usage_status.dtypes)[column] == 'int64' or dict(base_usage_status.dtypes)[column] == 'double' or dict(base_usage_status.dtypes)[column] == 'int') and column!='MSISDN' and column!= 'TENURE_IN_MONTHS' and column[2:6]!="_ROC"):
        prev_month_avg = (col("M2"+column[2:]) + col("M3"+column[2:]) + col("M4"+column[2:]) + col("M5"+column[2:]) + col("M6"+column[2:])) / 5 
        base_usage_status = base_usage_status.withColumn("M1_ROC"+column[2:], (col("M1"+column[2:]) -  prev_month_avg) / prev_month_avg )

In [77]:
# # Fill occured NaNs
for c in base_usage_status.columns:
    if (dict(base_usage_status.dtypes)[c] == 'int64' or dict(base_usage_status.dtypes)[c] == 'double' or
       dict(base_usage_status.dtypes)[c] == 'int'):
        base_usage_status = base_usage_status.na.fill(value=0, subset=[c])

# Average for all and half period

In [78]:
# Create 3 month and 6 month averages for all usage columns
for column in base_usage_status.schema.names:
    if ((dict(base_usage_status.dtypes)[column] == 'int64' or dict(base_usage_status.dtypes)[column] == 'double' or dict(base_usage_status.dtypes)[column] == 'int') and column!='MSISDN' and column!= 'TENURE_IN_MONTHS' and column[2:6]!="_ROC"):
        # first semi-semester
        base_usage_status = base_usage_status.withColumn("M13_AVG"+ column[2:], (col("M1"+column[2:])+col("M2"+column[2:])+col("M3"+column[2:]))/3)
        # second semi-semester
        base_usage_status = base_usage_status.withColumn("M46_AVG"+ column[2:], (col("M4"+column[2:])+col("M5"+column[2:])+col("M6"+column[2:]))/3)
        # calculate the average for all six months     
        base_usage_status = base_usage_status.withColumn("M16_AVG"+ column[2:], (col("M1"+column[2:])+col("M2"+column[2:])+col("M3"+column[2:])+col("M4"+column[2:])+col("M5"+column[2:])+col("M6"+column[2:])) /6)

# Ratio (recharge value / bundle value)

In [79]:
for month in range(1, 7):
    base_usage_status = base_usage_status.withColumn("M"+str(month)+ "_EXPENDITURE_RATIO",
                                                     col("M"+str(month)+ "_RECHARGES_VALUE") / col("M"+str(month)+ "_BUNDLE_REVENUE"))

In [80]:
# # Fill occured NaNs
for c in base_usage_status.columns:
    if (dict(base_usage_status.dtypes)[c] == 'int64' or dict(base_usage_status.dtypes)[c] == 'double' or dict(base_usage_status.dtypes)[c] == 'int'):
        base_usage_status = base_usage_status.na.fill(value=0, subset=[c])

### demographics

In [81]:
# CAR LINE DATASET
line = 'gs://' + customerprofilecar_rawprepared_bucket + '/car_line/1.0/parquet/year={}/month={}/'

# find max day
locals()["find_day"] = read_in_data2(line, year = year_val[2], month = month_val[2]).select('MSISDN', 'day')
locals()["day_max"] = locals()["find_day"].select(F.max(F.col("day")).alias("MAX")).limit(1).collect()[0].MAX
day_max = (locals()["day_max"])

In [82]:
line = 'gs://' + customerprofilecar_rawprepared_bucket + '/car_line/1.0/parquet/year={}/month={}/day={}/'

df_line = read_in_data2(line, year = year_val[2], month = month_val[2], day= day_max).select("MSISDN", "RETAIL_CUST_ACCT_DWH_ID")
#df_line = df_line.select("MSISDN", "RETAIL_CUST_ACCT_DWH_ID")

In [83]:
df_line = df_line.dropDuplicates(["MSISDN","RETAIL_CUST_ACCT_DWH_ID"])

In [84]:
df_line = df_line.dropDuplicates(["MSISDN"])

In [85]:
base_usage_status.createOrReplaceTempView("base_usage_view")
df_line.createOrReplaceTempView("line_view")

In [86]:
# Join CAR_LINE with BASE_USAGE
usage_status_df = spark.sql ("""SELECT A.*, B.RETAIL_CUST_ACCT_DWH_ID
                          FROM base_usage_view A
                          INNER JOIN line_view B
                             ON A.MSISDN = B.MSISDN
                       """)

In [87]:
# DEMOGRAPHICS DATASET
demographics = 'gs://'+ customerprofilecar_rawprepared_bucket +'/car_pega_customer/1.0/parquet/year={}/month={}/'

day_max=1
# find max day
locals()["find_day"] = read_in_data2(demographics, year = year_val[2], month = month_val[2]).select('day')
locals()["day_max"] = locals()["find_day"].select(F.max(F.col("day")).alias("MAX")).limit(1).collect()[0].MAX
day_max = (locals()["day_max"])

In [88]:
demographics = 'gs://'+ customerprofilecar_rawprepared_bucket +'/car_pega_customer/1.0/parquet/year={}/month={}/day={}/'

df_demographics = read_in_data2(demographics, year = year_val[2], month = month_val[2], day= day_max)

In [89]:
# select specific columns
df_demographics = df_demographics.select("CUST_DWH_ID", "POST_CODE", "GENDER", "AGE", "VF_COMBO_FLG", "ACTIVE_TOTAL_LINES")

In [90]:
df_demographics = df_demographics.dropDuplicates(["CUST_DWH_ID"])

In [91]:
df_demographics = df_demographics.withColumn('POST_CODE', regexp_replace('POST_CODE', 'XXXXX', 'DUMMY'))

In [92]:
# Fill NaNs
for column in df_demographics.columns:
    if (dict(df_demographics.dtypes)[column] == 'int64' or dict(df_demographics.dtypes)[column] == 'double' or dict(df_demographics.dtypes)[column] == 'int'):
        # fill with mean
        mean = df_demographics.agg({column: "avg"}).collect()[0][0]
        df_demographics = df_demographics.na.fill(mean, subset=[column])
    elif (dict(df_demographics.dtypes)[column] == 'object' or dict(df_demographics.dtypes)[column] == 'string'):
        if (column == "GENDER"):
            df_demographics = df_demographics.na.fill(value="O", subset=[column])
            df_demographics = df_demographics.withColumn(column, when(col(column)== "" ,"O").otherwise(col(column)))
        if (column == "POST_CODE"):
            df_demographics = df_demographics.na.fill(value="DUMMY", subset=[column])
            df_demographics = df_demographics.withColumn(column, when(col(column)== "" ,"DUMMY").otherwise(col(column)))
        df_demographics = df_demographics.na.fill(value="N/A", subset=[column])

In [93]:
# STRANGE VALUES FOR AGES
df_demographics = df_demographics.withColumn("AGE", coalesce(col("AGE"), lit(0.0)))

In [94]:
df_demographics = df_demographics.withColumn("AGE", when(col("AGE") > 80, 80).otherwise(col("AGE")))
df_demographics = df_demographics.withColumn("AGE", when(col("AGE") < 17, 18).otherwise(col("AGE")))

# JOIN USAGE STATUS WITH DEMOGRAPHICS

In [95]:
df_demographics.createOrReplaceTempView("demographics_view")
usage_status_df.createOrReplaceTempView("usage_status_view")

In [96]:
# JOIN
usage_status_demo = spark.sql ("""SELECT A.*, B.POST_CODE, B.GENDER, B.AGE, B.VF_COMBO_FLG, B.ACTIVE_TOTAL_LINES
                          FROM usage_status_view A
                          LEFT JOIN demographics_view B
                             ON A.RETAIL_CUST_ACCT_DWH_ID = B.CUST_DWH_ID
                       """)

In [97]:
usage_status_demo = usage_status_demo.drop(col("RETAIL_CUST_ACCT_DWH_ID"))

In [98]:
# Fill NaNs
for column in df_demographics.columns:
    if column != 'CUST_DWH_ID':
        if (dict(usage_status_demo.dtypes)[column] == 'int64' or dict(usage_status_demo.dtypes)[column] == 'double' or dict(usage_status_demo.dtypes)[column] == 'int'):
            # fill with mean
            mean = usage_status_demo.agg({column: "avg"}).collect()[0][0]
            usage_status_demo = usage_status_demo.na.fill(mean, subset=[column])
        elif (dict(usage_status_demo.dtypes)[column] == 'object' or dict(usage_status_demo.dtypes)[column] == 'string'):
            if (column == "GENDER"):
                usage_status_demo = usage_status_demo.na.fill(value="O", subset=[column])
                usage_status_demo = usage_status_demo.withColumn(column, when(col(column)== "" ,"O").otherwise(col(column)))
            if (column == "POST_CODE"):
                usage_status_demo = usage_status_demo.na.fill(value="DUMMY", subset=[column])
                usage_status_demo = usage_status_demo.withColumn(column, when(col(column)== "" ,"DUMMY").otherwise(col(column)))
            usage_status_demo = usage_status_demo.na.fill(value="N/A", subset=[column])

### post_code

In [99]:
subprocess.call('/bin/sh /usr/bin/gsutil -q cp gs://' + files_bucket + '/notebooks/jupyter/higher_bundles/Sociodemographics.xlsx Sociodemographics.xlsx', shell=True)
Population_pools = pd.read_excel('Sociodemographics.xlsx')
population_pools_df = sql.createDataFrame(Population_pools)

0

In [100]:
population_pools_df = population_pools_df.drop("Postcode_key", "Name", "Periferiaki_enotita", "Population_aged_60+",
                                              "Male_Population_aged_60+", "Female_Population_aged_60+")

In [101]:
population_pools_df = population_pools_df.withColumnRenamed('Population_aged_0-14', 'Population_aged_0_14')
population_pools_df = population_pools_df.withColumnRenamed('Male_Population_aged_0-14', 'Male_Population_aged_0_14')
population_pools_df = population_pools_df.withColumnRenamed('Female_Population_aged_0-14', 'Female_Population_aged_0_14')

population_pools_df = population_pools_df.withColumnRenamed('Population_aged_15-29', 'Population_aged_15_29')
population_pools_df = population_pools_df.withColumnRenamed('Male_Population_aged_15-29', 'Male_Population_aged_15_29')
population_pools_df = population_pools_df.withColumnRenamed('Female_Population_aged_15-29', 'Female_Population_aged_15_29')

population_pools_df = population_pools_df.withColumnRenamed('Population_aged_30-44', 'Population_aged_30_44')
population_pools_df = population_pools_df.withColumnRenamed('Male_Population_aged_30-44', 'Male_Population_aged_30_44')
population_pools_df = population_pools_df.withColumnRenamed('Female_Population_aged_30-44', 'Female_Population_aged_30_44')

population_pools_df = population_pools_df.withColumnRenamed('Population_aged_45-59', 'Population_aged_45_59')
population_pools_df = population_pools_df.withColumnRenamed('Male_Population_aged_45-59', 'Male_Population_aged_45_59')
population_pools_df = population_pools_df.withColumnRenamed('Female_Population_aged_45-59', 'Female_Population_aged_45_59')

population_pools_df = population_pools_df.withColumnRenamed('Purchasing_Power:_million_Euro', 'Purchasing_Power_million_Euro')
population_pools_df = population_pools_df.withColumnRenamed('Purchasing_Power:_per_mill_of_country', 'Purchasing_Power_per_mill_of_country')
population_pools_df = population_pools_df.withColumnRenamed('Purchasing_Power:_Euro_per_capita', 'Purchasing_Power_Euro_per_capita')
population_pools_df = population_pools_df.withColumnRenamed('Purchasing_Power:_index_(country_eq.100)', 'Purchasing_Power_index_country_eq_100')

In [102]:
population_pools_df.createOrReplaceTempView("population_pools_view")
usage_status_demo.createOrReplaceTempView("usage_status_demo_view")

In [103]:
# Exclude ages from 60+
usage_status_demo_pc = spark.sql ("""SELECT A.*, B.Population, B.Households, B.Average_Household_Size, B.Male_Population,
                          B.Female_Population, B.Population_aged_0_14, B.Male_Population_aged_0_14, B.Female_Population_aged_0_14,
                          B.Population_aged_15_29, B.Male_Population_aged_15_29, B.Female_Population_aged_15_29, 
                          B.Population_aged_30_44, B.Male_Population_aged_30_44, B.Female_Population_aged_30_44,
                          B.Population_aged_45_59, B.Male_Population_aged_45_59, B.Female_Population_aged_45_59,
                          B.Purchasing_Power_million_Euro, B.Purchasing_Power_per_mill_of_country,
                          B.Purchasing_Power_Euro_per_capita, B.Purchasing_Power_index_country_eq_100

                          FROM usage_status_demo_view A                          
                          LEFT JOIN population_pools_view B                          
                          ON A.POST_CODE = B.POST_CODE
                       """)

In [104]:
# Fill NaNs
for column in population_pools_df.columns:
    usage_status_demo_pc = usage_status_demo_pc.na.fill(value=0, subset=[column])

### students

In [105]:
status_service = 'gs://'+ dhdwh_bucket +'/master_status_services/1.0/parquet/year={}/month={}/'

day_max=1
# find max day
locals()["find_day"] = read_in_data2(status_service, year = year_val[2], month = month_val[2]).select('day')
locals()["day_max"] = locals()["find_day"].select(F.max(F.col("day")).alias("MAX")).limit(1).collect()[0].MAX
day_max = (locals()["day_max"])
#print(day_max)

In [106]:
status_service = 'gs://'+ dhdwh_bucket +'/master_status_services/1.0/parquet/year={}/month={}/day={}'

df_status_service = read_in_data2(status_service, year = year_val[2], month = month_val[2], day=day_max)

In [107]:
df_status_service.createOrReplaceTempView("df_status_service_view")

In [108]:
df_students = spark.sql("""SELECT * 
                           FROM df_status_service_view A
                           WHERE SERVICE_CODE == 'BDLCUPaso' 
                           """)

In [109]:
df_students = df_students.dropDuplicates(["MSISDN"])

In [110]:
# ADD A NEW COLUMN WITH A FLAG TO INIDICATE THAT THIS USER IS STUDENT
df_students = df_students.withColumn("STUDENTS_FLAG", lit("Y"))

# LEFT JOIN USAGE_STATUS_DEMO WITH STUDENTS INFO

In [111]:
usage_status_demo_pc.createOrReplaceTempView("usage_status_demo_pc_view")
df_students.createOrReplaceTempView("students_view")

In [112]:
usage_status_demo_pc_stud = spark.sql("""SELECT A.*, B.STUDENTS_FLAG
                                FROM usage_status_demo_pc_view A
                                LEFT JOIN students_view B
                                ON A.MSISDN=B.MSISDN""")

In [113]:
# SET indicator N for NO if the user is not a students
usage_status_demo_pc_stud = usage_status_demo_pc_stud.na.fill(value="N", subset=["STUDENTS_FLAG"])

### buckets

In [114]:
month_val[2:4]

[9, 8]

In [115]:
# # Read datasets from the previous 2-month period
k = 0 
day = 1
buckets = 'gs://'+ model_outputs_bucket + '/prepay_buckets/result/parquet/1.0/year={}/month={}/'

for i,j in zip(year_val[2:4], month_val[2:4]):
    k=k+1
    locals()["buckets_m"+str(k)] = read_in_data2(buckets, year = i, month = j)

In [116]:
df_buckets = buckets_m1.union(buckets_m2)

In [117]:
df_buckets = df_buckets.sort(col("BUNDLE").asc(), col("ACTIVATION_DATE").desc())

In [118]:
### drop duplicates and keep first occurrence for each bundle
df_buckets = df_buckets.select("MSISDN", "ACTIVATION_DATE", "BUNDLE", "BUNDLE_REVENUE", "BALANCE", "VOICE_BUCKET", "DATA_BUCKET", "SMS_BUCKET",
                                   F.row_number().over(Window.partitionBy("MSISDN", "BUNDLE").orderBy(desc("ACTIVATION_DATE"))).alias("row_num"))

df_buckets = df_buckets.filter(col("row_num") == 1)

In [119]:
# Calculate summary for all buckets
df_buckets_summary = df_buckets.groupBy("MSISDN").agg(sum("VOICE_BUCKET").alias("VOICE_BUCKET_SUMMARY"),sum("DATA_BUCKET").alias("DATA_BUCKET_SUMMARY"),sum("SMS_BUCKET").alias("SMS_BUCKET_SUMMARY"))

# balance

In [120]:
### drop duplicates and keep last registration 
df_balance = df_buckets.select("MSISDN", "BALANCE", F.row_number().over(Window.partitionBy("MSISDN").orderBy(desc("ACTIVATION_DATE"))).alias("row_num"))

df_balance_summary = df_balance.filter(col("row_num") == 1).drop("row_num")

# Join buckets with balance

In [121]:
df_buckets_summary.createOrReplaceTempView("buckets_sum_view")
df_balance_summary.createOrReplaceTempView("balance_sum_view")

In [122]:
df_buckets_balance = spark.sql("""SELECT A.*, B.BALANCE
                             FROM buckets_sum_view A
                             INNER JOIN balance_sum_view B
                                  ON A.MSISDN = B.MSISDN """)

# Join bucket-balance with main dataset

In [123]:
usage_status_demo_pc_stud.createOrReplaceTempView("usage_status_demo_pc_stud_view")
df_buckets_balance.createOrReplaceTempView("buckets_balance_view")

In [124]:
usage_status_demo_pc_stud_buckets = spark.sql("""SELECT A.*, B.VOICE_BUCKET_SUMMARY, B.DATA_BUCKET_SUMMARY, B.SMS_BUCKET_SUMMARY, B.BALANCE
                                FROM usage_status_demo_pc_stud_view A
                                LEFT JOIN buckets_balance_view B
                                ON A.MSISDN=B.MSISDN""")

In [125]:
for c in df_buckets_balance.columns:
    if (dict(usage_status_demo_pc_stud_buckets.dtypes)[c] == 'int64' or dict(usage_status_demo_pc_stud_buckets.dtypes)[c] == 'double' or dict(usage_status_demo_pc_stud_buckets.dtypes)[column] == 'int'):
        usage_status_demo_pc_stud_buckets = usage_status_demo_pc_stud_buckets.na.fill(value=0, subset=[c])
    elif (dict(usage_status_demo_pc_stud_buckets.dtypes)[c] == 'object' or dict(usage_status_demo_pc_stud_buckets.dtypes)[c] == 'string' or dict(usage_status_demo_pc_stud_buckets.dtypes)[c] == 'timestamp'):
        usage_status_demo_pc_stud_buckets = usage_status_demo_pc_stud_buckets.na.fill(value="N/A", subset=[c])

### drop_calls

In [126]:
drop_calls = 'gs://'+ mediatedcdrs_bucket +'/eds_network_cdr/2.0/parquet/year={}/month={}/'

df_drop_calls = read_in_data2(drop_calls, year = year_val[2], month = month_val[2])

In [127]:
df_drop_calls = df_drop_calls.select("SAMPLED","A_NUMBER","FIRST_LAC","LAST_LAC","CELL","LAST_CELL","REC_TYPE","TARIFF",
                               "DURATION", "TERM_CAUSE", "day")

In [128]:
df_drop_calls.createOrReplaceTempView("cdrs_view")

In [129]:
drop_calls_query = spark.sql("""SELECT K.MSISDN,
                                count(*) as DROPPED_CALL_COUNT
                                FROM (
                                    SELECT L.MSISDN, L.SAMPLED, L.YEAR
                                    FROM (
                                       SELECT
                                       A.SAMPLED, A.A_NUMBER AS MSISDN, A.day, YEAR(A.SAMPLED) AS YEAR, SUBSTR(A.TERM_CAUSE,1,4) AS EOS,
                                       CASE WHEN A.FIRST_LAC LIKE '%IE%' THEN A.LAST_LAC  --this condition holds only for calls
                                           WHEN A.FIRST_LAC = '' THEN A.LAST_LAC          --this condition holds only for calls
                                           ELSE A.FIRST_LAC END FIRST_LAC,                --this condition holds for both calls and SMS
                                       CASE WHEN A.LAST_LAC = '' THEN A.FIRST_LAC         --this condition holds for both calls and SMS
                                           ELSE A.LAST_LAC END LAST_LAC,                  --this condition holds for both calls and SMS
                                       CASE WHEN A.CELL LIKE '%F%' THEN A.LAST_CELL       --this condition holds only for calls
                                           WHEN A.CELL = '' THEN A.LAST_CELL              --this condition holds only for calls
                                           ELSE A.CELL END CELL,                          --this condition holds for both calls and SMS
                                       CASE WHEN A.LAST_CELL = '' THEN A.CELL             --this condition holds for both calls and SMS
                                           ELSE A.LAST_CELL END LAST_CELL,                --this condition holds for both calls and SMS
                                       ROW_NUMBER() OVER (PARTITION BY A.A_NUMBER, A.SAMPLED ORDER BY A.A_NUMBER, A.SAMPLED) as LEVEL
                                       FROM cdrs_view A
                                       WHERE ((A.FIRST_LAC != '' AND A.LAST_LAC != '' AND A.CELL != '' AND A.LAST_CELL != '') OR  -- this condition holds only for calls
                                       (A.FIRST_LAC = '' AND A.LAST_LAC != '' AND A.CELL = '' AND A.LAST_CELL != '') OR    -- this condition holds only for calls
                                       (A.FIRST_LAC != '' AND A.LAST_LAC = '' AND A.CELL != '' AND A.LAST_CELL = '') OR    -- this condition holds for both calls and SMS
                                       (A.FIRST_LAC != '' AND A.LAST_LAC != '' AND A.CELL != '' AND A.LAST_CELL = ''))     -- this condition holds for both calls and SMS
                                       AND A.REC_TYPE IN ('20','30') AND A.TARIFF != '142'
                                       AND LENGTH(A.TERM_CAUSE) = 8
                                       AND SUBSTR(A.TERM_CAUSE,1,4) IN ('068F','08BF','09A6','09C3','09C5','09C8','09F8','0A0E','0A0F','0AE9','0C15','0CD2',
                                                                        '0CD3','0F7B','0F7C','018F','065D','065E','0700','0701','0702','09A7','09BF','09C0',
                                                                        '09C2','09C4','09C6','09C7','09C9','09F6','09F7','0A0A','0A0B','0A0C','0A0D','0C14',
                                                                        '0C16','0F7D','1C8F','1C90','1C91','1C92','1C9A','1C9B')
                                       AND A.A_NUMBER != '' AND A.A_NUMBER LIKE '69%' 
                                       --ORDER BY A.A_NUMBER, A.SAMPLED
                                       ) AS L
                                    WHERE L.LEVEL = 1
                                    --ORDER BY L.MSISDN, L.SAMPLED ASC
                                ) K
                                GROUP BY K.MSISDN
                                ORDER BY K.MSISDN
                             """)  

# Join drop calls with main dataset

In [130]:
usage_status_demo_pc_stud_buckets.createOrReplaceTempView("usage_status_demo_pc_stud_buckets_view")
drop_calls_query.createOrReplaceTempView("drop_calls_view")

In [131]:
usage_status_demo_pc_stud_buckets_dropcalls = spark.sql("""SELECT A.*, B.DROPPED_CALL_COUNT
                                FROM usage_status_demo_pc_stud_buckets_view A
                                LEFT JOIN drop_calls_view B
                                ON A.MSISDN=B.MSISDN""")

In [132]:
usage_status_demo_pc_stud_buckets_dropcalls = usage_status_demo_pc_stud_buckets_dropcalls.na.fill(value=0, subset=["DROPPED_CALL_COUNT"])

### tickets

In [133]:
ticket_service = 'gs://'+ dhdwh_bucket +'/mobile_sr_tt/1.0/parquet/year={}/month={}/'

df_tickets_requests = read_in_data2(ticket_service, year = year_val[2], month = month_val[2]).select("X_MSISDN","SR_ID").drop('service_file_id')

In [134]:
#df_tickets_requests = df_tickets_requests.select("X_MSISDN","SR_ID")

In [135]:
# drop duplicates
df_tickets_requests = df_tickets_requests.dropDuplicates(["X_MSISDN", "SR_ID"])

In [136]:
# ADD A COLUMN AS TICKETS COUNTER FOR EACH MSISDN
df_tickets_requests = df_tickets_requests.select("X_MSISDN", F.count("X_MSISDN").over(Window.partitionBy("X_MSISDN")).alias("TICKETS_COUNT"))

In [137]:
df_tickets_requests = df_tickets_requests.dropDuplicates(["X_MSISDN"])

# Join tickets with main dataset

In [138]:
usage_status_demo_pc_stud_buckets_dropcalls.createOrReplaceTempView("usage_status_demo_pc_stud_buckets_dropcalls_view")
df_tickets_requests.createOrReplaceTempView("tickets_requests_view")

In [139]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets = spark.sql("""SELECT A.*, B.TICKETS_COUNT
                                FROM usage_status_demo_pc_stud_buckets_dropcalls_view A
                                LEFT JOIN tickets_requests_view B
                                ON A.MSISDN=B.X_MSISDN""")

In [140]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets = usage_status_demo_pc_stud_buckets_dropcalls_tickets.na.fill(value=0, subset=["TICKETS_COUNT"])

### channel

In [141]:
events = 'gs://'+ mediatedcdrs_bucket + '/alu_prepay_cdr/2.0/parquet/year={}/month={}/'

df_events = read_in_data2(events, year = year_val[2], month = month_val[2])

In [142]:
df_events.createOrReplaceTempView("df_events_view")

In [143]:
df_bundle_purchase = spark.sql("""SELECT ACCOUNT_ID, REQUESTING_SYSTEM                             
                             FROM df_events_view 
                             WHERE ((EVENT_LABEL=139) AND (EVENT_RESULT=169)) 
                             """)

In [144]:
# Change column names
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'VOP', 'DIGITAL'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'CUapp', 'DIGITAL'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'MCare', 'DIGITAL'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'CUsite', 'DIGITAL'))

In [145]:
# Change column names
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'TAZAAPP', 'OTHER'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'EKIOSK', 'OTHER'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'PostpaidToPrepaid', 'OTHER'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'XPCVM', 'OTHER'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'LMG', 'OTHER'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'PEGA', 'OTHER'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'SMSVAS', 'OTHER'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'CRM', 'OTHER'))

In [146]:
# MIA KATHGORIA VFSHOP - (VFSHOP)
# ALLH KATHGORIA - THLEFWNO (IVR)  
# DIGITAL (VOP, MCARE, CUapp, CUsite)
# OTHER

In [147]:
# One-hot encoding - 4 kathgories
df_bundle_purchase = df_bundle_purchase.groupBy('ACCOUNT_ID').pivot('REQUESTING_SYSTEM').count()

In [148]:
for c in df_bundle_purchase.columns:
        df_bundle_purchase = df_bundle_purchase.na.fill(0, subset=[c])

In [149]:
df_bundle_purchase = df_bundle_purchase.select("ACCOUNT_ID", "DIGITAL", "VFShop", "IVR", "OTHER")

# Join channel with main dataset

In [150]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets.createOrReplaceTempView("usage_status_demo_pc_stud_buckets_dropcalls_tickets_view")
df_bundle_purchase.createOrReplaceTempView("bundle_purchase_view")

In [151]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel = spark.sql("""SELECT A.*, B.DIGITAL, B.VFShop, B.IVR, B.OTHER
                                FROM usage_status_demo_pc_stud_buckets_dropcalls_tickets_view A
                                LEFT JOIN bundle_purchase_view B
                                ON A.MSISDN= SUBSTR(B.ACCOUNT_ID,3,10)""")

In [152]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel = usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel.na.fill(value=0, subset=["DIGITAL", "VFShop", "IVR", "OTHER"])

### NumberOfBundles

In [153]:
# # Read datasets from the previous 6-month period
k = 0 
day = 1
events = 'gs://'+ mediatedcdrs_bucket + '/alu_prepay_cdr/2.0/parquet/year={}/month={}/'

for i,j in zip(year_val[2:], month_val[2:]):
    k=k+1
    locals()["events_m"+str(k)] = read_in_data2(events, year = i, month = j).select("ACCOUNT_ID", "PTP_COSP_AMA_CODE", "EVENT_LABEL", "EVENT_RESULT")

In [154]:
events_m1.createOrReplaceTempView("events_view_m1")
events_m2.createOrReplaceTempView("events_view_m2")
events_m3.createOrReplaceTempView("events_view_m3")
events_m4.createOrReplaceTempView("events_view_m4")
events_m5.createOrReplaceTempView("events_view_m5")
events_m6.createOrReplaceTempView("events_view_m6")

In [155]:
df_num_of_bundles_m1 = spark.sql("""SELECT ACCOUNT_ID, COUNT(PTP_COSP_AMA_CODE) AS BUNDLES_NUM                       
                             FROM events_view_m1 
                             WHERE ((EVENT_LABEL=139) AND (EVENT_RESULT=169)) 
                             GROUP BY ACCOUNT_ID
                             """)

In [156]:
df_num_of_bundles_m2 = spark.sql("""SELECT ACCOUNT_ID, COUNT(PTP_COSP_AMA_CODE) AS BUNDLES_NUM                       
                             FROM events_view_m2 
                             WHERE ((EVENT_LABEL=139) AND (EVENT_RESULT=169)) 
                             GROUP BY ACCOUNT_ID
                             """)

In [157]:
df_num_of_bundles_m3 = spark.sql("""SELECT ACCOUNT_ID, COUNT(PTP_COSP_AMA_CODE) AS BUNDLES_NUM                       
                             FROM events_view_m3 
                             WHERE ((EVENT_LABEL=139) AND (EVENT_RESULT=169)) 
                             GROUP BY ACCOUNT_ID
                             """)

In [158]:
df_num_of_bundles_m4 = spark.sql("""SELECT ACCOUNT_ID, COUNT(PTP_COSP_AMA_CODE) AS BUNDLES_NUM                       
                             FROM events_view_m4 
                             WHERE ((EVENT_LABEL=139) AND (EVENT_RESULT=169)) 
                             GROUP BY ACCOUNT_ID
                             """)

In [159]:
df_num_of_bundles_m5 = spark.sql("""SELECT ACCOUNT_ID, COUNT(PTP_COSP_AMA_CODE) AS BUNDLES_NUM                       
                             FROM events_view_m5 
                             WHERE ((EVENT_LABEL=139) AND (EVENT_RESULT=169)) 
                             GROUP BY ACCOUNT_ID
                             """)

In [160]:
df_num_of_bundles_m6 = spark.sql("""SELECT ACCOUNT_ID, COUNT(PTP_COSP_AMA_CODE) AS BUNDLES_NUM                       
                             FROM events_view_m6 
                             WHERE ((EVENT_LABEL=139) AND (EVENT_RESULT=169)) 
                             GROUP BY ACCOUNT_ID
                             """)

In [161]:
df_num_of_bundles_m1.createOrReplaceTempView("num_of_bundles_view_m1")
df_num_of_bundles_m2.createOrReplaceTempView("num_of_bundles_view_m2")
df_num_of_bundles_m3.createOrReplaceTempView("num_of_bundles_view_m3")
df_num_of_bundles_m4.createOrReplaceTempView("num_of_bundles_view_m4")
df_num_of_bundles_m5.createOrReplaceTempView("num_of_bundles_view_m5")
df_num_of_bundles_m6.createOrReplaceTempView("num_of_bundles_view_m6")

In [162]:
num_of_bundles_semester = spark.sql("""SELECT substring(A.ACCOUNT_ID, 3 , 10) AS ACCOUNT_ID, A.BUNDLES_NUM AS M1_BUNDLES_NUM, 
                     B.BUNDLES_NUM AS M2_BUNDLES_NUM, C.BUNDLES_NUM AS M3_BUNDLES_NUM, 
                     D.BUNDLES_NUM AS M4_BUNDLES_NUM,
                     E.BUNDLES_NUM AS M5_BUNDLES_NUM, 
                     F.BUNDLES_NUM AS M6_BUNDLES_NUM
                     
                     FROM num_of_bundles_view_m1 A                 
                    
                     left join num_of_bundles_view_m2 B
                          on A.ACCOUNT_ID = B.ACCOUNT_ID 
                     left join num_of_bundles_view_m3 C 
                          on A.ACCOUNT_ID = C.ACCOUNT_ID
                     left join num_of_bundles_view_m4 D 
                          on A.ACCOUNT_ID = D.ACCOUNT_ID
                     left join num_of_bundles_view_m5 E 
                          on A.ACCOUNT_ID = E.ACCOUNT_ID
                     left join num_of_bundles_view_m6 F
                          on A.ACCOUNT_ID = F.ACCOUNT_ID
                          """)

In [163]:
num_of_bundles_semester = num_of_bundles_semester.na.fill(value=0)

In [164]:
# Create 3 month and 6 month averages for all usage columns
# first semi-semester
num_of_bundles_semester = num_of_bundles_semester.withColumn("M13_AVG_BUNDLES_NUM", (col("M1_BUNDLES_NUM")+col("M2_BUNDLES_NUM")+col("M3_BUNDLES_NUM"))/3)
# second semi-semester
num_of_bundles_semester = num_of_bundles_semester.withColumn("M46_AVG_BUNDLES_NUM", (col("M4_BUNDLES_NUM")+col("M5_BUNDLES_NUM")+col("M6_BUNDLES_NUM"))/3)
# calculate the average for all six months     
num_of_bundles_semester = num_of_bundles_semester.withColumn("M16_AVG_BUNDLES_NUM", (col("M1_BUNDLES_NUM")+col("M2_BUNDLES_NUM")+col("M3_BUNDLES_NUM")+col("M4_BUNDLES_NUM")+col("M5_BUNDLES_NUM")+col("M6_BUNDLES_NUM")) /6)

# Join number of bundles with main dataset

In [165]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel.createOrReplaceTempView("usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel_view")
num_of_bundles_semester.createOrReplaceTempView("num_of_bundles_semester_view")

In [166]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel_nob = spark.sql("""SELECT A.*, B.M1_BUNDLES_NUM, B.M2_BUNDLES_NUM, B.M3_BUNDLES_NUM, 
                                B.M4_BUNDLES_NUM, B.M5_BUNDLES_NUM, B.M6_BUNDLES_NUM, B.M13_AVG_BUNDLES_NUM, B.M46_AVG_BUNDLES_NUM, 
                                B.M16_AVG_BUNDLES_NUM
                                FROM usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel_view A
                                LEFT JOIN num_of_bundles_semester_view B
                                ON A.MSISDN= B.ACCOUNT_ID""")

In [167]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel_nob = usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel_nob.na.fill(value=0)

# JOIN ALL DATASET WITH Y

In [168]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel_nob.createOrReplaceTempView("final_dataset_view")
higher_bundle_segmentation_1.createOrReplaceTempView("higher_bundle_segmentation_1_view")

In [169]:
final_df_train = spark.sql("""SELECT A.*, B.HIGHER_BUNDLE
                                FROM final_dataset_view A
                                LEFT JOIN higher_bundle_segmentation_1_view B
                                ON A.MSISDN= B.MSISDN""")

In [170]:
final_df_train = final_df_train.na.fill(value=0, subset=["HIGHER_BUNDLE"])

In [None]:
final_df_train.groupby("HIGHER_BUNDLE").count().show()

+-------------+-------+
|HIGHER_BUNDLE|  count|
+-------------+-------+
|            1|  33988|
|            0|1286401|
+-------------+-------+



# ------------------------------------------------------------------------------------------------------------#

### DF2 <a class="anchor" id="df2"></a>

### Higher_Bundle_Migrators2

In [None]:
#print(month_val[1], month_val[0])

In [None]:
status_service = 'gs://'+ dhdwh_bucket +'/master_status_services/1.0/parquet/year={}/month={}/'

y_prev = read_in_data2(status_service, year = year_val[1], month = month_val[1]).select("MSISDN", "SERVICE_CODE")
y_next = read_in_data2(status_service, year = year_val[0], month = month_val[0]).select("MSISDN", "SERVICE_CODE")

In [None]:
y_prev.createOrReplaceTempView("y_prev_view")
y_next.createOrReplaceTempView("y_next_view")

In [None]:
y_prev = spark.sql("""SELECT MSISDN, SERVICE_CODE FROM y_prev_view
                WHERE (SERVICE_CODE="BDLEthnicDataH") OR (SERVICE_CODE="BDLDataTazaINT") 
                
                OR (SERVICE_CODE="BDLIntegLPak") OR (SERVICE_CODE="BDLIntegPak") 
                OR (SERVICE_CODE="BDLIntegLInd") OR (SERVICE_CODE="BDLIntegInd")
                OR (SERVICE_CODE="BDLIntegLBang") OR (SERVICE_CODE="BDLIntegBang")
                OR (SERVICE_CODE="BDLAlbania") OR (SERVICE_CODE="BDLVFAlbInt")
                
                OR (SERVICE_CODE="BDLXNetData") OR (SERVICE_CODE="BDLPreCombo")
                
                OR (SERVICE_CODE="BDLTalkText600") OR (SERVICE_CODE="BDLComboMax") OR (SERVICE_CODE="BDLCUComboXL")                
                OR (SERVICE_CODE="BDLPasoComboH") OR (SERVICE_CODE="BDLPasoComboXL") OR (SERVICE_CODE="BDLPasoComboML")
                OR (SERVICE_CODE="BDLPasoComboTL")
                """)

In [None]:
y_next = spark.sql("""SELECT MSISDN, SERVICE_CODE FROM y_next_view
                WHERE (SERVICE_CODE="BDLEthnicDataH") OR (SERVICE_CODE="BDLDataTazaINT") 
                
                OR (SERVICE_CODE="BDLIntegLPak") OR (SERVICE_CODE="BDLIntegPak") 
                OR (SERVICE_CODE="BDLIntegLInd") OR (SERVICE_CODE="BDLIntegInd")
                OR (SERVICE_CODE="BDLIntegLBang") OR (SERVICE_CODE="BDLIntegBang")
                OR (SERVICE_CODE="BDLAlbania") OR (SERVICE_CODE="BDLVFAlbInt")
                
                OR (SERVICE_CODE="BDLXNetData") OR (SERVICE_CODE="BDLPreCombo")
                
                OR (SERVICE_CODE="BDLTalkText600") OR (SERVICE_CODE="BDLComboMax") OR (SERVICE_CODE="BDLCUComboXL")                
                OR (SERVICE_CODE="BDLPasoComboH") OR (SERVICE_CODE="BDLPasoComboXL") OR (SERVICE_CODE="BDLPasoComboML")
                OR (SERVICE_CODE="BDLPasoComboTL")
                """)

In [None]:
                # CU
y_prev = y_prev.withColumn("PRICE", when(col("SERVICE_CODE") == "BDLTalkText600", 10.9)
                .when(col("SERVICE_CODE") == "BDLComboMax", 13.5)
                .when(col("SERVICE_CODE") == "BDLCUComboXL", 15)
                .when(col("SERVICE_CODE") == "BDLPasoComboH", 8.5)
                .when(col("SERVICE_CODE") == "BDLPasoComboXL", 10)
                .when(col("SERVICE_CODE") == "BDLPasoComboML", 12)
                .when(col("SERVICE_CODE") == "BDLPasoComboTL", 17.5)
                # TAZA
                .when(col("SERVICE_CODE") == "BDLIntegLPak", 5.3)
                .when(col("SERVICE_CODE") == "BDLIntegLInd", 5.3)
                .when(col("SERVICE_CODE") == "BDLIntegLBang", 5.3)
                .when(col("SERVICE_CODE") == "BDLAlbania", 5.3)
                .when(col("SERVICE_CODE") == "BDLIntegPak", 8.5)
                .when(col("SERVICE_CODE") == "BDLIntegInd", 8.5)
                .when(col("SERVICE_CODE") == "BDLIntegBang", 8.5)
                .when(col("SERVICE_CODE") == "BDLVFAlbInt", 8.5)
                # INTERNATIONAL
                .when(col("SERVICE_CODE") == "BDLEthnicDataH", 8.9)
                .when(col("SERVICE_CODE") == "BDLDataTazaINT", 10.5)
                # VFPP
                .when(col("SERVICE_CODE") == "BDLXNetData", 10.9)
                .when(col("SERVICE_CODE") == "BDLPreCombo", 13.5)
                )

In [None]:
                # CU
y_next = y_next.withColumn("PRICE", when(col("SERVICE_CODE") == "BDLTalkText600", 10.9)
                .when(col("SERVICE_CODE") == "BDLComboMax", 13.5)
                .when(col("SERVICE_CODE") == "BDLCUComboXL", 15)
                .when(col("SERVICE_CODE") == "BDLPasoComboH", 8.5)
                .when(col("SERVICE_CODE") == "BDLPasoComboXL", 10)
                .when(col("SERVICE_CODE") == "BDLPasoComboML", 12)
                .when(col("SERVICE_CODE") == "BDLPasoComboTL", 17.5)
                # TAZA
                .when(col("SERVICE_CODE") == "BDLIntegLPak", 5.3)
                .when(col("SERVICE_CODE") == "BDLIntegLInd", 5.3)
                .when(col("SERVICE_CODE") == "BDLIntegLBang", 5.3)
                .when(col("SERVICE_CODE") == "BDLAlbania", 5.3)
                .when(col("SERVICE_CODE") == "BDLIntegPak", 8.5)
                .when(col("SERVICE_CODE") == "BDLIntegInd", 8.5)
                .when(col("SERVICE_CODE") == "BDLIntegBang", 8.5)
                .when(col("SERVICE_CODE") == "BDLVFAlbInt", 8.5)
                # INTERNATIONAL
                .when(col("SERVICE_CODE") == "BDLEthnicDataH", 8.9)
                .when(col("SERVICE_CODE") == "BDLDataTazaINT", 10.5)
                # VFPP
                .when(col("SERVICE_CODE") == "BDLXNetData", 10.9)
                .when(col("SERVICE_CODE") == "BDLPreCombo", 13.5)
                )

In [None]:
### drop duplicates and keep the highest value bundle
y_prev = y_prev.select("MSISDN", "SERVICE_CODE", "PRICE", F.row_number().over(Window.partitionBy("MSISDN").orderBy(desc("PRICE"))).alias("row_num"))
y_prev = y_prev.filter(col("row_num") == 1).drop("row_num")

### drop duplicates and keep the highest value bundle
y_next = y_next.select("MSISDN", "SERVICE_CODE", "PRICE", F.row_number().over(Window.partitionBy("MSISDN").orderBy(desc("PRICE"))).alias("row_num"))
y_next = y_next.filter(col("row_num") == 1).drop("row_num")

In [None]:
# Rename columns for next month
y_next = y_next.withColumnRenamed("PRICE", "PRICE_NEW")
y_next = y_next.withColumnRenamed("SERVICE_CODE", "SERVICE_CODE_NEW")

In [None]:
y_prev.createOrReplaceTempView("y_prev_view")
y_next.createOrReplaceTempView("y_next_view")

In [None]:
migrations = spark.sql("""SELECT A.MSISDN, A.PRICE, B.PRICE_NEW
                                FROM y_prev_view A
                                LEFT JOIN y_next_view B
                                ON A.MSISDN= B.MSISDN""")

In [None]:
migrations = migrations.na.fill(value= 0, subset=["PRICE_NEW"])

In [None]:
# ADD NEW COLUMN FLAG FOR THOSE WHO PURCHASED HIGHER VALUE BUNDLE THE NEXT MONTH
migrations = migrations.withColumn("HIGHER_BUNDLE", when(col("PRICE") < col("PRICE_NEW"), 1).otherwise(0))

In [None]:
#migrations.groupBy("HIGHER_BUNDLE").count().show()

In [None]:
higher_bundle_segmentation_2 = migrations.select("MSISDN", "HIGHER_BUNDLE")

# Test Base

In [None]:
month_val[1]

10

### gdpr2

In [None]:
day_max = 1
gdpr = 'gs://'+ permsandprefs_rawprepared_bucket +'/ope_cpm_consent/'+ version +'/parquet/year={}/month={}/'

# find max day
locals()["find_day"] = read_in_data2(gdpr, year = year_val[1], month = month_val[1]).select('MSISDN_CLI', 'day')
locals()["day_max"] = locals()["find_day"].select(F.max(F.col("day")).alias("MAX")).limit(1).collect()[0].MAX
day_max = (locals()["day_max"])
#print(day_max)

In [None]:
gdpr = 'gs://'+ permsandprefs_rawprepared_bucket +'/ope_cpm_consent/'+ version +'/parquet/year={}/month={}/day={}/'

df_gdpr = read_in_data2(gdpr, year = year_val[1], month = month_val[1], day=day_max)

### status2

In [None]:
# find max day
day_max = 1
status = 'gs://' + customerprofilecar_rawprepared_bucket + '/car_pp_master_status/2.0/parquet/year={}/month={}/'

#print(year_val[1], month_val[1])
locals()["find_day"] = read_in_data2(status, year = year_val[1], month = month_val[1]).select('MSISDN', 'day')
locals()["day_max"] = locals()["find_day"].select(F.max(F.col("day")).alias("MAX")).limit(1).collect()[0].MAX
day_max = (locals()["day_max"])
#print(day_max)

In [None]:
status = 'gs://' + customerprofilecar_rawprepared_bucket + '/car_pp_master_status/2.0/parquet/year={}/month={}/day={}/'

df_status = read_in_data2(status, year = year_val[1], month = month_val[1], day= day_max).select("MSISDN", "TARIFF_PLAN", "CONNECTION_DAY", "SMARTPHONE_FLAG", "INSERTED", "STATUS")

In [None]:
#df_status = df_status.select("MSISDN", "TARIFF_PLAN", "CONNECTION_DAY", "SMARTPHONE_FLAG", "INSERTED", "STATUS")

In [None]:
#df_status.createOrReplaceTempView("status_view")

In [None]:
df_status = df_status.dropDuplicates(["MSISDN"])

In [None]:
df_gdpr.createOrReplaceTempView("gdpr_view")
df_status.createOrReplaceTempView("status_view")

In [None]:
# inner join status with gdpr
consent_prepay_status = spark.sql("""SELECT A.*
                         FROM (
                             SELECT substring(MSISDN, 3 , 10) AS MSISDN, TARIFF_PLAN, CONNECTION_DAY, SMARTPHONE_FLAG, INSERTED
                             FROM status_view A
                             WHERE STATUS IN ('A','B')
                             ) AS A
                         INNER JOIN 
                             (SELECT substring(MSISDN_CLI, 3 , 10) AS MSISDN, MAX(CONSENT_VALID_FROM_DATE) AS CONSENT_VALID_FROM_DATE
                              from gdpr_view
                              WHERE asset_status = 'Active' and CURRENT_IND = '1' and advanced_permission != 'NO'
                              GROUP BY MSISDN_CLI
                             ) AS B
                         ON A.MSISDN = B.MSISDN
                         ORDER BY A.MSISDN """)

In [None]:
#print((consent_prepay_status.count(), len(consent_prepay_status.columns)))

### usage2

In [None]:
# print(month_val[1], year_val[1])
# print(month_val[2], year_val[2])
# print(month_val[3], year_val[3])
# print(month_val[4], year_val[4])
# print(month_val[5], year_val[5])
# print(month_val[6], year_val[6])

In [None]:
month_val[1:-1]

[10, 9, 8, 7, 6, 5]

In [None]:
# find max day available in dataset for each month
k = 0
day_max = []
usage = 'gs://' + customerprofilecar_rawprepared_bucket + '/car_pp_master_usage/2.0/parquet/year={}/month={}/'

for i,j in zip(year_val[1:-1],month_val[1:-1]):
    k=k+1
    locals()["find_day"+str(k)] = read_in_data2(usage, year = i, month = j).select('MSISDN', 'day')
    locals()["day_max"+str(k)] = locals()["find_day"+str(k)].select(F.max(F.col("day")).alias("MAX")).limit(1).collect()[0].MAX
    day_max.append(locals()["day_max"+str(k)])

In [None]:
# Read datasets from the previous 6-month period
k = 0 
day = 1
usage = 'gs://' + customerprofilecar_rawprepared_bucket + '/car_pp_master_usage/2.0/parquet/year={}/month={}/day={}/'

for i,j,m in zip(year_val[1:-1], month_val[1:-1], day_max):
    k=k+1
    locals()["usage_m"+str(k)] = read_in_data2(usage, year = i, month = j, day = m)

In [None]:
for name in usage_m1.schema.names: usage_m1 = usage_m1.withColumnRenamed(name, name.replace('M1', 'M1'))
for name in usage_m2.schema.names: usage_m2 = usage_m2.withColumnRenamed(name, name.replace('M1', 'M2'))
for name in usage_m3.schema.names: usage_m3 = usage_m3.withColumnRenamed(name, name.replace('M1', 'M3'))
for name in usage_m4.schema.names: usage_m4 = usage_m4.withColumnRenamed(name, name.replace('M1', 'M4'))
for name in usage_m5.schema.names: usage_m5 = usage_m5.withColumnRenamed(name, name.replace('M1', 'M5'))
for name in usage_m6.schema.names: usage_m6 = usage_m6.withColumnRenamed(name, name.replace('M1', 'M6'))

In [None]:
usage_m2 = usage_m2.withColumnRenamed("MSISDN", "M2_MSISDN")
usage_m3 = usage_m3.withColumnRenamed("MSISDN", "M3_MSISDN")
usage_m4 = usage_m4.withColumnRenamed("MSISDN", "M4_MSISDN")
usage_m5 = usage_m5.withColumnRenamed("MSISDN", "M5_MSISDN")
usage_m6 = usage_m6.withColumnRenamed("MSISDN", "M6_MSISDN")

In [None]:
usage_m1.createOrReplaceTempView("usage_view_m1")
usage_m2.createOrReplaceTempView("usage_view_m2")
usage_m3.createOrReplaceTempView("usage_view_m3")
usage_m4.createOrReplaceTempView("usage_view_m4")
usage_m5.createOrReplaceTempView("usage_view_m5")
usage_m6.createOrReplaceTempView("usage_view_m6")

In [None]:
# Take active base for the last three months
usage_prepay = spark.sql("""SELECT substring(A.MSISDN, 3 , 10) AS MSISDN,
                     A.M1_TTL_OUT_CALLS, A.M1_TTL_OUT_MINUTES,
                     A.M1_TTL_OUT_REVENUE, A.M1_V_CALLS_TO_FIXED_LINES, 
                     A.M1_V_MINUTES_TO_FIXED_LINES, A.M1_V_REVENUE_TO_FIXED_LINES, 
                     A.M1_V_CALLS_TO_COMPETITION, A.M1_V_MINUTES_TO_COMPETITION, A.M1_V_REVENUE_TO_COMPETITION,
                     A.M1_V_CALLS_TO_INTERNATIONAL, A.M1_V_MINUTES_TO_INTERNATIONAL, 
                     A.M1_V_REVENUE_TO_INTERNATIONAL, A.M1_GPRS_SESSION,
                     A.M1_GPRS_VOLUME, A.M1_GPRS_REVENUE, A.M1_RECHARGES_NUMBER, 
                     A.M1_RECHARGES_VALUE, A.M1_OUT_DAYS, A.M1_INC_DAYS, A.M1_BUNDLE_REVENUE,
                     
                     B.M2_TTL_OUT_CALLS, B.M2_TTL_OUT_MINUTES, 
                     B.M2_TTL_OUT_REVENUE, B.M2_V_CALLS_TO_FIXED_LINES,
                     B.M2_V_MINUTES_TO_FIXED_LINES, B.M2_V_REVENUE_TO_FIXED_LINES, 
                     B.M2_V_CALLS_TO_COMPETITION, B.M2_V_MINUTES_TO_COMPETITION, 
                     B.M2_V_REVENUE_TO_COMPETITION, B.M2_V_CALLS_TO_INTERNATIONAL,
                     B.M2_V_MINUTES_TO_INTERNATIONAL, B.M2_V_REVENUE_TO_INTERNATIONAL, 
                     B.M2_GPRS_SESSION, B.M2_GPRS_VOLUME, 
                     B.M2_GPRS_REVENUE, B.M2_RECHARGES_NUMBER, 
                     B.M2_RECHARGES_VALUE, B.M2_OUT_DAYS,
                     B.M2_INC_DAYS, B.M2_BUNDLE_REVENUE,
                     
                     C.M3_TTL_OUT_CALLS, C.M3_TTL_OUT_MINUTES, 
                     C.M3_TTL_OUT_REVENUE, C.M3_V_CALLS_TO_FIXED_LINES,
                     C.M3_V_MINUTES_TO_FIXED_LINES, C.M3_V_REVENUE_TO_FIXED_LINES, 
                     C.M3_V_CALLS_TO_COMPETITION, C.M3_V_MINUTES_TO_COMPETITION, 
                     C.M3_V_REVENUE_TO_COMPETITION, C.M3_V_CALLS_TO_INTERNATIONAL,
                     C.M3_V_MINUTES_TO_INTERNATIONAL, C.M3_V_REVENUE_TO_INTERNATIONAL, 
                     C.M3_GPRS_SESSION, C.M3_GPRS_VOLUME, 
                     C.M3_GPRS_REVENUE, C.M3_RECHARGES_NUMBER, 
                     C.M3_RECHARGES_VALUE, C.M3_OUT_DAYS,
                     C.M3_INC_DAYS, C.M3_BUNDLE_REVENUE,
                     
                     D.M4_TTL_OUT_CALLS, D.M4_TTL_OUT_MINUTES, 
                     D.M4_TTL_OUT_REVENUE, D.M4_V_CALLS_TO_FIXED_LINES,
                     D.M4_V_MINUTES_TO_FIXED_LINES, D.M4_V_REVENUE_TO_FIXED_LINES, 
                     D.M4_V_CALLS_TO_COMPETITION, D.M4_V_MINUTES_TO_COMPETITION, 
                     D.M4_V_REVENUE_TO_COMPETITION, D.M4_V_CALLS_TO_INTERNATIONAL,
                     D.M4_V_MINUTES_TO_INTERNATIONAL, D.M4_V_REVENUE_TO_INTERNATIONAL, 
                     D.M4_GPRS_SESSION, D.M4_GPRS_VOLUME, 
                     D.M4_GPRS_REVENUE, D.M4_RECHARGES_NUMBER, 
                     D.M4_RECHARGES_VALUE, D.M4_OUT_DAYS, 
                     D.M4_INC_DAYS, D.M4_BUNDLE_REVENUE,
                     
                     E.M5_TTL_OUT_CALLS, E.M5_TTL_OUT_MINUTES, 
                     E.M5_TTL_OUT_REVENUE, E.M5_V_CALLS_TO_FIXED_LINES,
                     E.M5_V_MINUTES_TO_FIXED_LINES, E.M5_V_REVENUE_TO_FIXED_LINES, 
                     E.M5_V_CALLS_TO_COMPETITION, E.M5_V_MINUTES_TO_COMPETITION, 
                     E.M5_V_REVENUE_TO_COMPETITION, E.M5_V_CALLS_TO_INTERNATIONAL,
                     E.M5_V_MINUTES_TO_INTERNATIONAL, E.M5_V_REVENUE_TO_INTERNATIONAL, 
                     E.M5_GPRS_SESSION, E.M5_GPRS_VOLUME, 
                     E.M5_GPRS_REVENUE, E.M5_RECHARGES_NUMBER, 
                     E.M5_RECHARGES_VALUE, E.M5_OUT_DAYS,
                     E.M5_INC_DAYS, E.M5_BUNDLE_REVENUE,
                  
                     F.M6_TTL_OUT_CALLS, F.M6_TTL_OUT_MINUTES, 
                     F.M6_TTL_OUT_REVENUE, F.M6_V_CALLS_TO_FIXED_LINES,
                     F.M6_V_MINUTES_TO_FIXED_LINES, F.M6_V_REVENUE_TO_FIXED_LINES, 
                     F.M6_V_CALLS_TO_COMPETITION, F.M6_V_MINUTES_TO_COMPETITION, 
                     F.M6_V_REVENUE_TO_COMPETITION, F.M6_V_CALLS_TO_INTERNATIONAL,
                     F.M6_V_MINUTES_TO_INTERNATIONAL, F.M6_V_REVENUE_TO_INTERNATIONAL, 
                     F.M6_GPRS_SESSION, F.M6_GPRS_VOLUME, 
                     F.M6_GPRS_REVENUE, F.M6_RECHARGES_NUMBER, 
                     F.M6_RECHARGES_VALUE, F.M6_OUT_DAYS,
                     F.M6_INC_DAYS, F.M6_BUNDLE_REVENUE
                     
                     from usage_view_m1 A
                     left join usage_view_m2 B
                          on A.MSISDN = B.M2_MSISDN 
                     left join usage_view_m3 C
                          on A.MSISDN = C.M3_MSISDN
                     left join usage_view_m4 D
                          on A.MSISDN = D.M4_MSISDN
                     left join usage_view_m5 E
                          on A.MSISDN = E.M5_MSISDN
                     left join usage_view_m6 F
                          on A.MSISDN = F.M6_MSISDN
                          
                     WHERE ((A.M1_TTL_OUT_CALLS > 0) or (A.M1_GPRS_SESSION > 0))""")

In [None]:
usage_prepay = usage_prepay.dropDuplicates(["MSISDN"])

In [None]:
usage_prepay.createOrReplaceTempView("usage_prepay_view")
df_gdpr.createOrReplaceTempView("gdpr_view")

In [None]:
# join usage with consent
consent_prepay_usage = spark.sql("""SELECT A.*
                         FROM usage_prepay_view A
                         INNER JOIN 
                             (SELECT substring(MSISDN_CLI, 3 , 10) AS MSISDN, MAX(CONSENT_VALID_FROM_DATE) AS CONSENT_VALID_FROM_DATE
                              from gdpr_view
                              WHERE asset_status = 'Active' and CURRENT_IND = '1' and advanced_permission != 'NO'
                              GROUP BY MSISDN_CLI
                             ) AS B
                         ON A.MSISDN = B.MSISDN
                         ORDER BY A.MSISDN """) 

In [None]:
#print((consent_prepay_usage.count(), len(consent_prepay_usage.columns)))

# join status with usage

In [None]:
consent_prepay_usage.createOrReplaceTempView("consent_prepay_usage_view")
consent_prepay_status.createOrReplaceTempView("consent_prepay_status_view")

In [None]:
base_usage_status = spark.sql("""SELECT A.*, B.TARIFF_PLAN, B.CONNECTION_DAY, B.SMARTPHONE_FLAG, B.INSERTED
                             FROM consent_prepay_usage_view A
                             INNER JOIN consent_prepay_status_view B
                                  ON A.MSISDN = B.MSISDN """)

In [None]:
# Fill NaNs
for c in base_usage_status.columns:
    if (dict(base_usage_status.dtypes)[c] == 'int64' or dict(base_usage_status.dtypes)[c] == 'double' or dict(base_usage_status.dtypes)[c] == 'int'):
        base_usage_status = base_usage_status.na.fill(value=0, subset=[c])
    elif (dict(base_usage_status.dtypes)[c] == 'object' or dict(base_usage_status.dtypes)[c] == 'string' or dict(base_usage_status.dtypes)[c] == 'timestamp'):
        base_usage_status = base_usage_status.na.fill(value="N/A", subset=[c])

In [None]:
#print((base_usage_status.count(), len(base_usage_status.columns)))

### Add/Convert_Features2

In [None]:
# 5. Convert from second -> minutes
# 6. Convert from KByte -> MByte
for column in base_usage_status.columns:
    if 'MINUTES' in column:
        base_usage_status = base_usage_status.withColumn(column, col(column)/60)
    if 'VOLUME' in column:
        base_usage_status = base_usage_status.withColumn(column, col(column)/1024)

# Group tariffs

In [None]:
# CU
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'Cuba40', 'CU'))
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'Cuba', 'CU'))

In [None]:
# VFPP
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'VALCBASE', 'VFPP'))
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'HAM', 'VFPP'))
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'Advanced', 'VFPP'))
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'OCFP', 'VFPP'))

In [None]:
# ETHNIC (INTERNATIONAL + TAZA)
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'INTPACK', 'INTERNATIONAL'))
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'Taza', 'TAZA'))
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'TAZA', 'TAZA'))

# Tenure

In [None]:
# Calculate tenure in months
base_usage_status = base_usage_status.withColumn("TENURE_IN_MONTHS", round(F.months_between(col("INSERTED"), col("CONNECTION_DAY"))))
base_usage_status = base_usage_status.withColumn("TENURE_IN_MONTHS", col("TENURE_IN_MONTHS").cast(IntegerType()))

In [None]:
# SELECT ONLY CUSTOMERS THAT ARE MORE THAN 3 MONTHS IN OUR DATABASE
base_usage_status = base_usage_status[base_usage_status["TENURE_IN_MONTHS"] > 3] 

# Average talk per time

In [None]:
for month in range(1,7):
    base_usage_status = base_usage_status.withColumn("M"+ str(month) + "_MINUTES_PER_CALL", 
                                                     col("M" + str(month) + "_TTL_OUT_MINUTES") / col("M" + str(month) + "_TTL_OUT_CALLS"))

In [None]:
# Fill occured NaNs
for c in base_usage_status.columns:
    if (dict(base_usage_status.dtypes)[c] == 'int64' or dict(base_usage_status.dtypes)[c] == 'double' or
       dict(base_usage_status.dtypes)[c] == 'int'):
        base_usage_status = base_usage_status.na.fill(value=0, subset=[c])

# ARPU

In [None]:
for month in range(1, 7):
    base_usage_status = base_usage_status.withColumn("M"+str(month)+"_ARPU", 
                        col("M"+str(month)+"_TTL_OUT_REVENUE") + col("M"+str(month)+"_GPRS_REVENUE") 
                                                     + col("M"+str(month)+"_BUNDLE_REVENUE"))

# ROC

In [None]:
for column in base_usage_status.schema.names:
    if ((dict(base_usage_status.dtypes)[column] == 'int64' or dict(base_usage_status.dtypes)[column] == 'double' or dict(base_usage_status.dtypes)[column] == 'int') and column!='MSISDN' and column!= 'TENURE_IN_MONTHS' and column[2:6]!="_ROC"):
        prev_month_avg = (col("M2"+column[2:]) + col("M3"+column[2:]) + col("M4"+column[2:]) + col("M5"+column[2:]) + col("M6"+column[2:])) / 5 
        base_usage_status = base_usage_status.withColumn("M1_ROC"+column[2:], (col("M1"+column[2:]) -  prev_month_avg) / prev_month_avg )

In [None]:
# # Fill occured NaNs
for c in base_usage_status.columns:
    if (dict(base_usage_status.dtypes)[c] == 'int64' or dict(base_usage_status.dtypes)[c] == 'double' or
       dict(base_usage_status.dtypes)[c] == 'int'):
        base_usage_status = base_usage_status.na.fill(value=0, subset=[c])

# Average for half and all semester

In [None]:
# Create 3 month and 6 month averages for all usage columns
for column in base_usage_status.schema.names:
    if ((dict(base_usage_status.dtypes)[column] == 'int64' or dict(base_usage_status.dtypes)[column] == 'double' or dict(base_usage_status.dtypes)[column] == 'int') and column!='MSISDN' and column!= 'TENURE_IN_MONTHS' and column[2:6]!="_ROC"):
        # first semi-semester
        base_usage_status = base_usage_status.withColumn("M13_AVG"+ column[2:], (col("M1"+column[2:])+col("M2"+column[2:])+col("M3"+column[2:]))/3)
        # second semi-semester
        base_usage_status = base_usage_status.withColumn("M46_AVG"+ column[2:], (col("M4"+column[2:])+col("M5"+column[2:])+col("M6"+column[2:]))/3)
        # calculate the average for all six months     
        base_usage_status = base_usage_status.withColumn("M16_AVG"+ column[2:], (col("M1"+column[2:])+col("M2"+column[2:])+col("M3"+column[2:])+col("M4"+column[2:])+col("M5"+column[2:])+col("M6"+column[2:])) /6)

# Ratio racharge/bundle value

In [None]:
for month in range(1, 7):
    base_usage_status = base_usage_status.withColumn("M"+str(month)+ "_EXPENDITURE_RATIO",
                                                     col("M"+str(month)+ "_RECHARGES_VALUE") / col("M"+str(month)+ "_BUNDLE_REVENUE"))

In [None]:
# # Fill occured NaNs
for c in base_usage_status.columns:
    if (dict(base_usage_status.dtypes)[c] == 'int64' or dict(base_usage_status.dtypes)[c] == 'double' or
       dict(base_usage_status.dtypes)[c] == 'int'):
        base_usage_status = base_usage_status.na.fill(value=0, subset=[c])

### demographics2

In [None]:
# CAR LINE DATASET
line = 'gs://' + customerprofilecar_rawprepared_bucket + '/car_line/1.0/parquet/year={}/month={}/'

# find max day
locals()["find_day"] = read_in_data2(line, year = year_val[1], month = month_val[1]).select('MSISDN', 'day')
locals()["day_max"] = locals()["find_day"].select(F.max(F.col("day")).alias("MAX")).limit(1).collect()[0].MAX
day_max = (locals()["day_max"])

In [None]:
line = 'gs://' + customerprofilecar_rawprepared_bucket + '/car_line/1.0/parquet/year={}/month={}/day={}/'

df_line = read_in_data2(line, year = year_val[1], month = month_val[1], day= day_max)
df_line = df_line.select("MSISDN", "RETAIL_CUST_ACCT_DWH_ID")

In [None]:
df_line = df_line.dropDuplicates(["MSISDN","RETAIL_CUST_ACCT_DWH_ID"])

In [None]:
df_line = df_line.dropDuplicates(["MSISDN"])

In [None]:
base_usage_status.createOrReplaceTempView("base_usage_view")
df_line.createOrReplaceTempView("line_view")

In [None]:
# Join CAR_LINE with BASE_USAGE
usage_status_df = spark.sql ("""SELECT A.*, B.RETAIL_CUST_ACCT_DWH_ID
                          FROM base_usage_view A
                          INNER JOIN line_view B
                             ON A.MSISDN = B.MSISDN
                       """)

In [None]:
# DEMOGRAPHICS DATASET
demographics = 'gs://'+ customerprofilecar_rawprepared_bucket +'/car_pega_customer/1.0/parquet/year={}/month={}/'

day_max=1
# find max day
locals()["find_day"] = read_in_data2(demographics, year = year_val[1], month = month_val[1]).select('day')
locals()["day_max"] = locals()["find_day"].select(F.max(F.col("day")).alias("MAX")).limit(1).collect()[0].MAX
day_max = (locals()["day_max"])

In [None]:
demographics = 'gs://'+ customerprofilecar_rawprepared_bucket +'/car_pega_customer/1.0/parquet/year={}/month={}/day={}/'

df_demographics = read_in_data2(demographics, year = year_val[1], month = month_val[1], day= day_max)

In [None]:
# select specific columns
df_demographics = df_demographics.select("CUST_DWH_ID", "POST_CODE", "GENDER", "AGE", "VF_COMBO_FLG", "ACTIVE_TOTAL_LINES")

In [None]:
df_demographics = df_demographics.dropDuplicates(["CUST_DWH_ID"])

In [None]:
df_demographics = df_demographics.withColumn('POST_CODE', regexp_replace('POST_CODE', 'XXXXX', 'DUMMY'))

In [None]:
# Fill NaNs
for column in df_demographics.columns:
    if (dict(df_demographics.dtypes)[column] == 'int64' or dict(df_demographics.dtypes)[column] == 'double' or
       dict(df_demographics.dtypes)[column] == 'int'):
        # fill with mean
        mean = df_demographics.agg({column: "avg"}).collect()[0][0]
        df_demographics = df_demographics.na.fill(mean, subset=[column])
    elif (dict(df_demographics.dtypes)[column] == 'object' or dict(df_demographics.dtypes)[column] == 'string'):
        if (column == "GENDER"):
            df_demographics = df_demographics.na.fill(value="O", subset=[column])
            df_demographics = df_demographics.withColumn(column, when(col(column)== "" ,"O").otherwise(col(column)))
        if (column == "POST_CODE"):
            df_demographics = df_demographics.na.fill(value="DUMMY", subset=[column])
            df_demographics = df_demographics.withColumn(column, when(col(column)== "" ,"DUMMY").otherwise(col(column)))
        df_demographics = df_demographics.na.fill(value="N/A", subset=[column])

In [None]:
# STRANGE VALUES FOR AGES
df_demographics = df_demographics.withColumn("AGE", coalesce(col("AGE"), lit(0.0)))

In [None]:
df_demographics = df_demographics.withColumn("AGE", when(col("AGE") > 80, 80).otherwise(col("AGE")))
df_demographics = df_demographics.withColumn("AGE", when(col("AGE") < 17, 18).otherwise(col("AGE")))

# Join usage-status wth demographics

In [None]:
df_demographics.createOrReplaceTempView("demographics_view")
usage_status_df.createOrReplaceTempView("usage_status_view")

In [None]:
# JOIN
usage_status_demo = spark.sql ("""SELECT A.*, B.POST_CODE, B.GENDER, B.AGE, B.VF_COMBO_FLG, B.ACTIVE_TOTAL_LINES
                          FROM usage_status_view A
                          LEFT JOIN demographics_view B
                             ON A.RETAIL_CUST_ACCT_DWH_ID = B.CUST_DWH_ID
                       """)

In [None]:
usage_status_demo = usage_status_demo.drop(col("RETAIL_CUST_ACCT_DWH_ID"))

In [None]:
# Fill NaNs
for column in df_demographics.columns:
    if column != 'CUST_DWH_ID':
        if (dict(usage_status_demo.dtypes)[column] == 'int64' or dict(usage_status_demo.dtypes)[column] == 'double' or
       dict(usage_status_demo.dtypes)[column] == 'int'):
            # fill with mean
            mean = usage_status_demo.agg({column: "avg"}).collect()[0][0]
            usage_status_demo = usage_status_demo.na.fill(mean, subset=[column])
        elif (dict(usage_status_demo.dtypes)[column] == 'object' or dict(usage_status_demo.dtypes)[column] == 'string'):
            if (column == "GENDER"):
                usage_status_demo = usage_status_demo.na.fill(value="O", subset=[column])
                usage_status_demo = usage_status_demo.withColumn(column, when(col(column)== "" ,"O").otherwise(col(column)))
            if (column == "POST_CODE"):
                usage_status_demo = usage_status_demo.na.fill(value="DUMMY", subset=[column])
                usage_status_demo = usage_status_demo.withColumn(column, when(col(column)== "" ,"DUMMY").otherwise(col(column)))
            usage_status_demo = usage_status_demo.na.fill(value="N/A", subset=[column])

### post_code2

In [None]:
subprocess.call('/bin/sh /usr/bin/gsutil -q cp gs://' + files_bucket + '/notebooks/jupyter/higher_bundles/Sociodemographics.xlsx Sociodemographics.xlsx', shell=True)
Population_pools = pd.read_excel('Sociodemographics.xlsx')
population_pools_df = sql.createDataFrame(Population_pools)

0

In [None]:
population_pools_df = population_pools_df.drop("Postcode_key", "Name", "Periferiaki_enotita", "Population_aged_60+",
                                              "Male_Population_aged_60+", "Female_Population_aged_60+")

In [None]:
population_pools_df = population_pools_df.withColumnRenamed('Population_aged_0-14', 'Population_aged_0_14')
population_pools_df = population_pools_df.withColumnRenamed('Male_Population_aged_0-14', 'Male_Population_aged_0_14')
population_pools_df = population_pools_df.withColumnRenamed('Female_Population_aged_0-14', 'Female_Population_aged_0_14')

population_pools_df = population_pools_df.withColumnRenamed('Population_aged_15-29', 'Population_aged_15_29')
population_pools_df = population_pools_df.withColumnRenamed('Male_Population_aged_15-29', 'Male_Population_aged_15_29')
population_pools_df = population_pools_df.withColumnRenamed('Female_Population_aged_15-29', 'Female_Population_aged_15_29')

population_pools_df = population_pools_df.withColumnRenamed('Population_aged_30-44', 'Population_aged_30_44')
population_pools_df = population_pools_df.withColumnRenamed('Male_Population_aged_30-44', 'Male_Population_aged_30_44')
population_pools_df = population_pools_df.withColumnRenamed('Female_Population_aged_30-44', 'Female_Population_aged_30_44')

population_pools_df = population_pools_df.withColumnRenamed('Population_aged_45-59', 'Population_aged_45_59')
population_pools_df = population_pools_df.withColumnRenamed('Male_Population_aged_45-59', 'Male_Population_aged_45_59')
population_pools_df = population_pools_df.withColumnRenamed('Female_Population_aged_45-59', 'Female_Population_aged_45_59')

population_pools_df = population_pools_df.withColumnRenamed('Purchasing_Power:_million_Euro', 'Purchasing_Power_million_Euro')
population_pools_df = population_pools_df.withColumnRenamed('Purchasing_Power:_per_mill_of_country', 'Purchasing_Power_per_mill_of_country')
population_pools_df = population_pools_df.withColumnRenamed('Purchasing_Power:_Euro_per_capita', 'Purchasing_Power_Euro_per_capita')
population_pools_df = population_pools_df.withColumnRenamed('Purchasing_Power:_index_(country_eq.100)', 'Purchasing_Power_index_country_eq_100')

In [None]:
population_pools_df.createOrReplaceTempView("population_pools_view")
usage_status_demo.createOrReplaceTempView("usage_status_demo_view")

In [None]:
# Exclude ages from 60+
usage_status_demo_pc = spark.sql ("""SELECT A.*, B.Population, B.Households, B.Average_Household_Size, B.Male_Population,
                          B.Female_Population, B.Population_aged_0_14, B.Male_Population_aged_0_14, B.Female_Population_aged_0_14,
                          B.Population_aged_15_29, B.Male_Population_aged_15_29, B.Female_Population_aged_15_29, 
                          B.Population_aged_30_44, B.Male_Population_aged_30_44, B.Female_Population_aged_30_44,
                          B.Population_aged_45_59, B.Male_Population_aged_45_59, B.Female_Population_aged_45_59,
                          B.Purchasing_Power_million_Euro, B.Purchasing_Power_per_mill_of_country,
                          B.Purchasing_Power_Euro_per_capita, B.Purchasing_Power_index_country_eq_100

                          FROM usage_status_demo_view A                          
                          LEFT JOIN population_pools_view B                          
                          ON A.POST_CODE = B.POST_CODE
                       """)

In [None]:
# Fill NaNs
for column in population_pools_df.columns:
    usage_status_demo_pc = usage_status_demo_pc.na.fill(value=0, subset=[column])

### students2

In [None]:
status_service = 'gs://'+ dhdwh_bucket +'/master_status_services/1.0/parquet/year={}/month={}/'

day_max=1
# find max day
locals()["find_day"] = read_in_data2(status_service, year = year_val[1], month = month_val[1]).select('day')
locals()["day_max"] = locals()["find_day"].select(F.max(F.col("day")).alias("MAX")).limit(1).collect()[0].MAX
day_max = (locals()["day_max"])
#print(day_max)

In [None]:
status_service = 'gs://'+ dhdwh_bucket +'/master_status_services/1.0/parquet/year={}/month={}/day={}'

df_status_service = read_in_data2(status_service, year = year_val[1], month = month_val[1], day=day_max)

In [None]:
df_status_service.createOrReplaceTempView("df_status_service_view")

In [None]:
df_students = spark.sql("""SELECT * 
                           FROM df_status_service_view A
                           WHERE SERVICE_CODE == 'BDLCUPaso' 
                           """)

In [None]:
df_students = df_students.dropDuplicates(["MSISDN"])

In [None]:
# ADD A NEW COLUMN WITH A FLAG TO INIDICATE THAT THIS USER IS STUDENT
df_students = df_students.withColumn("STUDENTS_FLAG", lit("Y"))

# LEFT JOIN USAGE_STATUS_DEMO WITH STUDENTS INFO

In [None]:
usage_status_demo_pc.createOrReplaceTempView("usage_status_demo_pc_view")
df_students.createOrReplaceTempView("students_view")

In [None]:
usage_status_demo_pc_stud = spark.sql("""SELECT A.*, B.STUDENTS_FLAG
                                FROM usage_status_demo_pc_view A
                                LEFT JOIN students_view B
                                ON A.MSISDN=B.MSISDN""")

In [None]:
# SET indicator N for NO if the user is not a students
usage_status_demo_pc_stud = usage_status_demo_pc_stud.na.fill(value="N", subset=["STUDENTS_FLAG"])

### buckets2

In [None]:
month_val[1:3]

[10, 9]

In [None]:
# # Read datasets from the previous 2-month period
k = 0 
day = 1
buckets = 'gs://'+ model_outputs_bucket + '/prepay_buckets/result/parquet/1.0/year={}/month={}/'

for i,j in zip(year_val[1:3], month_val[1:3]):
    k=k+1
    locals()["buckets_m"+str(k)] = read_in_data2(buckets, year = i, month = j)

In [None]:
df_buckets = buckets_m1.union(buckets_m2)

In [None]:
df_buckets = df_buckets.sort(col("BUNDLE").asc(), col("ACTIVATION_DATE").desc())

In [None]:
### drop duplicates and keep first occurrence for each bundle
df_buckets = df_buckets.select("MSISDN", "ACTIVATION_DATE", "BUNDLE", "BUNDLE_REVENUE", "BALANCE", "VOICE_BUCKET", "DATA_BUCKET", "SMS_BUCKET",
                                   F.row_number().over(Window.partitionBy("MSISDN", "BUNDLE").orderBy(desc("ACTIVATION_DATE"))).alias("row_num"))

df_buckets = df_buckets.filter(col("row_num") == 1)

In [None]:
# Calculate summary for all buckets
df_buckets_summary = df_buckets.groupBy("MSISDN").agg(sum("VOICE_BUCKET").alias("VOICE_BUCKET_SUMMARY"),sum("DATA_BUCKET").alias("DATA_BUCKET_SUMMARY"),sum("SMS_BUCKET").alias("SMS_BUCKET_SUMMARY"))

### balance2

In [None]:
### drop duplicates and keep last registration 
df_balance = df_buckets.select("MSISDN", "BALANCE", F.row_number().over(Window.partitionBy("MSISDN").orderBy(desc("ACTIVATION_DATE"))).alias("row_num"))

df_balance_summary = df_balance.filter(col("row_num") == 1).drop("row_num")

# Join bucket with balance

In [None]:
df_buckets_summary.createOrReplaceTempView("buckets_sum_view")
df_balance_summary.createOrReplaceTempView("balance_sum_view")

In [None]:
df_buckets_balance = spark.sql("""SELECT A.*, B.BALANCE
                             FROM buckets_sum_view A
                             INNER JOIN balance_sum_view B
                                  ON A.MSISDN = B.MSISDN """)

# Join bucket balance with main dataset 

In [None]:
usage_status_demo_pc_stud.createOrReplaceTempView("usage_status_demo_pc_stud_view")
df_buckets_balance.createOrReplaceTempView("buckets_balance_view")

In [None]:
usage_status_demo_pc_stud_buckets = spark.sql("""SELECT A.*, B.VOICE_BUCKET_SUMMARY, B.DATA_BUCKET_SUMMARY, B.SMS_BUCKET_SUMMARY, B.BALANCE
                                FROM usage_status_demo_pc_stud_view A
                                LEFT JOIN buckets_balance_view B
                                ON A.MSISDN=B.MSISDN""")

In [None]:
for c in df_buckets_balance.columns:
    if (dict(usage_status_demo_pc_stud_buckets.dtypes)[c] == 'int64' or dict(usage_status_demo_pc_stud_buckets.dtypes)[c] == 'double' or
       dict(usage_status_demo_pc_stud_buckets.dtypes)[c] == 'int'):
        usage_status_demo_pc_stud_buckets = usage_status_demo_pc_stud_buckets.na.fill(value=0, subset=[c])
    elif (dict(usage_status_demo_pc_stud_buckets.dtypes)[c] == 'object' or dict(usage_status_demo_pc_stud_buckets.dtypes)[c] == 'string' or dict(usage_status_demo_pc_stud_buckets.dtypes)[c] == 'timestamp'):
        usage_status_demo_pc_stud_buckets = usage_status_demo_pc_stud_buckets.na.fill(value="N/A", subset=[c])

### drop_calls2

In [None]:
drop_calls = 'gs://'+ mediatedcdrs_bucket +'/eds_network_cdr/2.0/parquet/year={}/month={}/'

df_drop_calls = read_in_data2(drop_calls, year = year_val[1], month = month_val[1])

In [None]:
df_drop_calls = df_drop_calls.select("SAMPLED","A_NUMBER","FIRST_LAC","LAST_LAC","CELL","LAST_CELL","REC_TYPE","TARIFF",
                               "DURATION", "TERM_CAUSE", "day")

In [None]:
df_drop_calls.createOrReplaceTempView("cdrs_view")

In [None]:
drop_calls_query = spark.sql("""SELECT K.MSISDN,
                                count(*) as DROPPED_CALL_COUNT
                                FROM (
                                    SELECT L.MSISDN, L.SAMPLED, L.YEAR
                                    FROM (
                                       SELECT
                                       A.SAMPLED, A.A_NUMBER AS MSISDN, A.day, YEAR(A.SAMPLED) AS YEAR, SUBSTR(A.TERM_CAUSE,1,4) AS EOS,
                                       CASE WHEN A.FIRST_LAC LIKE '%IE%' THEN A.LAST_LAC  --this condition holds only for calls
                                           WHEN A.FIRST_LAC = '' THEN A.LAST_LAC          --this condition holds only for calls
                                           ELSE A.FIRST_LAC END FIRST_LAC,                --this condition holds for both calls and SMS
                                       CASE WHEN A.LAST_LAC = '' THEN A.FIRST_LAC         --this condition holds for both calls and SMS
                                           ELSE A.LAST_LAC END LAST_LAC,                  --this condition holds for both calls and SMS
                                       CASE WHEN A.CELL LIKE '%F%' THEN A.LAST_CELL       --this condition holds only for calls
                                           WHEN A.CELL = '' THEN A.LAST_CELL              --this condition holds only for calls
                                           ELSE A.CELL END CELL,                          --this condition holds for both calls and SMS
                                       CASE WHEN A.LAST_CELL = '' THEN A.CELL             --this condition holds for both calls and SMS
                                           ELSE A.LAST_CELL END LAST_CELL,                --this condition holds for both calls and SMS
                                       ROW_NUMBER() OVER (PARTITION BY A.A_NUMBER, A.SAMPLED ORDER BY A.A_NUMBER, A.SAMPLED) as LEVEL
                                       FROM cdrs_view A
                                       WHERE ((A.FIRST_LAC != '' AND A.LAST_LAC != '' AND A.CELL != '' AND A.LAST_CELL != '') OR  -- this condition holds only for calls
                                       (A.FIRST_LAC = '' AND A.LAST_LAC != '' AND A.CELL = '' AND A.LAST_CELL != '') OR    -- this condition holds only for calls
                                       (A.FIRST_LAC != '' AND A.LAST_LAC = '' AND A.CELL != '' AND A.LAST_CELL = '') OR    -- this condition holds for both calls and SMS
                                       (A.FIRST_LAC != '' AND A.LAST_LAC != '' AND A.CELL != '' AND A.LAST_CELL = ''))     -- this condition holds for both calls and SMS
                                       AND A.REC_TYPE IN ('20','30') AND A.TARIFF != '142'
                                       AND LENGTH(A.TERM_CAUSE) = 8
                                       AND SUBSTR(A.TERM_CAUSE,1,4) IN ('068F','08BF','09A6','09C3','09C5','09C8','09F8','0A0E','0A0F','0AE9','0C15','0CD2',
                                                                        '0CD3','0F7B','0F7C','018F','065D','065E','0700','0701','0702','09A7','09BF','09C0',
                                                                        '09C2','09C4','09C6','09C7','09C9','09F6','09F7','0A0A','0A0B','0A0C','0A0D','0C14',
                                                                        '0C16','0F7D','1C8F','1C90','1C91','1C92','1C9A','1C9B')
                                       AND A.A_NUMBER != '' AND A.A_NUMBER LIKE '69%' 
                                       --ORDER BY A.A_NUMBER, A.SAMPLED
                                       ) AS L
                                    WHERE L.LEVEL = 1
                                    --ORDER BY L.MSISDN, L.SAMPLED ASC
                                ) K
                                GROUP BY K.MSISDN
                                ORDER BY K.MSISDN
                             """)  

# Join drop_calls with main dataset

In [None]:
usage_status_demo_pc_stud_buckets.createOrReplaceTempView("usage_status_demo_pc_stud_buckets_view")
drop_calls_query.createOrReplaceTempView("drop_calls_view")

In [None]:
usage_status_demo_pc_stud_buckets_dropcalls = spark.sql("""SELECT A.*, B.DROPPED_CALL_COUNT
                                FROM usage_status_demo_pc_stud_buckets_view A
                                LEFT JOIN drop_calls_view B
                                ON A.MSISDN=B.MSISDN""")

In [None]:
usage_status_demo_pc_stud_buckets_dropcalls = usage_status_demo_pc_stud_buckets_dropcalls.na.fill(value=0, subset=["DROPPED_CALL_COUNT"])

### tickets2

In [None]:
ticket_service = 'gs://'+ dhdwh_bucket +'/mobile_sr_tt/1.0/parquet/year={}/month={}/'

df_tickets_requests = read_in_data2(ticket_service, year = year_val[1], month = month_val[1]).select("X_MSISDN","SR_ID").drop('service_file_id')

In [None]:
#df_tickets_requests = df_tickets_requests.select("X_MSISDN","SR_ID")

In [None]:
# drop duplicates
df_tickets_requests = df_tickets_requests.dropDuplicates(["X_MSISDN", "SR_ID"])

In [None]:
# ADD A COLUMN AS TICKETS COUNTER FOR EACH MSISDN
df_tickets_requests = df_tickets_requests.select("X_MSISDN", F.count("X_MSISDN").over(Window.partitionBy("X_MSISDN")).alias("TICKETS_COUNT"))

In [None]:
df_tickets_requests = df_tickets_requests.dropDuplicates(["X_MSISDN"])

# Join tickets with main dataset

In [None]:
usage_status_demo_pc_stud_buckets_dropcalls.createOrReplaceTempView("usage_status_demo_pc_stud_buckets_dropcalls_view")
df_tickets_requests.createOrReplaceTempView("tickets_requests_view")

In [None]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets = spark.sql("""SELECT A.*, B.TICKETS_COUNT
                                FROM usage_status_demo_pc_stud_buckets_dropcalls_view A
                                LEFT JOIN tickets_requests_view B
                                ON A.MSISDN=B.X_MSISDN""")

In [None]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets = usage_status_demo_pc_stud_buckets_dropcalls_tickets.na.fill(value=0, subset=["TICKETS_COUNT"])

### channel2

In [None]:
events = 'gs://'+ mediatedcdrs_bucket + '/alu_prepay_cdr/2.0/parquet/year={}/month={}/'

df_events = read_in_data2(events, year = year_val[1], month = month_val[1])

In [None]:
df_events.createOrReplaceTempView("df_events_view")

In [None]:
df_bundle_purchase = spark.sql("""SELECT ACCOUNT_ID, REQUESTING_SYSTEM                             
                             FROM df_events_view 
                             WHERE ((EVENT_LABEL=139) AND (EVENT_RESULT=169)) 
                             """)

In [None]:
# Change column names
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'VOP', 'DIGITAL'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'CUapp', 'DIGITAL'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'MCare', 'DIGITAL'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'CUsite', 'DIGITAL'))

In [None]:
# Change column names
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'TAZAAPP', 'OTHER'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'EKIOSK', 'OTHER'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'PostpaidToPrepaid', 'OTHER'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'XPCVM', 'OTHER'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'LMG', 'OTHER'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'PEGA', 'OTHER'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'SMSVAS', 'OTHER'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'CRM', 'OTHER'))

In [None]:
# MIA KATHGORIA VFSHOP - (VFSHOP)
# ALLH KATHGORIA - THLEFWNO (IVR)  
# DIGITAL (VOP, MCARE, CUapp, CUsite)
# OTHER

In [None]:
# One-hot encoding - 4 kathgories
df_bundle_purchase = df_bundle_purchase.groupBy('ACCOUNT_ID').pivot('REQUESTING_SYSTEM').count()

In [None]:
for c in df_bundle_purchase.columns:
        df_bundle_purchase = df_bundle_purchase.na.fill(0, subset=[c])

In [None]:
df_bundle_purchase = df_bundle_purchase.select("ACCOUNT_ID", "DIGITAL", "VFShop", "IVR", "OTHER")

# Join channel with main dataset

In [None]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets.createOrReplaceTempView("usage_status_demo_pc_stud_buckets_dropcalls_tickets_view")
df_bundle_purchase.createOrReplaceTempView("bundle_purchase_view")

In [None]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel = spark.sql("""SELECT A.*, B.DIGITAL, B.VFShop, B.IVR, B.OTHER
                                FROM usage_status_demo_pc_stud_buckets_dropcalls_tickets_view A
                                LEFT JOIN bundle_purchase_view B
                                ON A.MSISDN= SUBSTR(B.ACCOUNT_ID,3,10)""")

In [None]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel = usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel.na.fill(value=0, subset=["DIGITAL", "VFShop", "IVR", "OTHER"])

# NumberOfBundles2

In [None]:
# # Read datasets from the previous 6-month period
k = 0 
day = 1
events = 'gs://'+ mediatedcdrs_bucket + '/alu_prepay_cdr/2.0/parquet/year={}/month={}/'

for i,j in zip(year_val[1:-1], month_val[1:-1]):
    k=k+1
    locals()["events_m"+str(k)] = read_in_data2(events, year = i, month = j).select("ACCOUNT_ID", "PTP_COSP_AMA_CODE", "EVENT_LABEL", "EVENT_RESULT")

In [None]:
events_m1.createOrReplaceTempView("events_view_m1")
events_m2.createOrReplaceTempView("events_view_m2")
events_m3.createOrReplaceTempView("events_view_m3")
events_m4.createOrReplaceTempView("events_view_m4")
events_m5.createOrReplaceTempView("events_view_m5")
events_m6.createOrReplaceTempView("events_view_m6")

In [None]:
df_num_of_bundles_m1 = spark.sql("""SELECT ACCOUNT_ID, COUNT(PTP_COSP_AMA_CODE) AS BUNDLES_NUM                       
                             FROM events_view_m1 
                             WHERE ((EVENT_LABEL=139) AND (EVENT_RESULT=169)) 
                             GROUP BY ACCOUNT_ID
                             """)

In [None]:
df_num_of_bundles_m2 = spark.sql("""SELECT ACCOUNT_ID, COUNT(PTP_COSP_AMA_CODE) AS BUNDLES_NUM                       
                             FROM events_view_m2 
                             WHERE ((EVENT_LABEL=139) AND (EVENT_RESULT=169)) 
                             GROUP BY ACCOUNT_ID
                             """)

In [None]:
df_num_of_bundles_m3 = spark.sql("""SELECT ACCOUNT_ID, COUNT(PTP_COSP_AMA_CODE) AS BUNDLES_NUM                       
                             FROM events_view_m3 
                             WHERE ((EVENT_LABEL=139) AND (EVENT_RESULT=169)) 
                             GROUP BY ACCOUNT_ID
                             """)

In [None]:
df_num_of_bundles_m4 = spark.sql("""SELECT ACCOUNT_ID, COUNT(PTP_COSP_AMA_CODE) AS BUNDLES_NUM                       
                             FROM events_view_m4 
                             WHERE ((EVENT_LABEL=139) AND (EVENT_RESULT=169)) 
                             GROUP BY ACCOUNT_ID
                             """)

In [None]:
df_num_of_bundles_m5 = spark.sql("""SELECT ACCOUNT_ID, COUNT(PTP_COSP_AMA_CODE) AS BUNDLES_NUM                       
                             FROM events_view_m5 
                             WHERE ((EVENT_LABEL=139) AND (EVENT_RESULT=169)) 
                             GROUP BY ACCOUNT_ID
                             """)

In [None]:
df_num_of_bundles_m6 = spark.sql("""SELECT ACCOUNT_ID, COUNT(PTP_COSP_AMA_CODE) AS BUNDLES_NUM                       
                             FROM events_view_m6 
                             WHERE ((EVENT_LABEL=139) AND (EVENT_RESULT=169)) 
                             GROUP BY ACCOUNT_ID
                             """)

In [None]:
df_num_of_bundles_m1.createOrReplaceTempView("num_of_bundles_view_m1")
df_num_of_bundles_m2.createOrReplaceTempView("num_of_bundles_view_m2")
df_num_of_bundles_m3.createOrReplaceTempView("num_of_bundles_view_m3")
df_num_of_bundles_m4.createOrReplaceTempView("num_of_bundles_view_m4")
df_num_of_bundles_m5.createOrReplaceTempView("num_of_bundles_view_m5")
df_num_of_bundles_m6.createOrReplaceTempView("num_of_bundles_view_m6")

In [None]:
num_of_bundles_semester = spark.sql("""SELECT substring(A.ACCOUNT_ID, 3 , 10) AS ACCOUNT_ID, A.BUNDLES_NUM AS M1_BUNDLES_NUM, 
                     B.BUNDLES_NUM AS M2_BUNDLES_NUM, C.BUNDLES_NUM AS M3_BUNDLES_NUM, 
                     D.BUNDLES_NUM AS M4_BUNDLES_NUM,
                     E.BUNDLES_NUM AS M5_BUNDLES_NUM, 
                     F.BUNDLES_NUM AS M6_BUNDLES_NUM
                     
                     FROM num_of_bundles_view_m1 A                 
                    
                     left join num_of_bundles_view_m2 B
                          on A.ACCOUNT_ID = B.ACCOUNT_ID 
                     left join num_of_bundles_view_m3 C 
                          on A.ACCOUNT_ID = C.ACCOUNT_ID
                     left join num_of_bundles_view_m4 D 
                          on A.ACCOUNT_ID = D.ACCOUNT_ID
                     left join num_of_bundles_view_m5 E 
                          on A.ACCOUNT_ID = E.ACCOUNT_ID
                     left join num_of_bundles_view_m6 F
                          on A.ACCOUNT_ID = F.ACCOUNT_ID
                          """)

In [None]:
num_of_bundles_semester = num_of_bundles_semester.na.fill(value=0)

In [None]:
# Create 3 month and 6 month averages for all usage columns
# first semi-semester
num_of_bundles_semester = num_of_bundles_semester.withColumn("M13_AVG_BUNDLES_NUM", (col("M1_BUNDLES_NUM")+col("M2_BUNDLES_NUM")+col("M3_BUNDLES_NUM"))/3)
# second semi-semester
num_of_bundles_semester = num_of_bundles_semester.withColumn("M46_AVG_BUNDLES_NUM", (col("M4_BUNDLES_NUM")+col("M5_BUNDLES_NUM")+col("M6_BUNDLES_NUM"))/3)
# calculate the average for all six months     
num_of_bundles_semester = num_of_bundles_semester.withColumn("M16_AVG_BUNDLES_NUM", (col("M1_BUNDLES_NUM")+col("M2_BUNDLES_NUM")+col("M3_BUNDLES_NUM")+col("M4_BUNDLES_NUM")+col("M5_BUNDLES_NUM")+col("M6_BUNDLES_NUM")) /6)

# Join number of bundles with main dataset

In [None]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel.createOrReplaceTempView("usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel_view")
num_of_bundles_semester.createOrReplaceTempView("num_of_bundles_semester_view")

In [None]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel_nob = spark.sql("""SELECT A.*, B.M1_BUNDLES_NUM, B.M2_BUNDLES_NUM, B.M3_BUNDLES_NUM, 
                                B.M4_BUNDLES_NUM, B.M5_BUNDLES_NUM, B.M6_BUNDLES_NUM, B.M13_AVG_BUNDLES_NUM, B.M46_AVG_BUNDLES_NUM, 
                                B.M16_AVG_BUNDLES_NUM
                                FROM usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel_view A
                                LEFT JOIN num_of_bundles_semester_view B
                                ON A.MSISDN= B.ACCOUNT_ID""")

In [None]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel_nob = usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel_nob.na.fill(value=0)

In [None]:
#print((usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel_nob.count(), len(usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel_nob.columns)))

# JOIN ALL DATASET WITH Y

In [None]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel_nob.createOrReplaceTempView("final_dataset_view")
higher_bundle_segmentation_2.createOrReplaceTempView("higher_bundle_segmentation_2_view")

In [None]:
final_df_test = spark.sql("""SELECT A.*, B.HIGHER_BUNDLE
                                FROM final_dataset_view A
                                LEFT JOIN higher_bundle_segmentation_2_view B
                                ON A.MSISDN= B.MSISDN""")

In [None]:
final_df_test = final_df_test.na.fill(value=0, subset=["HIGHER_BUNDLE"])

In [None]:
#final_df_test.groupBy("HIGHER_BUNDLE").count().show()

# ------------------------------------------------------------------------------------------------------#

### Modelling <a class="anchor" id="modelling"></a>

In [None]:
# Boruta features
boruta_features = ['MSISDN', 'M1_TTL_OUT_CALLS', 'M1_V_CALLS_TO_COMPETITION', 'M1_V_MINUTES_TO_COMPETITION', 
          'M1_GPRS_SESSION', 'M1_GPRS_VOLUME', 'M1_RECHARGES_VALUE', 'M1_OUT_DAYS', 'M1_INC_DAYS', 
          'M1_BUNDLE_REVENUE', 'M2_RECHARGES_VALUE', 'M2_OUT_DAYS', 'M2_INC_DAYS', 'M2_BUNDLE_REVENUE', 
          'M3_V_CALLS_TO_COMPETITION', 'M3_GPRS_VOLUME', 'M3_BUNDLE_REVENUE', 'M4_GPRS_VOLUME', 'M4_INC_DAYS', 
          'M4_BUNDLE_REVENUE', 'M5_V_MINUTES_TO_COMPETITION', 'M5_BUNDLE_REVENUE', 'M6_V_MINUTES_TO_COMPETITION',
          'M6_BUNDLE_REVENUE', 'M1_ARPU', 'M2_ARPU', 'M4_ARPU', 'M5_ARPU', 'M1_ROC_TTL_OUT_CALLS',
          'M13_AVG_TTL_OUT_CALLS', 'M13_AVG_TTL_OUT_MINUTES', 'M13_AVG_V_CALLS_TO_COMPETITION',
          'M13_AVG_V_MINUTES_TO_COMPETITION', 'M13_AVG_GPRS_VOLUME', 'M16_AVG_GPRS_VOLUME', 
          'M13_AVG_RECHARGES_VALUE', 'M46_AVG_RECHARGES_VALUE', 'M16_AVG_RECHARGES_VALUE', 'M13_AVG_OUT_DAYS',
          'M16_AVG_OUT_DAYS', 'M13_AVG_INC_DAYS', 'M16_AVG_INC_DAYS', 'M13_AVG_BUNDLE_REVENUE',
          'M46_AVG_BUNDLE_REVENUE', 'M16_AVG_BUNDLE_REVENUE', 'M1_EXPENDITURE_RATIO', 'M2_EXPENDITURE_RATIO', 
          'STUDENTS_FLAG', 'VOICE_BUCKET_SUMMARY', 'DATA_BUCKET_SUMMARY', 'SMS_BUCKET_SUMMARY', 'BALANCE', 
          'DIGITAL', 'TARIFF_PLAN' ,'M1_TTL_OUT_MINUTES', 'M2_RECHARGES_NUMBER', 'M3_ARPU', 'M1_ROC_V_CALLS_TO_COMPETITION',
          'M16_AVG_TTL_OUT_CALLS', 'M13_AVG_RECHARGES_NUMBER', 'M46_AVG_INC_DAYS', 'M13_AVG_ARPU', 'M46_AVG_ARPU', 'HIGHER_BUNDLE']

In [None]:
#final_df_train.select(boruta_features).limit(5).toPandas()

In [None]:
# Convert to Pandas
df_train = final_df_train.select(boruta_features).toPandas()
df_test = final_df_test.select(boruta_features).toPandas()

### Preprocessing

In [None]:
# Train set
#Downsampling
df_train_sampled = downsampling(df_train, n_majority= 100000)  # change this number
# Define X and y
X_train, y_train = split_x_y(df_train_sampled)
# Encoding
X_train, y_train = encoding_data(X_train, y_train)
# Save feature name
feature_names= np.array(X_train.columns)

In [None]:
# Test set
# Define X and y
X_test, y_test = split_x_y(df_test)
# Encoding
X_test, y_test = encoding_data(X_test, y_test)
# Save feature name
feature_names= np.array(X_test.columns)

In [None]:
previous_proportion = estimate_proportion(y_train)
#print(previous_proportion)

In [None]:
X_train_new = X_train.rename(columns = lambda x:re.sub('[^A-Za-z0-9_]+', '', x))

### Grid_Search

In [None]:
#param_test = {'boosting_type': ['gbdt', 'dart', 'goss'], 'n_estimators':[100, 500, 1000], 'learning_rate': [0.01, 0.1, 0.2],
#             'scale_pos_weight': [2, 3, 4]}

In [None]:
#import sklearn
#sklearn.metrics.SCORERS.keys()

# Training

In [None]:
lgbm = lgb.LGBMClassifier(boosting_type = 'gbdt',
                                    importance_type = 'gain',
                                    scale_pos_weight= 3,
                                    n_estimators = 1000,
                                    learning_rate=0.01, metric='auc', silent=True, n_jobs=-1, random_state=10)

In [None]:
lgbm.fit(X_train_new, y_train)

LGBMClassifier(importance_type='gain', learning_rate=0.01, metric='auc',
               n_estimators=1000, random_state=10, scale_pos_weight=3,
               silent=True)

In [None]:
cm, acc, class_report = evaluation(X_test, y_test, lgbm)
print(cm)
print(acc)
print(class_report)

[[896623 378945]
 [  7366  20581]]
0.7036390068392001
                  precision    recall  f1-score   support

No higher bundle       0.99      0.70      0.82   1275568
   Higher Bundle       0.05      0.74      0.10     27947

        accuracy                           0.70   1303515
       macro avg       0.52      0.72      0.46   1303515
    weighted avg       0.97      0.70      0.81   1303515



In [None]:
#lgbm = lgb.LGBMClassifier()
#grid_lgbm = GridSearchCV(lgbm, param_grid=param_test, cv=3, scoring='recall_macro', n_jobs=-1, verbose=3)

In [None]:
#grid_lgbm.fit(X_train_new, y_train)

In [None]:
#print(" Results from Grid Search ")
#print("\n The best estimator across ALL searched params:\n",grid_lgbm.best_estimator_)
#print("\n The best score across ALL searched params:\n",grid_lgbm.best_score_)
#print("\n The best parameters across ALL searched params:\n",grid_lgbm.best_params_)

In [None]:
#lgbm = grid_lgbm.best_estimator_

# Evaluation

In [None]:
#cm, acc, class_report = evaluation(X_test, y_test, lgbm)
#print(cm)
#print(acc)
#print(class_report)

In [None]:
#ESTIMATE HOW MANY CUSTOMERS THE MODEL PREDICTED THAT WILL BUY THE HIGHER BUNDLE
pred_y_propab = lgbm.predict_proba(X_test)
pred_y_propab[:,1]
condition = pred_y_propab[:,1] >= 0.5
len(pred_y_propab[:,1][condition])

array([0.61972353, 0.06594057, 0.02281303, ..., 0.29399388, 0.06620404,
       0.42597067])

399526

In [None]:
shap_values = feature_importance(X_test, lgbm)

In [None]:
shap.summary_plot(shap_values, X_test, feature_names, plot_type="bar")
    
shap.summary_plot(shap_values[1], X_test, feature_names, plot_type="dot")

### Decile factors

In [None]:
from collections import Counter
y_pred_prob = lgbm.predict_proba(X_test)
y_pred = lgbm.predict(X_test)

y_test_quart = pd.Series(y_test)
y_test_quart = pd.DataFrame(y_test)
y_test_quart['Pred_Target'] = y_pred
y_test_quart['Prob_1'] = y_pred_prob[:,1]
y_test_quart = y_test_quart.sort_values(by = "Prob_1" , ascending = False)
y_test_quart.rename(columns = {0:'Actual_Target'}, inplace = True) 
#y_test_quart.head()
#y_test_quart.tail()

In [None]:
quartiles = np.array_split(y_test_quart['Prob_1'].values,20)
#quartiles
quarts = []
for i in list(range(1,21)):
    temp = [i]*len(quartiles[i-1])
    quarts.append(temp)   
    quart = [item for sublist in quarts for item in sublist]
y_test_quart['Percentile'] = quart
# y_test_quart.head() 

In [None]:
print ("1 decile: ", y_test_quart[y_test_quart.Percentile == 1]['Actual_Target'].value_counts()[1]/Counter(y_test)[1])
print ("1,2 decile: ", y_test_quart[y_test_quart.Percentile.isin([1,2]) & (y_test_quart.Prob_1 > 0.5)]['Actual_Target'].value_counts()[1]/Counter(y_test)[1])
print ("1,2,3 decile: ", y_test_quart[y_test_quart.Percentile.isin([1,2,3]) & (y_test_quart.Prob_1 > 0.5)]['Actual_Target'].value_counts()[1]/Counter(y_test)[1])
print ("1,2,3,4 decile: ", y_test_quart[y_test_quart.Percentile.isin([1,2,3,4]) & (y_test_quart.Prob_1 > 0.5)]['Actual_Target'].value_counts()[1]/Counter(y_test)[1])
print ("1,2,3,4,5 decile: ", y_test_quart[y_test_quart.Percentile.isin([1,2,3,4,5]) & (y_test_quart.Prob_1 > 0.5)]['Actual_Target'].value_counts()[1]/Counter(y_test)[1])

1 decile:  0.21658854259849
1,2 decile:  0.3716677997638387
1,2,3 decile:  0.4878162235660357
1,2,3,4 decile:  0.5859305113250081
1,2,3,4,5 decile:  0.6649729845779512


In [None]:
#y_test_quart.head()

In [None]:
#y_test_quart[y_test_quart['Prob_1'] >= 0.5].shape

In [None]:
#y_test_quart[y_test_quart['Prob_1'] >= 0.6].shape

# -----------------------------------------------------------------------------------------------------------#

### Scoring_dataset <a class="anchor" id="Scoring_dataset"></a>

### gdpr3

In [None]:
#month_val[0]

In [None]:
day_max = 1
gdpr = 'gs://'+ permsandprefs_rawprepared_bucket +'/ope_cpm_consent/'+ version +'/parquet/year={}/month={}/'

# find max day
locals()["find_day"] = read_in_data2(gdpr, year = year_val[0], month = month_val[0]).select('MSISDN_CLI', 'day')
locals()["day_max"] = locals()["find_day"].select(F.max(F.col("day")).alias("MAX")).limit(1).collect()[0].MAX
day_max = (locals()["day_max"])
#print(day_max)

In [None]:
gdpr = 'gs://'+ permsandprefs_rawprepared_bucket +'/ope_cpm_consent/'+ version +'/parquet/year={}/month={}/day={}/'

df_gdpr = read_in_data2(gdpr, year = year_val[0], month = month_val[0], day=day_max)

### status3

In [None]:
# find max day
day_max = 1
status = 'gs://' + customerprofilecar_rawprepared_bucket + '/car_pp_master_status/2.0/parquet/year={}/month={}/'

#print(year_val[2], month_val[2])
locals()["find_day"] = read_in_data2(status, year = year_val[0], month = month_val[0]).select('MSISDN', 'day')
locals()["day_max"] = locals()["find_day"].select(F.max(F.col("day")).alias("MAX")).limit(1).collect()[0].MAX
day_max = (locals()["day_max"])
#print(day_max)

In [None]:
status = 'gs://' + customerprofilecar_rawprepared_bucket + '/car_pp_master_status/2.0/parquet/year={}/month={}/day={}/'

df_status = read_in_data2(status, year = year_val[0], month = month_val[0], day= day_max).select("MSISDN", "TARIFF_PLAN", "CONNECTION_DAY", "SMARTPHONE_FLAG", "INSERTED", "STATUS")

In [None]:
#df_status = df_status.select("MSISDN", "TARIFF_PLAN", "CONNECTION_DAY", "SMARTPHONE_FLAG", "INSERTED", "STATUS")

In [None]:
df_status.createOrReplaceTempView("status_view")

In [None]:
df_status = df_status.dropDuplicates(["MSISDN"])

In [None]:
df_gdpr.createOrReplaceTempView("gdpr_view")
df_status.createOrReplaceTempView("status_view")

In [None]:
# inner join status with gdpr
consent_prepay_status = spark.sql("""SELECT A.*
                         FROM (
                             SELECT substring(MSISDN, 3 , 10) AS MSISDN, TARIFF_PLAN, CONNECTION_DAY, SMARTPHONE_FLAG, INSERTED
                             FROM status_view A
                             WHERE STATUS IN ('A','B')
                             ) AS A
                         INNER JOIN 
                             (SELECT substring(MSISDN_CLI, 3 , 10) AS MSISDN, MAX(CONSENT_VALID_FROM_DATE) AS CONSENT_VALID_FROM_DATE
                              from gdpr_view
                              WHERE asset_status = 'Active' and CURRENT_IND = '1' and advanced_permission != 'NO'
                              GROUP BY MSISDN_CLI
                             ) AS B
                         ON A.MSISDN = B.MSISDN
                         ORDER BY A.MSISDN """)

### usage3

In [None]:
# print(month_val[0], year_val[0])
# print(month_val[1], year_val[1])
# print(month_val[2], year_val[2])
# print(month_val[3], year_val[3])
# print(month_val[4], year_val[4])
# print(month_val[5], year_val[5])

In [None]:
month_val[:-2]

[11, 10, 9, 8, 7, 6]

In [None]:
# find max day available in dataset for each month
k = 0
day_max = []
usage = 'gs://' + customerprofilecar_rawprepared_bucket + '/car_pp_master_usage/2.0/parquet/year={}/month={}/'

for i,j in zip(year_val[:-2],month_val[:-2]):
    k=k+1
    locals()["find_day"+str(k)] = read_in_data2(usage, year = i, month = j).select('MSISDN', 'day')
    locals()["day_max"+str(k)] = locals()["find_day"+str(k)].select(F.max(F.col("day")).alias("MAX")).limit(1).collect()[0].MAX
    day_max.append(locals()["day_max"+str(k)])

AnalysisException: 'Path does not exist: gs://vfgr-dh-customerprofilecar-rawprepared/car_pp_master_usage/2.0/parquet/year=2022/month=11;'

In [None]:
# Read datasets from the previous 6-month period
k = 0 
day = 1
usage = 'gs://' + customerprofilecar_rawprepared_bucket + '/car_pp_master_usage/2.0/parquet/year={}/month={}/day={}/'

for i,j,m in zip(year_val[:-2], month_val[:-2], day_max):
    k=k+1
    locals()["usage_m"+str(k)] = read_in_data2(usage, year = i, month = j, day = m)

In [None]:
for name in usage_m1.schema.names: usage_m1 = usage_m1.withColumnRenamed(name, name.replace('M1', 'M1'))
for name in usage_m2.schema.names: usage_m2 = usage_m2.withColumnRenamed(name, name.replace('M1', 'M2'))
for name in usage_m3.schema.names: usage_m3 = usage_m3.withColumnRenamed(name, name.replace('M1', 'M3'))
for name in usage_m4.schema.names: usage_m4 = usage_m4.withColumnRenamed(name, name.replace('M1', 'M4'))
for name in usage_m5.schema.names: usage_m5 = usage_m5.withColumnRenamed(name, name.replace('M1', 'M5'))
for name in usage_m6.schema.names: usage_m6 = usage_m6.withColumnRenamed(name, name.replace('M1', 'M6'))

In [None]:
usage_m2 = usage_m2.withColumnRenamed("MSISDN", "M2_MSISDN")
usage_m3 = usage_m3.withColumnRenamed("MSISDN", "M3_MSISDN")
usage_m4 = usage_m4.withColumnRenamed("MSISDN", "M4_MSISDN")
usage_m5 = usage_m5.withColumnRenamed("MSISDN", "M5_MSISDN")
usage_m6 = usage_m6.withColumnRenamed("MSISDN", "M6_MSISDN")

In [None]:
usage_m1.createOrReplaceTempView("usage_view_m1")
usage_m2.createOrReplaceTempView("usage_view_m2")
usage_m3.createOrReplaceTempView("usage_view_m3")
usage_m4.createOrReplaceTempView("usage_view_m4")
usage_m5.createOrReplaceTempView("usage_view_m5")
usage_m6.createOrReplaceTempView("usage_view_m6")

In [None]:
# Take active base for the last three months
usage_prepay = spark.sql("""SELECT substring(A.MSISDN, 3 , 10) AS MSISDN,
                     A.M1_TTL_OUT_CALLS, A.M1_TTL_OUT_MINUTES,
                     A.M1_TTL_OUT_REVENUE, A.M1_V_CALLS_TO_FIXED_LINES, 
                     A.M1_V_MINUTES_TO_FIXED_LINES, A.M1_V_REVENUE_TO_FIXED_LINES, 
                     A.M1_V_CALLS_TO_COMPETITION, A.M1_V_MINUTES_TO_COMPETITION, A.M1_V_REVENUE_TO_COMPETITION,
                     A.M1_V_CALLS_TO_INTERNATIONAL, A.M1_V_MINUTES_TO_INTERNATIONAL, 
                     A.M1_V_REVENUE_TO_INTERNATIONAL, A.M1_GPRS_SESSION,
                     A.M1_GPRS_VOLUME, A.M1_GPRS_REVENUE, A.M1_RECHARGES_NUMBER, 
                     A.M1_RECHARGES_VALUE, A.M1_OUT_DAYS, A.M1_INC_DAYS, A.M1_BUNDLE_REVENUE,
                     
                     B.M2_TTL_OUT_CALLS, B.M2_TTL_OUT_MINUTES, 
                     B.M2_TTL_OUT_REVENUE, B.M2_V_CALLS_TO_FIXED_LINES,
                     B.M2_V_MINUTES_TO_FIXED_LINES, B.M2_V_REVENUE_TO_FIXED_LINES, 
                     B.M2_V_CALLS_TO_COMPETITION, B.M2_V_MINUTES_TO_COMPETITION, 
                     B.M2_V_REVENUE_TO_COMPETITION, B.M2_V_CALLS_TO_INTERNATIONAL,
                     B.M2_V_MINUTES_TO_INTERNATIONAL, B.M2_V_REVENUE_TO_INTERNATIONAL, 
                     B.M2_GPRS_SESSION, B.M2_GPRS_VOLUME, 
                     B.M2_GPRS_REVENUE, B.M2_RECHARGES_NUMBER, 
                     B.M2_RECHARGES_VALUE, B.M2_OUT_DAYS,
                     B.M2_INC_DAYS, B.M2_BUNDLE_REVENUE,
                     
                     C.M3_TTL_OUT_CALLS, C.M3_TTL_OUT_MINUTES, 
                     C.M3_TTL_OUT_REVENUE, C.M3_V_CALLS_TO_FIXED_LINES,
                     C.M3_V_MINUTES_TO_FIXED_LINES, C.M3_V_REVENUE_TO_FIXED_LINES, 
                     C.M3_V_CALLS_TO_COMPETITION, C.M3_V_MINUTES_TO_COMPETITION, 
                     C.M3_V_REVENUE_TO_COMPETITION, C.M3_V_CALLS_TO_INTERNATIONAL,
                     C.M3_V_MINUTES_TO_INTERNATIONAL, C.M3_V_REVENUE_TO_INTERNATIONAL, 
                     C.M3_GPRS_SESSION, C.M3_GPRS_VOLUME, 
                     C.M3_GPRS_REVENUE, C.M3_RECHARGES_NUMBER, 
                     C.M3_RECHARGES_VALUE, C.M3_OUT_DAYS,
                     C.M3_INC_DAYS, C.M3_BUNDLE_REVENUE,
                     
                     D.M4_TTL_OUT_CALLS, D.M4_TTL_OUT_MINUTES, 
                     D.M4_TTL_OUT_REVENUE, D.M4_V_CALLS_TO_FIXED_LINES,
                     D.M4_V_MINUTES_TO_FIXED_LINES, D.M4_V_REVENUE_TO_FIXED_LINES, 
                     D.M4_V_CALLS_TO_COMPETITION, D.M4_V_MINUTES_TO_COMPETITION, 
                     D.M4_V_REVENUE_TO_COMPETITION, D.M4_V_CALLS_TO_INTERNATIONAL,
                     D.M4_V_MINUTES_TO_INTERNATIONAL, D.M4_V_REVENUE_TO_INTERNATIONAL, 
                     D.M4_GPRS_SESSION, D.M4_GPRS_VOLUME, 
                     D.M4_GPRS_REVENUE, D.M4_RECHARGES_NUMBER, 
                     D.M4_RECHARGES_VALUE, D.M4_OUT_DAYS, 
                     D.M4_INC_DAYS, D.M4_BUNDLE_REVENUE,
                     
                     E.M5_TTL_OUT_CALLS, E.M5_TTL_OUT_MINUTES, 
                     E.M5_TTL_OUT_REVENUE, E.M5_V_CALLS_TO_FIXED_LINES,
                     E.M5_V_MINUTES_TO_FIXED_LINES, E.M5_V_REVENUE_TO_FIXED_LINES, 
                     E.M5_V_CALLS_TO_COMPETITION, E.M5_V_MINUTES_TO_COMPETITION, 
                     E.M5_V_REVENUE_TO_COMPETITION, E.M5_V_CALLS_TO_INTERNATIONAL,
                     E.M5_V_MINUTES_TO_INTERNATIONAL, E.M5_V_REVENUE_TO_INTERNATIONAL, 
                     E.M5_GPRS_SESSION, E.M5_GPRS_VOLUME, 
                     E.M5_GPRS_REVENUE, E.M5_RECHARGES_NUMBER, 
                     E.M5_RECHARGES_VALUE, E.M5_OUT_DAYS,
                     E.M5_INC_DAYS, E.M5_BUNDLE_REVENUE,
                  
                     F.M6_TTL_OUT_CALLS, F.M6_TTL_OUT_MINUTES, 
                     F.M6_TTL_OUT_REVENUE, F.M6_V_CALLS_TO_FIXED_LINES,
                     F.M6_V_MINUTES_TO_FIXED_LINES, F.M6_V_REVENUE_TO_FIXED_LINES, 
                     F.M6_V_CALLS_TO_COMPETITION, F.M6_V_MINUTES_TO_COMPETITION, 
                     F.M6_V_REVENUE_TO_COMPETITION, F.M6_V_CALLS_TO_INTERNATIONAL,
                     F.M6_V_MINUTES_TO_INTERNATIONAL, F.M6_V_REVENUE_TO_INTERNATIONAL, 
                     F.M6_GPRS_SESSION, F.M6_GPRS_VOLUME, 
                     F.M6_GPRS_REVENUE, F.M6_RECHARGES_NUMBER, 
                     F.M6_RECHARGES_VALUE, F.M6_OUT_DAYS,
                     F.M6_INC_DAYS, F.M6_BUNDLE_REVENUE
                     
                     from usage_view_m1 A
                     left join usage_view_m2 B
                          on A.MSISDN = B.M2_MSISDN 
                     left join usage_view_m3 C
                          on A.MSISDN = C.M3_MSISDN
                     left join usage_view_m4 D
                          on A.MSISDN = D.M4_MSISDN
                     left join usage_view_m5 E
                          on A.MSISDN = E.M5_MSISDN
                     left join usage_view_m6 F
                          on A.MSISDN = F.M6_MSISDN
                          
                     WHERE ((A.M1_TTL_OUT_CALLS > 0) or (A.M1_GPRS_SESSION > 0))""") 

In [None]:
usage_prepay = usage_prepay.dropDuplicates(["MSISDN"])

In [None]:
usage_prepay.createOrReplaceTempView("usage_prepay_view")
df_gdpr.createOrReplaceTempView("gdpr_view")

In [None]:
# join usage with consent
consent_prepay_usage = spark.sql("""SELECT A.*
                         FROM usage_prepay_view A
                         INNER JOIN 
                             (SELECT substring(MSISDN_CLI, 3 , 10) AS MSISDN, MAX(CONSENT_VALID_FROM_DATE) AS CONSENT_VALID_FROM_DATE
                              from gdpr_view
                              WHERE asset_status = 'Active' and CURRENT_IND = '1' and advanced_permission != 'NO'
                              GROUP BY MSISDN_CLI
                             ) AS B
                         ON A.MSISDN = B.MSISDN
                         ORDER BY A.MSISDN """) 

# join status with usage

In [None]:
consent_prepay_usage.createOrReplaceTempView("consent_prepay_usage_view")
consent_prepay_status.createOrReplaceTempView("consent_prepay_status_view")

In [None]:
base_usage_status = spark.sql("""SELECT A.*, B.TARIFF_PLAN, B.CONNECTION_DAY, B.SMARTPHONE_FLAG, B.INSERTED
                             FROM consent_prepay_usage_view A
                             INNER JOIN consent_prepay_status_view B
                                  ON A.MSISDN = B.MSISDN """)

In [None]:
# Fill NaNs
for c in base_usage_status.columns:
    if (dict(base_usage_status.dtypes)[c] == 'int64' or dict(base_usage_status.dtypes)[c] == 'double' or
       dict(base_usage_status.dtypes)[c] == 'int'):
        base_usage_status = base_usage_status.na.fill(value=0, subset=[c])
    elif (dict(base_usage_status.dtypes)[c] == 'object' or dict(base_usage_status.dtypes)[c] == 'string' or dict(base_usage_status.dtypes)[c] == 'timestamp'):
        base_usage_status = base_usage_status.na.fill(value="N/A", subset=[c])

### Add/Convert_Features3

In [None]:
# 1. Convert from second -> minutes
# 2. Convert from KByte -> MByte
for column in base_usage_status.columns:
    if 'MINUTES' in column:
        base_usage_status = base_usage_status.withColumn(column, col(column)/60)
    if 'VOLUME' in column:
        base_usage_status = base_usage_status.withColumn(column, col(column)/1024)

# Group Tariffs

In [None]:
# CU
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'Cuba40', 'CU'))
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'Cuba', 'CU'))

In [None]:
# VFPP
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'VALCBASE', 'VFPP'))
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'HAM', 'VFPP'))
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'Advanced', 'VFPP'))
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'OCFP', 'VFPP'))

In [None]:
# ETHNIC (INTERNATIONAL + TAZA)
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'INTPACK', 'INTERNATIONAL'))
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'Taza', 'TAZA'))
base_usage_status = base_usage_status.withColumn('TARIFF_PLAN', regexp_replace('TARIFF_PLAN', 'TAZA', 'TAZA'))

# Tenure

In [None]:
# Calculate tenure in months
base_usage_status = base_usage_status.withColumn("TENURE_IN_MONTHS", round(F.months_between(col("INSERTED"), col("CONNECTION_DAY"))))
base_usage_status = base_usage_status.withColumn("TENURE_IN_MONTHS", col("TENURE_IN_MONTHS").cast(IntegerType()))

In [None]:
# SELECT ONLY CUSTOMERS THAT ARE MORE THAN 3 MONTHS IN OUR DATABASE
base_usage_status = base_usage_status[base_usage_status["TENURE_IN_MONTHS"] > 3]

# Average talk per time

In [None]:
for month in range(1,7):
    base_usage_status = base_usage_status.withColumn("M"+ str(month) + "_MINUTES_PER_CALL", 
                                                     col("M" + str(month) + "_TTL_OUT_MINUTES") / col("M" + str(month) + "_TTL_OUT_CALLS"))

In [None]:
# Fill occured NaNs
for c in base_usage_status.columns:
    if (dict(base_usage_status.dtypes)[c] == 'int64' or dict(base_usage_status.dtypes)[c] == 'double' or
       dict(base_usage_status.dtypes)[c] == 'int'):
        base_usage_status = base_usage_status.na.fill(value=0, subset=[c])

# ARPU

In [None]:
for month in range(1, 7):
    base_usage_status = base_usage_status.withColumn("M"+str(month)+"_ARPU", 
                        col("M"+str(month)+"_TTL_OUT_REVENUE") + col("M"+str(month)+"_GPRS_REVENUE") 
                                                     + col("M"+str(month)+"_BUNDLE_REVENUE"))

# ROC

In [None]:
for column in base_usage_status.schema.names:
    if ((dict(base_usage_status.dtypes)[column] == 'int64' or dict(base_usage_status.dtypes)[column] == 'double' or dict(base_usage_status.dtypes)[column] == 'int') and column!='MSISDN' and column!= 'TENURE_IN_MONTHS' and column[2:6]!="_ROC"):
        prev_month_avg = (col("M2"+column[2:]) + col("M3"+column[2:]) + col("M4"+column[2:]) + col("M5"+column[2:]) + col("M6"+column[2:])) / 5 
        base_usage_status = base_usage_status.withColumn("M1_ROC"+column[2:], (col("M1"+column[2:]) -  prev_month_avg) / prev_month_avg )

In [None]:
# # Fill occured NaNs
for c in base_usage_status.columns:
    if (dict(base_usage_status.dtypes)[c] == 'int64' or dict(base_usage_status.dtypes)[c] == 'double' or
       dict(base_usage_status.dtypes)[c] == 'int'):
        base_usage_status = base_usage_status.na.fill(value=0, subset=[c])

# Average for all and half period

In [None]:
# Create 3 month and 6 month averages for all usage columns
for column in base_usage_status.schema.names:
    if ((dict(base_usage_status.dtypes)[column] == 'int64' or dict(base_usage_status.dtypes)[column] == 'double' or dict(base_usage_status.dtypes)[column] == 'int') and column!='MSISDN' and column!= 'TENURE_IN_MONTHS' and column[2:6]!="_ROC"):
        # first semi-semester
        base_usage_status = base_usage_status.withColumn("M13_AVG"+ column[2:], (col("M1"+column[2:])+col("M2"+column[2:])+col("M3"+column[2:]))/3)
        # second semi-semester
        base_usage_status = base_usage_status.withColumn("M46_AVG"+ column[2:], (col("M4"+column[2:])+col("M5"+column[2:])+col("M6"+column[2:]))/3)
        # calculate the average for all six months     
        base_usage_status = base_usage_status.withColumn("M16_AVG"+ column[2:], (col("M1"+column[2:])+col("M2"+column[2:])+col("M3"+column[2:])+col("M4"+column[2:])+col("M5"+column[2:])+col("M6"+column[2:])) /6)

# Ratio reacharge/ bundle value 

In [None]:
for month in range(1, 7):
    base_usage_status = base_usage_status.withColumn("M"+str(month)+ "_EXPENDITURE_RATIO",
                                                     col("M"+str(month)+ "_RECHARGES_VALUE") / col("M"+str(month)+ "_BUNDLE_REVENUE"))

In [None]:
# # Fill occured NaNs
for c in base_usage_status.columns:
    if (dict(base_usage_status.dtypes)[c] == 'int64' or dict(base_usage_status.dtypes)[c] == 'double' or
       dict(base_usage_status.dtypes)[c] == 'int'):
        base_usage_status = base_usage_status.na.fill(value=0, subset=[c])

### demographics3

In [None]:
# CAR LINE DATASET
line = 'gs://' + customerprofilecar_rawprepared_bucket + '/car_line/1.0/parquet/year={}/month={}/'

# find max day
locals()["find_day"] = read_in_data2(line, year = year_val[0], month = month_val[0]).select('MSISDN', 'day')
locals()["day_max"] = locals()["find_day"].select(F.max(F.col("day")).alias("MAX")).limit(1).collect()[0].MAX
day_max = (locals()["day_max"])

In [None]:
line = 'gs://' + customerprofilecar_rawprepared_bucket + '/car_line/1.0/parquet/year={}/month={}/day={}/'

df_line = read_in_data2(line, year = year_val[0], month = month_val[0], day= day_max)
df_line = df_line.select("MSISDN", "RETAIL_CUST_ACCT_DWH_ID")

In [None]:
df_line = df_line.dropDuplicates(["MSISDN","RETAIL_CUST_ACCT_DWH_ID"])

In [None]:
df_line = df_line.dropDuplicates(["MSISDN"])

In [None]:
base_usage_status.createOrReplaceTempView("base_usage_view")
df_line.createOrReplaceTempView("line_view")

In [None]:
# Join CAR_LINE with BASE_USAGE
usage_status_df = spark.sql ("""SELECT A.*, B.RETAIL_CUST_ACCT_DWH_ID
                          FROM base_usage_view A
                          INNER JOIN line_view B
                             ON A.MSISDN = B.MSISDN
                       """)

In [None]:
# DEMOGRAPHICS DATASET
demographics = 'gs://'+ customerprofilecar_rawprepared_bucket +'/car_pega_customer/1.0/parquet/year={}/month={}/'

day_max=1
# find max day
locals()["find_day"] = read_in_data2(demographics, year = year_val[0], month = month_val[0]).select('day')
locals()["day_max"] = locals()["find_day"].select(F.max(F.col("day")).alias("MAX")).limit(1).collect()[0].MAX
day_max = (locals()["day_max"])

In [None]:
demographics = 'gs://'+ customerprofilecar_rawprepared_bucket +'/car_pega_customer/1.0/parquet/year={}/month={}/day={}/'

df_demographics = read_in_data2(demographics, year = year_val[0], month = month_val[0], day= day_max)

In [None]:
# select specific columns
df_demographics = df_demographics.select("CUST_DWH_ID", "POST_CODE", "GENDER", "AGE", "VF_COMBO_FLG", "ACTIVE_TOTAL_LINES")

In [None]:
df_demographics = df_demographics.dropDuplicates(["CUST_DWH_ID"])

In [None]:
df_demographics = df_demographics.withColumn('POST_CODE', regexp_replace('POST_CODE', 'XXXXX', 'DUMMY'))

In [None]:
# Fill NaNs
for column in df_demographics.columns:
    if (dict(df_demographics.dtypes)[column] == 'int64' or dict(df_demographics.dtypes)[column] == 'double' or
       dict(df_demographics.dtypes)[column] == 'int'):
        # fill with mean
        mean = df_demographics.agg({column: "avg"}).collect()[0][0]
        df_demographics = df_demographics.na.fill(mean, subset=[column])
    elif (dict(df_demographics.dtypes)[column] == 'object' or dict(df_demographics.dtypes)[column] == 'string'):
        if (column == "GENDER"):
            df_demographics = df_demographics.na.fill(value="O", subset=[column])
            df_demographics = df_demographics.withColumn(column, when(col(column)== "" ,"O").otherwise(col(column)))
        if (column == "POST_CODE"):
            df_demographics = df_demographics.na.fill(value="DUMMY", subset=[column])
            df_demographics = df_demographics.withColumn(column, when(col(column)== "" ,"DUMMY").otherwise(col(column)))
        df_demographics = df_demographics.na.fill(value="N/A", subset=[column])

In [None]:
# STRANGE VALUES FOR AGES
df_demographics = df_demographics.withColumn("AGE", coalesce(col("AGE"), lit(0.0)))

In [None]:
df_demographics = df_demographics.withColumn("AGE", when(col("AGE") > 80, 80).otherwise(col("AGE")))
df_demographics = df_demographics.withColumn("AGE", when(col("AGE") < 17, 18).otherwise(col("AGE")))

# Join usage-status with demographics

In [None]:
df_demographics.createOrReplaceTempView("demographics_view")
usage_status_df.createOrReplaceTempView("usage_status_view")

In [None]:
# JOIN
usage_status_demo = spark.sql ("""SELECT A.*, B.POST_CODE, B.GENDER, B.AGE, B.VF_COMBO_FLG, B.ACTIVE_TOTAL_LINES
                          FROM usage_status_view A
                          LEFT JOIN demographics_view B
                             ON A.RETAIL_CUST_ACCT_DWH_ID = B.CUST_DWH_ID
                       """)

In [None]:
usage_status_demo = usage_status_demo.drop(col("RETAIL_CUST_ACCT_DWH_ID"))

In [None]:
# Fill NaNs
for column in df_demographics.columns:
    if column != 'CUST_DWH_ID':
        if (dict(usage_status_demo.dtypes)[column] == 'int64' or dict(usage_status_demo.dtypes)[column] == 'double' or
       dict(usage_status_demo.dtypes)[column] == 'int'):
            # fill with mean
            mean = usage_status_demo.agg({column: "avg"}).collect()[0][0]
            usage_status_demo = usage_status_demo.na.fill(mean, subset=[column])
        elif (dict(usage_status_demo.dtypes)[column] == 'object' or dict(usage_status_demo.dtypes)[column] == 'string'):
            if (column == "GENDER"):
                usage_status_demo = usage_status_demo.na.fill(value="O", subset=[column])
                usage_status_demo = usage_status_demo.withColumn(column, when(col(column)== "" ,"O").otherwise(col(column)))
            if (column == "POST_CODE"):
                usage_status_demo = usage_status_demo.na.fill(value="DUMMY", subset=[column])
                usage_status_demo = usage_status_demo.withColumn(column, when(col(column)== "" ,"DUMMY").otherwise(col(column)))
            usage_status_demo = usage_status_demo.na.fill(value="N/A", subset=[column])

### post_code3

In [None]:
subprocess.call('/bin/sh /usr/bin/gsutil -q cp gs://' + files_bucket + '/notebooks/jupyter/higher_bundles/Sociodemographics.xlsx Sociodemographics.xlsx', shell=True)
Population_pools = pd.read_excel('Sociodemographics.xlsx')
population_pools_df = sql.createDataFrame(Population_pools)

In [None]:
population_pools_df = population_pools_df.drop("Postcode_key", "Name", "Periferiaki_enotita", "Population_aged_60+",
                                              "Male_Population_aged_60+", "Female_Population_aged_60+")

In [None]:
population_pools_df = population_pools_df.withColumnRenamed('Population_aged_0-14', 'Population_aged_0_14')
population_pools_df = population_pools_df.withColumnRenamed('Male_Population_aged_0-14', 'Male_Population_aged_0_14')
population_pools_df = population_pools_df.withColumnRenamed('Female_Population_aged_0-14', 'Female_Population_aged_0_14')

population_pools_df = population_pools_df.withColumnRenamed('Population_aged_15-29', 'Population_aged_15_29')
population_pools_df = population_pools_df.withColumnRenamed('Male_Population_aged_15-29', 'Male_Population_aged_15_29')
population_pools_df = population_pools_df.withColumnRenamed('Female_Population_aged_15-29', 'Female_Population_aged_15_29')

population_pools_df = population_pools_df.withColumnRenamed('Population_aged_30-44', 'Population_aged_30_44')
population_pools_df = population_pools_df.withColumnRenamed('Male_Population_aged_30-44', 'Male_Population_aged_30_44')
population_pools_df = population_pools_df.withColumnRenamed('Female_Population_aged_30-44', 'Female_Population_aged_30_44')

population_pools_df = population_pools_df.withColumnRenamed('Population_aged_45-59', 'Population_aged_45_59')
population_pools_df = population_pools_df.withColumnRenamed('Male_Population_aged_45-59', 'Male_Population_aged_45_59')
population_pools_df = population_pools_df.withColumnRenamed('Female_Population_aged_45-59', 'Female_Population_aged_45_59')

population_pools_df = population_pools_df.withColumnRenamed('Purchasing_Power:_million_Euro', 'Purchasing_Power_million_Euro')
population_pools_df = population_pools_df.withColumnRenamed('Purchasing_Power:_per_mill_of_country', 'Purchasing_Power_per_mill_of_country')
population_pools_df = population_pools_df.withColumnRenamed('Purchasing_Power:_Euro_per_capita', 'Purchasing_Power_Euro_per_capita')
population_pools_df = population_pools_df.withColumnRenamed('Purchasing_Power:_index_(country_eq.100)', 'Purchasing_Power_index_country_eq_100')

In [None]:
population_pools_df.createOrReplaceTempView("population_pools_view")
usage_status_demo.createOrReplaceTempView("usage_status_demo_view")

In [None]:
# Exclude ages from 60+
usage_status_demo_pc = spark.sql ("""SELECT A.*, B.Population, B.Households, B.Average_Household_Size, B.Male_Population,
                          B.Female_Population, B.Population_aged_0_14, B.Male_Population_aged_0_14, B.Female_Population_aged_0_14,
                          B.Population_aged_15_29, B.Male_Population_aged_15_29, B.Female_Population_aged_15_29, 
                          B.Population_aged_30_44, B.Male_Population_aged_30_44, B.Female_Population_aged_30_44,
                          B.Population_aged_45_59, B.Male_Population_aged_45_59, B.Female_Population_aged_45_59,
                          B.Purchasing_Power_million_Euro, B.Purchasing_Power_per_mill_of_country,
                          B.Purchasing_Power_Euro_per_capita, B.Purchasing_Power_index_country_eq_100

                          FROM usage_status_demo_view A                          
                          LEFT JOIN population_pools_view B                          
                          ON A.POST_CODE = B.POST_CODE
                       """)

In [None]:
# Fill NaNs
for column in population_pools_df.columns:
    usage_status_demo_pc = usage_status_demo_pc.na.fill(value=0, subset=[column])

### students3

In [None]:
status_service = 'gs://'+ dhdwh_bucket +'/master_status_services/1.0/parquet/year={}/month={}/'

day_max=1
# find max day
locals()["find_day"] = read_in_data2(status_service, year = year_val[0], month = month_val[0]).select('day')
locals()["day_max"] = locals()["find_day"].select(F.max(F.col("day")).alias("MAX")).limit(1).collect()[0].MAX
day_max = (locals()["day_max"])
#print(day_max)

In [None]:
status_service = 'gs://'+ dhdwh_bucket +'/master_status_services/1.0/parquet/year={}/month={}/day={}'

df_status_service = read_in_data2(status_service, year = year_val[0], month = month_val[0], day=day_max)

In [None]:
df_status_service.createOrReplaceTempView("df_status_service_view")

In [None]:
df_students = spark.sql("""SELECT * 
                           FROM df_status_service_view A
                           WHERE SERVICE_CODE == 'BDLCUPaso' 
                           """)

In [None]:
df_students = df_students.dropDuplicates(["MSISDN"])

In [None]:
# ADD A NEW COLUMN WITH A FLAG TO INIDICATE THAT THIS USER IS STUDENT
df_students = df_students.withColumn("STUDENTS_FLAG", lit("Y"))

# Left join student info with main dataset

In [None]:
usage_status_demo_pc.createOrReplaceTempView("usage_status_demo_pc_view")
df_students.createOrReplaceTempView("students_view")

In [None]:
usage_status_demo_pc_stud = spark.sql("""SELECT A.*, B.STUDENTS_FLAG
                                FROM usage_status_demo_pc_view A
                                LEFT JOIN students_view B
                                ON A.MSISDN=B.MSISDN""")

In [None]:
# SET indicator N for NO if the user is not a students
usage_status_demo_pc_stud = usage_status_demo_pc_stud.na.fill(value="N", subset=["STUDENTS_FLAG"])

### buckets3

In [None]:
#month_val[:2]

In [None]:
# # Read datasets from the previous 2-month period
k = 0 
day = 1
buckets = 'gs://'+ model_outputs_bucket + '/prepay_buckets/result/parquet/1.0/year={}/month={}/'

for i,j in zip(year_val[:2], month_val[:2]):
    k=k+1
    locals()["buckets_m"+str(k)] = read_in_data2(buckets, year = i, month = j)

In [None]:
df_buckets = buckets_m1.union(buckets_m2)

In [None]:
df_buckets = df_buckets.sort(col("BUNDLE").asc(), col("ACTIVATION_DATE").desc())

In [None]:
### drop duplicates and keep first occurrence for each bundle
df_buckets = df_buckets.select("MSISDN", "ACTIVATION_DATE", "BUNDLE", "BUNDLE_REVENUE", "BALANCE", "VOICE_BUCKET", "DATA_BUCKET", "SMS_BUCKET",
                                   F.row_number().over(Window.partitionBy("MSISDN", "BUNDLE").orderBy(desc("ACTIVATION_DATE"))).alias("row_num"))

df_buckets = df_buckets.filter(col("row_num") == 1)

In [None]:
# Calculate summary for all buckets
df_buckets_summary = df_buckets.groupBy("MSISDN").agg(sum("VOICE_BUCKET").alias("VOICE_BUCKET_SUMMARY"),sum("DATA_BUCKET").alias("DATA_BUCKET_SUMMARY"),sum("SMS_BUCKET").alias("SMS_BUCKET_SUMMARY"))

### balance3

In [None]:
### drop duplicates and keep last registration 
df_balance = df_buckets.select("MSISDN", "BALANCE", F.row_number().over(Window.partitionBy("MSISDN").orderBy(desc("ACTIVATION_DATE"))).alias("row_num"))

df_balance_summary = df_balance.filter(col("row_num") == 1).drop("row_num")

# Join buckets with balance

In [None]:
df_buckets_summary.createOrReplaceTempView("buckets_sum_view")
df_balance_summary.createOrReplaceTempView("balance_sum_view")

In [None]:
df_buckets_balance = spark.sql("""SELECT A.*, B.BALANCE
                             FROM buckets_sum_view A
                             INNER JOIN balance_sum_view B
                                  ON A.MSISDN = B.MSISDN """)

# Join bucket-balance with main dataset

In [None]:
usage_status_demo_pc_stud.createOrReplaceTempView("usage_status_demo_pc_stud_view")
df_buckets_balance.createOrReplaceTempView("buckets_balance_view")

In [None]:
usage_status_demo_pc_stud_buckets = spark.sql("""SELECT A.*, B.VOICE_BUCKET_SUMMARY, B.DATA_BUCKET_SUMMARY, B.SMS_BUCKET_SUMMARY, B.BALANCE
                                FROM usage_status_demo_pc_stud_view A
                                LEFT JOIN buckets_balance_view B
                                ON A.MSISDN=B.MSISDN""")

In [None]:
for c in df_buckets_balance.columns:
    if (dict(usage_status_demo_pc_stud_buckets.dtypes)[c] == 'int64' or dict(usage_status_demo_pc_stud_buckets.dtypes)[c] == 'double' or
       dict(usage_status_demo_pc_stud_buckets.dtypes)[c] == 'int'):
        usage_status_demo_pc_stud_buckets = usage_status_demo_pc_stud_buckets.na.fill(value=0, subset=[c])
    elif (dict(usage_status_demo_pc_stud_buckets.dtypes)[c] == 'object' or dict(usage_status_demo_pc_stud_buckets.dtypes)[c] == 'string' or dict(usage_status_demo_pc_stud_buckets.dtypes)[c] == 'timestamp'):
        usage_status_demo_pc_stud_buckets = usage_status_demo_pc_stud_buckets.na.fill(value="N/A", subset=[c])

### drop_calls3

In [None]:
drop_calls = 'gs://'+ mediatedcdrs_bucket +'/eds_network_cdr/2.0/parquet/year={}/month={}/'

df_drop_calls = read_in_data2(drop_calls, year = year_val[0], month = month_val[0])

In [None]:
df_drop_calls = df_drop_calls.select("SAMPLED","A_NUMBER","FIRST_LAC","LAST_LAC","CELL","LAST_CELL","REC_TYPE","TARIFF",
                               "DURATION", "TERM_CAUSE", "day")

In [None]:
df_drop_calls.createOrReplaceTempView("cdrs_view")

In [None]:
drop_calls_query = spark.sql("""SELECT K.MSISDN,
                                count(*) as DROPPED_CALL_COUNT
                                FROM (
                                    SELECT L.MSISDN, L.SAMPLED, L.YEAR
                                    FROM (
                                       SELECT
                                       A.SAMPLED, A.A_NUMBER AS MSISDN, A.day, YEAR(A.SAMPLED) AS YEAR, SUBSTR(A.TERM_CAUSE,1,4) AS EOS,
                                       CASE WHEN A.FIRST_LAC LIKE '%IE%' THEN A.LAST_LAC  --this condition holds only for calls
                                           WHEN A.FIRST_LAC = '' THEN A.LAST_LAC          --this condition holds only for calls
                                           ELSE A.FIRST_LAC END FIRST_LAC,                --this condition holds for both calls and SMS
                                       CASE WHEN A.LAST_LAC = '' THEN A.FIRST_LAC         --this condition holds for both calls and SMS
                                           ELSE A.LAST_LAC END LAST_LAC,                  --this condition holds for both calls and SMS
                                       CASE WHEN A.CELL LIKE '%F%' THEN A.LAST_CELL       --this condition holds only for calls
                                           WHEN A.CELL = '' THEN A.LAST_CELL              --this condition holds only for calls
                                           ELSE A.CELL END CELL,                          --this condition holds for both calls and SMS
                                       CASE WHEN A.LAST_CELL = '' THEN A.CELL             --this condition holds for both calls and SMS
                                           ELSE A.LAST_CELL END LAST_CELL,                --this condition holds for both calls and SMS
                                       ROW_NUMBER() OVER (PARTITION BY A.A_NUMBER, A.SAMPLED ORDER BY A.A_NUMBER, A.SAMPLED) as LEVEL
                                       FROM cdrs_view A
                                       WHERE ((A.FIRST_LAC != '' AND A.LAST_LAC != '' AND A.CELL != '' AND A.LAST_CELL != '') OR  -- this condition holds only for calls
                                       (A.FIRST_LAC = '' AND A.LAST_LAC != '' AND A.CELL = '' AND A.LAST_CELL != '') OR    -- this condition holds only for calls
                                       (A.FIRST_LAC != '' AND A.LAST_LAC = '' AND A.CELL != '' AND A.LAST_CELL = '') OR    -- this condition holds for both calls and SMS
                                       (A.FIRST_LAC != '' AND A.LAST_LAC != '' AND A.CELL != '' AND A.LAST_CELL = ''))     -- this condition holds for both calls and SMS
                                       AND A.REC_TYPE IN ('20','30') AND A.TARIFF != '142'
                                       AND LENGTH(A.TERM_CAUSE) = 8
                                       AND SUBSTR(A.TERM_CAUSE,1,4) IN ('068F','08BF','09A6','09C3','09C5','09C8','09F8','0A0E','0A0F','0AE9','0C15','0CD2',
                                                                        '0CD3','0F7B','0F7C','018F','065D','065E','0700','0701','0702','09A7','09BF','09C0',
                                                                        '09C2','09C4','09C6','09C7','09C9','09F6','09F7','0A0A','0A0B','0A0C','0A0D','0C14',
                                                                        '0C16','0F7D','1C8F','1C90','1C91','1C92','1C9A','1C9B')
                                       AND A.A_NUMBER != '' AND A.A_NUMBER LIKE '69%' 
                                       --ORDER BY A.A_NUMBER, A.SAMPLED
                                       ) AS L
                                    WHERE L.LEVEL = 1
                                    --ORDER BY L.MSISDN, L.SAMPLED ASC
                                ) K
                                GROUP BY K.MSISDN
                                ORDER BY K.MSISDN
                             """)  

# Join drop_calls with main dataset

In [None]:
usage_status_demo_pc_stud_buckets.createOrReplaceTempView("usage_status_demo_pc_stud_buckets_view")
drop_calls_query.createOrReplaceTempView("drop_calls_view")

In [None]:
usage_status_demo_pc_stud_buckets_dropcalls = spark.sql("""SELECT A.*, B.DROPPED_CALL_COUNT
                                FROM usage_status_demo_pc_stud_buckets_view A
                                LEFT JOIN drop_calls_view B
                                ON A.MSISDN=B.MSISDN""")

In [None]:
usage_status_demo_pc_stud_buckets_dropcalls = usage_status_demo_pc_stud_buckets_dropcalls.na.fill(value=0, subset=["DROPPED_CALL_COUNT"])

### tickets3

In [None]:
ticket_service = 'gs://'+ dhdwh_bucket +'/mobile_sr_tt/1.0/parquet/year={}/month={}/'

df_tickets_requests = read_in_data2(ticket_service, year = year_val[0], month = month_val[0]).select("X_MSISDN","SR_ID").drop('service_file_id')

In [None]:
# drop duplicates
df_tickets_requests = df_tickets_requests.dropDuplicates(["X_MSISDN", "SR_ID"])

In [None]:
# ADD A COLUMN AS TICKETS COUNTER FOR EACH MSISDN
df_tickets_requests = df_tickets_requests.select("X_MSISDN", F.count("X_MSISDN").over(Window.partitionBy("X_MSISDN")).alias("TICKETS_COUNT"))

In [None]:
df_tickets_requests = df_tickets_requests.dropDuplicates(["X_MSISDN"])

# Join tickets with main dataset

In [None]:
usage_status_demo_pc_stud_buckets_dropcalls.createOrReplaceTempView("usage_status_demo_pc_stud_buckets_dropcalls_view")
df_tickets_requests.createOrReplaceTempView("tickets_requests_view")

In [None]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets = spark.sql("""SELECT A.*, B.TICKETS_COUNT
                                FROM usage_status_demo_pc_stud_buckets_dropcalls_view A
                                LEFT JOIN tickets_requests_view B
                                ON A.MSISDN=B.X_MSISDN""")

In [None]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets = usage_status_demo_pc_stud_buckets_dropcalls_tickets.na.fill(value=0, subset=["TICKETS_COUNT"])

### channel3

In [None]:
ticket_service = 'gs://'+ dhdwh_bucket +'/mobile_sr_tt/1.0/parquet/year={}/month={}/'

df_tickets_requests = read_in_data2(ticket_service, year = year_val[0], month = month_val[0]).drop('service_file_id')

In [None]:
df_events.createOrReplaceTempView("df_events_view")

In [None]:
df_bundle_purchase = spark.sql("""SELECT ACCOUNT_ID, REQUESTING_SYSTEM                             
                             FROM df_events_view 
                             WHERE ((EVENT_LABEL=139) AND (EVENT_RESULT=169)) 
                             """)

In [None]:
# Change column names
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'VOP', 'DIGITAL'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'CUapp', 'DIGITAL'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'MCare', 'DIGITAL'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'CUsite', 'DIGITAL'))

In [None]:
# Change column names
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'TAZAAPP', 'OTHER'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'EKIOSK', 'OTHER'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'PostpaidToPrepaid', 'OTHER'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'XPCVM', 'OTHER'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'LMG', 'OTHER'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'PEGA', 'OTHER'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'SMSVAS', 'OTHER'))
df_bundle_purchase = df_bundle_purchase.withColumn('REQUESTING_SYSTEM', regexp_replace('REQUESTING_SYSTEM', 'CRM', 'OTHER'))

In [None]:
# MIA KATHGORIA VFSHOP - (VFSHOP)
# ALLH KATHGORIA - THLEFWNO (IVR)  
# DIGITAL (VOP, MCARE, CUapp, CUsite)
# OTHER

In [None]:
# One-hot encoding - 4 kathgories
df_bundle_purchase = df_bundle_purchase.groupBy('ACCOUNT_ID').pivot('REQUESTING_SYSTEM').count()

In [None]:
for c in df_bundle_purchase.columns:
        df_bundle_purchase = df_bundle_purchase.na.fill(0, subset=[c])

In [None]:
df_bundle_purchase = df_bundle_purchase.select("ACCOUNT_ID", "DIGITAL", "VFShop", "IVR", "OTHER")

# Join channel with main dataset

In [None]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets.createOrReplaceTempView("usage_status_demo_pc_stud_buckets_dropcalls_tickets_view")
df_bundle_purchase.createOrReplaceTempView("bundle_purchase_view")

In [None]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel = spark.sql("""SELECT A.*, B.DIGITAL, B.VFShop, B.IVR, B.OTHER
                                FROM usage_status_demo_pc_stud_buckets_dropcalls_tickets_view A
                                LEFT JOIN bundle_purchase_view B
                                ON A.MSISDN= SUBSTR(B.ACCOUNT_ID,3,10)""")

In [None]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel = usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel.na.fill(value=0, subset=["DIGITAL", "VFShop", "IVR", "OTHER"])

### NumberOfBundles3

In [None]:
# # Read datasets from the previous 6-month period
k = 0 
day = 1
events = 'gs://'+ mediatedcdrs_bucket + '/alu_prepay_cdr/2.0/parquet/year={}/month={}/'

for i,j in zip(year_val[:-2], month_val[:-2]):
    k=k+1
    locals()["events_m"+str(k)] = read_in_data2(events, year = i, month = j).select("ACCOUNT_ID", "PTP_COSP_AMA_CODE", "EVENT_LABEL", "EVENT_RESULT")

In [None]:
events_m1.createOrReplaceTempView("events_view_m1")
events_m2.createOrReplaceTempView("events_view_m2")
events_m3.createOrReplaceTempView("events_view_m3")
events_m4.createOrReplaceTempView("events_view_m4")
events_m5.createOrReplaceTempView("events_view_m5")
events_m6.createOrReplaceTempView("events_view_m6")

In [None]:
df_num_of_bundles_m1 = spark.sql("""SELECT ACCOUNT_ID, COUNT(PTP_COSP_AMA_CODE) AS BUNDLES_NUM                       
                             FROM events_view_m1 
                             WHERE ((EVENT_LABEL=139) AND (EVENT_RESULT=169)) 
                             GROUP BY ACCOUNT_ID
                             """)

In [None]:
df_num_of_bundles_m2 = spark.sql("""SELECT ACCOUNT_ID, COUNT(PTP_COSP_AMA_CODE) AS BUNDLES_NUM                       
                             FROM events_view_m2 
                             WHERE ((EVENT_LABEL=139) AND (EVENT_RESULT=169)) 
                             GROUP BY ACCOUNT_ID
                             """)

In [None]:
df_num_of_bundles_m3 = spark.sql("""SELECT ACCOUNT_ID, COUNT(PTP_COSP_AMA_CODE) AS BUNDLES_NUM                       
                             FROM events_view_m3 
                             WHERE ((EVENT_LABEL=139) AND (EVENT_RESULT=169)) 
                             GROUP BY ACCOUNT_ID
                             """)

In [None]:
df_num_of_bundles_m4 = spark.sql("""SELECT ACCOUNT_ID, COUNT(PTP_COSP_AMA_CODE) AS BUNDLES_NUM                       
                             FROM events_view_m4 
                             WHERE ((EVENT_LABEL=139) AND (EVENT_RESULT=169)) 
                             GROUP BY ACCOUNT_ID
                             """)

In [None]:
df_num_of_bundles_m5 = spark.sql("""SELECT ACCOUNT_ID, COUNT(PTP_COSP_AMA_CODE) AS BUNDLES_NUM                       
                             FROM events_view_m5 
                             WHERE ((EVENT_LABEL=139) AND (EVENT_RESULT=169)) 
                             GROUP BY ACCOUNT_ID
                             """)

In [None]:
df_num_of_bundles_m6 = spark.sql("""SELECT ACCOUNT_ID, COUNT(PTP_COSP_AMA_CODE) AS BUNDLES_NUM                       
                             FROM events_view_m6 
                             WHERE ((EVENT_LABEL=139) AND (EVENT_RESULT=169)) 
                             GROUP BY ACCOUNT_ID
                             """)

In [None]:
df_num_of_bundles_m1.createOrReplaceTempView("num_of_bundles_view_m1")
df_num_of_bundles_m2.createOrReplaceTempView("num_of_bundles_view_m2")
df_num_of_bundles_m3.createOrReplaceTempView("num_of_bundles_view_m3")
df_num_of_bundles_m4.createOrReplaceTempView("num_of_bundles_view_m4")
df_num_of_bundles_m5.createOrReplaceTempView("num_of_bundles_view_m5")
df_num_of_bundles_m6.createOrReplaceTempView("num_of_bundles_view_m6")

In [None]:
num_of_bundles_semester = spark.sql("""SELECT substring(A.ACCOUNT_ID, 3 , 10) AS ACCOUNT_ID, A.BUNDLES_NUM AS M1_BUNDLES_NUM, 
                     B.BUNDLES_NUM AS M2_BUNDLES_NUM, C.BUNDLES_NUM AS M3_BUNDLES_NUM, 
                     D.BUNDLES_NUM AS M4_BUNDLES_NUM,
                     E.BUNDLES_NUM AS M5_BUNDLES_NUM, 
                     F.BUNDLES_NUM AS M6_BUNDLES_NUM
                     
                     FROM num_of_bundles_view_m1 A                 
                    
                     left join num_of_bundles_view_m2 B
                          on A.ACCOUNT_ID = B.ACCOUNT_ID 
                     left join num_of_bundles_view_m3 C 
                          on A.ACCOUNT_ID = C.ACCOUNT_ID
                     left join num_of_bundles_view_m4 D 
                          on A.ACCOUNT_ID = D.ACCOUNT_ID
                     left join num_of_bundles_view_m5 E 
                          on A.ACCOUNT_ID = E.ACCOUNT_ID
                     left join num_of_bundles_view_m6 F
                          on A.ACCOUNT_ID = F.ACCOUNT_ID
                          """)

In [None]:
num_of_bundles_semester = num_of_bundles_semester.na.fill(value=0)

In [None]:
# Create 3 month and 6 month averages for all usage columns
# first semi-semester
num_of_bundles_semester = num_of_bundles_semester.withColumn("M13_AVG_BUNDLES_NUM", (col("M1_BUNDLES_NUM")+col("M2_BUNDLES_NUM")+col("M3_BUNDLES_NUM"))/3)
# second semi-semester
num_of_bundles_semester = num_of_bundles_semester.withColumn("M46_AVG_BUNDLES_NUM", (col("M4_BUNDLES_NUM")+col("M5_BUNDLES_NUM")+col("M6_BUNDLES_NUM"))/3)
# calculate the average for all six months     
num_of_bundles_semester = num_of_bundles_semester.withColumn("M16_AVG_BUNDLES_NUM", (col("M1_BUNDLES_NUM")+col("M2_BUNDLES_NUM")+col("M3_BUNDLES_NUM")+col("M4_BUNDLES_NUM")+col("M5_BUNDLES_NUM")+col("M6_BUNDLES_NUM")) /6)

# Join number of bundles with main dataset

In [None]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel.createOrReplaceTempView("usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel_view")
num_of_bundles_semester.createOrReplaceTempView("num_of_bundles_semester_view")

In [None]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel_nob = spark.sql("""SELECT A.*, B.M1_BUNDLES_NUM, B.M2_BUNDLES_NUM, B.M3_BUNDLES_NUM, 
                                B.M4_BUNDLES_NUM, B.M5_BUNDLES_NUM, B.M6_BUNDLES_NUM, B.M13_AVG_BUNDLES_NUM, B.M46_AVG_BUNDLES_NUM, 
                                B.M16_AVG_BUNDLES_NUM
                                FROM usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel_view A
                                LEFT JOIN num_of_bundles_semester_view B
                                ON A.MSISDN= B.ACCOUNT_ID""")

In [None]:
usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel_nob = usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel_nob.na.fill(value=0)

# -----------------------------------------------------------------------------------------------------------#

# Pandas scoring dataset

In [None]:
scoring_boruta_features = boruta_features
scoring_boruta_features.remove("HIGHER_BUNDLE")

In [None]:
#len(scoring_boruta_features)

In [None]:
#scoring_boruta_features

In [None]:
final_scoring_df = usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel_nob.select(scoring_boruta_features).toPandas()

In [None]:
#final_scoring_df.columns

In [None]:
objects = final_scoring_df.select_dtypes(include=['object'])
object_names = list(objects.columns.values)

#print(object_names)
object_names.remove('MSISDN')

In [None]:
le = LabelEncoder()

# Encoding all the categorical columns
for object_name in object_names:
    if (final_scoring_df[object_name].nunique() > 2):
        enc_pc = pd.get_dummies(final_scoring_df[object_name], drop_first = True)
        final_scoring_df = final_scoring_df.drop(object_name, axis = 1)
        final_scoring_df = pd.concat([final_scoring_df, enc_pc], axis = 1)
    elif (final_scoring_df[object_name].nunique() <= 2):
        le.fit(final_scoring_df[object_name].astype(str))
        final_scoring_df[object_name] = le.transform(final_scoring_df[object_name].astype(str))

### Score <a class="anchor" id="score"></a>

# PREPARE FINAL OUTPUT DATASET

In [None]:
# 1. PROPENSITY
final_scoring_df_new = final_scoring_df.drop("MSISDN", axis=1)
higher_bundle_propensity = lgbm.predict_proba(final_scoring_df_new)

In [None]:
#higher_bundle_propensity

In [None]:
# 2. Bring more info

In [None]:
msisdn = final_scoring_df["MSISDN"]
voice_bucket = final_scoring_df["VOICE_BUCKET_SUMMARY"]
data_bucket = final_scoring_df["DATA_BUCKET_SUMMARY"]
sms_bucket = final_scoring_df["SMS_BUCKET_SUMMARY"]
students_flg = final_scoring_df["STUDENTS_FLAG"] 

In [None]:
output_list = list(zip(msisdn,voice_bucket,data_bucket,sms_bucket,students_flg, list(higher_bundle_propensity[:,1])))
sortED = list(sorted(output_list, key=lambda x: x[5], reverse=True))
scored = pd.DataFrame(sortED, columns = ['MSISDN','VOICE_BUCKET_SUMMARY', 'DATA_BUCKET_SUMMARY', 'SMS_BUCKET_SUMMARY', 'STUDENTS_FLAG','PROPENSITY'])

In [None]:
#scored['PROPENSITY'] =  round(scored['PROPENSITY'], 4)
scored['PROPENSITY'] =  scored['PROPENSITY'].round(3)

In [None]:
# quartiles = np.array_split(scored['PROPENSITY'].values,10)
# quarts = []
# for i in list(range(1,11)):
#     temp = [i]*len(quartiles[i-1])
#     quarts.append(temp)
# quart = [item for sublist in quarts for item in sublist]
# scored['PERCENTILE'] = quart

# scored[scored['PROPENSITY'] >= 0.5].shape
# scored['PROPENSITY'] =  round(scored['PROPENSITY'], 4)
# scored.head()

# Add TARIFF PLAN information

In [None]:
#scored.shape

In [None]:
help_df = usage_status_demo_pc_stud_buckets_dropcalls_tickets_channel_nob.select(scoring_boruta_features).toPandas()

In [None]:
#help_df.head()

In [None]:
output_df = pd.merge(scored, help_df[["MSISDN","TARIFF_PLAN"]], on='MSISDN')

In [None]:
#output_df.shape

In [None]:
output_df["TARIFF_PLAN"] = np.where((output_df["TARIFF_PLAN"] == "CU") & (output_df["STUDENTS_FLAG"] == 0), "CURest", output_df["TARIFF_PLAN"])
output_df["TARIFF_PLAN"] = np.where((output_df["TARIFF_PLAN"] == "CU") & (output_df["STUDENTS_FLAG"] == 1), "CUStudent", output_df["TARIFF_PLAN"])

# Join with the last purchased bundle

In [None]:
status_service = 'gs://'+ dhdwh_bucket +'/master_status_services/1.0/parquet/year={}/month={}/'

day_max=1
# find max day
locals()["find_day"] = read_in_data2(status_service, year = year_val[0], month = month_val[0]).select('day')
locals()["day_max"] = locals()["find_day"].select(F.max(F.col("day")).alias("MAX")).limit(1).collect()[0].MAX
day_max = (locals()["day_max"])
#print(day_max)

In [None]:
status_service = 'gs://'+ dhdwh_bucket +'/master_status_services/1.0/parquet/year={}/month={}/day={}'

recent_bundles = read_in_data2(status_service, year = year_val[0], month = month_val[0], day=day_max)

In [None]:
recent_bundles.createOrReplaceTempView("recent_bundles_view")

In [None]:
recent_bundles = spark.sql("""SELECT MSISDN, SERVICE_CODE FROM recent_bundles_view
                WHERE (SERVICE_CODE="BDLEthnicDataH") OR (SERVICE_CODE="BDLDataTazaINT") 
                
                OR (SERVICE_CODE="BDLIntegLPak") OR (SERVICE_CODE="BDLIntegPak") 
                OR (SERVICE_CODE="BDLIntegLInd") OR (SERVICE_CODE="BDLIntegInd")
                OR (SERVICE_CODE="BDLIntegLBang") OR (SERVICE_CODE="BDLIntegBang")
                OR (SERVICE_CODE="BDLAlbania") OR (SERVICE_CODE="BDLVFAlbInt")
                
                OR (SERVICE_CODE="BDLXNetData") OR (SERVICE_CODE="BDLPreCombo")
                
                OR (SERVICE_CODE="BDLTalkText600") OR (SERVICE_CODE="BDLComboMax") OR (SERVICE_CODE="BDLCUComboXL")                
                OR (SERVICE_CODE="BDLPasoComboH") OR (SERVICE_CODE="BDLPasoComboXL") OR (SERVICE_CODE="BDLPasoComboML")
                OR (SERVICE_CODE="BDLPasoComboTL")
                """)

In [None]:
recent_bundles = recent_bundles.withColumn("PRICE", when(col("SERVICE_CODE") == "BDLTalkText600", 10.9)
                .when(col("SERVICE_CODE") == "BDLComboMax", 13.5)
                .when(col("SERVICE_CODE") == "BDLCUComboXL", 15)
                .when(col("SERVICE_CODE") == "BDLPasoComboH", 8.5)
                .when(col("SERVICE_CODE") == "BDLPasoComboXL", 10)
                .when(col("SERVICE_CODE") == "BDLPasoComboML", 12)
                .when(col("SERVICE_CODE") == "BDLPasoComboTL", 17.5)
                # TAZA
                .when(col("SERVICE_CODE") == "BDLIntegLPak", 5.3)
                .when(col("SERVICE_CODE") == "BDLIntegLInd", 5.3)
                .when(col("SERVICE_CODE") == "BDLIntegLBang", 5.3)
                .when(col("SERVICE_CODE") == "BDLAlbania", 5.3)
                .when(col("SERVICE_CODE") == "BDLIntegPak", 8.5)
                .when(col("SERVICE_CODE") == "BDLIntegInd", 8.5)
                .when(col("SERVICE_CODE") == "BDLIntegBang", 8.5)
                .when(col("SERVICE_CODE") == "BDLVFAlbInt", 8.5)
                # INTERNATIONAL
                .when(col("SERVICE_CODE") == "BDLEthnicDataH", 8.9)
                .when(col("SERVICE_CODE") == "BDLDataTazaINT", 10.5)
                # VFPP
                .when(col("SERVICE_CODE") == "BDLXNetData", 10.9)
                .when(col("SERVICE_CODE") == "BDLPreCombo", 13.5)
                )

In [None]:
recent_bundles = recent_bundles.select("MSISDN", "SERVICE_CODE", "PRICE", F.row_number().over(Window.partitionBy("MSISDN").orderBy(desc("PRICE"))).alias("row_num"))

In [None]:
last_bundle = recent_bundles.filter(col("row_num") == 1).drop("row_num")

In [None]:
last_bundle_pd =  last_bundle.toPandas()

# Join last bundle with output

In [None]:
output_df = output_df.merge(last_bundle_pd, how='left', on='MSISDN')

In [None]:
for column in output_df.columns.values:
    if (output_df[column].dtype == 'int64' or output_df[column].dtype == 'float64'):
        # chech strange age values
        output_df[column] = output_df[column].fillna(0)
    elif (output_df[column].dtype == 'object' or output_df[column].dtype == 'str'):
        output_df[column] = output_df[column].fillna("NULL")

In [None]:
output_df['PROPENSITY'] =  output_df['PROPENSITY'].round(3)

In [None]:
output_df['VOICE_BUCKET_SUMMARY'] =  output_df['VOICE_BUCKET_SUMMARY'].round(3)
output_df['DATA_BUCKET_SUMMARY'] =  output_df['DATA_BUCKET_SUMMARY'].round(3)
output_df['SMS_BUCKET_SUMMARY'] =  output_df['SMS_BUCKET_SUMMARY'].round(3)
output_df['PRICE'] =  output_df['PRICE'].round(3)

In [None]:
#output_df["SERVICE_CODE"].value_counts()

In [None]:
#output_df[output_df["SERVICE_CODE"]== "NULL"].shape

In [None]:
#output_df.head()

In [None]:
output_df.isnull().values.any()

In [None]:
output_df = output_df.replace([np.inf, -np.inf, np.nan], 0.0)
output_df = output_df.fillna(0)

In [None]:
output_df.isnull().values.any()

In [None]:
#output_df.head()

# VOLUME SCORING

In [None]:
# Exclude users with bundle 15 euro
# output_df = output_df[~((output_df["PROPENSITY"] > 0.5) & (output_df["SERVICE_CODE"] == 'BDLCUComboXL'))]

In [None]:
# People with high propensity to buy higher bundle
#scoring_volume = output_df[output_df["PROPENSITY"] >= 0.5]

In [None]:
#scoring_volume.shape

In [None]:
#scoring_volume["TARIFF_PLAN"].value_counts()

In [None]:
#scoring_volume["SERVICE_CODE"].value_counts()

In [None]:
#test = scoring_volume.groupby(["TARIFF_PLAN", "SERVICE_CODE", "PRICE"])["MSISDN"].count()

In [None]:
#test.head(100)

### Save model <a class="anchor" id="save_model"></a>

In [None]:
file_out = 'gs://'+ output_bucket_new +'/higher_bundles/result/parquet/'+ version +'/year={}/month={}/day={}'

In [None]:
import datetime
currentDay = datetime.date.today().day
currentMonth = datetime.date.today().month
currentYear = datetime.date.today().year

In [None]:
#print(currentYear, currentMonth, currentDay)

In [None]:
output_df_pyspark = spark.createDataFrame(output_df)

In [None]:
output_df_pyspark.printSchema()

In [None]:
output_df_pyspark.limit(10).toPandas()

In [None]:
output_df_pyspark.write.parquet(file_out.format(currentYear, currentMonth, currentDay), mode='overwrite')

In [None]:
import time
stop_time = time.time()
print(stop_time)

In [None]:
print(stop_time/60 - start_time/60) 