<a href="https://colab.research.google.com/github/BradenAnderson/Twitter-Sentiment-Analysis/blob/main/01_Data_Cleaning_Current_Spellcheck_Issue.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install contextualSpellCheck

In [None]:
import spacy
!python -m spacy download en_core_web_md

In [None]:
import pickle
import nltk
import string
import re
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

import en_core_web_md
import contextualSpellCheck

from spacy.lang.en.stop_words import STOP_WORDS
from spacy.symbols import ORTH
from spacy.tokenizer import _get_regex_pattern

from sklearn.model_selection import cross_val_score, GridSearchCV, RandomizedSearchCV, train_test_split
from sklearn.ensemble import RandomForestClassifier, ExtraTreesClassifier, BaggingClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import precision_score, recall_score, accuracy_score, SCORERS, multilabel_confusion_matrix, silhouette_score
from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import NearestNeighbors

In [None]:
pd.set_option('display.max_rows', 1000)

In [None]:
# Read in the dirty twitter data, store in a dataframe and display the first several rows. 

filename = '/content/drive/MyDrive/Programming/Colab Notebooks/Coding_Dojo/Twitter_Sentiment_Project/train_twitter_sentiment.csv'

tweet_df = pd.read_csv(filename, index_col=0)

emoji_file = '/content/drive/MyDrive/Programming/Colab Notebooks/Coding_Dojo/Twitter_Sentiment_Project/emoji_partial.csv'

emoji_df = pd.read_csv(emoji_file)

pd.set_option('display.max_rows', 1000)

tweet_df.head(10)

In [None]:
emoji_df.head()

# Part 1 - Replace Emojis

Emojis are replaced with text that describes the sentiment of the emoji if possible. If the sentiment for that particular emoji is unknown, the emoji is replaced with an empty string.

In [None]:
def codePointToLatinUnicode(code_point):

  output_string = ""

  for char in code_point: 

    if char != "U" and char != "+":

      output_string = output_string + char

  output = int(output_string, 16) # Covert the output string to 16 bit hex

  output = chr(output) 

  output = output.encode('utf-8').decode('latin-1')

  return output

In [None]:
emoji_df['Unicode'] = emoji_df['Codepoint'].apply(codePointToLatinUnicode)

emoji_df.head()

In [None]:
code_point_values = list(emoji_df.loc[: , 'Codepoint'].to_numpy())

latin_unicode_values = []

for code in code_point_values: 

  value = codePointToLatinUnicode(code)

  latin_unicode_values.append(value)

emoji_sentiments = list(emoji_df.loc[:, 'Sentiment'].to_numpy())
code_to_sentiment = zip(latin_unicode_values, emoji_sentiments)

emoji_map = {}

for index, mapping in enumerate(code_to_sentiment): 

  emoji_map[mapping[0]] = mapping[1]

In [None]:
# This global variable is used to generate a list of all emojis that I currently do not have a sentiment for.
global unknown_emoji_list
unknown_emoji_list = []

#----------------------------------------------------------------------------------------------------------------
# Uses the emoji_map dictionary to convert the emoji unicode representation to the desired sentiment.
# Some extra logic is included for handling situations where an emoji is not in the emoji_map dictionary
# or situations where multiple emojis are stuck together and therefore there codes run together as one.
#----------------------------------------------------------------------------------------------------------------

def getEmojiSentiment(emoji_code): 

  global unknown_emoji_list

  sentiment = ""

  if emoji_code in emoji_map.keys(): 
    sentiment = " " + emoji_map[emoji_code] + " "
    return sentiment
  
  else: 

    code = ""

    for char in emoji_code:

      code = code + char

      if code in emoji_map.keys():
        sentiment = " " + sentiment + " " + emoji_map[code] + " "
        code = ""


    if sentiment == "":

      unknown_emoji_list.append(emoji_code)

    return sentiment

