In [None]:
# Importing all needed libraries.
import nltk
from nltk.tokenize import word_tokenize
import os
from os.path import join
import re
from nltk.probability import FreqDist
from nltk.stem import PorterStemmer
from nltk.corpus import stopwords
from nltk.tokenize import sent_tokenize
from functools import reduce
from operator import add
import numpy as np

# Downlaoding the stopwords module.
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [None]:
# Getting the list of stopwords and the Porter Stemmer.
stop_words = stopwords.words('english')
porter = PorterStemmer()

In [None]:
# The Corpus Reader class.
class CorpusReader:
    def __init__(self, folder_path, stop_words = None, stemmer = None):
        '''
            The Corpus Reader constructor
                :param folder_path: str
                    The path to the folder with files.
                :param stop_words: list
                    The list o stopwords
                :param stemmer: obj
                    The stemmer to apply on the text data.
        '''
        # Setting up the parameters.
        self.__folder_path = folder_path
        self.__stop_words = stop_words
        self.stemmer = stemmer
        self.classes_ = os.listdir(self.__folder_path)
        self.cls_dict = {}
        self.__named_entities = set()
        
        # Building up the 
        for cls in self.classes_:
            self.cls_dict[cls] = []
            for file_path in os.listdir(join(self.__folder_path, cls)):
                self.cls_dict[cls].append(join(self.__folder_path, cls, file_path))
    
    def __getitem__(self, cls):
        '''
            This function allows accessing the textes in the corpus by name of the class
            and the index of the file.
                :param cls: tuple
                    This argument is passes as a tuple with 2 values:
                        1. Name of the class.
                        2. Index of the file.
        '''
        return open(self.cls_dict[cls[0]][cls[1]], 'r', encoding='utf-8').read()
    
    def __normalize(self, text):
        '''
            This function normalizes the text.
                :param text: str
                    The text that should be normalized.
        '''
        # Leplacing the new line symbols wiht space.
        text = text.replace('\n', ' ')
        
        # Bringing the text to the lower case.
        text = text.lower()
        
        # Extracting only the words from the text.
        text = ' '.join(re.findall('[a-z]+', text))
        return text
    
    def __eliminate_stopwords(self, text):
        '''
            This function is deleting the stopwords from text.
                :param text: str
                    The text from which we shoul remove the stopwords.
        '''
        if self.__stop_words is not None:
            return ' '.join([word for word in word_tokenize(text)
                            if word not in self.__stop_words])
    
    def __get_hapaxes(self, dictionary):
        '''
            This function is responsible for finding the hapaxes.
                :param dictionary: dict
                    The dixtionary with the all texts separated by classes in the following
                    structure:
                        {<class> : [<list of textes for this class>]}
        '''
        # Creatting an empty dictionary for storryng the hapaxes for every class.
        self.__hapaxes_per_classes = {}
        
        # Creatting an empty dictionary for storryn the frequency distribution of words
        # for every class
        fdist = {}
        
        # Iterating throw every class.
        for cls in self.classes_:
            fdist[cls] = FreqDist()
            
            # Updating the frequenncy distribution of words for the class.
            for i in range(len(dictionary[cls])):
                fdist[cls].update(FreqDist(word for word in word_tokenize(dictionary[cls][i])))
            
            # Adding the hapaxes for every class.
            self.__hapaxes_per_classes[cls] = fdist[cls].hapaxes()
    
    def __stem(self, text):
        '''
            This function amply yhe stemmer on every word in the sentence.
                :param text: str
                    The text that should be stemmed.
        '''
        return ' '.join([self.stemmer.stem(word) for word in word_tokenize(text)])
    
    def __named_identity_extraction(self, text):
        '''
            This function finds out all named entities in a text and adds it in a set.
                :param text: str
                    The text from which we should extract the named entities.
        '''
        for sent in sent_tokenize(text):
            for chunk in nltk.ne_chunk(nltk.pos_tag(word_tokenize(sent))):
                if hasattr(chunk, 'label'):
                    self.__named_entities.add(' '.join(c[0] for c in chunk))
    
    def process(self):
        '''
            This function is setting up the corpus reader based on the corpus sent in the 
            constructor of the reader.
        '''
        # Creatting a dictionary with all texted from the corpus separated by classes.
        text_data = {cls : [] for cls in self.classes_}
        for cls in self.classes_:
            for i in range(len(os.listdir(join(self.__folder_path, cls)))):
                # Loading the corpus in the dictionary.
                text_data[cls].append(self.__normalize(self[cls, i]))
                
                # Adding the names entities extracted for mthe text.
                self.__named_identity_extraction(text_data[cls][-1])
        
        # Gathering all named entities in a general list.
        self.named_entities = list(self.__named_entities)
        
        # Replacing the spaces in named entities wiht underscores.
        self.named_entities = [ni.replace(' ', '_') for ni in self.named_entities
                              if len(ni) >= 2]
        
        # Itereting throw every text in the corpus and replacing spaces in named entities
        # with underscores.
        for cls in self.classes_:
            for i in range(len(text_data[cls])):
                for ni in self.named_entities:
                    text_data[cls][i] = text_data[cls][i].replace(' '.join(
                        ni.split('_')), ni)
        
        # Getting the hapaxes.
        self.__get_hapaxes(text_data)
        self.common_hapaxes = list(reduce(add, [self.__hapaxes_per_classes[cls]
                                               for cls in self.classes_]))
        
        # Applying the last preprocessing on the text.
        for cls in self.classes_:
            for i in range(len(text_data[cls])):
                # Eliminating the hapaxes.
                text_data[cls][i] = ' '.join([word for word in word_tokenize(text_data[cls][i])
                                             if word not in self.common_hapaxes])
                
                # Eliminating the stopwords.
                text_data[cls][i] = self.__eliminate_stopwords(text_data[cls][i])
                
                # Stemming the text.
                text_data[cls][i] = self.__stem(text_data[cls][i])
                
                # Eliminating words with fewer thant 3 letters.
                text_data[cls][i] = ' '.join([word for word in word_tokenize(text_data[cls][i])
                                             if len(word) >= 3])
        
        # Generating the X matrix and the y vector for the Machine Learning Pipeline.
        X = np.array([text_data[cls][i] for cls in self.classes_ 
                      for i in range(len(text_data[cls]))])
        y = np.array([cls for cls in self.classes_ for i in range(len(text_data[cls]))])
        return X, y
    
    def apply(self, path):
        # Creatting a dictionary with all texted from the corpus separated by classes.
        text_data = {cls : [] for cls in self.classes_}
        
        for cls in self.classes_:
            # Loading the corpus in the dictionary.
            for file_path in os.listdir(join(path, cls)):
                text_data[cls].append(self.__normalize(open(
                    join(path, cls, file_path), 'r', encoding='utf-8'
                ).read()))
        
        # Itereting throw every text in the corpus and replacing spaces in named entities
        # with underscores.
        for cls in self.classes_:
            for i in range(len(text_data[cls])):
                for ni in self.named_entities:
                    text_data[cls][i] = text_data[cls][i].replace(' '.join(
                        ni.split('_')), ni)
        
        # Applying the last preprocessing on the text.
        for cls in self.classes_:
            for i in range(len(text_data[cls])):
                # Eliminating the hapaxes.
                text_data[cls][i] = ' '.join([word for word in word_tokenize(text_data[cls][i])
                                             if word not in self.common_hapaxes])
                
                # Eliminating the stopwords.
                text_data[cls][i] = self.__eliminate_stopwords(text_data[cls][i])
                
                # Stemming the text.
                text_data[cls][i] = self.__stem(text_data[cls][i])
                
                # Eliminating words with fewer thant 3 letters.
                text_data[cls][i] = ' '.join([word for word in word_tokenize(text_data[cls][i])
                                             if len(word) >= 3])
        
        # Generating the X matrix and the y vector for the Machine Learning Pipeline.
        X = np.array([text_data[cls][i] for cls in self.classes_ 
                      for i in range(len(text_data[cls]))])
        y = np.array([cls for cls in self.classes_ for i in range(len(text_data[cls]))])
        return X, y

