# Data Pipeline Modules

In this notebook, all the required classes to buils a sustainable product database is avaiable.

For Data Preprocessing, below are classes can be used 
- ColumnDropper
- RowDropper
- StringCleaner

For Keyword Extraction using TF-IDF, __KeywordExtractor__ class can be used. This class includes all the cutomize functions defined to extract keyword accurately based on a specfic vocabulary. 

For mapping keywords to specific sustainbility factors using the ontology data, __KeywordMapper__ class can be used.

In [1]:
from string import punctuation, digits
from pandas import isna, DataFrame, pivot_table, read_sql_query
import numpy as np
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from scipy.sparse import csr_matrix
import sqlite3
from itertools import combinations

In [2]:
with open("custom_stopwords.txt", "r") as f:
    ENGLISH_STOP_WORDS = f.read().split("\n")

In [4]:
class ColumnDropper():
    """Given a list of columns, this class is used to drop or retain those columns"""
    def __init__(self, columns:list, should_drop:bool=False):
        """ If should_drop is False, then the given columns will be retained and rest will be dropped
            If should_drop is True, then the given columns will be dropped and rest will be retained"""
        self.columns = columns
        self.should_drop = should_drop
    
    def fit(self, X:DataFrame, y=None):
        # No additional parameter required
        return self 

    def transform(self, X:DataFrame):
        # Filter all the columns from the given dataframe that are present 
        # in the columns provided during initializing the class.
        drop_columns = [col for col in X.columns if col in self.columns]
        
        # If include=True, then the filtered columns have to be retained.
        # Therefore, remaining columns have to be dropped
        if not self.should_drop:
            drop_columns = X.columns.difference(drop_columns)
        X = X.drop(drop_columns, axis=1)
        return X

In [5]:
class RowDropper():
    """Given a list of columns, this class is used to drop rows with np.nan in any of the listed columns
       and removes any duplicate rows in the subset of given columns"""
    def __init__(self, columns:list=None):
        self.columns = columns
    
    def fit(self, X:DataFrame, y=None):
        return self 

    def transform(self, X:DataFrame):
        # If columns are not provided, all the columns are considered
        if not self.columns:
            self.columns = X.columns
        X = X.dropna(subset=self.columns, how="any")
        X = X.drop_duplicates(subset=self.columns, keep="last").reset_index(drop=True)
        return X

In [6]:
class StringCleaner():
    """
    Given a list of columns, this class is used clean the string in the listed columns with following steps:
            1. Convert string to lower case
            2. Remove HTML encoded characters (characters starting with &)
            3. Remove XML tags (characters between <>)
            4. Remove white spaces
            5. Remove punctuations and digits
            6. Replace empty strings with np.nan
    """
    def __init__(self, columns):
        self.columns = columns
        self.translator = str.maketrans(punctuation+digits, ' '*len(punctuation+digits))
    
    def fit(self, X, y=None):
        return self 
        
    def transform(self, X:DataFrame):
        for column in self.columns:
            X[column] = X[column].astype(str)
            # 1. Convert string to lower case
            X[column] = X[column].str.lower()
            # 2. Remove HTML encoded characters (characters starting with &)
            X[column] = X[column].apply(lambda s: re.sub('&\w+',' ',str(s)) if not isna(s) else s)
            # 3. Remove XML tags (characters between <>)
            X[column] = X[column].apply(lambda s: re.sub('<\w*>',' ',str(s)) if not isna(s) else s)
            # 4. Remove white spaces
            X[column] = X[column].apply(lambda s: re.sub(r'\\\w', ' ',str(s)) if not isna(s) else s)
            X[column] = X[column].apply(lambda s: re.sub(r'\s+',' ',str(s)) if not isna(s) else s)
            # 5. Remove punctuations and digits
            X[column] = X[column].apply(lambda s: s.translate(self.translator) if not isna(s) else s)
            X[column] = X[column].apply(lambda s: re.sub('\s+',' ',str(s)) if not isna(s) else s)
            # 6. Replace empty strings with np.nan
            X[column] = X[column].str.strip()
            X[column] = X[column].replace('', np.nan)
            X[column] = X[column].replace(' ', np.nan)
        return X

