In [2]:
%%configure -f
{"conf":
 {"spark.driver.cores": "6",
  "spark.driver.memory": "10g",
  "spark.executor.cores": "6",
  "spark.executor.memory": "10g",
  "spark.dynamicAllocation.enabled": "true",
  "spark.dynamicAllocation.minExecutors" : "4",
  "spark.driver.maxResultSize": "4g"
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1519,application_1620749185953_140308,pyspark,idle,Link,Link,
1520,application_1620749185953_140309,pyspark,idle,Link,Link,
1523,application_1620749185953_140314,pyspark,idle,Link,Link,
1534,application_1620749185953_140546,pyspark,idle,Link,Link,
1535,application_1620749185953_140549,pyspark,idle,Link,Link,
1537,application_1620749185953_140604,pyspark,idle,Link,Link,
1538,application_1620749185953_140730,pyspark,idle,Link,Link,
1539,application_1620749185953_140740,pyspark,idle,Link,Link,
1540,application_1620749185953_140748,pyspark,idle,Link,Link,
1541,application_1620749185953_140752,pyspark,busy,Link,Link,


In [3]:
#%%cleanup -f

In [4]:
# Data Science Packages
import sys
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from  matplotlib import pyplot
import seaborn as sns
import warnings
from datetime import datetime
from datetime import timedelta


#Spark Packages
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from pyspark.sql import types 
import pyspark.sql.types
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.functions import col, avg, sum

from pyspark.sql import Window 
from pyspark.sql import functions as F

import random
from pyspark.sql.functions import lit

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1556,application_1620749185953_140809,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Matplotlib

#matplotlib.use('agg')
#plt.switch_backend('agg')

# Seaborn Style
sns.set(style='ticks')
sns.set_style({'font.family': 'Hiragino Maru Gothic Pro'})
sns.set_palette("cool")

# Pandas Style
pd.set_option("display.max_column", 9999)
pd.set_option("display.max_row", 9999)

# Ignore annoying warning 
warnings.filterwarnings('ignore')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
config = {
    "spark": {
        "app_name": "pred-care",
        "master": "yarn"
    },

    "path": {
        "prefiltering_output": "hdfs://nameservicedev1//user/tsystems_vkumar/dsl_tickets/dsl_tkts_all_lvls_uniq_n.parquet",
        "datagathering_output": "hdfs://nameservicedev1//user/tsystems_kkeshore/dslfault_data_new_v1.parquet"
    },

    "data_gathering": {
        "seq_len": 32,
        "gap": 1,
        "features": {
            "dslam": {
                "measurements":['dslam_code_of_dominant_modulation', 
                                'dslam_avg_bitrate_us', 'dslam_avg_bitrate_ds', 'dslam_avg_attenuation_ds', 'dslam_avg_attenuation_us',
                                'dslam_avg_power_us', 'dslam_avg_power_ds', 'dslam_avg_att_bitrate_us', 'dslam_avg_att_bitrate_ds', 
                                'dslam_avg_snr_us', 'dslam_avg_snr_ds', 'dslam_avg_bandline_ds', 'dslam_avg_bandline_us', 'dslam_no_counts',
                                'dslam_sum_cv_us', 'dslam_sum_cv_ds', 'dslam_sum_es_ds', 'dslam_sum_es_us', 'dslam_sum_ses_ds', 'dslam_sum_ses_us', 
                                'dslam_sum_fec_ds', 'dslam_sum_fec_us'],
                
                "counters":['dslam_count_cv_us', 'dslam_count_cv_ds', 'dslam_count_es_ds', 'dslam_count_es_us', 'dslam_count_ses_ds', 
                             'dslam_count_ses_us', 'dslam_count_fec_ds', 'dslam_count_fec_us', 'dslam_inits'],
                
                "categoricals":['dslam_platforma', 'dslam_model', 'dslam_servis'],
                "grouping_features": ["dslam_assetid", "dslam_datum", "dslam_servis", 'dslam_platforma', 'dslam_model']
            },

            "snmp": {
                "measurements":[],
                "counters":['snmp_no_lossoflink', 'snmp_no_linkup', 'snmp_no_dyinggasp'],
                "categoricals":["snmp_servis"],
                "grouping_features": ["snmp_assetid", "snmp_datum", "snmp_servis"]
            },

             "acs": {
                "measurements":['acs_xdslline_dslUptime', 'acs_xdslline_cpeUptime', 
                             'acs_xdslline_cpuUsagePercentage', 'acs_xdslline_freeMemoryKb', 'acs_xdslline_totalMemoryKb', 
                             'acs_xdslline_usedMemoryKb', 'acs_xdslline_natCurrent', 'acs_xdslline_natMax', 
                             'acs_xdslline_initErrors',
                             'acs_xdslline_downAttenuationDb', 
                             'acs_xdslline_downCurrRateKbps', 'acs_xdslline_downMaxRateKbps', 'acs_xdslline_downNoiseMarginDb', 'acs_xdslline_downPowerDbm', 
                             'acs_xdslline_upAttenuationDb', 'acs_xdslline_upCurrRateKbps', 'acs_xdslline_upMaxRateKbps', 'acs_xdslline_upNoiseMarginDb', 
                             'acs_xdslline_upPowerDbm'],
                "counters":['acs_xdslline_cellDelinDelta', 'acs_xdslline_erroredSecsDelta', 'acs_xdslline_initErrorsDelta', 
                            'acs_xdslline_initTimeoutsDelta', 'acs_xdslline_severelyErroredSecsDelta', 'acs_xdslline_lossOfFramingDelta', 
                            'acs_xdslline_transmitBlocksDelta', 'acs_xdslline_receiveBlocksDelta', 'acs_xdslline_atucfecdelta', 
                            'acs_xdslline_atuchecdelta', 'acs_xdslline_crcdelta', 'acs_xdslline_fecdelta', 'acs_xdslline_hecdelta'],
                "categoricals":['acs_crm_productClass'], 
                "grouping_features": ["acs_crm_assetId", "acs_datum", "acs_crm_productClass"]
            },

             "tdm": {
                "preprocessing_col1": ['id', 'bssid', 'min_rssi_per_cpe', 'max_rssi_per_cpe', 'avg_rssi_per_cpe', 'perc5_rssi_per_cpe', 'perc25_rssi_per_cpe', 'perc50_rssi_per_cpe', 'perc75_rssi_per_cpe', 'perc95_rssi_per_cpe'],
                "preprocessing_col2": ['id', 'bssid', 'channel', 'channelsinuse', 'operatingfrequencyband', 'ssid', 'operatingstandard', 'status', 'transmitpower', 'radioenable', 'radiostatus', 'ssidenable', 'enable', 'min_rssi_per_cpe', 'max_rssi_per_cpe', 'avg_rssi_per_cpe', 'perc5_rssi_per_cpe', 'perc25_rssi_per_cpe', 'perc50_rssi_per_cpe', 'perc75_rssi_per_cpe', 'perc95_rssi_per_cpe'],
                "preprocessing_col3": ['id', 'networkdeviceid', 'eventdate', 'eventtime', 'bssid', 'channel', 'channelsinuse', 'transmitpower', 'min_rssi_per_cpe', 'max_rssi_per_cpe', 'avg_rssi_per_cpe', 'perc5_rssi_per_cpe', 'perc25_rssi_per_cpe', 'perc50_rssi_per_cpe', 'perc75_rssi_per_cpe', 'perc95_rssi_per_cpe',  'bytesreceived', 'bytessent', 'errorsreceived', 'errorssent', 'packetsreceived', 'packetssent', 'bytesreceiveddelta', 'bytessentdelta', 'errorsreceiveddelta', 'errorssentdelta', 'packetsreceiveddelta', 'packetssentdelta', 'connecteddevicescount'],
                "measurements":['lb_transmitpower', 'lb_min_rssi_per_cpe', 'lb_max_rssi_per_cpe', 'lb_avg_rssi_per_cpe', 'lb_perc5_rssi_per_cpe', 'lb_perc25_rssi_per_cpe', 'lb_perc50_rssi_per_cpe', 'lb_perc75_rssi_per_cpe', 'lb_perc95_rssi_per_cpe',  'lb_bytesreceived', 'lb_bytessent', 'lb_errorsreceived', 'lb_errorssent', 'lb_packetsreceived', 'lb_packetssent', 'lb_connecteddevicescount', 
                                'ub_transmitpower', 'ub_min_rssi_per_cpe', 'ub_max_rssi_per_cpe', 'ub_avg_rssi_per_cpe', 'ub_perc5_rssi_per_cpe', 'ub_perc25_rssi_per_cpe', 'ub_perc50_rssi_per_cpe', 'ub_perc75_rssi_per_cpe', 'ub_perc95_rssi_per_cpe',  'ub_bytesreceived', 'ub_bytessent', 'ub_errorsreceived', 'ub_errorssent', 'ub_packetsreceived', 'ub_packetssent', 'ub_connecteddevicescount'],
                "counters": ['lb_bytesreceiveddelta', 'lb_bytessentdelta', 'lb_errorsreceiveddelta', 'lb_errorssentdelta', 'lb_packetsreceiveddelta', 'lb_packetssentdelta', 'ub_bytesreceiveddelta', 'ub_bytessentdelta', 'ub_errorsreceiveddelta', 'ub_errorssentdelta', 'ub_packetsreceiveddelta', 'ub_packetssentdelta'],
                "categoricals":['bssid', 'channel', 'channelsinuse'],
                "grouping_features": ['lb_networkdeviceid', 'lb_eventdate']
            },
    }
    }
}

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# Util functions
def spark_init():
    """Init Spark session.
    Returns:
        object: spark session.
    """
    spark = SparkSession.builder \
        .master('yarn') \
        .appName('predictive_care') \
        .enableHiveSupport() \
        .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    return spark

# Load data for TDM and Raw_table
def data_import(database, table, spark):
    """Function to load data from Hive tables based on start time and end time.
    
    Args:
        database (string): Database name in HIVE.
        table (string): Table name in HIVE.
        spark (obj): Spark session object.
        
    Returns:
        dataframe :  spark dataframe with loaded data.
    """
    
    df = spark.sql("select * from {0}.{1}" \
              .format(database, table))
    return df

def filter_null_values_on_join_features(df, session_id, bssid):
    """Filter Null values on columns which are used to join different tables.
    Args:
        df (dataframe): Input spark df.
        id (string): Session id across different tables.
        bssid (string): BSSID feature from df.
    Returns:
        dataframe: spark dataframe.
    """
    df = df.filter((df[session_id] != '') & (df[bssid] != ''))
    return df

def filter_radio(df, operatingfrequencyband, band):
    """ Function to filter df on radio lower band & upper band.
    Args:
        df (dataframe): Input spark dataframe.
        band (string): Band specifiying 2.4Ghz or 5Ghz.
    Returns:
        dataframe :  spark dataframe.
    """
    df = df.filter(df[operatingfrequencyband] == band)
    return df

def add_prefix_to_cols(df, table):
    """ Function to add table name as prefix to all cols.
    Args:
        df (dataframe): Input spark df.
        table (string): Denotes required prefix, here the table name is added as prefix.
    Returns:
        dataframe :  spark dataframe.
    """
    for cols in df.columns:
        df = df.withColumnRenamed(cols, table + cols)
    return df

def generate_date_series(start, stop):
    return [start + timedelta(days=x) for x in range(0, (stop-start).days + 1)] 

def parsing_date_format(datetime_str, target_date_format):
    """Fuction to modify the date format.
    
    Args:
        datetime_str (str): Date in original format. expected "%Y-%m-%d".
        target_date_format (str): Desire date format e.g "%Y%m%d".
    
    Returns
        str: Date in target format.
    """
    date_obj = datetime.strptime(datetime_str, "%Y-%m-%d").date()
    return date_obj.strftime(target_date_format)

def flatten_df(df):
    """This funtion flat a nested dataframe.
    
    Args:
        df (dataframe): Input nested dataframe.
        
    Returns:
        flat_df (dataframe): Output flatted datframe.
    """
    flat_cols = [col[0] for col in df.dtypes if col[1][:6] != 'struct']
    nested_cols = [col[0] for col in df.dtypes if col[1][:6] == 'struct']
    flat_df = df.select(flat_cols +\
                   [F.col(root_col+'.'+nested_col).alias(root_col+'_'+nested_col)\
                    for root_col in nested_cols\
                    for nested_col in df.select(root_col+'.*').columns])
    return flat_df

def rename_cols(df, prefix):
    """Column renameb by adding a prefix.
    
    Args:
        df (dataframe): Input dataframe.
        
    Returns:
        df (dataframe): Output datframe witha ppended prefix to columns.
    """
    for feature in df.columns:
        df = df.withColumnRenamed(feature,prefix+feature)
    return df

def change_data_type(df, cols_to_convert, datatype):
    """ Change data type for columns from one type to another.
    Args:
        df (dataframe): Input spark dataframe. 
        cols_to_convert (list): list of columns.
        datatype(object): Intended data type.
    Returns:
        df (dataframe): Spark df with proper datatype.
    """
    for col_name in cols_to_convert:
        df = df.withColumn(col_name, F.col(col_name).cast(datatype))
    return df

def get_window_end(event_dt, config):
    """Function to extrat the prediction gap to the date in which the ticket was raise.
    
    Arg:
        event_dt (str): Date in which the ticket was raise
        prediction_gap (int): Predction gap in days.
    
    Returns:
        str: date of the window end.
    """
    prediction_gap = config["data_gathering"]["gap"] - 1
    event_dt = datetime.strptime(str(event_dt), "%Y-%m-%d").date()
    end_time_dt = (event_dt - timedelta(days=prediction_gap+1))
    end_time = end_time_dt.strftime("%Y-%m-%d")
    return end_time

def get_window_start(event_dt, window_plus_gap):
    """Function to extrat the prediction gap plus the window of historical data
    to the date in which the ticket was raised.
    
    Arg:
        event_dt (str): Date in which the ticket was raise
        window_plus_gap (int): Prediction gap plus window of historical data in days.
    
    Returns:
        str: date of the window start.
    """
    event_dt = datetime.strptime(str(event_dt), "%Y-%m-%d").date()
    start_time_dt = (event_dt - timedelta(days=window_plus_gap))
    start_time = start_time_dt.strftime("%Y-%m-%d")
    return start_time

def udf_window_end(config):
    """User Defined Funtion to calculate the window end with respect to the
    date in which the ticket was raised"""
    prediction_gap = config["data_gathering"]["gap"] - 1
    return F.udf(lambda l: get_window_end(l,config))

def udf_window_start(window_plus_gap):
    """User Defined Funtion to calculate the window start with respect to the
    date in which the ticket was raised"""
    return F.udf(lambda l: get_window_start(l,window_plus_gap))

# Lets aggregate the data in ACS table per day 
def agg_acs_per_day(df_acs, config):
    mm_list_acs = config['data_gathering']['features']['acs']['measurements']
    counter_list_acs = config['data_gathering']['features']['acs']['counters']
    df_acs_mm = df_acs.groupBy(config['data_gathering']['features']['acs']['grouping_features']).agg(*[avg(c).alias(c) for c in mm_list_acs])
    df_acs_counter= df_acs.groupBy(config['data_gathering']['features']['acs']['grouping_features']).agg(*[sum(c).alias(c) for c in counter_list_acs])
    df_acs = df_acs_mm.join(df_acs_counter, config['data_gathering']['features']['acs']['grouping_features'], how="inner")
    return df_acs

def agg_dslam_per_day(df_dslam, config):
    mm_list_dslam = config['data_gathering']['features']['dslam']['measurements']
    counter_list_dslam = config['data_gathering']['features']['dslam']['counters']
    df_dslam_mm = df_dslam.groupBy(config['data_gathering']['features']['dslam']['grouping_features']).agg(*[avg(c).alias(c) for c in mm_list_dslam])
    df_dslam_counter= df_dslam.groupBy(config['data_gathering']['features']['dslam']['grouping_features']).agg(*[sum(c).alias(c) for c in counter_list_dslam])
    df_dslam = df_dslam_mm.join(df_dslam_counter, config['data_gathering']['features']['dslam']['grouping_features'], how="inner")
    return df_dslam

def agg_snmp_per_day(df_snmp, config):
    mm_list_snmp = config['data_gathering']['features']['snmp']['measurements']
    counter_list_snmp = config['data_gathering']['features']['snmp']['counters']
    df_snmp= df_snmp.groupBy(config['data_gathering']['features']['snmp']['grouping_features']).agg(*[sum(c).alias(c) for c in counter_list_snmp])
    return df_snmp


# Lets aggregate the data in TDM table per day 
def agg_tdm_per_day(df_tdm, config):
    mm_list_tdm = config['data_gathering']['features']['tdm']['measurements']
    counter_list_tdm = config['data_gathering']['features']['tdm']['counters']
    df_stats_client_radio_mm = df_tdm.groupBy(config['data_gathering']['features']['tdm']['grouping_features']).agg(*[avg(c).alias(c) for c in mm_list_tdm])
    df_stats_client_radio_counter = df_tdm.groupBy(config['data_gathering']['features']['tdm']['grouping_features']).agg(*[sum(c).alias(c) for c in counter_list_tdm])
    df_stats_client_radio = df_stats_client_radio_mm.join(df_stats_client_radio_counter, config['data_gathering']['features']['tdm']['grouping_features'], how="inner")
    return df_stats_client_radio


# Create Start date and end date which equates to 32 days for each assetid
def create_windowing_per_assetid(df, config):
    # df is input  from prefiltering
    df_key = df.select('assetid', "incident_date").distinct()
    df_key = df_key.withColumn("end_date", udf_window_end(config)(F.col("incident_date")))\
                                .withColumn("start_date", udf_window_start(config["data_gathering"]["gap"]+config["data_gathering"]["seq_len"])(F.col("incident_date")))

    df_key = df_key.withColumn('start_date', F.col('start_date').cast(types.DateType()))
    df_key = df_key.withColumn('end_date', F.col('end_date').cast(types.DateType()))

    spark.udf.register("generate_date_series", generate_date_series, types.ArrayType(types.DateType()) )
    df_key.createOrReplaceTempView("keydf")
    df_key = spark.sql("SELECT assetid, explode(generate_date_series(start_date, end_date)) as date FROM keydf")
    return df_key

# Load and Preprocess TDM data
def loading_tdm(config, spark):
        df_radio_table_raw = data_import("cdl_tdata_model", "wifiaccesspointrole", spark)
        df_client_table_raw = data_import("cdl_tdata_model", "wificlientperformance", spark)
        df_stats_table_raw = data_import("cdl_tdata_model", "wifiaccesspointperformance", spark)
        #Replace N/A with None
        df_radio_table_raw = df_radio_table_raw.withColumn("bssid", F.when(col("bssid").isin('n/a', 'N/A'), None).otherwise(col("bssid"))) \
                                                .withColumn("id", F.when(col("id").isin('n/a', 'N/A'), None).otherwise(col("id"))) 
        df_client_table_raw = df_client_table_raw.withColumn("bssid", F.when(col("bssid").isin('n/a', 'N/A'), None).otherwise(col("bssid"))) \
                                                .withColumn("id", F.when(col("id").isin('n/a', 'N/A'), None).otherwise(col("id"))) 
        df_stats_table_raw = df_stats_table_raw.withColumn("bssid", F.when(col("bssid").isin('n/a', 'N/A'), None).otherwise(col("bssid"))) \
                                                .withColumn("id", F.when(col("id").isin('n/a', 'N/A'), None).otherwise(col("id"))) 
        # Filtering null values on Join features
        df_radio_clean = filter_null_values_on_join_features(df_radio_table_raw, 'id', 'bssid')
        df_client_clean = filter_null_values_on_join_features(df_client_table_raw, 'id', 'bssid')
        df_stats_clean = filter_null_values_on_join_features(df_stats_table_raw, 'id', 'bssid')
        #dropping missing keys
        df_radio_clean =df_radio_clean.na.drop(subset=['networkdeviceid','eventdate', 'id', 'bssid'])
        df_client_clean =df_client_clean.na.drop(subset=['networkdeviceid','eventdate', 'id', 'bssid' ])
        df_stats_clean = df_stats_clean.na.drop(subset=['networkdeviceid','eventdate', 'id', 'bssid'])
        #Drop multiple samples per day
        df_radio_clean = df_radio_clean.dropDuplicates()
        df_client_clean = df_client_clean.dropDuplicates()
        df_stats_clean = df_stats_clean.dropDuplicates()
    
        quantiles_for_rssi =  "(0.05, 0.25, 0.50, 0.75, 0.95)"
        field = 'signalstrength'
        df_client_clean = df_client_clean.groupby('id', 'bssid').agg(F.expr('percentile('+field+', array'+ str(quantiles_for_rssi) +')'). \
                                alias('rssi_per_cpe'), F.min(field).alias('min_rssi_per_cpe'), F.max(field).alias('max_rssi_per_cpe'), F.avg(field).alias('avg_rssi_per_cpe'))
        df_client_clean = df_client_clean.withColumn('perc5_rssi_per_cpe', df_client_clean.rssi_per_cpe[0]) \
            .withColumn('perc25_rssi_per_cpe', df_client_clean.rssi_per_cpe[1]) \
            .withColumn('perc50_rssi_per_cpe', df_client_clean.rssi_per_cpe[2]) \
            .withColumn('perc75_rssi_per_cpe', df_client_clean.rssi_per_cpe[3]) \
            .withColumn('perc95_rssi_per_cpe', df_client_clean.rssi_per_cpe[4])
    
        df_client_clean =  df_client_clean.select(config['data_gathering']['features']['tdm']['preprocessing_col1'])
        
        df_radio1 = filter_radio(df_radio_clean, 'operatingfrequencyband', '2.4GHz')
        df_radio2 = filter_radio(df_radio_clean, 'operatingfrequencyband', '5GHz')
        
        df_client_radio1 = df_radio1.join(df_client_clean, ['id', 'bssid'], how='left')
        df_client_radio2 = df_radio2.join(df_client_clean, ['id', 'bssid'], how='left')
        
        client_radio_cols = config['data_gathering']['features']['tdm']['preprocessing_col2']
        df_client_radio1 = df_client_radio1.select(client_radio_cols)
        df_client_radio2 = df_client_radio2.select(client_radio_cols)
        
        df_stats_client_radio1 = df_client_radio1.join(df_stats_clean, ['id', 'bssid'], how = 'left')
        df_stats_client_radio2 = df_client_radio2.join(df_stats_clean, ['id', 'bssid'], how = 'left')
        
        final_cols = config['data_gathering']['features']['tdm']['preprocessing_col3']
        
        df_stats_client_radio1 = df_stats_client_radio1.select(final_cols)
        df_stats_client_radio2 = df_stats_client_radio2.select(final_cols)
        df_stats_client_radio1 = add_prefix_to_cols(df_stats_client_radio1, 'lb_')
        df_stats_client_radio2 = add_prefix_to_cols(df_stats_client_radio2, 'ub_')
    
        df_stats_client_radio = df_stats_client_radio1.join(df_stats_client_radio2, (df_stats_client_radio1.lb_id == df_stats_client_radio2.ub_id) \
                    , how='left')
        
        return df_stats_client_radio

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
def gather_data (df, config, spark):
    """This Funtion gather the relevant information from HDFS in the relevant time frame.
    
    Args:
        df (dataframe): Output from pre filtering.
        config (dict): Configuration file.
        spark (obj): Spark session object.
        
    Returns:
        dataframe :  spark dataframe with loaded data.
    
    """
    # loading data
    df_dslam = data_import('cdl_blos', 'pol_day_aggregation', spark) #per day
    df_snmp = data_import('cdl_blos', 'snmp_traps_agg', spark) #per day
    df_acs = data_import('cdl_acscoll', 'cdl_acscoll_xdsl_line_prq', spark)# cat, mm, counter/delta
    df_tdm = loading_tdm(config, spark) # Preprocess TDM data with aggregations
    
    #flattening acs
    df_acs = flatten_df(df_acs)
    #Renaming for conditional join
    df_dslam = rename_cols(df_dslam, 'dslam_')
    df_snmp = rename_cols(df_snmp, 'snmp_')
    df_acs = rename_cols(df_acs, 'acs_')
    #change data type 
    df_dslam = change_data_type(df_dslam, cols_to_convert=config["data_gathering"]["features"]["dslam"]["measurements"] + config["data_gathering"]["features"]["dslam"]["counters"], datatype=types.LongType())
    df_snmp = change_data_type(df_snmp, cols_to_convert=config["data_gathering"]["features"]["snmp"]["measurements"] + config["data_gathering"]["features"]["snmp"]["counters"], datatype=types.LongType())
    df_acs = change_data_type(df_acs, cols_to_convert=config["data_gathering"]["features"]["acs"]["measurements"] + config["data_gathering"]["features"]["acs"]["counters"], datatype=types.LongType())
    #aggregating all tables fields
    df_acs = agg_acs_per_day(df_acs, config)
    df_dslam = agg_dslam_per_day(df_dslam, config)
    df_snmp = agg_snmp_per_day(df_snmp, config)     
    df_tdm = agg_tdm_per_day(df_tdm, config) # Lets aggregate the data in TDM table per day 
    
    
    #dropping missing keys
    df_dslam =df_dslam.na.drop(subset=['dslam_assetid','dslam_datum'])
    df_snmp =df_snmp.na.drop(subset=['snmp_assetid','snmp_datum'])
    df_acs = df_acs.na.drop(subset=['acs_crm_assetId','acs_datum'])
    
    # Drop multiple samples per day
    df_dslam = df_dslam.dropDuplicates(['dslam_assetid','dslam_datum'])
    df_snmp = df_snmp.dropDuplicates(['snmp_assetid','snmp_datum'])
    df_acs = df_acs.dropDuplicates(['acs_crm_assetId','acs_datum'])
    
    # create key
    df_key = create_windowing_per_assetid(df, config)
    
    #Join
    df = df_key.join(df_dslam, [(df_key.assetid == df_dslam.dslam_assetid) & (df_key.date == df_dslam.dslam_datum)], how='left')
    df = df.join(df_snmp, [(df.assetid == df_snmp.snmp_assetid) & (df.date == df_snmp.snmp_datum)], how='left')
    df = df.join(df_acs, [(df.assetid == df_acs.acs_crm_assetId) & (df.date == df_acs.acs_datum)], how='left')
    df = df.join(df_tdm, [(df.assetid == df_tdm.lb_networkdeviceid) & (df.date == df_tdm.lb_eventdate)], how='left')
    
    # We drop duplicate column and perform the renaming of columns
    df = df.drop('dslam_assetid', 'acs_crm_assetId','acs_datum','snmp_assetid', 'snmp_datum', 'lb_networkdeviceid', 'ub_networkdeviceid')
    df = df.withColumnRenamed('date','datum')
    df = df.withColumnRenamed('acs_crm_serialnumber', 'cpe_serial_number')
    
    #Sorting by date
    df = df.orderBy(['assetid', 'datum'], asc = True)
    #df = df.withColumn("time_index", counter from 1 to 32 per assetid)
    
    return df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
#def main(args):
#    """Main function"""
    # start Spark application and get Spark session, logger and config
spark = spark_init()
config = config

df = spark.read.parquet(config["path"]["prefiltering_output"])#.limit(5)
# log that main ETL job is starting
print('Starting Data gathering pipeline..')
df = gather_data (df, config, spark)
# log the success and terminate Spark application
print('Data gathering is finished and writing to HDFS..')
df.write.parquet(config["path"]["datagathering_output"], mode='overwrite')
print('Writing to HDFS is finished and spark session is closing..')
spark.stop()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Starting Data gathering pipeline..
Data gathering is finished and writing to HDFS..
Writing to HDFS is finished and spark session is closing..

#### Rough Work

In [21]:
data = spark.read.parquet(config["path"]["datagathering_output"])
data = data.select([i for i in data.columns if not 'lb_' in i and not 'ub_' in i])
tickets = spark.read.parquet(config["path"]["prefiltering_output"])
data_null_count = data.groupby("assetid")\
                      .agg(*(F.sum(F.col(c).isNull().cast("int")).alias(c) for c in ['dslam_avg_attenuation_ds', 'snmp_no_lossoflink', 'acs_xdslline_downAttenuationDb']))\
                      .sort(['dslam_avg_attenuation_ds', 'acs_xdslline_downAttenuationDb', 'snmp_no_lossoflink'], ascending = False)
data_null = data_null_count.join(tickets, on = ['assetid'], how='inner')[[data_null_count.columns + ['incident_date']]]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
def get_max_date(cols):
    assetid, incident_date = cols[0], cols[1]
    return str(spark.sql("select max(datum) from cdl_blos.pol_day_aggregation where assetid == {} and incident_date <= {}".format(assetid, incident_date)).rdd.map(lambda x: x[0]).collect()[0])

max_date_udf = F.udf(get_max_date, types.StringType())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
data_null.limit(1).withColumn("date", get_max_date(F.struct(data_null['assetid'], data_null['incident_date'])))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
u"\nmismatched input 'incident_date' expecting <EOF>(line 1, pos 145)\n\n== SQL ==\nselect max(datum) from cdl_blos.pol_day_aggregation where assetid == Column<named_struct(assetid, assetid, incident_date, incident_date)[0]> and incident_date <= Column<named_struct(assetid, assetid, incident_date, incident_date)[1]>\n-------------------------------------------------------------------------------------------------------------------------------------------------^^^\n"
Traceback (most recent call last):
  File "<stdin>", line 3, in get_max_date
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 744, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.tar

In [10]:
# df_tdm = loading_tdm(config, spark) # Preprocess TDM data with aggregations
# df_tdm = agg_tdm_per_day(df_tdm, config)
# df_tdm.write.parquet("hdfs://nameservicedev1//user/tsystems_kkeshore/sample_data_tdm.parquet", mode="overwrite")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
df_tdm = spark.read.parquet("hdfs://nameservicedev1//user/tsystems_kkeshore/sample_data_tdm.parquet")
df = spark.read.parquet("hdfs://nameservicedev1//user/tsystems_vkumar/dsl_tickets/dsl_tkts_all_lvls_uniq_n.parquet")
df_key = create_windowing_per_assetid(df, config)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
df_tdm.join(df_key, on = [df_tdm['']])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+------------+----------------+-------------------+-------------------+-------------------+---------------------+----------------------+----------------------+----------------------+----------------------+--------------------+--------------------+-----------------+-------------+--------------------+--------------------+------------------------+----------------+-------------------+-------------------+-------------------+---------------------+----------------------+----------------------+----------------------+----------------------+--------------------+--------------------+-----------------+-------------+------------------+--------------------+------------------------+---------------------+-----------------+----------------------+------------------+-----------------------+-------------------+---------------------+-----------------+----------------------+------------------+-----------------------+-------------------+
|lb_networkdeviceid|lb_eventdate|lb_transmitpower|lb

In [7]:
df_key.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+----------+
| assetid|      date|
+--------+----------+
|81641693|2020-01-06|
|81641693|2020-01-07|
|81641693|2020-01-08|
|81641693|2020-01-09|
|81641693|2020-01-10|
|81641693|2020-01-11|
|81641693|2020-01-12|
|81641693|2020-01-13|
|81641693|2020-01-14|
|81641693|2020-01-15|
|81641693|2020-01-16|
|81641693|2020-01-17|
|81641693|2020-01-18|
|81641693|2020-01-19|
|81641693|2020-01-20|
|81641693|2020-01-21|
|81641693|2020-01-22|
|81641693|2020-01-23|
|81641693|2020-01-24|
|81641693|2020-01-25|
+--------+----------+
only showing top 20 rows

In [7]:
data = spark.read.parquet(config["path"]["datagathering_output"])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
data.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+----------+-----------+--------------------+---------------+------------------+---------------------------------+--------------------+--------------------+------------------------+------------------------+------------------+------------------+------------------------+------------------------+----------------+----------------+---------------------+---------------------+---------------+---------------+---------------+---------------+---------------+----------------+----------------+----------------+----------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+------------------+------------------+-----------+--------------------+------------------+--------------+-----------------+--------------------+----------------------+----------------------+-------------------------------+-------------------------+--------------------------+-------------------------+-----------------------+-------------------+----------------

In [6]:
df = spark.read.parquet("hdfs://nameservicedev1//user/tsystems_vkumar/dsl_tickets/dsl_tkts_all_lvls_uniq_n.parquet")
#df = df[df['incident_date'] > '2021-03-01'].limit(5)
df_dslam = data_import('cdl_blos', 'pol_day_aggregation', spark) #per day
df_snmp = data_import('cdl_blos', 'snmp_traps_agg', spark) #per day
df_acs = flatten_df(data_import('cdl_acscoll', 'cdl_acscoll_xdsl_line_prq', spark))# cat, mm, counter/delta
#df_tdm = loading_tdm(config, spark) # Preprocess TDM data with aggregations
df_dslam = rename_cols(df_dslam, 'dslam_')
df_snmp = rename_cols(df_snmp, 'snmp_')
df_acs = rename_cols(df_acs, 'acs_')
df_dslam = change_data_type(df_dslam, cols_to_convert=config["data_gathering"]["features"]["dslam"]["measurements"] + config["data_gathering"]["features"]["dslam"]["counters"], datatype=types.LongType())
df_snmp = change_data_type(df_snmp, cols_to_convert=config["data_gathering"]["features"]["snmp"]["measurements"] + config["data_gathering"]["features"]["snmp"]["counters"], datatype=types.LongType())
df_acs = change_data_type(df_acs, cols_to_convert=config["data_gathering"]["features"]["acs"]["measurements"] + config["data_gathering"]["features"]["acs"]["counters"], datatype=types.LongType())
df_acs = agg_acs_per_day(df_acs, config)
df_dslam = agg_dslam_per_day(df_dslam, config)
df_snmp = agg_snmp_per_day(df_snmp, config)     
#df_tdm = agg_tdm_per_day(df_tdm, config)
df_dslam =df_dslam.na.drop(subset=['dslam_assetid','dslam_datum'])
df_snmp =df_snmp.na.drop(subset=['snmp_assetid','snmp_datum'])
df_acs = df_acs.na.drop(subset=['acs_crm_assetId','acs_datum'])
df_dslam = df_dslam.dropDuplicates(['dslam_assetid','dslam_datum'])
df_snmp = df_snmp.dropDuplicates(['snmp_assetid','snmp_datum'])
df_acs = df_acs.dropDuplicates(['acs_crm_assetId','acs_datum'])
df_key = create_windowing_per_assetid(df, config)
df = df_key.join(df_dslam, [(df_key.assetid == df_dslam.dslam_assetid) & (df_key.date == df_dslam.dslam_datum)], how='left')
df = df.join(df_snmp, [(df.assetid == df_snmp.snmp_assetid) & (df.date == df_snmp.snmp_datum)], how='left')
df = df.join(df_acs, [(df.assetid == df_acs.acs_crm_assetId) & (df.date == df_acs.acs_datum)], how='left')
#df = df.join(df_tdm, ((df.assetid == df_tdm.lb_networkdeviceid) & (df.date == df_tdm.lb_eventdate)), how='left')
df.write.parquet("hdfs://nameservicedev1//user/tsystems_kkeshore/sample_data.parquet", mode="overwrite")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
data = spark.read.parquet("hdfs://nameservicedev1//user/tsystems_kkeshore/sample_data.parquet")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
data.coalesce(1).write.parquet("hdfs://nameservicedev1//user/tsystems_kkeshore/sample_data_coalesced.parquet")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
data[data['assetid'] == 31102439].show(100)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+----------+-------------+-----------+------------+---------------+-----------+---------------------------------+--------------------+--------------------+------------------------+------------------------+------------------+------------------+------------------------+------------------------+----------------+----------------+---------------------+---------------------+---------------+---------------+---------------+---------------+---------------+----------------+----------------+----------------+----------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+------------------+------------------+-----------+------------+----------+-----------+------------------+--------------+-----------------+---------------+---------+--------------------+----------------------+----------------------+-------------------------------+-------------------------+--------------------------+-------------------------+--------------------

In [21]:
data[data['dslam_assetid'].isNull()].groupby('assetid').count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+-----+
|  assetid|count|
+---------+-----+
|104806498|   15|
|107119604|   22|
|108014473|   33|
|109440723|   15|
|111299703|   26|
| 18173458|   17|
| 27370063|   33|
| 31102439|   33|
| 33447491|   21|
| 35792484|   13|
| 37050266|   18|
| 37367702|    9|
| 43346596|   27|
| 46345615|   33|
| 46625676|    1|
| 47087068|   17|
| 48647794|   11|
| 48705979|   23|
| 48979630|   15|
| 54987910|   21|
+---------+-----+
only showing top 20 rows

In [26]:
df_tickets = spark.read.parquet("hdfs://nameservicedev1//user/tsystems_vkumar/dsl_tickets/dsl_tkts_all_lvls_uniq_n.parquet")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
df_tickets[df_tickets['assetid'] == 31102439].show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-------------+-------------------+-----+
| assetid|incident_date|              label|level|
+--------+-------------+-------------------+-----+
|31102439|   2020-05-04|DSL synchronization|    3|
+--------+-------------+-------------------+-----+

In [36]:
def data_import(database, table, spark):
    df = spark.sql("select * from {0}.{1}".format(database, table))
    return df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
df = spark.read.parquet("hdfs://nameservicedev1//user/tsystems_vkumar/dsl_tickets/dsl_tkts_all_lvls_uniq_n.parquet")
print("Total distinct AssetIds: {}".format(df[['assetid']].distinct().count()))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total distinct AssetIds: 107315

In [38]:
df_dslam = data_import('cdl_blos', 'pol_day_aggregation', spark)
df.join(df_dslam, on = ['assetid'], how = 'inner')[['assetid']].distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

107315

In [39]:
df_snmp = data_import('cdl_blos', 'snmp_traps_agg', spark) #per day
df.join(df_snmp, on = ['assetid'], how = 'inner')[['assetid']].distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

107285

In [40]:
df_acs = flatten_df(data_import('cdl_acscoll', 'cdl_acscoll_xdsl_line_prq', spark))
df.join(df_acs, on = (df_acs['crm_assetId'] == df['assetid']), how = 'inner')[['assetid']].distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

104887

In [17]:
data[['assetid']].distinct().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+
| assetid|
+--------+
|33442319|
|48843620|
|48954125|
|33445142|
| 2883935|
+--------+

In [6]:
def gather_data (df, config, spark):
    """This Funtion gather the relevant information from HDFS in the relevant time frame.
    
    Args:
        df (dataframe): Output from pre filtering.
        config (dict): Configuration file.
        spark (obj): Spark session object.
        
    Returns:
        dataframe :  spark dataframe with loaded data.
    
    """
    # loading data
    df_dslam = data_import('cdl_blos', 'pol_day_aggregation', spark) #per day
    df_snmp = data_import('cdl_blos', 'snmp_traps_agg', spark) #per day
    df_acs = data_import('cdl_acscoll', 'cdl_acscoll_xdsl_line_prq', spark)# cat, mm, counter/delta
    df_tdm = loading_tdm(config, spark) # Preprocess TDM data with aggregations
    
    #flattening acs
    df_acs = flatten_df(df_acs)
    #Renaming for conditional join
    df_dslam = rename_cols(df_dslam, 'dslam_')
    df_snmp = rename_cols(df_snmp, 'snmp_')
    df_acs = rename_cols(df_acs, 'acs_')
    #change data type 
    df_dslam = change_data_type(df_dslam, cols_to_convert=config["data_gathering"]["features"]["dslam"]["measurements"] + config["data_gathering"]["features"]["dslam"]["counters"], datatype=types.LongType())
    df_snmp = change_data_type(df_snmp, cols_to_convert=config["data_gathering"]["features"]["snmp"]["measurements"] + config["data_gathering"]["features"]["snmp"]["counters"], datatype=types.LongType())
    df_acs = change_data_type(df_acs, cols_to_convert=config["data_gathering"]["features"]["acs"]["measurements"] + config["data_gathering"]["features"]["acs"]["counters"], datatype=types.LongType())
    #aggregating all tables fields
    df_acs = agg_acs_per_day(df_acs, config)
    df_dslam = agg_dslam_per_day(df_dslam, config)
    df_snmp = agg_snmp_per_day(df_snmp, config)     
    df_tdm = agg_tdm_per_day(df_tdm, config) # Lets aggregate the data in TDM table per day 
    
    
    #dropping missing keys
    df_dslam =df_dslam.na.drop(subset=['dslam_assetid','dslam_datum'])
    df_snmp =df_snmp.na.drop(subset=['snmp_assetid','snmp_datum'])
    df_acs = df_acs.na.drop(subset=['acs_crm_assetId','acs_datum'])
    
    # Drop multiple samples per day
    df_dslam = df_dslam.dropDuplicates(['dslam_assetid','dslam_datum'])
    df_snmp = df_snmp.dropDuplicates(['snmp_assetid','snmp_datum'])
    df_acs = df_acs.dropDuplicates(['acs_crm_assetId','acs_datum'])
    
    # create key
    df_key = create_windowing_per_assetid(df, config)
    
    #Join
    df = df_key.join(df_dslam, [(df_key.assetid == df_dslam.dslam_assetid) & (df_key.date == df_dslam.dslam_datum)], how='left')
    df = df.join(df_snmp, [(df.assetid == df_snmp.snmp_assetid) & (df.date == df_snmp.snmp_datum)], how='left')
    df = df.join(df_acs, [(df.assetid == df_acs.acs_crm_assetId) & (df.date == df_acs.acs_datum)], how='left')
    df = df.join(df_tdm, ((df.assetid == df_tdm.lb_networkdeviceid) & (df.assetid == df_tdm.ub_networkdeviceid) & (df.date == df_tdm.lb_eventdate) & (df.date == df_tdm.ub_eventdate)), how='left')
    
    # We drop duplicate column and perform the renaming of columns
    df = df.drop('dslam_assetid', 'acs_crm_assetId','acs_datum','snmp_assetid', 'snmp_datum', 'lb_networkdeviceid', 'ub_networkdeviceid')
    df = df.withColumnRenamed('date','datum')
    df = df.withColumnRenamed('acs_crm_serialnumber', 'cpe_serial_number')
    
    #Sorting by date
    df = df.orderBy(['assetid', 'datum'], asc = True)
    #df = df.withColumn("time_index", counter from 1 to 32 per assetid)
    
    return df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
#def main(args):
#    """Main function"""
    # start Spark application and get Spark session, logger and config
spark = spark_init()
config = config

df = spark.read.parquet(config["path"]["prefiltering_output"])#.limit(5)
# log that main ETL job is starting
print('Starting Data gathering pipeline..')
df = gather_data (df, config, spark)
# log the success and terminate Spark application
print('Data gathering is finished and writing to HDFS..')
df.write.parquet(config["path"]["datagathering_output"], mode='overwrite')
print('Writing to HDFS is finished and spark session is closing..')
spark.stop()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Starting Data gathering pipeline..
Data gathering is finished and writing to HDFS..
Writing to HDFS is finished and spark session is closing..

In [6]:
spark.read.parquet(config["path"]["datagathering_output"]).coalesce(1).write.parquet(config["path"]["datagathering_output"]+"_coalesced")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# if __name__ == '__main__':
#     main(sys.argv)

The code failed because of a fatal error:
	Session 1426 unexpectedly reached final status 'killed'. See logs:
stdout: 

stderr: 
21/10/20 15:41:02 INFO yarn.Client: Requesting a new application from cluster with 8 NodeManagers
21/10/20 15:41:02 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (16384 MB per container)
21/10/20 15:41:02 INFO yarn.Client: Will allocate AM container, with 11264 MB memory including 1024 MB overhead
21/10/20 15:41:02 INFO yarn.Client: Setting up container launch context for our AM
21/10/20 15:41:02 INFO yarn.Client: Setting up the launch environment for our AM container
21/10/20 15:41:02 INFO yarn.Client: Preparing resources for our AM container
21/10/20 15:41:02 INFO yarn.Client: Uploading resource file:/opt/cloudera/parcels/UT_HALO_TS-1.4.4.1/bin/apache-livy-0.7.0-incubating-bin/rsc-jars/livy-rsc-0.7.0-incubating.jar -> hdfs://nameservicedev1/user/tsystems_kkeshore/.sparkStaging/applicatio