In [None]:
# Creating the corpus reader.
reader = CorpusReader(r'D:\NLP\BIO_CS_DATA\TRAIN', stop_words, porter)

FileNotFoundError: ignored

In [None]:
reader = ['biology', 0]

In [None]:
# processing the training corpus.
X_train, y_train = reader.process()

In [None]:
# Applying the changes on the test corpus.
X_test, y_test = reader.apply(r'D:\NLP\BIO_CS_DATA\TEST')

In [None]:
import pandas as pd

In [None]:
train_df = pd.DataFrame({'text' : X_train, 'class' : y_train})

In [None]:
train_df.to_csv('train.csv', sep = '\t', index = False)

In [None]:
test_df = pd.DataFrame({'text' : X_test, 'class' : y_test})

# Homework

In [1]:
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.utils.multiclass import unique_labels
from sklearn.metrics import euclidean_distances


In [2]:
class TextNormalizer(BaseEstimator, TransformerMixin):
    def fit(self, X, y):
          return self
    def transform(self, X):
          for i in range(len(X)):
              X[i] = X[i].replace('\n', ' ')
              X[i] = X[i].replace('\r', ' ')
              X[i] = X[i].lower()
          return X

In [3]:
import nltk
nltk.download('punkt')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


True

In [7]:
from nltk.tokenize import word_tokenize
class WordsExtractor(BaseEstimator, TransformerMixin):
    def __init__(self, stopwords_list):
      self.__stopwords_list = stopwords_list
    def fit(self, X, y):
      self.__hapaxes = {}
      self.__fdist = {}
      for i in range(len(X)):
        self.__fdist.update(FreqDist(word_tokenize(X[i])))
      self.__hapaxes = self.__fidist.hapaxes()
    def transform(self, X, y):
      NewX = []
      for i in range(len(X)):
        Newstring = ''
        for word in word_tokenize(X[i]):
           if word not in self.__hapaxes and word not in self.stopwords_list:
                Newstring += word + ' '
        NewX.append(Newstring)
      return np.array(NewX)
 # sau
 #     X[i] = ' '.join([word for word in word_tokenize(X[i]) if word if not in self.__hapaxes and word not in self.stopwords_list])

In [15]:
from nltk.tokenize import word_tokenize
class ApplyStemmer(BaseEstimator, TransformerMixin):
  def __init__(self, stemmer):
    self.__stemmer = stemmer
  def fit(self, X, y):
    return self
  def transform(self, X, y):
    NewX = []
    for i in range(len(X)):
      NewX = ''
      for word in word_tokenize(X[i]):
        X[i] = ' '.join(self.__stemmer.stem(word))
      NewX.append(NewString)
    return np.array(NewX)
# sau
#  X[i] = ' '.join([self.__stemmer.stem(word) for word in word_tokenize(X[i])])