In [7]:
class KeywordExtractor():
    product_tfidf = None
    ontology_tf = None
    trigram_mapping = None
    custom_vocab = []
    tfidfVectorizer = None
    def __init__(self, stop_words='english', ngram_range=(2,3), word_window_length:int=4):
        if isinstance(stop_words, str) and stop_words.lower()=='english':
            # Use ENGLISH STOP WORDs by default
            self.stop_words = ENGLISH_STOP_WORDS
        else:
            self.stop_words = stop_words
        self.ngram_range = ngram_range
        self.word_window_length = word_window_length
    
    def extract_keywords(self, ontology_data, product_data):
        # Create vocabulary from ontology data
        self.custom_vocab = self.create_vocab(ontology_data)
        # Compute TF-IDF scores
        self.product_tfidf = self.compute_tfidf(product_data)
        # Remove redundant bigrams
        self.product_tfidf = self.eliminate_redundant_bigram()
        # Computing term-frequency (TF) for ontology data
        self.ontology_tf = self.compute_tf(ontology_data)
        # Remove any partially overlapping match
        self.ontology_tf = self.eliminate_partial_mapping_tf()
        return self.product_tfidf, self.ontology_tf
    
    def custom_ngram_generator(self, s: str):
        # Get all words in a string
        words = s.split()
        n_grams = []
        # Remove stop words or single letter words and stem common variations of a word containing 's' or 'ing'
        words = [re.sub(r'(s|ing)$','',word) for word in words if (len(word)>1 and word not in self.stop_words)]
        for i in range(len(words)):
            # ngram_range = (2,3)
            min_ngram = self.ngram_range[0]
            max_ngram = self.ngram_range[1]
            for j in range(min_ngram, max_ngram+1):
                # Get all bigram and trigrams in a given window
                n_grams+=[' '.join(n_gram) for n_gram in combinations(words[i:i+self.word_window_length], j)]
        return list(set(n_grams))
    
    def create_vocab(self, data):
        self.custom_vocab = []
        for i in data:
            self.custom_vocab+=self.custom_ngram_generator(i)
        self.custom_vocab = np.array(sorted(set(self.custom_vocab)))
        return self.custom_vocab
    
    def compute_tfidf(self, data):
        # Create TF-IDF vectorizer if not initialized already
        if not self.tfidfVectorizer:
            if len(self.custom_vocab)!=0:
                self.tfidfVectorizer = TfidfVectorizer(vocabulary=self.custom_vocab,
                                                       analyzer=self.custom_ngram_generator)
            else:
                self.tfidfVectorizer = TfidfVectorizer(analyzer=self.custom_ngram_generator)
            self.tfidfVectorizer.fit(data)
        # Compute TF-IDF for product descriptions
        self.product_tfidf = self.tfidfVectorizer.transform(data)
        return self.product_tfidf
    
    def compute_tf(self, data):
        if len(self.custom_vocab)!=0:
            temp_tfidfVectorizer = TfidfVectorizer(vocabulary=self.custom_vocab,
                                              analyzer=self.custom_ngram_generator,
                                              use_idf=False)
        else:
            temp_tfidfVectorizer = TfidfVectorizer(analyzer=self.custom_ngram_generator,
                                              use_idf=False)
        self.ontology_tf = temp_tfidfVectorizer.fit_transform(data)
        return self.ontology_tf
    
    def eliminate_partial_mapping_tf(self):
        self.ontology_tf = self.ontology_tf.toarray()
        for col in range(self.ontology_tf.shape[1]):
            # Check if the column has a complete match (=1)
            if (self.ontology_tf[:,col]==1).sum()!=0:
                # Get all partially mapping indices (those with value < 1)
                partial_mapping_indices = np.where(self.ontology_tf[:,col]<1)
                if len(partial_mapping_indices)!=0:
                    # Reset score to 0 to the partially mtaching indices
                    self.ontology_tf[partial_mapping_indices,col]=0
        self.ontology_tf = csr_matrix(self.ontology_tf)
        return self.ontology_tf
    
    def get_vocab(self):
        return self.custom_vocab
    
    def create_trigram_mapping(self):
        self.trigram_mapping = {}
        for idx, feature in enumerate(self.custom_vocab):
            if len(feature.split())==3:
                bigrams = combinations(feature.split(), 2)
                self.trigram_mapping[idx] = [np.where(self.custom_vocab==' '.join(bigram))[0][0] 
                                             for bigram in bigrams]
    
    def eliminate_redundant_bigram(self):
        if not self.trigram_mapping:
            self.create_trigram_mapping()
        
        # Redundant bigram elimination
        self.product_tfidf = self.product_tfidf.toarray()
        for row_idx in np.unique(self.product_tfidf.nonzero()[0]):
            feature_indices = self.product_tfidf[row_idx].nonzero()[0]
            if len(feature_indices)<=2:
                continue
            # Filter trigrams
            feature_indices = list(filter(lambda feature_idx: feature_idx in self.trigram_mapping, feature_indices))
            for feature_idx in feature_indices:
                # Remove bigram associated with the trigram
                bigram_indices =  self.trigram_mapping[feature_idx]
                self.product_tfidf[row_idx][np.array(bigram_indices)] = np.zeros(len(bigram_indices))
        self.product_tfidf = csr_matrix(self.product_tfidf)
        return self.product_tfidf

        

