In [None]:
# %%time
import pandas as pd
import numpy as np
import re
import os

import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession

from pyspark.sql.functions import udf, struct, lit
from pyspark.sql.types import IntegerType, StringType

n_CPUs = os.cpu_count()

from pyspark.sql.functions import udf, expr, concat, col, split, lower
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame

import sparknlp
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *


overall_start_time = time.time()

spark = (SparkSession         
         .builder.master("local["+str(n_CPUs)+"]")
         .config("spark.driver.cores", str(n_CPUs))
         .config("spark.driver.memory", "58g")
         .config("spark.executor.memory","6g")
         .config("spark.sql.execution.arrow.enabled", "true")
         .config("spark.memory.offHeap.enabled","true")
         .config("spark.memory.offHeap.size","10g")
         .config("spark.python.worker.memory","4g")
         .config("spark.driver.maxResultSize","8g")
         .config("spark.jars.packages", "JohnSnowLabs:spark-nlp:2.1.0")
         .config("spark.jars.packages", "JohnSnowLabs:spark-nlp:2.1.0,com.johnsnowlabs.nlp:spark-nlp-ocr_2.11:2.1.0")
         .config("spark.jars.repositories","http://repo.spring.io/plugins-release/")
         .getOrCreate())


#open the spark webUI
import webbrowser
webbrowser.open(spark.sparkContext.uiWebUrl)

## Load Data

In [None]:
from pyspark.sql.functions import regexp_replace,concat_ws, lower, when
spark.conf.set("spark.sql.shuffle.partitions",str(n_CPUs))

All_Failure_Data = spark.read.parquet('Source_Data/All_Failure_Data')
# All_Failure_Data.printSchema()

## Text Preprocessing pipeline

In [None]:
 %%time
if refit_preprocessor != 'n' and refit_preprocessor != 'no':
    print('re-fitting nlp preprocessing pipeline.....')
    
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import RegexTokenizer, NGram
    from pyspark.ml.feature import HashingTF, Tokenizer
    from sparknlp.annotator import *
    from sparknlp.common import *
    from sparknlp.base import *

    from sparknlp.annotator import Tokenizer as nlp_Tokenizer
    f = open("user_input_files/token_exceptions.txt","r",encoding="utf8")
    token_exceptions = f.read()
    f.close()
    token_exceptions = [t for t in token_exceptions.split('\n') if t]
        
    documentAssembler = DocumentAssembler()\
      .setInputCol("orig_value")\
      .setOutputCol("document")

    ## Tokenizer: Word tokens ##
    # Identifies tokens with tokenization open standards. A few rules will help customizing it if defaults do not fit user needs.
    #https://nlp.johnsnowlabs.com/components.html#Tokenizer
    tokenizer = (Tokenizer() 
                 .setInputCols(["document"]) 
                 .setOutputCol("token") 
                 .setExceptions(token_exceptions)
                 .setCaseSensitiveExceptions(False)
                 .setTargetPattern("\\w+")
                 #each group in each of the infix patterns below will become its own token. those groups starting with (?:...) are non-capturing and are dropped.  
                 .setInfixPatterns(["([\\w]*)([0-9_]*)(?<![\\d_])([0-9]{3}_[0-9]{1})(?:[0]{2}[0-9]{1})(?!\\d{0,4}_)(?:[0-9_]*)([\\w]*)",#split words and 7 digit error codes ending with three zeros. Truncate zeros
                                    "([\\w]*)([0-9_]*)(?<![\\d_])([0-9]{3}_[1-9]{2})(?:[0]{1}[0-9]{1})(?!\\d{0,4}_)(?:[0-9_]*)([\\w]*)",#split words and 7 digit error codes ending with two zeros. Truncate zeros
                                    "([\\w]*)([0-9_]*)(?<![\\d_])([0-9]{3}_[1-9]{1,2})(?:[0]{1,2})(?!\\d{1,4}|_)(?:[0-9_]*)([\\w]*)", #capture and truncate codes like 242_200 -> 242_2, 357_10 -> 357_1 or 133_680 -> 133_68
                                    "([\\w]*)([0-9_]*)(?<![\\d_])([0-9]{3}_[0-9]{3})(?!\\d{0,4}_)(?:[0-9_]*)([\\w]*)",#split words and 7 digit error codes. truncate the error code to 6 digits. 
                                    "(\\b[A-Za-z]{1}[0-9]{1,6}\\b)",#error codes like c123, r56, etc. 
                                    "([A-Za-z]*)([0-9_]{2,20})([A-Za-z]*)", #split general word and number combinations
#                                     "([\\$#]?\\d+(?:[^\\s\\d]{1}\\d+)*)", # Money, Phone number and dates -> http://rubular.com/r/ihCCgJiX4e
#                                     "((?:\\p{L}\\.)+)", # Abbreviations -> http://rubular.com/r/nMf3n0axfQ
#                                     "(\\p{L}+)(n't\\b)", # Weren't -> http://rubular.com/r/coeYJFt8eM
#                                     "(\\p{L}+)('{1}\\p{L}+)", # I'll -> http://rubular.com/r/N84PYwYjQp
#                                     "((?:\\p{L}+[^\\s\\p{L}]{1})+\\p{L}+)", # foo-bar -> http://rubular.com/r/cjit4R6uWd
#                                     "([\\p{L}\\w]+)" # basic word token
                                   ]) 
                )

    ## Normalizer: Text cleaning ##
    # Removes all dirty characters from text following a regex pattern and transforms words based on a provided dictionary
    #https://nlp.johnsnowlabs.com/components.html#Normalizer
    normalizer = (Normalizer() 
                  .setInputCols(["token"]) 
                  .setOutputCol("normalized")
                  .setLowercase(True)
                  ._set(cleanupPatterns=['[^A-Za-z0-9\\\/\-_:]']) #Regular expressions list for normalization, defaults [^A-Za-z])
    #               .setCleanupPatterns()  #BUG!!! - This functionality does not work. Workaround is to use '_set(cleanupPatterns="[^A-Za-z]")'
                  .setSlangDictionary('user_input_files/BD_Slang_Dictionary.csv', delimiter = ',')#: path to custom word mapping for spell checking. e.g. gr8 -> great. Uses provided delimiter, readAs LINE_BY_LINE or SPARK_DATASET with options passed to reader if the latter.
                 )



