In [0]:
dbutils.widgets.text("tablename", "tablename")
tablename= dbutils.widgets.get('tablename')

# Libs

In [0]:
import pandas as pd
import numpy as np
import nltk
from nltk.tokenize import word_tokenize
import pickle
from joblib import Parallel, delayed
import multiprocessing
from tqdm import tqdm
from nltk.corpus import wordnet, stopwords
from warnings import filterwarnings
import mlflow
filterwarnings('ignore')

# Dowload nltk

In [0]:
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('words')
nltk.download('averaged_perceptron_tagger')

# Load  Data

In [0]:
# Loading Data
data = spark.sql(f"select * from {tablename}").toPandas()

# replacement dictionary generated from EDA
with open('replace_dict.pkl', 'rb') as f:
  replacement_dict = pickle.load(f)

In [0]:
data.head()

# Functions

In [0]:
def replace_shortwords(replacement_dict,text):
  """function to replace short words to proper words using a replacement dictionary eg. awsm to awesome
  replacement_dict : dict : dictionary created in EDA process
  text : string : text which need to be treated"""
  word_list= word_tokenize(text)
  for r in replacement_dict.keys():
      word_list=list(pd.Series(word_list).replace(r,replacement_dict[r]))
  return " ".join(word_list)

stopwords_list = stopwords.words('english')
def ReviewProcessing(df):
  """function to remove stopwords
  df : pd.DataFrame : Dataframe which contains 'combined_cleaned' column"""
  # remove non alphanumeric
  df['text_cleaned'] = df.text_cleaned.str.replace('[^a-zA-Z0-9 ]', '')
  # lowercase
  df.text_cleaned = df.text_cleaned.str.lower()
  # split into list
  df.text_cleaned = df.text_cleaned.str.split(' ')
  # remove stopwords
  df.text_cleaned = df.text_cleaned.apply(lambda x: [item for item in x if item not in stopwords_list])
  return df

def get_wordnet_pos(word):
  """function to perform Lemmatization on text
  word : string : word in a paragraph"""
  tag = nltk.pos_tag([word])[0][1][0].upper()
  tag_dict = {"J": wordnet.ADJ,
                "N": wordnet.NOUN,
                "V": wordnet.VERB,
                "R": wordnet.ADV}
  return tag_dict.get(tag, wordnet.NOUN)

lemmatizer = nltk.stem.WordNetLemmatizer()

def get_lemmatize(sent):
  """function to perfrom Lemmatization on text
  sent : string : sentence"""
  return " ".join([lemmatizer.lemmatize(w, get_wordnet_pos(w)) for w in nltk.word_tokenize(sent)])

# Transformations

## Filtering and Cleaning

In [0]:
# Removing Null Records from Summary
data=data.dropna(subset='text')

## Replacing short words to proper words

In [0]:
# Replacing short words with proper words
data=data.reset_index(drop=True)
result = Parallel(verbose = 0, n_jobs=multiprocessing.cpu_count())(delayed(replace_shortwords)(replacement_dict=replacement_dict,text=x) for x in tqdm(data['text']))
data['text_cleaned']=pd.Series(result)

## Lemmatization

In [0]:
clean_data = ReviewProcessing(data)
clean_data.text_cleaned = clean_data.text_cleaned.apply(' '.join)
clean_data['text_cleaned_lemmatized'] = clean_data.text_cleaned.apply(get_lemmatize)
clean_data=clean_data.drop_duplicates()

In [0]:
clean_data.head()

# Get Production Customer Sentiment Model

In [0]:
model = mlflow.sklearn.load_model('models:/Customer_Sentiment_Prediction/Production')

# Make Predictions

In [0]:
pred=model.predict(clean_data['text_cleaned_lemmatized'])
clean_data['prediction']=pred

# Save output to new table

In [0]:
clean_data.drop(['text_cleaned_lemmatized','text_cleaned'],axis=1,inplace=True)
sparkdf=spark.createDataFrame(clean_data)
try:
    spark.sql(f"drop table {tablename}_output")
except:
    pass
sparkdf.write.saveAsTable(f"{tablename}_output")