In [10]:
class KeywordsMapper():
    def __init__(self, keyword_extractor:KeywordExtractor):
        self.ontology_tf = keyword_extractor.ontology_tf
        self.product_tfidf = keyword_extractor.product_tfidf
        
    def map_keywords(self):
        # Matrix Multiplication
        self.importance = np.dot(self.product_tfidf, self.ontology_tf.T)
        self.importance = self.importance.tocoo()
        # Converting to Data Frame
        self.mapping = DataFrame(zip(self.importance.row, self.importance.col, self.importance.data))
        self.mapping.columns = ["product_idx", "onto_idx", "imp_score"]
        return self.mapping
    
    def integrate_ontology(self, ontology_data):
    # If numnber of unique ontology indices in the mapping is more than the given ontology data, then error out
        if len(self.mapping["onto_idx"].unique())>ontology_data.shape[0]:
            raise Exception("Invalid Ontology Data (Size Mismatch)")
        self.ontology_data = ontology_data
        # Left join the matrix with ontology data by index
        self.mapping = self.mapping.merge(ontology_data, how="left", left_on="onto_idx", right_index=True)
        # Multiply importance score with association value from the ontology data
        self.mapping["association_imp_score"] = self.mapping["association"]*self.mapping["imp_score"]
        return self.mapping
    
    def aggregate_mapping(self, by:str="preference category", conditional:bool=None):
        filtered_df = self.mapping
        if conditional!=None:
            # Filter by the given conditional value
            filtered_df = filtered_df[filtered_df.conditional==conditional]
        final_mapping = pivot_table(filtered_df, 
                                    values=["association_imp_score"], 
                                    index=["product_idx"], 
                                    columns=[by], 
                                    aggfunc=np.average)
        return final_mapping
        
        
        

In [11]:
class DatabaseWriter():
    # table to column mapping
    column_dict = {"ontology_data": ["idx", "tag", "sustainability_preference", 
                                     "preference_category", "association", "conditional"],
                   "vocabulary": ["idx", "vocab"],
                   "product_data": ["idx", "source", "product_code", "product_title",
                                    "product_description", "product_category", "brand", "price"],
                   "product_ontology_mapping": ["source", "product_idx", "onto_idx", "imp_score"],
                   "product_keywords": ["source", "product_idx", "vocab_idx"]}
    def __init__(self, db_file_path: str="sustainable_product_db.db"):
        self.db_file_path = db_file_path
        
    def create_tables(self, sql_script_path: str="sustainable_product_db_tables.sql"):
        # create tables using an SQL script
        try:
            with sqlite3.connect(self.db_file_path) as conn:
                cursor = conn.cursor()
                script = open(sql_script_path, "r").read()
                cursor.executescript(script)
                print("Successfully created tables")
        except Exception as e:
            print("Failed to create tables!!!")
            raise e
        
    def insert_data(self, df, table_name:str, include_index:bool=True):
        # Insert a datafrme values into a table. If include index is true then, index is also inserted
        try:
            with sqlite3.connect(self.db_file_path) as conn:
                query = f'insert or replace into {table_name} ({", ".join(self.column_dict[table_name])}) values ({", ".join(["?"]*len(self.column_dict[table_name]))});'
                if include_index:
                    values = list(df[self.column_dict[table_name][1:]].itertuples(index=True))
                else:
                    values = list(df[self.column_dict[table_name]].itertuples(index=False))
                conn.cursor().executemany(query, values)
                print(f"Successfully inserted (or updated) {table_name} table")
                
        except Exception as e:
            print("Failed to insert data into table!")
            raise e
    
    def execute_script(self, query_script:str="generate_pivot_query.sql", should_generate_query:bool=False):
        # Execute an SQL script. If generate query is set to true, the query is generated, then executed
        try:
            with sqlite3.connect(self.db_file_path) as conn:
                cursor = conn.cursor()
                script = open(query_script, "r").read()
                if should_generate_query:
                    script = cursor.execute(str(script)).fetchone()[0]
                    print(f"Pivot table query generated: {script}")
                cursor.executescript(script)
                print(f"Script {query_script} executed succesfully!")
        except Exception as e:
            print("Failed to auto insert into sustainable mapping")
            raise e
            
    def read_table(self, table_name):
        # Read a given table and return it as a data frame
        try:
            with sqlite3.connect(self.db_file_path) as conn:
                query = f"select * from {table_name}"
                df = read_sql_query(query, conn)
                print(f"Successfully loaded data from {table_name}!")
                return df
        except Exception as e:
            print(f"Failed fetch data from {table_name}")
            raise e
        
            