#     #########################################
#     ### Build Autocorrect Training Corpus ####
#     ### In order to train a Norvig or Symmetric Spell Checkers, we need to get corpus data as a spark dataframe. We can read a plain text file and transforms it to a spark dataset. ###

#     from sparknlp.ocr import OcrHelper
#     from pyspark.sql.functions import rand
#     ocr = OcrHelper()
#     spell_data_ocr = ocr.createDataset(spark, "pdf_text_sources/").withColumnRenamed("value","text")

#     #use the brown corpus of scientific texts to train the autocorrect
#     from nltk.corpus import brown
#     spell_data_brown = spark.createDataFrame([' '.join(i) for i in brown.sents(categories=['learned','government','news','hobbies'])],StringType()).withColumnRenamed("value","text")

#     #training set of the top 500 uncorrected words from the last model run. Each row has all 500 words. There are n number of rows defined below
#     spell_data_seed_top_BD_words = spark.read.text('misc_input_files/autocorrect_train_seed_top_BD_words.txt').withColumnRenamed("value", "text")

#     # combine the lvp text, brown_corpus, and  data into our training set. 
#     spell_data = spell_data_ocr.select('text').union(spell_data_brown).union(spell_data_seed_top_BD_words).orderBy(rand())

#     documentAssembler_spl_chk = DocumentAssembler()\
#       .setInputCol("text")\
#       .setOutputCol("document")


#     spell_check_training_data_pipeline = Pipeline(stages=[documentAssembler_spl_chk,tokenizer,normalizer])
#     spell_check_train_fit = spell_check_training_data_pipeline.fit(spell_data)
#     spell_check_train_data = (spell_check_train_fit
#                               .transform(spell_data)
#                               .select('normalized')
#                              )
# #     spell_check_train_data.write.parquet("misc_input_files/spell_check_train_data", mode = "overwrite")
# #     spell_check_train_data.show(10,truncate=150)