In [None]:
def convertEmojiToSentiment(input_tweet): 

  building_emoji_code = False
  emoji_code = ""
  output_tweet = ""

  for char in input_tweet:  

    if ord(char) > 127 and building_emoji_code == False:          # If this character is the first character in a new emoji.

      building_emoji_code = True                                  # Indicate that we are now building an emoji code.
      emoji_code = emoji_code + char                              # Store the first character in the emoji code.
  
    elif ord(char) > 127 and building_emoji_code == True:         # If this character is part of an emoji, and we are already in the middle of building an emoji code.

      emoji_code = emoji_code + char                              # Continue building the emoji code, this will be used to get the emoji sentiment once the code is complete.

    elif ord(char) < 127 and building_emoji_code == True:         # If we were just building an emoji code, but this character is not part of an emoji.

      emoji_description = getEmojiSentiment(emoji_code)           # The emoji code is now complete, go get the sentiment for that emoji.

      output_tweet = output_tweet + emoji_description             # Add the emojis sentiment to the output tweet.

      emoji_code = ""                                             # Reset the emoji code and sentiment to prepare to store the next one.
      emoji_description = ""

      building_emoji_code = False                                 # Reset flag indicating we are no longer building an emoji. 

      output_tweet = output_tweet + char                          # This character was not part of an emoji, so pass it through to the output tweet.

    elif ord(char) < 127 and building_emoji_code == False:        # If this character is not part of an emoji, and we have not been building one. 

      output_tweet = output_tweet + char                          # This character was not part of an emoji, so pass it through to the output tweet.

  if len(emoji_code) != 0:                                        # This section handles a situation where an emoji code was the final character in a tweet.

    emoji_description = getEmojiSentiment(emoji_code)             # Go get the sentiment for this emoji.

    output_tweet = output_tweet + emoji_description               # Add the emojis sentiment to the output tweet.

  return output_tweet

In [None]:
# ------------------------------------------------------------------------------------------------------------------------------------
# Creates a new column in tweet_df where all emojis are removed. If a sentiment for that emojis exists in the emoji_map dictionary,
# then the sentiment is added in place of the emoji. If no sentiment exists, the emoji is replaced with an empty string.
# ------------------------------------------------------------------------------------------------------------------------------------
tweet_df['tweet_emoji_cleaned'] = tweet_df['tweet'].apply(convertEmojiToSentiment)

tweet_df.head()

In [None]:
unknown_emojis = list(set(unknown_emoji_list))

unlisted_emoji_dict = {'Code_From_Tweet' : [], 'Code_Converted' : []}

for emoji in unknown_emojis:

  unlisted_emoji_dict['Code_From_Tweet'].append(emoji)

  try: 

    unlisted_emoji_dict['Code_Converted'].append(emoji.encode('latin-1').decode('utf-8'))

  except UnicodeDecodeError:

    unlisted_emoji_dict['Code_Converted'].append("Code could not be converted.")

emojis_no_sentiment_df = pd.DataFrame(unlisted_emoji_dict)

emojis_no_sentiment_df.head()

emojis_no_sentiment_df.to_csv(path_or_buf='/content/drive/MyDrive/Programming/Colab Notebooks/Coding_Dojo/Twitter_Sentiment_Project/unknown_emoji.csv')

# Part 2 - Setting up Spacy

In [None]:
# Importing the trained Spacy model. 
nlp = en_core_web_md.load()

contextualSpellCheck.add_to_pipe(nlp)

nlp.pipe_names

In [None]:
# Save off a single tweet to use when exploring tokenization behavior. 
first_tweet = tweet_df.loc[ tweet_df.index == 1, ['tweet_emoji_cleaned']]['tweet_emoji_cleaned'].to_numpy()

first_tweet = str(first_tweet[0])

first_tweet

