# Predictive Care Initiative - Labeling

The main goal of this notebook is to write a pyspark script that preprocess and creates a data set that combines statistics from the access network (`DSLAM`) and the customer premises devices (`CPE`). The above mentioned script should be able to:

* Generate a filter data set based on a specified time range.
* measurements per Line/port/cpe must be regularly collected.

Also, in a second stage CPEs that report any kind of issues must be labled

**Business Impact:**
The Data generated out of this notebook (train and test) is used as input for ML modeling for CPE replacement usecase.

**Key Insights:**
* Data with issues are generated
* Healthy CPE data are generated
* Issues and Healthy CPE data is concatenated and stratified sampling is done in ration 90%:10% for train:test

**Future Work:**
* We have to sort this pipeline to gather techincal data on daily basis in HT prod by using part of this notebook


### Table of Contents

* [1.Import packages and config](#chapter1)
    * [1.1 Spark configuration](#section_1_1)
    * [1.2 Import packages](#section_1_2)
    * [1.3 Lets setup some style!](#section_1_3)
* [2.Gather Data](#chapter2)
* [3.CPE replacement data](#chapter3)
* [4. Tickets data](#chapter4)
* [5.CPE refurbishment data](#chapter5)
* [6.Join CPE refurbishment, tickets and replacement data](#chapter6)  
* [7.Building dataset independet of `CPE` replacement events - healthy dataset.](#chapter7)
* [8.8. Building dataset related with `CPE` replacement events - issue dataset](#chapter8)
    * [8.1 Creating Windowing](#section_8_1)
    * [8.2 Join `CPE` replacement  with history data](#section_8_2)
* [9. Combine healthy +  issue CPE's and label them](#chapter9)
    * [9.1 Read CPE's  with issue and healthy](#section_9_1)
    * [9.2 Add dummy columns to df_healthy](#section_9_2)
    * [9.3 Concatenate issue and healthy CPEs](#section_9_3)
    * [9.4 Check for No thunderstorm](#section_9_4)
    * [9.5 Get list of distinct issue types, to add new columns](#section_9_5)
    * [9.6 Add columns to the df and assign default values for now](#section_9_6)
    * [9.7 Fill labels based on issues](#section_9_7)
    * [9.8 Stratified split](#section_9_8)
* [10. Appendix](#chapter10)
    * [A CPEs without any issues](#section_10_1)

# 1. Import packages and config  <a class="anchor" id="chapter1"></a>

Here is just some stuff we are gonna need!!

##  1.1 Spark configuration  <a class="anchor" id="section_1_1"></a>

Let's configure our spark session.

In [1]:
%%configure -f
{"conf":
 {"spark.driver.cores": "6",
  "spark.driver.memory": "10g",
  "spark.executor.cores": "6",
  "spark.executor.memory": "10g",
  "spark.dynamicAllocation.enabled": "true",
  "spark.dynamicAllocation.minExecutors" : "4",
  "spark.driver.maxResultSize": "4g"
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
237,application_1620749185953_46577,pyspark,idle,Link,Link,
276,application_1620749185953_47232,pyspark,idle,Link,Link,
277,application_1620749185953_47247,pyspark,idle,Link,Link,
279,application_1620749185953_47264,pyspark,idle,Link,Link,
280,application_1620749185953_47288,pyspark,idle,Link,Link,
285,application_1620749185953_47417,pyspark,idle,Link,Link,
286,application_1620749185953_47418,pyspark,idle,Link,Link,
287,application_1620749185953_47424,pyspark,idle,Link,Link,


In [2]:
%%cleanup -f

## 1.2 Import packages <a class="anchor" id="section_1_2"></a>

In [2]:
# Data Science Packages
import sys
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from  matplotlib import pyplot
import seaborn as sns
import warnings
from datetime import datetime
from datetime import timedelta
import datetime


#Spark Packages
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from pyspark.sql import types 
import pyspark.sql.types
from pyspark.sql.functions import UserDefinedFunction

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
289,application_1620749185953_47449,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 1.3  Let's setup some style! <a class="anchor" id="section_1_3"></a>

This is not a requirement but anyway you know that they say:
> "Fashions fade, style is eternal." <br/>
_Yves Saint Laurent_

In [3]:
# Matplotlib

#matplotlib.use('agg')
#plt.switch_backend('agg')

# Seaborn Style
sns.set(style='ticks')
sns.set_style({'font.family': 'Hiragino Maru Gothic Pro'})
sns.set_palette("cool")

# Pandas Style
pd.set_option("display.max_column", 9999)
pd.set_option("display.max_row", 9999)

# Ignore annoying warning 
warnings.filterwarnings('ignore')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 2. Gather data <a class="anchor" id="chapter2"></a>

In this section we are going load the required data. (more information take a look into `1.0-lfva-gather-data.ipynb`)

In [4]:
from datetime import datetime
from datetime import timedelta
#import datetime

#Spark Packages
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

def spark_init():
    """Init Spark session.
    Returns:
        object: spark session.
    """
    spark = SparkSession.builder \
        .master('yarn') \
        .appName('predictive_care') \
        .enableHiveSupport() \
        .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    return spark


def data_import(database, table, start_time, end_time, spark):
    """Function to load data from Hive tables based on start time and end time.
    
    Args:
        database (string): Database name in HIVE.
        table (string): Table name in HIVE.
        start_time (str): Start time for the query.
        end_time (str): End time for the query.
        spark (obj): Spark session object.
        
    Returns:
        dataframe :  spark dataframe with loaded data.
    """
    start_time = parsing_date_format(start_time, "%Y-%m-%d")
    end_time = parsing_date_format(end_time, "%Y-%m-%d")

    df = spark.sql("select * from {0}.{1} where {2} between '{3}' and '{4}' " \
              .format(database, table, 'datum', start_time, end_time))
    return df

def generate_date_series(start, stop):
    return [start + timedelta(days=x) for x in range(0, (stop-start).days + 1)] 

def parsing_date_format(datetime_str, target_date_format):
    """Fuction to modify the date format.
    
    Args:
        datetime_str (str): Date in original format. expected "%Y-%m-%d".
        target_date_format (str): Desire date format e.g "%Y%m%d".
    
    Returns
        str: Date in target format.
    """
    date_obj = datetime.strptime(datetime_str, "%Y-%m-%d").date()
    return date_obj.strftime(target_date_format)

def flatten_df(df):
    """This funtion flat a nested dataframe.
    
    Args:
        df (dataframe): Input nested dataframe.
        
    Returns:
        flat_df (dataframe): Output flatted datframe.
    """
    flat_cols = [col[0] for col in df.dtypes if col[1][:6] != 'struct']
    nested_cols = [col[0] for col in df.dtypes if col[1][:6] == 'struct']
    flat_df = df.select(flat_cols +\
                   [F.col(root_col+'.'+nested_col).alias(root_col+'_'+nested_col)\
                    for root_col in nested_cols\
                    for nested_col in df.select(root_col+'.*').columns])
    return flat_df

def rename_cols(df, prefix):
    """Column renameb by adding a prefix.
    
    Args:
        df (dataframe): Input dataframe.
        
    Returns:
        df (dataframe): Output datframe witha ppended prefix to columns.
    """
    for feature in df.columns:
        df = df.withColumnRenamed(feature,prefix+feature)
    return df

def change_data_type(df, cols_to_convert, datatype):
    """ Change data type for columns from one type to another.
    Args:
        df (dataframe): Input spark dataframe. 
        cols_to_convert (list): list of columns.
        datatype(object): Intended data type.
    Returns:
        df (dataframe): Spark df with proper datatype.
    """
    for col_name in cols_to_convert:
        df = df.withColumn(col_name, F.col(col_name).cast(datatype))
    return df

def fix_cpetype(df):
        df_inventory = df.filter(F.col('cpetype').isNotNull()).select('assetid','cpetype').groupBy('assetid', 'cpetype').count()
        df_inventory = df_inventory.select('assetid','cpetype')
        df_inventory = df_inventory.dropDuplicates(subset = ['assetid'])
        df = df.drop('cpetype')
        df = df.join(df_inventory, on=['assetid'], how='left')
        df = df.filter(F.col('cpetype').isNotNull())
        return df

def gather_data (start_time, end_time, spark):
    """This Funtion gather the relevant information from HDFS in the relevant time frame.
    
    Args:
        start_time (str): Start time for the query.
        end_time (str): End time for the query.
        spark (obj): Spark session object.
        
    Returns:
        dataframe :  spark dataframe with loaded data.
    
    """
    #loading data
    df_dslam = data_import('cdl_blos', 'pol_day_aggregation', start_time, end_time, spark) #per day
    df_snmp = data_import('cdl_blos', 'snmp_traps_agg', start_time, end_time, spark) #per day
    df_acs = data_import('cdl_acscoll', 'cdl_acscoll_xdsl_line_prq',start_time, end_time, spark)# cat, mm, counter/delta
    # Delta/counter values in df_acs doesnt make sense
    
    # create key
    df_key = df_dslam.select('assetid').distinct()
    df_key = df_key.withColumn('start', F.lit(start_time)).withColumn('stop', F.lit(end_time))
    df_key = df_key.withColumn('start', F.col('start').cast(types.DateType()))
    df_key = df_key.withColumn('stop', F.col('stop').cast(types.DateType()))
    
    spark.udf.register("generate_date_series", generate_date_series, types.ArrayType(types.DateType()) )
    df_key.createOrReplaceTempView("keydf")

    df_key = spark.sql("SELECT assetid, explode(generate_date_series(start, stop)) as date FROM keydf")
    print('df_key - printSchema', df_key.printSchema())
    
    #flatteing nested fields
    df_acs = flatten_df(df_acs)
    
    #dropping missing keys
    df_dslam =df_dslam.na.drop(subset=['assetid','datum'])
    df_snmp =df_snmp.na.drop(subset=['assetid','datum'])
    df_acs = df_acs.na.drop(subset=['crm_assetId','datum'])
    
    # Drop multiple samples per day
    df_dslam = df_dslam.dropDuplicates(['assetid','datum'])
    df_snmp = df_snmp.dropDuplicates(['assetid','datum'])
    df_acs = df_acs.dropDuplicates(['crm_assetId','datum'])
    
    #Renamming for conditional join
    df_dslam = rename_cols(df_dslam, 'dslam_')
    df_snmp = rename_cols(df_snmp, 'snmp_')
    df_acs = rename_cols(df_acs, 'acs_')
    
    
    #Join
    
    df = df_key.join(df_dslam, [(df_key.assetid == df_dslam.dslam_assetid) & (df_key.date == df_dslam.dslam_datum)], how='left')
    df = df.join(df_snmp, [(df.assetid == df_snmp.snmp_assetid) & (df.date == df_snmp.snmp_datum)], how='left')
    df = df.join(df_acs, [(df.assetid == df_acs.acs_crm_assetId) & (df.date == df_acs.acs_datum)], how='left')
    df = df.drop('dslam_assetid', 'acs_crm_assetId','acs_datum','snmp_assetid', 'snmp_datum')
    df = df.withColumnRenamed('date','datum')
  
    
    #change data types as all types are string
    cols_to_convert_to_long = ['acs_xdslline_dslUptime', 'acs_xdslline_cpeUptime','acs_xdslline_cpuUsagePercentage',
                           'acs_xdslline_freeMemoryKb', 'acs_xdslline_totalMemoryKb', 'acs_xdslline_usedMemoryKb',
                           'acs_xdslline_natCurrent', 'acs_xdslline_natMax', 'acs_xdslline_crc', 'acs_xdslline_fec',
                           'acs_xdslline_hec', 'acs_xdslline_cellDelin', 'acs_xdslline_erroredSecs', 'acs_xdslline_initErrors',
                           'acs_xdslline_initTimeOuts', 'acs_xdslline_severelyErroredSecs', 'acs_xdslline_linkRetrain',
                           'acs_xdslline_lossOfFraming', 'acs_xdslline_lossOfPower','acs_xdslline_transmitBlocks', 'acs_xdslline_receiveBlocks', 
                           'acs_xdslline_downAttenuationDb', 'acs_xdslline_downCurrRateKbps', 'acs_xdslline_downMaxRateKbps', 
                           'acs_xdslline_downNoiseMarginDb', 'acs_xdslline_downPowerDbm', 'acs_xdslline_upAttenuationDb', 'acs_xdslline_upCurrRateKbps', 'acs_xdslline_upMaxRateKbps', 
                           'acs_xdslline_upNoiseMarginDb', 'acs_xdslline_upPowerDbm', 'acs_xdslline_atuccrc',
                            'acs_xdslline_atucfec','acs_xdslline_atuchec']
    df = change_data_type(df, cols_to_convert=cols_to_convert_to_long, datatype=types.LongType())
    
    #feature list
    feature_list = ['dslam_distinct_modulation', 'dslam_no_of_dominant_modulation', 'dslam_code_of_dominant_modulation', 
     'dslam_avg_bitrate_us', 'dslam_avg_bitrate_ds', 'dslam_avg_attenuation_ds', 'dslam_avg_attenuation_us',
     'dslam_avg_power_us', 'dslam_avg_power_ds', 'dslam_avg_att_bitrate_us', 'dslam_avg_att_bitrate_ds', 
     'dslam_avg_snr_us', 'dslam_avg_snr_ds', 'dslam_no_counts', 'dslam_avg_bandline_ds', 'dslam_avg_bandline_us', 
     'dslam_sum_cv_us', 'dslam_count_cv_us', 'dslam_sum_cv_ds', 'dslam_count_cv_ds', 'dslam_sum_es_ds', 
     'dslam_count_es_ds', 'dslam_sum_es_us', 'dslam_count_es_us', 'dslam_sum_ses_ds', 'dslam_count_ses_ds', 
     'dslam_sum_ses_us', 'dslam_count_ses_us', 'dslam_sum_fec_ds', 'dslam_count_fec_ds', 'dslam_sum_fec_us', 
     'dslam_count_fec_us', 'dslam_inits', 'dslam_port_inst_id', 'dslam_rg_port_inst_id', 'dslam_prim_izvod_inst_id', 
     'snmp_no_lossoflink', 'snmp_no_linkup', 'snmp_no_dyinggasp', 'acs_xdslline_dslUptime', 'acs_xdslline_cpeUptime', 
     'acs_xdslline_cpuUsagePercentage', 'acs_xdslline_freeMemoryKb', 'acs_xdslline_totalMemoryKb', 
     'acs_xdslline_usedMemoryKb', 'acs_xdslline_natCurrent', 'acs_xdslline_natMax', 'acs_xdslline_crc', 
     'acs_xdslline_fec', 'acs_xdslline_hec', 'acs_xdslline_cellDelin', 'acs_xdslline_erroredSecs', 'acs_xdslline_initErrors',
     'acs_xdslline_initTimeOuts', 'acs_xdslline_severelyErroredSecs', 'acs_xdslline_linkRetrain', 'acs_xdslline_lossOfFraming', 
     'acs_xdslline_lossOfPower', 'acs_xdslline_transmitBlocks', 'acs_xdslline_receiveBlocks', 'acs_xdslline_downAttenuationDb', 
     'acs_xdslline_downCurrRateKbps', 'acs_xdslline_downMaxRateKbps', 'acs_xdslline_downNoiseMarginDb', 'acs_xdslline_downPowerDbm', 
     'acs_xdslline_upAttenuationDb', 'acs_xdslline_upCurrRateKbps', 'acs_xdslline_upMaxRateKbps', 'acs_xdslline_upNoiseMarginDb', 
     'acs_xdslline_upPowerDbm', 'acs_xdslline_atuccrc', 'acs_xdslline_atucfec', 'acs_xdslline_atuchec', 'datum', 'assetid', 
     'dslam_date_inserted', 'dslam_platforma', 'dslam_model', 'dslam_card_type', 'dslam_status_porta', 'dslam_bandwidth', 
     'dslam_vdsl_adsl', 'dslam_vendor', 'dslam_status', 'dslam_internet', 'dslam_iptv', 'dslam_voip', 'dslam_regija', 
     'acs_@type', 'acs_status', 'acs_description', 'acs_crm_accessNetwork', 'acs_crm_dslamFrame', 'acs_crm_dslamSlot', 
     'acs_crm_serviceName', 'acs_crm_productClass', 'acs_xdslline_manufacturer', 'acs_xdslline_manufacturerOui', 'acs_xdslline_softwareVersion', 
     'acs_xdslline_additionalSoftwareVersion', 'acs_xdslline_anexType', 'acs_xdslline_dslType', 'acs_xdslline_linkEncapsulationType', 
     'acs_xdslline_pppoEConnectionStatus', 'acs_xdslline_pppoELastConnectionError', 'acs_xdslline_pppoENATEnabled', 
     'acs_xdslline_pppoEUsername', 'acs_createdmsec']
    
    df = df.select(*feature_list)
    df = df.withColumnRenamed('acs_crm_productClass', 'cpetype')
    
    #Sorting by date
    df = df.orderBy(df.datum.asc())
    df = fix_cpetype(df)
    return df


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 3. `CPE` replacement data <a class="anchor" id="chapter3"></a>

Le's load the `CPE replacement dataset`. - The day the CPE was replaced due to issues and it also contains for which ticket ID the replacement happened

In [13]:
path_cpe_replacement1 = "hdfs://nameservicedev1//user/dt_srajan/predictive_care/replacement/cpe_replacement.csv"
df_cpe_replacement1 = spark.read.option("encoding", "ISO-8859-1").option("inferSchema", "true").csv(path_cpe_replacement1,header = True,sep=";")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
# New CPE replacament data for Jan to May 2020, Jan to July 2021

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
path_cpe_replacement2 = "hdfs://nameservicedev1//user/dt_srajan/predictive_care/replacement/replacement_new_data/CPE replacement_2020_01-06.csv"
df_cpe_replacement2 = spark.read.option("encoding", "ISO-8859-1").option("inferSchema", "true").csv(path_cpe_replacement2,header = True,sep=";")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
path_cpe_replacement3 = "hdfs://nameservicedev1//user/dt_srajan/predictive_care/replacement/replacement_new_data/CPE replacement_2021_01-07.csv"
df_cpe_replacement3 = spark.read.option("encoding", "ISO-8859-1").option("inferSchema", "true").csv(path_cpe_replacement3,header = True,sep=";")
df_cpe_replacement3 = df_cpe_replacement3.filter((df_cpe_replacement3.Month == 1) | (df_cpe_replacement3.Month == 2) | (df_cpe_replacement3.Month == 3) | (df_cpe_replacement3.Month == 4))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
df_cpe_replacement1 = df_cpe_replacement1.select('TICKET_ID','ASSET_ID', 'CPE_SERIAL_NUMBER')
df_cpe_replacement2 = df_cpe_replacement2.select('TICKET_ID','ASSET_ID', 'CPE_SERIAL_NUMBER')
df_cpe_replacement3 = df_cpe_replacement3.select('TICKET_ID','ASSET_ID', 'CPE_SERIAL_NUMBER')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
df_cpe_replacement1.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

65522

In [19]:
df_cpe_replacement2.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

20265

In [20]:
df_cpe_replacement3.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

23633

In [21]:
df_cpe_replacement = df_cpe_replacement1.union(df_cpe_replacement2).union(df_cpe_replacement3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
df_cpe_replacement.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

109420

In [24]:
65522 + 20265 + 23633

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

109420

In [25]:
df_cpe_replacement = df_cpe_replacement.dropDuplicates()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
df_cpe_replacement.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

109237

In [27]:
df_cpe_replacement.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- TICKET_ID: integer (nullable = true)
 |-- ASSET_ID: integer (nullable = true)
 |-- CPE_SERIAL_NUMBER: string (nullable = true)

# 4. Tickets data <a class="anchor" id="chapter4"></a>


Let's load the `Tickets dataset`.

The tickets are divided in two files then we are going to loaded individualy

In [28]:
path_tickets_1 = "hdfs://nameservicedev1//user/dt_srajan/predictive_care/tickets/Smetnje_0107-1510_2020.csv"
df_tickets_1 = spark.read.option("encoding", "ISO-8859-1").option("inferSchema", "true").csv(path_tickets_1,header = True,sep=";")
df_tickets_1 = df_tickets_1.select('TICKET_ID','START_DATE_TIME_DONAT','ASSET_ID','Uzrok_smetnje')

path_tickets_2 = "hdfs://nameservicedev1//user/dt_srajan/predictive_care/tickets/FAULTS_E2E_01102020-31122020.csv"
df_tickets_2 = spark.read.option("encoding", "ISO-8859-1").option("inferSchema", "true").csv(path_tickets_2,header = True,sep=";")
df_tickets_2 = df_tickets_2.select('TICKET_ID','START_DATE_TIME_DONAT','ASSET_ID','Uzrok_smetnje')

path_tickets_3 = "hdfs://nameservicedev1//user/dt_srajan/predictive_care/tickets/FAULTS_01012020-30062020.csv"
df_tickets_3 = spark.read.option("encoding", "ISO-8859-1").option("inferSchema", "true").csv(path_tickets_3,header = True,sep=";")
df_tickets_3 = df_tickets_3.select('TICKET_ID','START_DATE_TIME_DONAT','ASSET_ID','Uzrok_smetnje')

path_tickets_4 = "hdfs://nameservicedev1//user/dt_srajan/predictive_care/tickets/FAULTS_01012021-30042021.csv"
df_tickets_4 = spark.read.option("encoding", "ISO-8859-1").option("inferSchema", "true").csv(path_tickets_4,header = True,sep=";")
df_tickets_4 = df_tickets_4.select('TICKET_ID','START_DATE_TIME_DONAT','ASSET_ID','Uzrok_smetnje')


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Now we can ajust the timestamp.

In [29]:
df_tickets_1 = df_tickets_1.select('TICKET_ID','START_DATE_TIME_DONAT','ASSET_ID','Uzrok_smetnje')
df_tickets_1 = df_tickets_1.withColumn('START_DATE_TICKET',F.unix_timestamp('START_DATE_TIME_DONAT', "yyyy-MM-dd'T'HH:mm:ss").cast(types.TimestampType()))
df_tickets_1 = df_tickets_1.withColumn('START_DATE_TICKET',F.date_format(F.col("START_DATE_TICKET"), "yyy-MM-dd HH:mm:ss"))
df_tickets_1.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+---------------------+---------+--------------------+-------------------+
|TICKET_ID|START_DATE_TIME_DONAT| ASSET_ID|       Uzrok_smetnje|  START_DATE_TICKET|
+---------+---------------------+---------+--------------------+-------------------+
| 45450341|  2020-07-01T00:18:32| 77725692|Neispravna oprema...|2020-07-01 00:18:32|
| 45450966|  2020-07-01T06:13:03| 93950417|Neispravna oprema...|2020-07-01 06:13:03|
| 45450975|  2020-07-01T06:23:56| 84248683|           Nepoznato|2020-07-01 06:23:56|
| 45451012|  2020-07-01T07:16:33| 46489303|Neispravna oprema...|2020-07-01 07:16:33|
| 45451016|  2020-07-01T07:21:50|110578838|Neispravna oprema...|2020-07-01 07:21:50|
+---------+---------------------+---------+--------------------+-------------------+
only showing top 5 rows

In [30]:
df_tickets_2 = df_tickets_2.select('TICKET_ID','START_DATE_TIME_DONAT','ASSET_ID','Uzrok_smetnje')
df_tickets_2 = df_tickets_2.withColumn('START_DATE_TICKET',F.unix_timestamp('START_DATE_TIME_DONAT', "yyyy-MM-dd HH:mm:ss").cast(types.TimestampType()))
df_tickets_2 = df_tickets_2.withColumn('START_DATE_TICKET',F.date_format(F.col("START_DATE_TICKET"), "yyy-MM-dd HH:mm:ss"))
df_tickets_2.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+---------------------+---------+--------------------+-------------------+
|TICKET_ID|START_DATE_TIME_DONAT| ASSET_ID|       Uzrok_smetnje|  START_DATE_TICKET|
+---------+---------------------+---------+--------------------+-------------------+
| 45910206|  2020-10-01 00:52:52| 44780554|      Grmljavina - 7|2020-10-01 00:52:52|
| 45912033|  2020-10-01 06:46:19|104882889|Neispravna oprema...|2020-10-01 06:46:19|
| 45912181|  2020-10-01 07:42:24| 47060490|        Blokada - 39|2020-10-01 07:42:24|
| 45912199|  2020-10-01 07:44:49| 67918605|      Grmljavina - 7|2020-10-01 07:44:49|
| 45912247|  2020-10-01 08:03:04| 45621657|      Grmljavina - 7|2020-10-01 08:03:04|
+---------+---------------------+---------+--------------------+-------------------+
only showing top 5 rows

In [31]:
df_tickets_3 = df_tickets_3.select('TICKET_ID','START_DATE_TIME_DONAT','ASSET_ID','Uzrok_smetnje')
df_tickets_3 = df_tickets_3.withColumn('START_DATE_TICKET',F.unix_timestamp('START_DATE_TIME_DONAT', "yyyy-MM-dd'T'HH:mm:ss").cast(types.TimestampType()))
df_tickets_3 = df_tickets_3.withColumn('START_DATE_TICKET',F.date_format(F.col("START_DATE_TICKET"), "yyy-MM-dd HH:mm:ss"))
df_tickets_3.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+---------------------+---------+--------------------+-------------------+
|TICKET_ID|START_DATE_TIME_DONAT| ASSET_ID|       Uzrok_smetnje|  START_DATE_TICKET|
+---------+---------------------+---------+--------------------+-------------------+
| 44533445|  2020-01-01T08:38:03| 80381835|Prekid; odvod; kr...|2020-01-01 08:38:03|
| 44533446|  2020-01-01T08:41:15| 44873887|Neispravna oprema...|2020-01-01 08:41:15|
| 44533451|  2020-01-01T08:54:18| 63048511|Neispravna oprema...|2020-01-01 08:54:18|
| 44533452|  2020-01-01T08:56:10|103753032|Neispravna oprema...|2020-01-01 08:56:10|
| 44533457|  2020-01-01T09:03:49| 62359651|Neispravna oprema...|2020-01-01 09:03:49|
+---------+---------------------+---------+--------------------+-------------------+
only showing top 5 rows

In [32]:
df_tickets_4 = df_tickets_4.select('TICKET_ID','START_DATE_TIME_DONAT','ASSET_ID','Uzrok_smetnje')
df_tickets_4 = df_tickets_4.withColumn('START_DATE_TICKET',F.unix_timestamp('START_DATE_TIME_DONAT', "yyyy-MM-dd'T'HH:mm:ss").cast(types.TimestampType()))
df_tickets_4 = df_tickets_4.withColumn('START_DATE_TICKET',F.date_format(F.col("START_DATE_TICKET"), "yyy-MM-dd HH:mm:ss"))
df_tickets_4.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+---------------------+---------+--------------------+-------------------+
|TICKET_ID|START_DATE_TIME_DONAT| ASSET_ID|       Uzrok_smetnje|  START_DATE_TICKET|
+---------+---------------------+---------+--------------------+-------------------+
| 46602934|  2021-01-01T01:15:38|104147316|Neispravna oprema...|2021-01-01 01:15:38|
| 46603675|  2021-01-01T08:19:32| 80580807|     Dotrajalost - 1|2021-01-01 08:19:32|
| 46603738|  2021-01-01T09:22:58| 53887615|Nije utvrðen uzro...|2021-01-01 09:22:58|
| 46603822|  2021-01-01T10:41:08|101461236|     Oksidacija - 13|2021-01-01 10:41:08|
| 46603829|  2021-01-01T10:45:18| 49508282|      Grmljavina - 7|2021-01-01 10:45:18|
+---------+---------------------+---------+--------------------+-------------------+
only showing top 5 rows

Here we are going to join the 4 tickets dataset

In [33]:
df_tickets = df_tickets_1.union(df_tickets_2)
df_tickets = df_tickets.union(df_tickets_3)
df_tickets = df_tickets.union(df_tickets_4)
df_tickets = df_tickets.dropDuplicates()
df_tickets = df_tickets.withColumnRenamed('Uzrok_smetnje', 'TICKET_TENTATIVE_ROOT_CAUSE')
df_tickets = df_tickets.drop('START_DATE_TIME_DONAT')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
df_tickets.select('ASSET_ID').distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

658190

Let's check the time range of the available tickets

In [35]:
df_tickets.describe().select('START_DATE_TICKET').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+
|  START_DATE_TICKET|
+-------------------+
|            1250315|
|               null|
|               null|
|2020-01-01 00:13:34|
|2021-04-30 23:57:23|
+-------------------+

Now the tickets is matching with the time range of `cpe replacement dataset`

# 5. `CPE` refurbishment data <a class="anchor" id="chapter5"></a>

Dictionary:

* _ID_ - internal ID, not relevant	
* _Serijski_broj_ = CPE serial number, use to connect to the replacement dataset	
* _SAP_sifra_ = unique for the type of the CPE	
* _Model_opreme_ = Cpe type
* _Garancija_ = warranty - yes / no / unknown / empty	
* _Iz_tablice_serijskih_brojeva_ = is it from ser. No. table	
* _Datum_kraja_garancije_ = end of warranty date	
* _Datum_ulaznog_skeniranja_ = entry scan date, when CPE came to refurbishment center, should be after fau*lt was closed
* _Broj_ponovljenih_skeniranja_ = no. of repeated scans - CPE was more than once in the refurbishment center	
* _Potrebna_detaljna_analiza_ = not relevant	
* _Ulazno_skeniranje_obavio_ = who did the scanning	
* _Izvor_opreme_ = code of the warehouse	
* _Hardverski_osteceno_ = hardware damange / physically broken	
* Status_Tehnicara = status / what to do 	
* _Status_tehnicara_odredio_	
* _Datum_statusa_tehnicara_ = date when status was entered	
* _Izlazno_skeniranje_obavio_ = scanned at the end	
* _Skladiste_izlaz_ = destination / where it went from here	
* _Datum_izlaznog_skeniranja_ = date of scanning when it went out of refurbishment center	
* _Vrsta_kvara_ = what was the issue with the CPE


Let's load the data and take a look

In [36]:
path_cpe_refurbishment = "hdfs://nameservicedev1//user/dt_srajan/predictive_care/replacement/CPE_refurbishment_2020_2021.csv"
df_cpe_refurbishment = spark.read.option("encoding", "ISO-8859-1").option("inferSchema", "true").csv(path_cpe_refurbishment,header = True,sep=",")
df_cpe_refurbishment.limit(1).show(200,truncate=False, vertical=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0---------------------------------------------------------------
 Datum_ulaznog_skeniranja    | 14/01/2020                               
 Izvor_opreme                | XI03                                     
 Serijski_broj               | j835bh004425                             
 SAP_sifra                   | 3410023305                               
 Model_opreme                | MAX WLAN Speedport Plus Sercomm          
 Datum_kraja_garancije       | 07/11/2020                               
 Broj_ponovljenih_skeniranja | 0                                        
 Hardverski_osteceno         | null                                     
 Datum_statusa_tehnicara     | 14/01/2020                               
 Status_Tehnicara            | Otpis                                    
 Vrsta kvara                 | Manje fiziÄko ili toplinsko oÅ¡teÄenje 
 Datum_izlaznog_skeniranja   | 14/01/2020                               
 Skladiste_izlaz             | Centralno skladiÅ¡te

In [37]:
from pyspark.sql.types import *

df_cpe_refurbishment = df_cpe_refurbishment.withColumn('DATE_SCAN_END',F.when(F.unix_timestamp('Datum_izlaznog_skeniranja', "dd/MM/yyyy").cast(TimestampType()).isNotNull (),\
                                                                      F.unix_timestamp('Datum_izlaznog_skeniranja', "dd/MM/yyyy").cast(TimestampType()))\
                                                                .otherwise(None))\
                                            .withColumn('DATE_SCAN_START',F.when(F.unix_timestamp('Datum_ulaznog_skeniranja', "dd/MM/yyyy").cast(TimestampType()).isNotNull (),\
                                                                      F.unix_timestamp('Datum_ulaznog_skeniranja', "dd/MM/yyyy").cast(TimestampType()))\
                                                                .otherwise(None))

df_cpe_refurbishment = df_cpe_refurbishment.withColumn('DATE_SCAN_END',F.date_format(F.col("DATE_SCAN_END"), "yyyy-MM-dd HH:mm:ss"))\
                                            .withColumn('DATE_SCAN_START',F.date_format(F.col("DATE_SCAN_START"), "yyyy-MM-dd HH:mm:ss"))

df_cpe_refurbishment = df_cpe_refurbishment.withColumnRenamed('Vrsta kvara', 'Vrsta_kvara')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
df_cpe_refurbishment.select('Datum_ulaznog_skeniranja','DATE_SCAN_START', "Datum_izlaznog_skeniranja", "DATE_SCAN_END").limit(3).show(200,truncate=False, vertical=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0----------------------------------------
 Datum_ulaznog_skeniranja  | 14/01/2020          
 DATE_SCAN_START           | 2020-01-14 00:00:00 
 Datum_izlaznog_skeniranja | 14/01/2020          
 DATE_SCAN_END             | 2020-01-14 00:00:00 
-RECORD 1----------------------------------------
 Datum_ulaznog_skeniranja  | 14/01/2020          
 DATE_SCAN_START           | 2020-01-14 00:00:00 
 Datum_izlaznog_skeniranja | 14/01/2020          
 DATE_SCAN_END             | 2020-01-14 00:00:00 
-RECORD 2----------------------------------------
 Datum_ulaznog_skeniranja  | 14/01/2020          
 DATE_SCAN_START           | 2020-01-14 00:00:00 
 Datum_izlaznog_skeniranja | 14/01/2020          
 DATE_SCAN_END             | 2020-01-14 00:00:00

In [39]:
df_cpe_refurbishment.select('DATE_SCAN_START',  "DATE_SCAN_END").describe().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------+-------------------+
|summary|    DATE_SCAN_START|      DATE_SCAN_END|
+-------+-------------------+-------------------+
|  count|              90920|              90920|
|   mean|               null|               null|
| stddev|               null|               null|
|    min|2020-01-02 00:00:00|2020-01-14 00:00:00|
|    max|2021-04-23 00:00:00|2021-04-30 00:00:00|
+-------+-------------------+-------------------+

In [40]:
# Old refurbishment data 
"""path_cpe_refurbishment = "hdfs://nameservicedev1//user/dt_srajan/predictive_care/replacement/cpe_refurbishment.csv"
df_cpe_refurbishment = spark.read.option("encoding", "ISO-8859-1").option("inferSchema", "true").csv(path_cpe_refurbishment,header = True,sep=";")
df_cpe_refurbishment.limit(1).show(200,truncate=False, vertical=True)

df_cpe_refurbishment = df_cpe_refurbishment.withColumn('DATE_SCAN_END',F.when(F.unix_timestamp('Datum_izlaznog_skeniranja', "dd.MM.yyyy HH:mm").cast(TimestampType()).isNotNull (),\
                                                                      F.unix_timestamp('Datum_izlaznog_skeniranja', "dd.MM.yyyy HH:mm").cast(TimestampType()))\
                                                                .otherwise(None))\
                                            .withColumn('DATE_SCAN_START',F.when(F.unix_timestamp('Datum_ulaznog_skeniranja', "dd.MM.yyyy HH:mm").cast(TimestampType()).isNotNull (),\
                                                                      F.unix_timestamp('Datum_ulaznog_skeniranja', "dd.MM.yyyy HH:mm").cast(TimestampType()))\
                                                                .otherwise(None))

df_cpe_refurbishment = df_cpe_refurbishment.withColumn('DATE_SCAN_END',F.date_format(F.col("DATE_SCAN_END"), "yyyy-MM-dd HH:mm:ss"))\
                                            .withColumn('DATE_SCAN_START',F.date_format(F.col("DATE_SCAN_START"), "yyyy-MM-dd HH:mm:ss"))

df_cpe_refurbishment.select('Datum_ulaznog_skeniranja','DATE_SCAN_START', "Datum_izlaznog_skeniranja", "DATE_SCAN_END").limit(3).show(200,truncate=False, vertical=True)

df_cpe_refurbishment.select('DATE_SCAN_START',  "DATE_SCAN_END").describe().show()"""

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'path_cpe_refurbishment = "hdfs://nameservicedev1//user/dt_srajan/predictive_care/replacement/cpe_refurbishment.csv"\ndf_cpe_refurbishment = spark.read.option("encoding", "ISO-8859-1").option("inferSchema", "true").csv(path_cpe_refurbishment,header = True,sep=";")\ndf_cpe_refurbishment.limit(1).show(200,truncate=False, vertical=True)\n\ndf_cpe_refurbishment = df_cpe_refurbishment.withColumn(\'DATE_SCAN_END\',F.when(F.unix_timestamp(\'Datum_izlaznog_skeniranja\', "dd.MM.yyyy HH:mm").cast(TimestampType()).isNotNull (),                                                                      F.unix_timestamp(\'Datum_izlaznog_skeniranja\', "dd.MM.yyyy HH:mm").cast(TimestampType()))                                                                .otherwise(None))                                            .withColumn(\'DATE_SCAN_START\',F.when(F.unix_timestamp(\'Datum_ulaznog_skeniranja\', "dd.MM.yyyy HH:mm").cast(TimestampType()).isNotNull (),                                                    

Let's filter where issue type is NULL

In [41]:
df_cpe_refurbishment = df_cpe_refurbishment.filter(df_cpe_refurbishment.Vrsta_kvara != 'NULL')


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Lets translate the issue_type to english

In [42]:
df_cpe_refurbishment = df_cpe_refurbishment.withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'Napajanje', 'power_supply')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'ZnaÄajno fiziÄko oÅ¡teÄenje - nepopravljivo', 'significant_physical_damage_irreparable')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'Ne prijavljuje se u ACS', 'does_not_log_into_acs')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'Reset ne radi', 'reset_not_work')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'Manje fiziÄko ili toplinsko oÅ¡teÄenje', 'minor_physical_thermal_damage')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'Ispravan', 'no_problem')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'OptiÄki port', 'optical_port')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'SIM utor', 'sim_slot')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'WIFI', 'wifi')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'Blokiran', 'blocked')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'Resetira se sam', 'reset_itself')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'Internet ne radi', 'internet_not_working')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'DSL port', 'dsl_port')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'Pregrijavanje ureÄaja', 'device_overheating')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'LAN port', 'lan_port')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'Slaba brzina', 'poor_speed')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'Telefon port', 'telefon_port')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'Software', 'software'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [43]:
df_cpe_refurbishment.select('Vrsta_kvara').distinct().show(100, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------------------------------------+
|Vrsta_kvara                                   |
+----------------------------------------------+
|telefon_port/ internet_not_working            |
|does_not_log_into_acs/ lan_port               |
|no_problem                                    |
|telefon_port/ lan_port                        |
|minor_physical_thermal_damage                 |
|telefon_port/ power_supply/ dsl_port          |
|lan_port/ dsl_port                            |
|reset_itself/ does_not_log_into_acs           |
|reset_itself/ dsl_port                        |
|wifi                                          |
|does_not_log_into_acs                         |
|minor_physical_thermal_damage/ lan_port       |
|reset_itself/ power_supply                    |
|device_overheating/ lan_port                  |
|software                                      |
|does_not_log_into_acs/ blocked                |
|wifi/ telefon_port                            |
|reset_not_work/ dev

Finally, let's rename some columns, alter space in issues and select the columns of interest. 

In [44]:
df_cpe_refurbishment = df_cpe_refurbishment.withColumnRenamed('Vrsta_kvara', 'CPE_ISSUE')\
                                            .withColumnRenamed('Serijski_broj', 'CPE_SERIAL_NUMBER')
                                            



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [45]:
spaceDeleteUDF = F.udf(lambda s: s.replace("/ ", "/"), StringType())
df_cpe_refurbishment = df_cpe_refurbishment.withColumn("CPE_ISSUE", spaceDeleteUDF("CPE_ISSUE"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [46]:
df_cpe_refurbishment = df_cpe_refurbishment.select('CPE_SERIAL_NUMBER', 'DATE_SCAN_START','DATE_SCAN_END','CPE_ISSUE')
df_cpe_refurbishment.limit(3).show(200,truncate=False, vertical=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0------------------------------------------
 CPE_SERIAL_NUMBER | j835bh004425                  
 DATE_SCAN_START   | 2020-01-14 00:00:00           
 DATE_SCAN_END     | 2020-01-14 00:00:00           
 CPE_ISSUE         | minor_physical_thermal_damage 
-RECORD 1------------------------------------------
 CPE_SERIAL_NUMBER | J833BH004104                  
 DATE_SCAN_START   | 2020-01-14 00:00:00           
 DATE_SCAN_END     | 2020-01-14 00:00:00           
 CPE_ISSUE         | minor_physical_thermal_damage 
-RECORD 2------------------------------------------
 CPE_SERIAL_NUMBER | J902BH013132                  
 DATE_SCAN_START   | 2020-01-14 00:00:00           
 DATE_SCAN_END     | 2020-01-14 00:00:00           
 CPE_ISSUE         | minor_physical_thermal_damage

# 6. Join `CPE` refurbishment, tickets and replacement data <a class="anchor" id="chapter6"></a>


First we are going to join `df_tickets` and `df_cpe_replacement` based on the fields `TICKET_ID` and `ASSET_ID`

In [47]:
df_tickets.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- TICKET_ID: string (nullable = true)
 |-- ASSET_ID: string (nullable = true)
 |-- TICKET_TENTATIVE_ROOT_CAUSE: string (nullable = true)
 |-- START_DATE_TICKET: string (nullable = true)

`START_DATE_TICKET`- date user called customer care

In [48]:
df_cpe_replacement.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- TICKET_ID: integer (nullable = true)
 |-- ASSET_ID: integer (nullable = true)
 |-- CPE_SERIAL_NUMBER: string (nullable = true)

In [49]:
df_cpe_replacement_tickets = df_cpe_replacement.join(df_tickets, on=['TICKET_ID','ASSET_ID'], how='inner')


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [50]:
df_cpe_replacement_tickets.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- TICKET_ID: integer (nullable = true)
 |-- ASSET_ID: integer (nullable = true)
 |-- CPE_SERIAL_NUMBER: string (nullable = true)
 |-- TICKET_TENTATIVE_ROOT_CAUSE: string (nullable = true)
 |-- START_DATE_TICKET: string (nullable = true)

In [51]:
df_cpe_replacement_tickets = df_cpe_replacement_tickets.na.drop(subset=["CPE_SERIAL_NUMBER","START_DATE_TICKET"]) 


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [52]:
df_cpe_replacement_tickets.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- TICKET_ID: integer (nullable = true)
 |-- ASSET_ID: integer (nullable = true)
 |-- CPE_SERIAL_NUMBER: string (nullable = true)
 |-- TICKET_TENTATIVE_ROOT_CAUSE: string (nullable = true)
 |-- START_DATE_TICKET: string (nullable = true)

Let's take a look how the joied DF looks like

In [53]:
df_cpe_replacement_tickets.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+--------+-----------------+---------------------------+-------------------+
|TICKET_ID|ASSET_ID|CPE_SERIAL_NUMBER|TICKET_TENTATIVE_ROOT_CAUSE|  START_DATE_TICKET|
+---------+--------+-----------------+---------------------------+-------------------+
| 45921147|50661075|       J537018077|             Grmljavina - 7|2020-10-02 10:23:27|
| 45922430|47121661| FYN6R19309909486|       Prekid; odvod; kr...|2020-10-02 12:37:57|
| 45924364|28383510|      CP1040NT1PT|             Grmljavina - 7|2020-10-02 19:07:18|
| 45931074|89449348|       J730007258|             Grmljavina - 7|2020-10-05 09:21:44|
| 45952326|77890535|       J724006004|       Prekid; odvod; kr...|2020-10-08 14:46:36|
+---------+--------+-----------------+---------------------------+-------------------+
only showing top 5 rows

What about how many unique `ASSET_ID` we get

In [54]:
df_cpe_replacement_tickets.select('ASSET_ID').distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

86891

Let's joing with `df_cpe_refurbishment` dataset on `CPE_SERIAL_NUMBER` field.

In [55]:
df_cpe_refurbishment.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- CPE_SERIAL_NUMBER: string (nullable = true)
 |-- DATE_SCAN_START: string (nullable = true)
 |-- DATE_SCAN_END: string (nullable = true)
 |-- CPE_ISSUE: string (nullable = true)

In [56]:
df_cpe_replacement_tickets.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- TICKET_ID: integer (nullable = true)
 |-- ASSET_ID: integer (nullable = true)
 |-- CPE_SERIAL_NUMBER: string (nullable = true)
 |-- TICKET_TENTATIVE_ROOT_CAUSE: string (nullable = true)
 |-- START_DATE_TICKET: string (nullable = true)

In [57]:
df_cpe_replace_scan = df_cpe_replacement_tickets.join(df_cpe_refurbishment, on=['CPE_SERIAL_NUMBER'], how="inner")

df_cpe_replace_scan.limit(1).show(200,truncate=False, vertical=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0------------------------------------------
 CPE_SERIAL_NUMBER           | J533008527          
 TICKET_ID                   | 46064204            
 ASSET_ID                    | 48337008            
 TICKET_TENTATIVE_ROOT_CAUSE | Grmljavina - 7      
 START_DATE_TICKET           | 2020-10-26 17:11:38 
 DATE_SCAN_START             | 2020-12-08 00:00:00 
 DATE_SCAN_END               | 2021-01-26 00:00:00 
 CPE_ISSUE                   | no_problem

In [58]:
df_cpe_replace_scan.select('ASSET_ID').distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

27458

Now, we need to select only the cases whene the `DATE_SCAN_END` happend after `DATE_TICKET` in other to filterout the cpes that were previosly in the  refurbishment center. 

In [62]:
df_cpe_replace_scan = df_cpe_replace_scan.filter(F.col('DATE_SCAN_END')>F.col('START_DATE_TICKET'))\
                                        .filter(F.col('DATE_SCAN_START')>F.col('START_DATE_TICKET'))
df_cpe_replace_scan.limit(1).show(200,truncate=False, vertical=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0---------------------------------------------
 CPE_SERIAL_NUMBER           | J820005367             
 TICKET_ID                   | 45469275               
 ASSET_ID                    | 78851136               
 TICKET_TENTATIVE_ROOT_CAUSE | Neispravna oprema - 33 
 START_DATE_TICKET           | 2020-07-03 12:03:04    
 DATE_SCAN_START             | 2020-10-27 00:00:00    
 DATE_SCAN_END               | 2020-12-14 00:00:00    
 CPE_ISSUE                   | power_supply/dsl_port

In [63]:
df_cpe_replace_scan.select('ASSET_ID').distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

26039

Now to avoid `cpes` with multiple tickets we are going to filter out when a `cpe` has more than one replacement entry.

In [64]:
df_cpe_replace_scan = df_cpe_replace_scan.dropDuplicates(["ASSET_ID"])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [65]:
df_cpe_replace_scan.select('ASSET_ID').distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

26039

We can now count how many valid cpe replacement cases we have.

In [66]:
df_cpe_replace_scan.select('START_DATE_TICKET','DATE_SCAN_START','DATE_SCAN_END').describe().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------+-------------------+-------------------+
|summary|  START_DATE_TICKET|    DATE_SCAN_START|      DATE_SCAN_END|
+-------+-------------------+-------------------+-------------------+
|  count|              26039|              26039|              26039|
|   mean|               null|               null|               null|
| stddev|               null|               null|               null|
|    min|2020-01-01 08:38:03|2020-01-10 00:00:00|2020-02-25 00:00:00|
|    max|2021-03-30 14:45:44|2021-04-12 00:00:00|2021-04-30 00:00:00|
+-------+-------------------+-------------------+-------------------+

Test

In [67]:
df_cpe_replace_scan_stat = df_cpe_replace_scan.groupBy('CPE_ISSUE').agg(F.count('CPE_ISSUE').alias('CPE_ISSUE_COUNT'))
df_cpe_replace_scan_stat = df_cpe_replace_scan_stat.withColumn('CPE_ISSUE_PECENT', 100*F.col('CPE_ISSUE_COUNT')/df_cpe_replace_scan.count())
df_cpe_replace_scan_stat.sort(F.col("CPE_ISSUE_PECENT").desc()).show(100, truncate = False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------------------------------+---------------+--------------------+
|CPE_ISSUE                                  |CPE_ISSUE_COUNT|CPE_ISSUE_PECENT    |
+-------------------------------------------+---------------+--------------------+
|no_problem                                 |13371          |51.349898229578706  |
|dsl_port                                   |3904           |14.992895272475902  |
|power_supply                               |3697           |14.197933868428127  |
|telefon_port                               |1471           |5.649218479972349   |
|minor_physical_thermal_damage              |1238           |4.754406851261569   |
|reset_itself                               |616            |2.365682245861976   |
|power_supply/dsl_port                      |345            |1.3249356734129576  |
|lan_port                                   |264            |1.0138638196551326  |
|significant_physical_damage_irreparable    |207            |0.7949614040477745  |
|doe

In [68]:
df_cpe_replace_scan_no_thunder = df_cpe_replace_scan.filter(F.col('TICKET_TENTATIVE_ROOT_CAUSE')!='Grmljavina - 7')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [69]:
df_cpe_replace_scan_no_thunder.select('ASSET_ID').distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

16106

In [70]:
df_cpe_replace_scan_stat = df_cpe_replace_scan_no_thunder.groupBy('CPE_ISSUE').agg(F.count('CPE_ISSUE').alias('CPE_ISSUE_COUNT'))
df_cpe_replace_scan_stat = df_cpe_replace_scan_stat.withColumn('CPE_ISSUE_PECENT', 100*F.col('CPE_ISSUE_COUNT')/df_cpe_replace_scan.count())
df_cpe_replace_scan_stat.sort(F.col("CPE_ISSUE_PECENT").desc()).show(100, truncate = False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------------------------+---------------+--------------------+
|CPE_ISSUE                              |CPE_ISSUE_COUNT|CPE_ISSUE_PECENT    |
+---------------------------------------+---------------+--------------------+
|no_problem                             |11264          |43.258189638618994  |
|power_supply                           |1095           |4.205230615615039   |
|dsl_port                               |1065           |4.090018817926956   |
|minor_physical_thermal_damage          |1056           |4.0554552786205305  |
|telefon_port                           |663            |2.54618072890664    |
|reset_itself                           |237            |0.9101732017358578  |
|wifi                                   |153            |0.5875801682092247  |
|lan_port                               |131            |0.5030915165712969  |
|significant_physical_damage_irreparable|93             |0.35715657283305813 |
|does_not_log_into_acs                  |92         

Now, we can count how number of issues per type tha we can uses later in our training

# 7. Building dataset independet of `CPE` replacement events - healthy dataset. <a class="anchor" id="chapter7"></a>

In order to have control of posible class imbalance if we train with all available `cpes` we are goning to write a funtion to create a dataset with specific number of `cpes` which will not present in the `cpe replamente list`.

> **Warnning:** when dealing with an imbalanced dataset, if classes are not well separable with the given variables and if our goal is to get the best possible accuracy, the best classifier can be a “naive” one that always answer the majority class. Resampling methods can be used but have to be thought carefully: they should not be used as stand alone solutions but have to be coupled with a rework of the problem to serve a specific goal. But reworking the problem itself is often the best way to tackle an imbalanced classes problem: the classifier and the decision rule have to be set with respect to a well chosen goal that can be, for example, minimising a cost.

Here we are going to create a black list of `CPE` which are not associeted to `cpe replacemenet`

In [71]:
df_tickets_per_month = df_tickets.withColumn('START_DATE_TICKET_MONTH',F.month('START_DATE_TICKET')).withColumn('START_DATE_TICKET_YEAR',F.year('START_DATE_TICKET'))
df_group_tickets = df_tickets_per_month.groupBy('START_DATE_TICKET_MONTH', 'START_DATE_TICKET_YEAR').agg(F.countDistinct("ASSET_ID"))
df_group_tickets = df_group_tickets.dropna()
df_group_tickets.orderBy('START_DATE_TICKET_MONTH').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------+----------------------+------------------------+
|START_DATE_TICKET_MONTH|START_DATE_TICKET_YEAR|count(DISTINCT ASSET_ID)|
+-----------------------+----------------------+------------------------+
|                      1|                  2020|                   59083|
|                      1|                  2021|                   63646|
|                      2|                  2021|                   54704|
|                      2|                  2020|                   59158|
|                      3|                  2021|                   58819|
|                      3|                  2020|                   71186|
|                      4|                  2021|                   53476|
|                      4|                  2020|                   60142|
|                      5|                  2020|                   64201|
|                      6|                  2020|                   71707|
|                      7|             

Now, we are going to select the data from `cpes` that were not reported in the `replacement list`.
>**Note:** it is important to take into account that the only dates of with data in the all the required tables are:


| Hive Table / Dataset | Start date | End date |
| :- | -: | :-: |
| cdl_acscoll_wifi_params_prq | 2019-03-07 | 2020-09-04 |
| cdl_acscoll_xdsl_line_prq | 2020-01-01 | 2020-10-28 |
| tickets | 2020-06-0 | 2020-12-2 |
| scan start | 2020-01-08 | 2021-01-25 |
| scan end | 2020-06-18 | 2021-01-29 |

From all 3 tables in gather_date we have data from Jan 2020 to April 2021 as like tickets data

In [72]:
start_time, end_time = '2020-01-01', '2021-04-30' #jan 2020 to april 2021
df_stats  = gather_data (start_time, end_time,spark)

df_cpe_blacklist = df_tickets.select("ASSET_ID").distinct()
df_cpe_blacklist= df_cpe_blacklist.withColumnRenamed('ASSET_ID','assetid')

df_cpe_healthy = df_stats.join(df_cpe_blacklist, on=['assetid'], how='left_anti')


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- assetid: integer (nullable = true)
 |-- date: date (nullable = true)

('df_key - printSchema', None)

In [73]:
df_cpe_healthy = df_cpe_healthy.withColumn('assetid', F.col('assetid').cast(types.StringType()))
df_cpe_healthy = df_cpe_healthy.withColumn('datum',  F.to_date(F.unix_timestamp('datum', 'yyyy-MM-dd').cast('timestamp')))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

how many unique Healthy `cpes` we have from Jan 202 to Apr 2021?

In [74]:
df_cpe_healthy.select('assetid').distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

250597

now we can select 10K of those CPEs

In [75]:
cpe_healty_unique_10k = df_cpe_healthy.select('assetid').distinct().limit(20000)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [76]:
cpe_healty_unique_10k.write.format("parquet").mode('overwrite').option("header","true").save("hdfs://nameservicedev1//user/dt_srajan/cpe_healty_unique_20k_new")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [77]:
cpe_healty_unique_10k = spark.read.parquet('hdfs://nameservicedev1//user/dt_srajan/cpe_healty_unique_20k_new')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Join with df_cpe_healthy to get all information 

In [78]:
df_cpe_healthy_10k = cpe_healty_unique_10k.join(df_cpe_healthy, on=['assetid'], how='inner')


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [79]:
df_cpe_healthy_10k.select('assetid').distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

20000

#### Get Random Start date and end date Test

In [80]:
from datetime import timedelta, date

def daterange(date1, date2):
    for n in range(int ((date2 - date1).days)+1):
        yield date1 + timedelta(n)
        

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [81]:
# Enter the range of dates available for selecting end date - so lets start feom Feb 2, 2020 as we tae data for 32 days from Jan 2020
start_dt = date(2020, 2, 2)
end_dt = date(2021, 4, 30)
lst=[]
for dt in daterange(start_dt, end_dt):
    new_dt = dt.strftime("%Y-%m-%d")
    lst.append(new_dt)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Get start_date

In [82]:
def generate_start_date(end_date,n_days):
    """ Function to define start time and end time for the query
    Args:
        enddate (int): end date for data load in days.
        n_days (int): days
    Returns:
      {
      start_date(object): endtime of query,
      }
    """

    end_date_1 = datetime.strptime(end_date, "%Y-%m-%d").date()
    start_date = (end_date_1 - timedelta(days=n_days))
    start_date = start_date.strftime("%Y-%m-%d")
    return start_date

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Assign the distinct cpe id to pandas df to get start and en in df

In [83]:
healthy = df_cpe_healthy_10k.select('assetid').distinct().toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [84]:
healthy['end_date'] = healthy.apply(lambda x: np.random.choice(lst),axis=1)
healthy['start_date'] = healthy.apply(lambda x: generate_start_date(x['end_date'],31),axis=1) # 32 days

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [85]:
healthy.head(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

     assetid    end_date  start_date
0   37323317  2020-11-28  2020-10-28
1   43434736  2021-01-01  2020-12-01
2   66811827  2021-02-05  2021-01-05
3   95041058  2020-02-06  2020-01-06
4  104098584  2020-09-29  2020-08-29

In [86]:
healthy['assetid'] = healthy['assetid'].astype('string')
healthy['end_date'] = healthy['end_date'].astype('string')
healthy['start_date'] = healthy['start_date'].astype('string')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [87]:
healthy.dtypes

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

assetid       object
end_date      object
start_date    object
dtype: object

In [88]:
mySchema = StructType([ StructField("assetid", StringType(), True)\
                        ,StructField("end_date", StringType(), True)\
                        ,StructField("start_date", StringType(), True)
                      
                      ])


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [89]:
df_healthy = spark.createDataFrame(healthy, schema = mySchema)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [90]:
df_healthy.write.format("parquet").mode('overwrite').option("header","true").save("hdfs://nameservicedev1//user/dt_srajan/df_healthy_new")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [91]:
df_healthy = spark.read.parquet('hdfs://nameservicedev1//user/dt_srajan/df_healthy_new')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [92]:
df_healthy.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- assetid: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- start_date: string (nullable = true)

In [93]:
df_healthy.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

20000

In [94]:
df_healthy = df_healthy.withColumn('start_date', F.col('start_date').cast(types.DateType()))
df_healthy = df_healthy.withColumn('end_date', F.col('end_date').cast(types.DateType()))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [95]:
spark.udf.register("generate_date_series", generate_date_series, types.ArrayType(types.DateType()) )
df_healthy.createOrReplaceTempView("keydf")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [96]:
df_date_expanded = spark.sql('select *, explode(generate_date_series(start_date, end_date)) as date FROM keydf ')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [97]:
#df_date_expanded.show(40)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [98]:
df_date_expanded = df_date_expanded.select('assetid', 'date').withColumnRenamed('date', 'datum')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [99]:
df_date_expanded.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- assetid: string (nullable = true)
 |-- datum: date (nullable = true)

In [100]:
df_date_expanded = df_date_expanded.join(df_cpe_healthy_10k, ['assetid', 'datum'], how = 'inner')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [101]:
df_date_expanded.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

640000

In [102]:
20000*32

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

640000

In [103]:
df_date_expanded.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- assetid: string (nullable = true)
 |-- datum: date (nullable = true)
 |-- dslam_distinct_modulation: long (nullable = true)
 |-- dslam_no_of_dominant_modulation: long (nullable = true)
 |-- dslam_code_of_dominant_modulation: float (nullable = true)
 |-- dslam_avg_bitrate_us: double (nullable = true)
 |-- dslam_avg_bitrate_ds: double (nullable = true)
 |-- dslam_avg_attenuation_ds: double (nullable = true)
 |-- dslam_avg_attenuation_us: double (nullable = true)
 |-- dslam_avg_power_us: double (nullable = true)
 |-- dslam_avg_power_ds: double (nullable = true)
 |-- dslam_avg_att_bitrate_us: double (nullable = true)
 |-- dslam_avg_att_bitrate_ds: double (nullable = true)
 |-- dslam_avg_snr_us: double (nullable = true)
 |-- dslam_avg_snr_ds: double (nullable = true)
 |-- dslam_no_counts: long (nullable = true)
 |-- dslam_avg_bandline_ds: double (nullable = true)
 |-- dslam_avg_bandline_us: double (nullable = true)
 |-- dslam_sum_cv_us: double (nullable = true)
 |-- dslam_count_cv

In [104]:
df_date_expanded.write.format("parquet").mode('overwrite').option("header","true").save("hdfs://nameservicedev1//user/dt_srajan/predictive_care/cpe_replacement/data_w32_healthy/df_healty_sel_features_20k_new")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 8. Building dataset related with `CPE` replacement events - issue dataset .
<a class="anchor" id="chapter8"></a>

Now we want to build a dataset which contains all the `cpes` which were replaced. But it has a mayor challenge we need to filter in a fix window (e.g one month before the issue raise).

Now, we are going to select the data from `cpes` that were reported in the `replacement list`.
>**Note:** it is important to take into account that the only dates of with data in the all the required tables are:


| Hive Table / Dataset | Start date | End date |
| :- | -: | :-: |
| cdl_acscoll_wifi_params_prq | 2019-03-07 | 2020-09-04 |
| cdl_acscoll_xdsl_line_prq | 2020-01-01 | 2020-10-28 |
| tickets | 2020-06-0 | 2020-12-2 |
| scan start | 2020-01-08 | 2021-01-25 |
| scan end | 2020-06-18 | 2021-01-29 |

In this case  the constrain is `cdl_acscoll_wifi_params_prq` becasuse it only has data until `2020-09-04`. Therefore, we only can work with tickets that were raise until `2020-09-04`.

In [105]:
df_cpe_replace_scan.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- CPE_SERIAL_NUMBER: string (nullable = true)
 |-- TICKET_ID: integer (nullable = true)
 |-- ASSET_ID: integer (nullable = true)
 |-- TICKET_TENTATIVE_ROOT_CAUSE: string (nullable = true)
 |-- START_DATE_TICKET: string (nullable = true)
 |-- DATE_SCAN_START: string (nullable = true)
 |-- DATE_SCAN_END: string (nullable = true)
 |-- CPE_ISSUE: string (nullable = true)

In other words it means that we need to filter `df_cpe_replace_scan` by selecting the recods where `DATE_TICKET` is older than `2020-09-04`. 
let's do it!

In [110]:
df_cpe_repl_flt = df_cpe_replace_scan.filter(F.col("START_DATE_TICKET")<"2021-04-30")\
                                    .filter(F.col("START_DATE_TICKET")>"2020-02-02")# we only found  data after 2020-05-14 then if window is 30 days

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 8.1 Creating Windowing <a class="anchor" id="section_8_1"></a>

Great! now we need to define a `history_window` that represents the amount of historical data we are use to predict the event `cpe damage` (excluding those cpes that were replace but no issue was found). Also we need to set `prediction_gap` that reflect how much time in advance we want to do our prediction (e.g one week a head). 

In [111]:
history_window = 32
prediction_gap = 3

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Now, let's define some `UDF` to calculate the `WINDOW_START` and `WINDOW_END` with respect to `DATE_TICKET`.

In [112]:
from pyspark.sql.functions import udf

def get_window_end(event_dt, prediction_gap):
    """Function to extrat the prediction gap to the date in which the ticket was raise.
    
    Arg:
        event_dt (str): Date in which the ticket was raise
        prediction_gap (int): Predction gap in days.
    
    Returns:
        str: date of the window end.
    """
    event_dt = datetime.strptime(str(event_dt), "%Y-%m-%d %H:%M:%S").date()
    end_time_dt = (event_dt - timedelta(days=prediction_gap+1))
    end_time = end_time_dt.strftime("%Y-%m-%d %H:%M:%S")
    return end_time


def get_window_start(event_dt, window_plus_gap):
    """Function to extrat the prediction gap plus the window of historical data
    to the date in which the ticket was raised.
    
    Arg:
        event_dt (str): Date in which the ticket was raise
        window_plus_gap (int): Prediction gap plus window of historical data in days.
    
    Returns:
        str: date of the window start.
    """
    event_dt = datetime.strptime(str(event_dt), "%Y-%m-%d %H:%M:%S").date()
    start_time_dt = (event_dt - timedelta(days=window_plus_gap))
    start_time = start_time_dt.strftime("%Y-%m-%d %H:%M:%S")
    return start_time

def udf_window_end(prediction_gap):
    """User Defined Funtion to calculate the window end with respect to the
    date in which the ticket was raised"""
    return F.udf(lambda l: get_window_end(l,prediction_gap))


def udf_window_start(window_plus_gap):
    """User Defined Funtion to calculate the window start with respect to the
    date in which the ticket was raised"""
    return F.udf(lambda l: get_window_start(l,window_plus_gap))


df_cpe_repl_flt = df_cpe_repl_flt.withColumn("WINDOW_END", udf_window_end(prediction_gap)(F.col("START_DATE_TICKET")))\
                                .withColumn("WINDOW_START", udf_window_start(prediction_gap+history_window)(F.col("START_DATE_TICKET")))

df_cpe_repl_flt.select('START_DATE_TICKET', 'WINDOW_START', 'WINDOW_END').show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+-------------------+-------------------+
|  START_DATE_TICKET|       WINDOW_START|         WINDOW_END|
+-------------------+-------------------+-------------------+
|2020-08-29 13:51:36|2020-07-25 00:00:00|2020-08-25 00:00:00|
|2020-10-09 08:39:58|2020-09-04 00:00:00|2020-10-05 00:00:00|
|2020-06-26 20:44:39|2020-05-22 00:00:00|2020-06-22 00:00:00|
|2020-07-02 11:09:45|2020-05-28 00:00:00|2020-06-28 00:00:00|
|2020-08-18 13:59:42|2020-07-14 00:00:00|2020-08-14 00:00:00|
+-------------------+-------------------+-------------------+
only showing top 5 rows

In [113]:
df_cpe_repl_flt.write.format("parquet").mode('overwrite').option("header","true").save("hdfs://nameservicedev1//user/dt_srajan/predictive_care/cpe_replacement/data_w32_g3/df_cpe_repl_flt_new")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 8.2 Join `CPE` replacement  with history data <a class="anchor" id="section_8_2"></a>

Let' Joins teh data and apply the respective windowing and filter thunderstorm

In [114]:
start_time, end_time = '2020-01-01', '2021-04-30'
df_stats  = gather_data (start_time, end_time,spark)
df_stats.write.format("parquet").mode('overwrite').option("header","true").save("hdfs://nameservicedev1//user/dt_srajan/predictive_care/cpe_replacement/data_w32_g3/df_stats_new")



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- assetid: integer (nullable = true)
 |-- date: date (nullable = true)

('df_key - printSchema', None)

In [115]:
df_cpe_repl_flt =  spark.read.parquet("hdfs://nameservicedev1//user/dt_srajan/predictive_care/cpe_replacement/data_w32_g3/df_cpe_repl_flt_new")
df_stats = spark.read.parquet("hdfs://nameservicedev1//user/dt_srajan/predictive_care/cpe_replacement/data_w32_g3/df_stats_new")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [116]:
df_cpe_repl_flt.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- CPE_SERIAL_NUMBER: string (nullable = true)
 |-- TICKET_ID: integer (nullable = true)
 |-- ASSET_ID: integer (nullable = true)
 |-- TICKET_TENTATIVE_ROOT_CAUSE: string (nullable = true)
 |-- START_DATE_TICKET: string (nullable = true)
 |-- DATE_SCAN_START: string (nullable = true)
 |-- DATE_SCAN_END: string (nullable = true)
 |-- CPE_ISSUE: string (nullable = true)
 |-- WINDOW_END: string (nullable = true)
 |-- WINDOW_START: string (nullable = true)

In [146]:
#df_stats.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [117]:
df_issues  = df_cpe_repl_flt.join(df_stats, [(df_cpe_repl_flt.ASSET_ID==df_stats.assetid)], how='inner')#df_stats.crm_assetId


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [118]:
df_issues = df_issues.withColumn('WINDOW_END',F.unix_timestamp(F.col("WINDOW_END"), "yyyy-MM-dd HH:mm:ss").cast(TimestampType())) \
                     .withColumn('WINDOW_START',F.unix_timestamp(F.col("WINDOW_START"), "yyyy-MM-dd HH:mm:ss").cast(TimestampType())) \
                     .withColumn('datum',F.unix_timestamp(F.col("datum"), "yyyy-MM-dd").cast(TimestampType()))
        
df_issues = df_issues.filter(F.col("WINDOW_START")<=F.col("datum"))\
                        .filter(F.col("WINDOW_END")>= F.col("datum")) 

df_issues = df_issues.filter(F.col('TICKET_TENTATIVE_ROOT_CAUSE')!='Grmljavina - 7')
df_issues = df_issues.dropDuplicates()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

It looks great! lets save the Dataframe as parquet!

In [119]:
df_issues.write.format("parquet").mode('overwrite').option("header","true").save("hdfs://nameservicedev1//user/dt_srajan/predictive_care/cpe_replacement/data_w32_g3/df_issues_Jan2020_Apr2021_new")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 9. Combine healthy +  issue CPE's and label them <a class="anchor" id="chapter9"></a>

## 9.1 Read CPE's  with issue and healthy <a class="anchor" id="section_9_1"></a>

In [120]:
df_issues = spark.read.parquet('hdfs://nameservicedev1//user/dt_srajan/predictive_care/cpe_replacement/data_w32_g3/df_issues_Jan2020_Apr2021_new')

df_healthy = spark.read.parquet("hdfs://nameservicedev1//user/dt_srajan/predictive_care/cpe_replacement/data_w32_healthy/df_healty_sel_features_20k_new")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [121]:
df_issues.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- CPE_SERIAL_NUMBER: string (nullable = true)
 |-- TICKET_ID: integer (nullable = true)
 |-- ASSET_ID: integer (nullable = true)
 |-- TICKET_TENTATIVE_ROOT_CAUSE: string (nullable = true)
 |-- START_DATE_TICKET: string (nullable = true)
 |-- DATE_SCAN_START: string (nullable = true)
 |-- DATE_SCAN_END: string (nullable = true)
 |-- CPE_ISSUE: string (nullable = true)
 |-- WINDOW_END: timestamp (nullable = true)
 |-- WINDOW_START: timestamp (nullable = true)
 |-- assetid: integer (nullable = true)
 |-- dslam_distinct_modulation: long (nullable = true)
 |-- dslam_no_of_dominant_modulation: long (nullable = true)
 |-- dslam_code_of_dominant_modulation: float (nullable = true)
 |-- dslam_avg_bitrate_us: double (nullable = true)
 |-- dslam_avg_bitrate_ds: double (nullable = true)
 |-- dslam_avg_attenuation_ds: double (nullable = true)
 |-- dslam_avg_attenuation_us: double (nullable = true)
 |-- dslam_avg_power_us: double (nullable = true)
 |-- dslam_avg_power_ds: double (nullable = 

In [122]:
df_issues.select('asset_id').distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

13853

In [275]:
#a = df_issues.groupby('asset_id').agg(F.count('asset_id').alias('cnt'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [281]:
#a = a.filter(a.cnt < 32).select('asset_id', 'cnt')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [282]:
#a.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+---+
|asset_id|cnt|
+--------+---+
|32075047| 29|
|77081221| 31|
|87525043| 31|
|61615062| 29|
|46717835| 31|
|66094686| 30|
|84293478| 30|
+--------+---+

In [124]:
print(len(df_issues.columns))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

118

## 9.2 Add dummy columns to df_healthy <a class="anchor" id="section_9_2"></a>

We are adding this to make sure the columns are aligned with issues. Thihs is required as we will union healthy and issues 
dataframe later. Also create a lable `healthy` under `CPE_ISSUE` type for healthy cpes. This is label for ml model

In [125]:
from pyspark.sql.functions import lit
df_healthy = df_healthy.withColumn('CPE_SERIAL_NUMBER', lit(None).cast(types.StringType()))\
                        .withColumn('TICKET_ID', lit(None).cast(types.IntegerType()))\
                        .withColumn('ASSET_ID', lit(None).cast(types.IntegerType()))\
                        .withColumn('TICKET_TENTATIVE_ROOT_CAUSE', lit(None).cast(types.StringType()))\
                        .withColumn('START_DATE_TICKET', lit(None).cast(types.StringType()))\
                        .withColumn('DATE_SCAN_START', lit(None).cast(types.StringType()))\
                        .withColumn('DATE_SCAN_END', lit(None).cast(types.StringType()))\
                        .withColumn('CPE_ISSUE', lit('healthy').cast(types.StringType()))\
                        .withColumn('WINDOW_END', lit(None).cast(types.StringType()))\
                        .withColumn('WINDOW_START', lit(None).cast(types.StringType()))
                    

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [126]:
print(len(df_healthy.columns))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

118

In [127]:
df_healthy = df_healthy.select(df_issues.columns)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 9.3 Concatenate issue and healthy CPE's <a class="anchor" id="section_9_3"></a>

In [128]:
df_con = df_healthy.union(df_issues)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [129]:
df_con.select('assetid').distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

33853

## 9.4 Check for No thunderstorm <a class="anchor" id="section_9_4"></a>

Thunderstorm evenets are removed in the before section to reduce effort, but lets make sure there are no thunderstorm events 

In [130]:
df_con.select('TICKET_TENTATIVE_ROOT_CAUSE').distinct().show(100, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------------------------------------------------------------------+
|TICKET_TENTATIVE_ROOT_CAUSE                                                     |
+--------------------------------------------------------------------------------+
|slab/lo mobilni signal jer korisnik ne dozvoljava instalaciju vanjske antene - |
|Nije utvrðen uzrok smetnje - 19                                                 |
|slab/lo mobilni signal - 83                                                    |
|NetCool alarm                                                                   |
|Padaline (kia, snijeg, tuèa) - 45                                              |
|null                                                                            |
|Blokada - 39                                                                    |
|Pogreka u konfiguraciji - 44                                                   |
|Greka korisnièkog raèuna - 31                                                  |
|Nep

## 9.5 Get list of distinct issue types, to add new columns <a class="anchor" id="section_9_5"></a>

In [131]:
from pyspark.sql.functions import udf

#Read the refurb data
path_cpe_refurbishment = "hdfs://nameservicedev1//user/dt_srajan/predictive_care/replacement/CPE_refurbishment_2020_2021.csv"
df_refurb = spark.read.option("encoding", "ISO-8859-1").option("inferSchema", "true").csv(path_cpe_refurbishment,header = True,sep=",")
df_refurb = df_refurb.withColumnRenamed('Vrsta kvara', 'Vrsta_kvara')

#Remove spaces in issue types
#spaceDeleteUDF = udf(lambda s: s.replace("/ ", "/"), types.StringType())
#df_refurb = df_refurb.withColumn("Vrsta_kvara", spaceDeleteUDF("Vrsta_kvara"))

#Remove null type issues
df_refurb = df_refurb.filter(df_refurb.Vrsta_kvara != 'NULL')

#transalate to english
df_refurb = df_refurb.withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'Napajanje', 'power_supply')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'ZnaÄajno fiziÄko oÅ¡teÄenje - nepopravljivo', 'significant_physical_damage_irreparable')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'Ne prijavljuje se u ACS', 'does_not_log_into_acs')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'Reset ne radi', 'reset_not_work')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'Manje fiziÄko ili toplinsko oÅ¡teÄenje', 'minor_physical_thermal_damage')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'Ispravan', 'no_problem')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'OptiÄki port', 'optical_port')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'SIM utor', 'sim_slot')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'WIFI', 'wifi')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'Blokiran', 'blocked')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'Resetira se sam', 'reset_itself')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'Internet ne radi', 'internet_not_working')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'DSL port', 'dsl_port')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'Pregrijavanje ureÄaja', 'device_overheating')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'LAN port', 'lan_port')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'Slaba brzina', 'poor_speed')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'Telefon port', 'telefon_port')) \
     .withColumn('Vrsta_kvara', F.regexp_replace('Vrsta_kvara', 'Software', 'software'))

#Convert column to arry type having one or more issues
df_refurb = df_refurb.withColumn("Vrsta_kvara", F.split("Vrsta_kvara", "/ ")).select('Vrsta_kvara').distinct() #/

#Collect all column values to list
issue_list = df_refurb.select(F.collect_set('Vrsta_kvara').alias('Vrsta_kvara')).first()['Vrsta_kvara']

#Flat the list
issue_flat_list = ['healthy']
for sublist in issue_list:
    for item in sublist:
        issue_flat_list.append(item)
issue_flat_list = list(set(issue_flat_list))

#issue_flat_list is the list of new columns that has to be added


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [132]:
issue_flat_list

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[u'internet_not_working', u'optical_port', u'reset_not_work', u'sim_slot', u'no_problem', 'healthy', u'reset_itself', u'wifi', u'significant_physical_damage_irreparable', u'lan_port', u'telefon_port', u'blocked', u'device_overheating', u'poor_speed', u'does_not_log_into_acs', u'software', u'dsl_port', u'minor_physical_thermal_damage', u'power_supply']

In [133]:
issue_flat_list

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[u'internet_not_working', u'optical_port', u'reset_not_work', u'sim_slot', u'no_problem', 'healthy', u'reset_itself', u'wifi', u'significant_physical_damage_irreparable', u'lan_port', u'telefon_port', u'blocked', u'device_overheating', u'poor_speed', u'does_not_log_into_acs', u'software', u'dsl_port', u'minor_physical_thermal_damage', u'power_supply']

## 9.6 Add columns to the df and assign default values for now <a class="anchor" id="section_9_6"></a>

In [134]:
for new_col in issue_flat_list:
    df_con = df_con.withColumn('label_'+new_col, F.lit(0))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [135]:
df_con.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- CPE_SERIAL_NUMBER: string (nullable = true)
 |-- TICKET_ID: integer (nullable = true)
 |-- ASSET_ID: integer (nullable = true)
 |-- TICKET_TENTATIVE_ROOT_CAUSE: string (nullable = true)
 |-- START_DATE_TICKET: string (nullable = true)
 |-- DATE_SCAN_START: string (nullable = true)
 |-- DATE_SCAN_END: string (nullable = true)
 |-- CPE_ISSUE: string (nullable = true)
 |-- WINDOW_END: string (nullable = true)
 |-- WINDOW_START: string (nullable = true)
 |-- assetid: string (nullable = true)
 |-- dslam_distinct_modulation: long (nullable = true)
 |-- dslam_no_of_dominant_modulation: long (nullable = true)
 |-- dslam_code_of_dominant_modulation: float (nullable = true)
 |-- dslam_avg_bitrate_us: double (nullable = true)
 |-- dslam_avg_bitrate_ds: double (nullable = true)
 |-- dslam_avg_attenuation_ds: double (nullable = true)
 |-- dslam_avg_attenuation_us: double (nullable = true)
 |-- dslam_avg_power_us: double (nullable = true)
 |-- dslam_avg_power_ds: double (nullable = true)
 

## 9.7 Fill labels bases on issues <a class="anchor" id="sectoin_9_7"></a>

In [136]:
df_con = df_con.withColumn('label_internet_not_working', F.col("CPE_ISSUE").rlike("internet_not_working").cast("Integer")) \
     .withColumn('label_optical_port', F.col("CPE_ISSUE").rlike("optical_port").cast("Integer")) \
     .withColumn('label_reset_not_work', F.col("CPE_ISSUE").rlike("reset_not_work").cast("Integer")) \
     .withColumn('label_sim_slot', F.col("CPE_ISSUE").rlike("sim_slot").cast("Integer")) \
     .withColumn('label_no_problem', F.col("CPE_ISSUE").rlike("no_problem").cast("Integer")) \
     .withColumn('label_does_not_log_into_acs', F.col("CPE_ISSUE").rlike("does_not_log_into_acs").cast("Integer")) \
     .withColumn('label_reset_itself',F.col("CPE_ISSUE").rlike("reset_itself").cast("Integer")) \
     .withColumn('label_wifi',F.col("CPE_ISSUE").rlike("wifi").cast("Integer")) \
     .withColumn('label_significant_physical_damage_irreparable', F.col("CPE_ISSUE").rlike("significant_physical_damage_irreparable").cast("Integer")) \
     .withColumn('label_lan_port',F.col("CPE_ISSUE").rlike("lan_port").cast("Integer")) \
     .withColumn('label_telefon_port', F.col("CPE_ISSUE").rlike("telefon_port").cast("Integer")) \
     .withColumn('label_device_overheating', F.col("CPE_ISSUE").rlike("device_overheating").cast("Integer")) \
     .withColumn('label_poor_speed',F.col("CPE_ISSUE").rlike("poor_speed").cast("Integer")) \
     .withColumn('label_blocked', F.col("CPE_ISSUE").rlike("blocked").cast("Integer")) \
     .withColumn('label_software', F.col("CPE_ISSUE").rlike("software").cast("Integer")) \
     .withColumn('label_dsl_port', F.col("CPE_ISSUE").rlike("dsl_port").cast("Integer")) \
     .withColumn('label_minor_physical_thermal_damage', F.col("CPE_ISSUE").rlike("minor_physical_thermal_damage").cast("Integer")) \
     .withColumn('label_power_supply',F.col("CPE_ISSUE").rlike("power_supply").cast("Integer"))\
     .withColumn('label_healthy',F.col("CPE_ISSUE").rlike("healthy").cast("Integer")) 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [137]:
df_con.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['CPE_SERIAL_NUMBER', 'TICKET_ID', 'ASSET_ID', 'TICKET_TENTATIVE_ROOT_CAUSE', 'START_DATE_TICKET', 'DATE_SCAN_START', 'DATE_SCAN_END', 'CPE_ISSUE', 'WINDOW_END', 'WINDOW_START', 'assetid', 'dslam_distinct_modulation', 'dslam_no_of_dominant_modulation', 'dslam_code_of_dominant_modulation', 'dslam_avg_bitrate_us', 'dslam_avg_bitrate_ds', 'dslam_avg_attenuation_ds', 'dslam_avg_attenuation_us', 'dslam_avg_power_us', 'dslam_avg_power_ds', 'dslam_avg_att_bitrate_us', 'dslam_avg_att_bitrate_ds', 'dslam_avg_snr_us', 'dslam_avg_snr_ds', 'dslam_no_counts', 'dslam_avg_bandline_ds', 'dslam_avg_bandline_us', 'dslam_sum_cv_us', 'dslam_count_cv_us', 'dslam_sum_cv_ds', 'dslam_count_cv_ds', 'dslam_sum_es_ds', 'dslam_count_es_ds', 'dslam_sum_es_us', 'dslam_count_es_us', 'dslam_sum_ses_ds', 'dslam_count_ses_ds', 'dslam_sum_ses_us', 'dslam_count_ses_us', 'dslam_sum_fec_ds', 'dslam_count_fec_ds', 'dslam_sum_fec_us', 'dslam_count_fec_us', 'dslam_inits', 'dslam_port_inst_id', 'dslam_rg_port_inst_id', 'dslam_

In [138]:
df_con.filter((df_con.label_dsl_port == 1) & (df_con.label_lan_port == 1)).select('CPE_ISSUE', 'label_internet_not_working', 'label_optical_port', 'label_reset_not_work', 'label_sim_slot', 'label_no_problem', 'label_healthy', 'label_reset_itself', 'label_wifi', 'label_significant_physical_damage_irreparable', 'label_lan_port', 'label_telefon_port', 'label_blocked', 'label_device_overheating', 'label_poor_speed', 'label_does_not_log_into_acs', 'label_software', 'label_dsl_port', 'label_minor_physical_thermal_damage', 'label_power_supply' ).show(1, truncate=False, vertical=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0-----------------------------------------------------------------------
 CPE_ISSUE                                     | power_supply/lan_port/dsl_port 
 label_internet_not_working                    | 0                              
 label_optical_port                            | 0                              
 label_reset_not_work                          | 0                              
 label_sim_slot                                | 0                              
 label_no_problem                              | 0                              
 label_healthy                                 | 0                              
 label_reset_itself                            | 0                              
 label_wifi                                    | 0                              
 label_significant_physical_damage_irreparable | 0                              
 label_lan_port                                | 1                              
 label_telefon_port         

In [139]:
df_con =  df_con.withColumn('assetid', F.col('assetid').cast(types.StringType()))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [140]:
df_con.select('assetid').printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- assetid: string (nullable = true)

In [141]:
df_con.repartition(1).write.format("parquet").mode('overwrite').save("hdfs://nameservicedev1//user/dt_srajan/predictive_care/cpe_replacement/data_w32_g3/df_wo_split_new")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 9.8 Stratified split <a class="anchor" id="chapter_9_8"></a>

Let's try to split the data in stratified fashion for train and test

In [142]:
df_distinct = df_con.select('assetid', 'CPE_ISSUE')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [143]:
df_distinct.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- assetid: string (nullable = true)
 |-- CPE_ISSUE: string (nullable = true)

In [144]:
df_distinct = df_distinct.dropDuplicates()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [145]:
df_distinct.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

33853

In [146]:
df_distinct.select('assetid').distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

33853

Lets set 90% for train set and 10% for test ser

In [147]:
fractions = df_distinct.select("CPE_ISSUE").distinct().withColumn("fraction", lit(0.9)).rdd.collectAsMap()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [148]:
fractions

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{u'does_not_log_into_acs/lan_port': 0.9, u'no_problem': 0.9, u'reset_itself': 0.9, u'telefon_port/power_supply/lan_port': 0.9, u'telefon_port/power_supply': 0.9, u'power_supply/lan_port': 0.9, u'poor_speed': 0.9, u'reset_not_work': 0.9, u'telefon_port/internet_not_working': 0.9, u'dsl_port': 0.9, u'minor_physical_thermal_damage': 0.9, u'power_supply': 0.9, u'reset_itself/dsl_port': 0.9, u'telefon_port/power_supply/dsl_port': 0.9, u'telefon_port/dsl_port': 0.9, u'reset_itself/power_supply': 0.9, u'device_overheating/power_supply': 0.9, u'device_overheating': 0.9, u'blocked': 0.9, u'device_overheating/dsl_port': 0.9, u'minor_physical_thermal_damage/dsl_port': 0.9, u'lan_port': 0.9, u'telefon_port/lan_port': 0.9, u'reset_itself/device_overheating': 0.9, u'internet_not_working': 0.9, u'lan_port/dsl_port': 0.9, u'telefon_port': 0.9, u'healthy': 0.9, u'wifi': 0.9, u'significant_physical_damage_irreparable': 0.9, u'power_supply/lan_port/dsl_port': 0.9, u'power_supply/dsl_port': 0.9, u'does_no

In [149]:
train = df_distinct.sampleBy("CPE_ISSUE", fractions=fractions, seed=10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [150]:
train.select('assetid').distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

30453

Lets save df_distinct and train into parquet files. Because to obtain test set the join of df_distinct and train is reuired. Surprisngly the join with df_distinct and train is not working in right way if we do not save and re-read the file again. So lets save it 

In [151]:
df_distinct.repartition(1).write.format("parquet").option('header', 'true').mode('overwrite').save("hdfs://nameservicedev1//user/dt_srajan/predictive_care/cpe_replacement/data_w32_g3/df_distinct_new")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [152]:
train.repartition(1).write.format("parquet").option('header', 'true').mode('overwrite').save("hdfs://nameservicedev1//user/dt_srajan/predictive_care/cpe_replacement/data_w32_g3/train_new")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [101]:
# Read the above files again

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [153]:
df_distinct = spark.read.parquet('hdfs://nameservicedev1//user/dt_srajan/predictive_care/cpe_replacement/data_w32_g3/df_distinct_new')
train = spark.read.parquet('hdfs://nameservicedev1//user/dt_srajan/predictive_care/cpe_replacement/data_w32_g3/train_new')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Generate the test set

In [154]:
test = df_distinct.join(train, ['assetid'], how='left_anti')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Lets check the distribution if its stratified across train and test


* df_distinct - whole base from which train and test is split


* train - train set


* test - test set

In [156]:
df_distinct.groupby('CPE_ISSUE').count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+
|           CPE_ISSUE|count|
+--------------------+-----+
|power_supply/lan_...|   15|
|             healthy|20000|
|          no_problem| 9689|
|minor_physical_th...|  808|
|power_supply/dsl_...|   64|
|telefon_port/powe...|    3|
|telefon_port/inte...|    1|
|                wifi|  129|
|does_not_log_into...|   75|
|reset_itself/dsl_...|    3|
|telefon_port/dsl_...|    3|
|telefon_port/lan_...|    2|
|            software|    1|
|        power_supply|  974|
|reset_itself/powe...|    1|
|            dsl_port|  987|
|            lan_port|  118|
|  device_overheating|   25|
|telefon_port/powe...|    5|
|minor_physical_th...|    1|
+--------------------+-----+
only showing top 20 rows

In [157]:
train.groupby('CPE_ISSUE').count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+
|           CPE_ISSUE|count|
+--------------------+-----+
|power_supply/lan_...|   14|
|             healthy|17971|
|          no_problem| 8724|
|minor_physical_th...|  723|
|power_supply/dsl_...|   58|
|telefon_port/powe...|    3|
|                wifi|  117|
|does_not_log_into...|   73|
|reset_itself/dsl_...|    3|
|telefon_port/dsl_...|    3|
|telefon_port/lan_...|    2|
|            software|    1|
|        power_supply|  884|
|reset_itself/powe...|    1|
|            dsl_port|  898|
|            lan_port|  105|
|  device_overheating|   20|
|telefon_port/powe...|    5|
|minor_physical_th...|    1|
|          poor_speed|    1|
+--------------------+-----+
only showing top 20 rows

In [158]:
test.groupby('CPE_ISSUE').count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+
|           CPE_ISSUE|count|
+--------------------+-----+
|power_supply/lan_...|    1|
|             healthy| 2029|
|          no_problem|  965|
|minor_physical_th...|   85|
|power_supply/dsl_...|    6|
|telefon_port/inte...|    1|
|                wifi|   12|
|does_not_log_into...|    2|
|        power_supply|   90|
|            dsl_port|   89|
|            lan_port|   13|
|  device_overheating|    5|
|reset_itself/devi...|    1|
|internet_not_working|    3|
|telefon_port/powe...|    2|
|significant_physi...|    8|
|   lan_port/dsl_port|    1|
|power_supply/lan_...|    1|
|        reset_itself|   22|
|        telefon_port|   62|
+--------------------+-----+
only showing top 20 rows

Let's check if there are overlapping id in both train and test set ? Count when innerjoin should be 0

In [159]:
train.join(test, ['assetid'], how = 'inner').count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

ok, we know that there are no overlapping assetid in train and test set

Let us join the asset id with df_con to get the tech stats data

In [160]:
df_train = train.select('assetid')
df_test = test.select('assetid')


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [161]:
df_train = df_train.join(df_con, ['assetid'], how = 'inner')
df_test = df_test.join(df_con, ['assetid'], how = 'inner')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [162]:
df_train.repartition(1).write.format("parquet").mode('overwrite').save("hdfs://nameservicedev1//user/dt_srajan/predictive_care/cpe_replacement/data_w32_g3/df_train_new")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [164]:
df_test.repartition(1).write.format("parquet").mode('overwrite').save("hdfs://nameservicedev1//user/dt_srajan/predictive_care/cpe_replacement/data_w32_g3/df_test_new")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Now lets filter if there are not 32 records in train and test

In [261]:
train_new = spark.read.parquet('hdfs://nameservicedev1//user/dt_srajan/predictive_care/cpe_replacement/data_w32_g3/df_train_new')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [262]:
train_new_grp = train_new.groupby('assetid').count().select(F.col("assetid"),F.col("count").alias("cnt"))
#.alias('cnt')


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [263]:
train_new_grp.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

30453

In [264]:
train_new_grp = train_new_grp.filter(train_new_grp.cnt < 32)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [265]:
train_new_list = train_new_grp.select("assetid").rdd.flatMap(lambda x: x).collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [266]:
train_new_list

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[u'105647124', u'46989894', u'40035791', u'42345339', u'104416957', u'47884552', u'100384047', u'46682428', u'28787722', u'75972343', u'32226468', u'51510167', u'64957179', u'36600151', u'33217095', u'104214304', u'36377824', u'62190746', u'63863421', u'103066490', u'54010030', u'93328799', u'45984635', u'44514478', u'55411804', u'68381420', u'52236398', u'80320355', u'52956518', u'71675586', u'97314226', u'49532656', u'99966878']

In [268]:
train_new = train_new.filter(~train_new.assetid.isin(train_new_list))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [269]:
train_new.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

973440

In [270]:
train_new.select('assetid').distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

30420

In [271]:
30420*32

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

973440

In [272]:
train_new.repartition(1).write.format("parquet").mode('overwrite').save("hdfs://nameservicedev1//user/dt_srajan/predictive_care/cpe_replacement/data_w32_g3/df_train_new_filtered")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

test

In [276]:
test_new = spark.read.parquet('hdfs://nameservicedev1//user/dt_srajan/predictive_care/cpe_replacement/data_w32_g3/df_test_new')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [277]:
test_new_grp = test_new.groupby('assetid').count().select(F.col("assetid"),F.col("count").alias("cnt"))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [278]:
test_new_grp.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3400

In [279]:
test_new_grp = test_new_grp.filter(test_new_grp.cnt < 32)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [280]:
test_new_grp.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3

In [281]:
test_new_list = test_new_grp.select("assetid").rdd.flatMap(lambda x: x).collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [282]:
test_new_list

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[u'78214434', u'75831186', u'83560337']

In [283]:
test_new = test_new.filter(~test_new.assetid.isin(test_new_list))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [284]:
test_new.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

108704

In [285]:
test_new.select('assetid').distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3397

In [286]:
3397*32

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

108704

In [287]:
test_new.repartition(1).write.format("parquet").mode('overwrite').save("hdfs://nameservicedev1//user/dt_srajan/predictive_care/cpe_replacement/data_w32_g3/df_test_new_filtered")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
## Check

In [241]:
a = spark.read.parquet('hdfs://nameservicedev1//user/dt_srajan/predictive_care/cpe_replacement/data_w32_g3/df_train_new')
b = spark.read.parquet('hdfs://nameservicedev1//user/dt_srajan/predictive_care/cpe_replacement/data_w30_g3/df_test')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [242]:
print(len(b.columns))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

137

In [169]:
a.columns == b.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

True

In [16]:
c = spark.sql('select * from cdl_tdata_model.networkdevice2 limit 2')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
c.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-------------------+------------+--------------------+-----------------+--------------------+---------------+-----+-----------------+---------+----+
|      id|          eventtime|productclass|        serialnumber|customerproductid|             eventid|softwareversion|model|      description|eventdate|type|
+--------+-------------------+------------+--------------------+-----------------+--------------------+---------------+-----+-----------------+---------+----+
|19645353|2021-05-06 01:08:06|        NULL|8fec70258857f1eb2...|             null|002ad72b0d451eb80...|            346|VQE-S|MAXTV set top box| 20210506| STB|
|19645353|2021-05-06 13:00:10|        NULL|8fec70258857f1eb2...|             null|002ad72b0d451eb80...|            346|VQE-S|MAXTV set top box| 20210506| STB|
+--------+-------------------+------------+--------------------+-----------------+--------------------+---------------+-----+-----------------+---------+----+

In [252]:
#a.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [292]:
a.select('assetid').distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

26097

# Appendix

## A. `CPEs` without any issue

Here we are going to select the relevant data only for those `cpes` atht has not raise any ticket.

In [None]:
path_tickets = "hdfs://nameservicedev1//user/dt_srajan/predictive_care/tickets/Smetnje_0107-1510_2020.csv"
df_tickets = spark.read.option("encoding", "ISO-8859-1").option("inferSchema", "true").csv(path_tickets,header = True,sep=";")

Let's take a look into the schema

In [None]:
df_tickets.printSchema()

Now we can check how many unique `ASSET_ID` and in this dataset and we can create also alist for further filtering

In [None]:
cpe_tickets = spark.read.option("encoding", "ISO-8859-1").option("inferSchema", "true").csv(path_tickets,header = True,sep=";").select("ASSET_ID").distinct().toPandas().values
cpe_tickets = cpe_tickets.T[0].tolist()
cpe_tickets = [str(cpe_id) for cpe_id in  cpe_tickets]
len(cpe_tickets)

Then the only overlapping months in all datasets are July and Agust. For the following example we are goin to use July only

In [None]:
start_time, end_time = '2020-07-01', '2020-07-30'
df_stats_healty_any_issue  = gather_data (start_time, end_time,spark)
df_stats_healty_any_issue = df_stats_healty_any_issue.filter(~df_stats_healty_any_issue.crm_assetId.isin(cpe_tickets)).cache()
df_stats_healty_any_issue.count()

how many unique Healthy `cpes` we have in july?

In [None]:
df_stats_healty_any_issue.select('crm_assetId').distinct().count()

Let's print some results

In [None]:
df_stats_healty_any_issue.limit(10).show(200,truncate=False, vertical=True)

Saving parquet file in `HDFS`

In [None]:
df_stats_healty_any_issue.write.format("parquet").mode('overwrite').option("header","true").save("hdfs://nameservicedev1//user/dt_srajan/predictive_care/healthy/cpe_healty_any_issue")