#     ### End Build Autocorrect Training Corpus ####
#     ##############################################
    

    from pyspark.ml.feature import SQLTransformer
    from pyspark.ml import Pipeline, PipelineModel

    # SpellChecker: Norvig algorithm ##
    # This annotator retrieves tokens and makes corrections automatically if not found on an english dictionary
    # https://nlp.johnsnowlabs.com/components.html#SpellChecker
    spell_check_train_data = spark.read.parquet("misc_input_files/spell_check_train_data")
    spell_checker = (NorvigSweetingApproach() 
                     .setInputCols(["normalized"]) 
                     .setOutputCol("autocorrected")
                     .setDictionary('misc_input_files/BD_Dictionary.txt',token_pattern="\w+", read_as=ReadAs.LINE_BY_LINE)
#                      .setFrequencyPriority(True)
                     .fit(spell_check_train_data)
                    )


    # # Lemmatizer: Lemmas ##
    # Retrieves lemmas out of words with the objective of returning a base dictionary word
    # https://nlp.johnsnowlabs.com/components.html#Lemmatizer
    # from sparknlp.annotator import LemmatizerModel
    lemmatizer = (Lemmatizer()
    #               .pretrained()
                  .setInputCols(["autocorrected"]) 
                  .setOutputCol("lemmas")
                  .setDictionary("user_input_files/BD_lemmas.txt", key_delimiter="->", value_delimiter="\t"))

    custom_stop_words = ['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', "you're", "you've", "you'll", "you'd", 'your', 'yours'
                 , 'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she', "she's", 'her', 'hers', 'herself', 'it', "it's", 'its', 'itself', 'they'
                 , 'them', 'their', 'theirs', 'themselves', 'what', 'which', 'who', 'whom', 'this', 'that', "that'll", 'these', 'those', 'am', 'is', 'are'
                 , 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does', 'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if'
                 , 'or', 'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', 'with', 'about', 'against', 'between', 'into', 'through', 'during'
                 , 'before', 'after', 'to', 'from', 'up', 'down', 'out', 'on', 'off', 'over', 'under', 'again', 'further', 'then'
                 , 'once', 'here', 'there', 'when', 'where', 'why', 'how', 'all', 'any', 'both', 'each', 'few', 'more', 'most', 'other', 'some', 'such'
                 , 'no', 'nor', 'not', 'only', 'own', 'same', 'so', 'than', 'too', 'very', 's', 't', 'can', 'will', 'just', 'don', "don't", 'should', "should've"
                 , 'now', 'd', 'll', 'm', 'o', 're', 've', 'y', 'ain', 'aren', "aren't", 'couldn', "couldn't", 'didn', "didn't", 'doesn', "doesn't", 'hadn'
                 , "hadn't", 'hasn', "hasn't", 'haven', "haven't", 'isn', "isn't", 'ma', 'mightn', "mightn't", 'mustn', "mustn't"
                 , 'needn', "needn't", 'shan', "shan't", 'shouldn', "shouldn't", 'wasn', "wasn't", 'weren', "weren't", 'won', "won't", 'wouldn', "wouldn't"
                 , "nan","na",'xxx','due', "dont", "shouldve", "arent", "couldnt", "didnt", "doesnt", "hadnt", "hasnt", "havent", "isnt", "shouldnt", "wasnt", "werent","wont", "wouldnt", "1","2","3","4","5","6","7","8","9","10"]

    # stop_words_remover = StopWordsRemover(inputCol="lemmas", outputCol="lemmas_no_sw",stopWords=custom_stop_words)
    stop_words_remover = (StopWordsCleaner() 
            .setInputCols(["lemmas"]) 
            .setOutputCol("lemmas_no_sw") 
            .setCaseSensitive(False) 
            .setStopWords(custom_stop_words)
                         )


    # Stemmer: Hard stems ##
    #Returns hard-stems out of words with the objective of retrieving the meaningful part of the word
    #https://nlp.johnsnowlabs.com/components.html#Stemmer
    stemmer = (Stemmer() 
      .setInputCols("lemmas_no_sw") 
      .setOutputCol("stem_no_sw")
              )


    #finisher to remove meta data 
    finisher = Finisher() \
        .setInputCols(["token","normalized","autocorrected","lemmas","lemmas_no_sw","stem_no_sw"]) \
        .setOutputCols(["token","normalized","autocorrected","lemmas","lemmas_no_sw","stem_no_sw"]) \
        .setIncludeMetadata(False)


    #this part is really slow for some reason. would it be faster if we did it on the un-finished cols?
    bigrams = NGram(n=2, inputCol="lemmas", outputCol="bigrams")
    trigrams = NGram(n=3, inputCol="lemmas", outputCol="trigrams")

    ml_bigrams = NGram(n=2, inputCol="stem_no_sw", outputCol="ml_bigrams")

    #concatenate the token columns for tfidf. 
    token_and_ngram_assembler = SQLTransformer(statement="SELECT *, concat(stem_no_sw,ml_bigrams) as all_tokens_and_ngrams FROM __THIS__")

    #set sequence of preprocessing pipeline
    preprocess_pipeline = Pipeline(stages=[documentAssembler,tokenizer,normalizer,spell_checker,lemmatizer,stop_words_remover,stemmer,finisher,bigrams,trigrams,ml_bigrams,token_and_ngram_assembler])