In [None]:
#-------------------------------------------------------------------------------------------------------------------------------------------------------------
# Remove "#" from the list of default prefixes spacy looks for during tokenization. This will allow us to add a token_match regular
# expression that matches a hashtag and text (e.g. #hashTagsAreCool) as a single token. If '#' is not removed from the prefix list
# then hashtags will always be a separate token from the text, this is because prefixes are processed before token_match rules during 
# tokenization. Reference the spacy tokenization chart for a visual of how rules are processed: https://spacy.io/usage/linguistic-features#tokenization
#-------------------------------------------------------------------------------------------------------------------------------------------------------------

default_prefixes = list(nlp.Defaults.prefixes)                      # The default prefixes spacy will look for during tokenization
default_prefixes.remove('#')                                        # Remove hashtags for the default prefix list.
prefix_regex = spacy.util.compile_prefix_regex(default_prefixes)    # Update to use the new prefixes
nlp.tokenizer.prefix_search = prefix_regex.search

# Get the current regex that nlp is using for token matching.
nlp_token_matching_regex_pre_update = spacy.tokenizer._get_regex_pattern(nlp.tokenizer.token_match)

# Create a new regex that combines the current regex and a term that will treat hashtags as a single token. 
updated_token_matching_regex = f"({nlp_token_matching_regex_pre_update}|#\w+)"

# Update the token matching regex used by nlp with the regex created in the line above.
nlp.tokenizer.token_match = re.compile(updated_token_matching_regex).match

# This is kept as an example of how to use nlp.tokenizer.explain to debug the tokenizer
# tokenizer.explain tells you the rules being applied to get to the final result.
'''
tweet_doc = nlp.tokenizer.explain(first_tweet)   
for token in tweet_doc: 
  print(token)
'''

# Turn the first_tweet string into a spacy doc object (sequence of tokens).
tweet_doc = nlp(first_tweet)

# Commented out but kept as an example of displaying Spacys behavior when turning 
# strings into doc objects (tokens). This output can also be used to verify that
# the rule we created to keep hashtags as a single token worked as intended.
'''
print(f"Token\t\tLemma\t\tStopword")
print("="*40)
for token in tweet_doc:
    print(f"{token}\t\t{token.lemma_}\t\t{token.is_stop}")
''';

# Part 3 - Tweet Cleaning

In [None]:
#-----------------------------------------------------------------------------------------------------------------------------------------------------------
# This cell takes in a csv file that contains a list of common contractions, and their "expanded" counterparts. 
# The csv file is converted to a python dictionary, which will make it easy to lookup the expanded form of a word for any given contraction.
# This dictionary will be used to convert all contractions to their expanded forms during the tweet cleaning process.
#-----------------------------------------------------------------------------------------------------------------------------------------------------------

contraction_file = '/content/drive/MyDrive/Programming/Colab Notebooks/Coding_Dojo/Twitter_Sentiment_Project/contractions.csv'

contractions_df = pd.read_csv(contraction_file)

contractions = list(contractions_df['Contraction'].to_numpy())
expanded_words = list(contractions_df['Expanded_Word'].to_numpy())

contraction_to_expansion = zip(contractions, expanded_words)

contraction_map = {}

for index, mapping in enumerate(contraction_to_expansion):
  contraction_map[mapping[0].lower()] = mapping[1]

In [None]:
#-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
# "SMS language" is a term that refers to abbreviated or slang language that does not consist of proper dictionary words, but is nonetheless 
# commonly understood and widely used in digitial forms of communication. See wikipedia for more info on "SMS language" https://en.wikipedia.org/wiki/SMS_language
#
#
#-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

sms_speak_file = '/content/drive/MyDrive/Programming/Colab Notebooks/Coding_Dojo/Twitter_Sentiment_Project/sms_speak.csv'

sms_speak_df = pd.read_csv(sms_speak_file)

correct_wordings = list(sms_speak_df['Correct_Wording'].to_numpy())
sms_abbreviations = list(sms_speak_df['SMS_Wording'].to_numpy())