#     Fit the pipeline to training documents.
    preprocess_pipeline_fit = preprocess_pipeline.fit(distinct_texts_for_preprocessing)
    preprocess_pipeline_fit.write().overwrite().save(os.getcwd()+"/model_data/inputs/preprocess_pipeline/fitted")
else:
    preprocess_pipeline_fit = PipelineModel.load("file:///"+os.getcwd()+"/model_data/inputs/preprocess_pipeline/fitted")
    
tokenized_model_input = preprocess_pipeline_fit.transform(distinct_texts_for_preprocessing)

tokenized_model_input.cache()

## Get counts of how many unique n-grams there are for each field
This will give us an idea of scale and where we can reduce complexity

In [None]:
# %%time
from pyspark.sql.functions import count, max,when, udf
from pyspark.sql.types import *
create_spark_model_vec_start_time = time.time()

null_array = udf(lambda z: ["blank"], ArrayType(StringType()))


min_token_count = 3

dict_n_features = {}
min_token_count_dict = {'Failure_all_ngrams': 2, 'FailureDescription_all_ngrams': 3, 'ObservedSymptom_all_ngrams': 3, 'Operation_Short_Text_all_ngrams': 3, 'Service_Order_Text_all_ngrams': 5, 'parts_list': 3, 'ReferenceDesignator_all_ngrams': 3
                        , 'Other_Subsystem_Keys':3,'RootCause_all_ngrams':5, 'Investigation_Conclusion_all_ngrams':5, 'Investigation_Summary_all_ngrams':5
                        ,'Concat_Complaint_Text_all_ngrams':5,'Concat_Failure_Text_all_ngrams':2}

max_token_pct_dict = {'Failure_all_ngrams': .05, 'FailureDescription_all_ngrams': .05, 'ObservedSymptom_all_ngrams': .05, 'Operation_Short_Text_all_ngrams': .05, 'Service_Order_Text_all_ngrams': .05
                        , 'parts_list': .25, 'ReferenceDesignator_all_ngrams': .25, 'Other_Subsystem_Keys':.8, 'RootCause_all_ngrams':1, 'Investigation_Conclusion_all_ngrams':1
                      , 'Investigation_Summary_all_ngrams':1,'Concat_Complaint_Text_all_ngrams':.15,'Concat_Failure_Text_all_ngrams':.25}


text_cols_for_distinct_counts = ['parts_list','ReferenceDesignator_all_ngrams','Concat_Complaint_Text_all_ngrams','Concat_Failure_Text_all_ngrams','Other_Subsystem_Keys']


for column_name in text_cols_for_distinct_counts:
    n_features = model_input_df.selectExpr(f"explode({column_name}) as value").groupBy('value').count().filter(f"count >={min_token_count_dict[column_name]} and count <= {num_failures*max_token_pct_dict[column_name]}").count()
    dict_n_features.update({column_name : n_features})


id_cols_for_distinct_counts = ['System_key','Subsystem_key']
for column_name in id_cols_for_distinct_counts:
    n_features = (model_input_df
                  .selectExpr(f"{column_name} as x")
                  .withColumn('x', col('x').cast(IntegerType()))
                  .agg({"x": "max"})
                  .collect()[0]["max(x)"])
    dict_n_features.update({column_name : n_features})
    
print(dict_n_features)

## Begin vectorization of data
TFIDF and OneHotEncoderEstimator

In [None]:
# %%time
from pyspark.ml import Pipeline
from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.ml.feature import HashingTF, IDF,VectorAssembler,OneHotEncoderEstimator, Normalizer, PCA,StandardScaler,CountVectorizer
from pyspark.mllib.linalg import SparseVector

# text_cols = ['Failure', 'FailureDescription','ObservedSymptom','Operation_Short_Text','Service_Order_Text','parts_list','ReferenceDesignator']
text_cols = ['parts_list','ReferenceDesignator_all_ngrams','Concat_Complaint_Text_all_ngrams','Concat_Failure_Text_all_ngrams','Other_Subsystem_Keys']

n_features = sum([dict_n_features[colname] for colname in text_cols+id_cols_for_distinct_counts])
n_classes = model_input_df.selectExpr("Resultant_Category as value").filter('value is not null').groupBy('value').count().count()
print(n_features)
print(n_classes)

vectorized_model_input_df = model_input_df
# vectorized_model_input_df.cache()

# for column in text_cols:
#     tf = HashingTF(numFeatures=dict_n_features[column], inputCol=column, outputCol=column+"_TFOut")#.setInputCol(column).setOutputCol(column+"_TFOut")
#     idf = IDF(minDocFreq = min_token_count_dict[column], inputCol = column+"_TFOut",outputCol = column+"_TFIDF")#.setInputCol(column+"_TFOut").setOutputCol(column+"_IDFOut")
#     tf_fit = tf.transform(vectorized_model_input_df)
#     vectorized_model_input_df = idf.fit(tf_fit).transform(tf_fit).drop(column+"_TFOut")

# max_pct_documents = .25

for column in text_cols:    
    # fit a CountVectorizerModel from the corpus.
    cv = CountVectorizer(inputCol=column, outputCol=column+"_CV", vocabSize=dict_n_features[column], minDF=min_token_count_dict[column],maxDF=max_token_pct_dict[column], binary = False)
    vectorized_model_input_df = cv.fit(vectorized_model_input_df).transform(vectorized_model_input_df)
    
# vectorized_model_input_df.select(*[column+"_TFIDF" for column in text_cols]).show(100,truncate = 20)
encoder = OneHotEncoderEstimator(inputCols=['System_key','Subsystem_key'],
                                 outputCols=["system_vector", "subsystem_vector"])

vectorized_model_input_df = (vectorized_model_input_df
                             .withColumn('System_key', col('System_key').cast(IntegerType()))
                             .withColumn('Subsystem_key', col('Subsystem_key').cast(IntegerType()))
                          )
vectorized_model_input_df = encoder.fit(vectorized_model_input_df).transform(vectorized_model_input_df)

# vectorized_model_input_df.cache()
# vectorized_model_input_df.count()
# vectorized_model_input_df.write.parquet("model_data/inputs/vectorized_model_input_df", mode = "overwrite")

## Convert text to vectors via Word2Vec

In [None]:
from pyspark.ml.feature import Word2Vec
vectorized_model_input_df.cache()

complaint_w2v = Word2Vec(vectorSize=200, minCount=6, numPartitions=32, inputCol="Concat_Complaint_Text_unigrams", outputCol="Complaint_doc_vec", windowSize=5, seed = 1)
failure_w2v = Word2Vec(vectorSize=100, minCount=3, numPartitions=32, inputCol="Concat_Failure_Text_unigrams", outputCol="Failure_doc_vec", windowSize=3, seed = 1)

mdl_complaint_w2v = complaint_w2v.fit(vectorized_model_input_df)

vectorized_model_input_df = mdl_complaint_w2v.transform(vectorized_model_input_df)
vectorized_model_input_df.cache()

mdl_failure_w2v = failure_w2v.fit(vectorized_model_input_df)

vectorized_model_input_df = mdl_failure_w2v.transform(vectorized_model_input_df)

## Combine all the vectors into one for each failure

In [None]:
# %%time
from pyspark.ml.feature import HashingTF, IDF,VectorAssembler,OneHotEncoderEstimator, Normalizer,StandardScaler

assembler = VectorAssembler(
    inputCols=[column+"_CV" for column in text_cols]+["system_vector", "subsystem_vector"] + ['Complaint_doc_vec','Failure_doc_vec'],
    outputCol="features")

normalizer = Normalizer(inputCol="features", outputCol="norm_features")

vectorized_model_input_df = assembler.transform(vectorized_model_input_df)
vectorized_model_input_df = normalizer.transform(vectorized_model_input_df)

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="Resultant_Category", outputCol="label",handleInvalid ="keep",stringOrderType = 'alphabetAsc')

vectorized_model_input_df = indexer.fit(vectorized_model_input_df).transform(vectorized_model_input_df
                                                                            )
vectorized_model_input_df.cache()
vectorized_model_input_df.count()

vectorized_model_input_df.limit(10).toPandas().head()