sms_and_correct_wordings = zip(sms_abbreviations, correct_wordings)

sms_map = {}

for index, mapping in enumerate(sms_and_correct_wordings):
  sms_map[str(mapping[0]).lower()] = mapping[1]

In [None]:
# --------------------------------------------------------------------------------------------------------------------------------------
# Sometimes hashtags get stuck together, as in: #hashtagone#hastagtwo#hashtagthree. This function separates hashtags that are
# stuck together so they get treated as their own entitiy.
# --------------------------------------------------------------------------------------------------------------------------------------
def split_chained_hashtags(word):
  
  output_string = "" 
  if word.startswith('#') and word.count("#") > 1:

    individual_hashtags = word.split("#")

    for tag in individual_hashtags: 

      if tag != "":

        output_string = output_string + "#" + tag + " "

  else: 
    output_string = word

  output_string = output_string.rstrip()
  return output_string


In [None]:
# ---------------------------------------------------------------------------------------------------------------------------------------------------------
# This is a helper function for the fix_contractions_punctuation_abbreviation function. When the parent function is ready to add a word to 
# the output tweet, this function helps by checking to see if there are any contractions or abbreviations we should use to transform the word first.
# ---------------------------------------------------------------------------------------------------------------------------------------------------------
def evaluate_word(word, contraction_mapping=contraction_map, sms_mapping=sms_map):

  output_word = ""

  if word.lower() in contraction_mapping:                                         # Check if this word is a contraction that can be expanded. If so, add the expanded word to the output tweet.
    output_word = output_word + " " + contraction_mapping[word.lower()]
  elif word.lower() in sms_mapping:                                               # Check if this word is an abbreviation that can be expanded.
    output_word = output_word + " " + sms_mapping[word.lower()]          
  elif word != "":                                                        # If this word is not a contraction or abbreviation, and the word is not empty, add the word to the output tweet.
    output_word = output_word + " " + word

  if word.startswith('#') and word.count("#") > 1:
    output_word = split_chained_hashtags(word)

  if word.count('\'') >= 1: 
    output_word = output_word.replace('\'', "")

  return output_word

In [None]:
def fix_contractions_punctuation_abbreviation(input_tweet, contraction_mapping=contraction_map, sms_mapping=sms_map):

  punctuation = ".&\()*+,-./:;<=>\"!?[\\]^_`{|}~"
  word = "" 
  output_tweet = ""

  for char in input_tweet:                                                      # Iterate through every character in the tweet.

    if char != " ":                                                             # If we haven't hit a white space, continue on to keep building up a word.
      if char in punctuation:                                                   # If we hit a punctuation mark, the current word is over.                                      
        word = evaluate_word(word)                                              # Check if this word needs to be transformed to something else, or should be added as is.
        output_tweet = output_tweet + " " + word                                # Add the appropriate word to the output tweet.
        word = ""                                              
      else:                                                                     # If this character was not a space or a punctuation mark, continue building up the word.
        word = word + char
    
    else:                                                                       # If we reached the space before a new word starts, evaluate the word to see if it is a 
      word = evaluate_word(word)                                                # contraction or abbreviation. Add the appropriate word to the output tweet.
      output_tweet = output_tweet + " " + word
      word = ""

  # If we are about to exit but still have one last word to add.
  if word != "":                                                                # If the final character is not a whitespace, we may be ready to exit the function but still have one 
    word = evaluate_word(word)                                                  # remaining word to add to the output tweet. This section handles that scenario.
    output_tweet = output_tweet + " " + word
    word = ""

  output_tweet = output_tweet.lstrip().rstrip()
  output_tweet = re.sub(" +", " ", output_tweet)

  return output_tweet

In [None]:
#filter = (tweet_df.index == 17) & (tweet_df.index == 18) & (tweet_df.index == 19) & (tweet_df.index == 20) & (tweet_df.index == 21)

#test = tweet_df.iloc[100:150, 2].to_numpy()

#test;

In [None]:
# test = list(test);

# test;

In [None]:
'''
output_list = []

for item in test:

  func_test = fix_contractions_punctuation_abbreviation(item)

  output_list.append(func_test)
''';

In [None]:
# output_list;

In [None]:
'''
doc_list = []

for item in output_list: 

  test_doc = nlp(item)
  doc_list.append(test_doc)
''';

In [None]:
global spelling_changes_made
spelling_changes_made = {}

global tweets_processed_for_spelling
tweets_processed_for_spelling = 0

In [None]:
def non_hashtag_changes(probabilities): 

  non_hashtag_spelling_changes = {}

  for key, value in probabilities.items():

    if str(key).startswith('#') == False: 

      non_hashtag_spelling_changes[key] = value

  return non_hashtag_spelling_changes

In [None]:
def keeper_changes(potential_changes, threshold=0.7):

  changes_to_keep = {}

  for key, value in potential_changes.items(): 

    best_change, best_probability = potential_changes[key][0]
    #print("Potential_changes[key] as value: ", potential_changes[key])
    #print("Potential_changes[key] as type: ", type(potential_changes[key]))

    if best_probability >= threshold: 
      changes_to_keep[key] = best_change

  return changes_to_keep


In [None]:
def record_changes_made(changes_being_made, original_tweet):

  global tweets_processed_for_spelling
  global spelling_changes_made

  update_dict = {'original_tweet' : original_tweet,
                 'original_word_spelling' : [],
                 'new_spelling_after_contextualSpellCheck' : []}

  for key, value in changes_being_made.items(): 

    update_dict['original_word_spelling'].append(key)
    update_dict['new_spelling_after_contextualSpellCheck'].append(value)

  spelling_changes_made[tweets_processed_for_spelling] = update_dict


In [None]:
def parse_changes(probabilities, doc, hash_check = True):
  
  if hash_check:                                                  # Only necessary to run the hash check if the parent function found hash tags were changed.
    potential_changes = non_hashtag_changes(probabilities)        # Immediately ruling out any spelling change suggestions on hashtags. 
  else:
    potential_changes = probabilities
  
  changes_to_implement = keeper_changes(potential_changes)        # Keeping changes where the probability of being correct is above a desired threshold. 

  original_string = str(doc)

  if bool(changes_to_implement) == False:                         # Allow early exit in the case that there are no changes to implement.
    #print("No changes to implement!")
    return original_string

  record_changes_made(changes_to_implement, original_string)

  original_str_as_tokens = doc

  output_string = ""

  for token in doc: 

    if token in changes_to_implement.keys():

      output_string = output_string + " " + str(changes_to_implment[token]) + " "

    else: 

      output_string = output_string + " " + str(token) + " "

  output_string = output_string.lstrip().rstrip()
  output_string = re.sub(" +", " ", output_string)

  #print("---------------------")
  #print("The original string was: ", original_string)
  #print("The original string has type: ", type(original_string))
  #print("The output string is: ", output_string)
  #print("The original string has type: ", type(output_string))
  #print("----------------------\n")

  return output_string

In [None]:
def spell_checker(doc):

  global tweets_processed_for_spelling
  tweets_processed_for_spelling = tweets_processed_for_spelling + 1

  changed_hashtag = False
  
  if doc._.contextual_spellCheck:                     # Check if contextual spellcheck has been added as an extension. This should always pass.

    if doc._.performed_spellCheck == True:            # If the spell checker identified one or more misspelled words.

      probabilities = doc._.score_spellCheck          # Get a dictionary of the misspelled words, with their corrections and probabilities.

      for key, value in probabilities.items():        # Check if any hashtags were corrected. This library is terrible at spell checking hash tags. 
        #print(key, value)
        #print("\n")
        if str(key).startswith('#') == True:
          changed_hashtag = True

      if changed_hashtag == True: 
        string_with_corrections = parse_changes(probabilities, doc)
        return string_with_corrections
      else: 
        string_with_corrections = parse_changes(probabilities, doc, hash_check=False)
        return string_with_corrections
     
  return doc

In [None]:
'''first_doc = doc_list[6]

first_check = spell_checker(first_doc)

type(first_check)
'''

In [None]:
'''
spelling_checked_docs = []
loop_count = 0
for doc_obj in doc_list:
  spell_check_result = spell_checker(doc_obj)

  loop_count = loop_count + 1
  print("--------Loop ", str(loop_count), "----------------")

  try: 
    spelling_corrected_doc = nlp(spell_check_result)
    spelling_checked_docs.append((spelling_corrected_doc, 'Changed'))
  except TypeError:
    spelling_corrected_doc = spell_check_result
    spelling_checked_docs.append((spelling_corrected_doc, 'Original'))
'''

In [None]:
#spelling_changes_made;

In [None]:
#spelling_checked_docs;

In [None]:
#doc_list;

In [None]:
#out = spell_checker(test_doc);

In [None]:
#type(out)

In [None]:
# --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
# Tweet Cleaner Function. When applied to a tweet, this function will do the following:
#
# - Remove punctuation marks.
# - Convert common contractions and abbreviations to their expanded forms.
# - Convert the tweet to a Spacy doc object (a set of tokens).
# - Use Spacys lemmatization process to group words together and reduce the overall number of unique tokens
#   Note: Reducing the number of unique tokens is desired because that will help reduce the dimensionality of the matrix we use when building models, 
#         and will therefore help mitigate the negative effects of working in high dimensional spaces (curse of dimensionality).
#
# - Convert all words to lowercase, to further reduce the number of unique words in the output.
# - Remove all stop words as defined in Spacys list of default stop words. Use nlp.Defaults.stop_words for a list.
# --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

def tweet_cleaner(dirty_tweet, nlp):

  partially_clean_tweet = fix_contractions_punctuation_abbreviation(dirty_tweet)
  stop_words = nlp.Defaults.stop_words
  partially_clean_tweet_doc = nlp(partially_clean_tweet)

  spell_check_result = spell_checker(partially_clean_tweet_doc)
  
  try: 
    tweet_doc = nlp(spell_check_result)
  except TypeError:
    tweet_doc = spell_check_result

  clean_tweet_tokens = []
  for token in tweet_doc:
    if token.lemma_ == "-PRON-":                                # -PRON- indicates that Spacys lemmatizer flagged this work as a generic pronoun.
      token_txt = token.lemma_.lower().strip('-')               # If the lemma of the token is -PRON- strip off the "-"'s and make the word lowercase.
      clean_tweet_tokens.append(token_txt)  
    
    # Ignore @users tokens and stopwords. They are generic and are unlikely to help provide insight.
    elif token.lemma_ != '@user' and token.lemma_ not in stop_words and token.lemma_ != "" and token.lemma_ != " ":
      token_txt = token.lemma_.lower().strip()               
      clean_tweet_tokens.append(token_txt)
  
  return clean_tweet_tokens

In [None]:
tweet_df['Fully_Clean_Tweet_Tokenized'] = tweet_df['tweet_emoji_cleaned'].apply(tweet_cleaner, args=(nlp,))

filepath = '/content/drive/MyDrive/Programming/Colab Notebooks/Coding_Dojo/Twitter_Sentiment_Project/tweet_df_incorrect_spellcheck.csv'
tweet_df.to_csv(path_or_buff=filepath)

In [None]:
tweet_df.head(30)

In [None]:
changes_df = pd.DataFrame(spelling_changes_made).T

filepath = '/content/drive/MyDrive/Programming/Colab Notebooks/Coding_Dojo/Twitter_Sentiment_Project/Contextual_SpellCheck_Incorrect_Output.csv'

changes_df.to_csv(path_or_buff=filepath)


In [None]:
changes_df.head(50)