# Identification of emerging technologies using NLP-powered patent networks

## Model API

* **Contains the model and its instances in order to be called from other modules**, for instance for robustness checks or applications. 
* More details about the implementation choices and the rationale behind the algorithms can be found in the `main model` notebook.

> Author: **Antoine MATHIEU COLLIN**
* Department of Management, Strategy and Innovation (MSI) of the Faculty of Economics and Business (FEB), KU Leuven
* Department of Computer Science of the Faculty of Engineering Science, KU Leuven
* Leuven.AI, KU Leuven Institute for Artificial Intelligence

In [None]:
# Import libraries

# data wrangling
import pandas as pd
import math
import numpy as np
import igraph
from igraph import Graph
import random

# language processing
import nltk
from nltk import word_tokenize
from nltk.tokenize import RegexpTokenizer
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer
import string

# machine learning
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.metrics import silhouette_samples, silhouette_score

# figures
import matplotlib.pyplot as plt
import seaborn as sns
from wordcloud import WordCloud 
from igraph import BoundingBox, palettes
from matplotlib.patches import Patch
from matplotlib.lines import Line2D

# library to parse the xml content of the EP full text database
# library doc: https://docs.python.org/3/library/xml.etree.elementtree.html
import xml.etree.ElementTree as ET  

# disable warnings
import warnings
warnings.filterwarnings("ignore")

In [None]:
class Config:
    """ 
    # Configuration
    The config class can be overwritten when calling the model from outside this notebook
    """
    
    # Magic numbers
    LAST_YEAR_TO_RECEIVE_CITAITONS = 2018
    FIRST_YEAR_CONVERAGE_EP_FULL_TEXT = 2000
    LAST_YEAR_COVERAGE_PATSTAT = 2014
    GRANTED_PATENT_CODE = 1
    PERCENTAGE_TOP_PATENTS = 0.2
    EP_AUTHORITY = 'EP'

    # PASTAT_variables 
    VAR_APPLN_ID = 'appln_id'
    VAR_DOCDC_FAMILY_ID = 'docdb_family_id'
    VAR_CITED_DOCDB_FAM_ID = 'cited_docdb_family_id'
    VAR_APPLN_FILLING_YEAR = 'appln_filing_year'
    VAR_NB_CITING_DOCDB_FAM = 'nb_citing_docdb_fam'
    VAR_EARLIEST_FILLING_DATE = 'earliest_filing_date'
    VAR_EARLIEST_FILING_YEAR = 'earliest_filing_year'

    # Computed variables
    NEW_VAR_CITING_DOCDB_FAM_IDS = 'citing_docdb_families_ids'
    NEW_VAR_NB_CITING_DOCDB_FAM_BY_YEAR = 'nb_citing_docdb_fam_by_year'

In [None]:
class DataCleaning:
    """
    # Data cleaning methods.
    The data should be inputed in a specific format detailed in the 'main_model' notebook
    """
    
    def __init__():
        pass
    
    
    def _keep_only_EP_patents(self):
        """We filter the data to keep only EU patents (not only EP)"""
        
        # Local variables for simplicity
        df_main = self.data['_table_main_patent_infos']
        # filtering
        condition = df_main['appln_auth']==Config.EP_AUTHORITY
        df_main = df_main[condition]
        # update the table and the list of patent/fam ids
        self.data['_table_main_patent_infos'] = df_main
        self = self.__update_patent_fam_ids() # Storing ids and filtering datasets
        return self
    
    
    def _keep_only_granted_patents(self):
        """We keep only patents which have gone through the approval process, ie are granted"""
        
        # local variables for simplicity
        df_main = self.data['_table_main_patent_infos']
        # filtering
        condition = df_main['granted']==Config.GRANTED_PATENT_CODE
        df_main = df_main[condition]
        # update the table and the list of patent/fam ids
        self.data['_table_main_patent_infos'] = df_main
        self = self.__update_patent_fam_ids() # Storing ids and filtering datasets
        return self
    
    
    def _select_time_range(self):
        """We do not keep patents before 2000, because no full text is available"""
        
        # local variables for simplicity
        df_main = self.data['_table_main_patent_infos']
        # filtering
        condition1 = df_main['appln_filing_year']>=Config.FIRST_YEAR_CONVERAGE_EP_FULL_TEXT
        condition2 = df_main['appln_filing_year']<=Config.LAST_YEAR_COVERAGE_PATSTAT
        df_main = df_main[condition1 & condition2]
        # update the table and the list of patent/fam ids
        self.data['_table_main_patent_infos'] = df_main
        self = self.__update_patent_fam_ids() # Storing ids and filtering datasets
        return self
    
    
    # this function is not implemented
    def _normalise(self):
        """Normalisation of the data accross years and sectors, to cater for **patent explosion**"""
        # Do # Update the list of ids
        self = self.__update_patent_fam_ids() # Storing ids and filtering datasets
        return self
    
    
    def _select_one_patent_per_family(self):
        """In order to select only patent of interest, as well as
        saving computationnal power, we select only the earliest patent by
        family"""
        
        # Local variables for simplicity
        df_main = self.data['_table_main_patent_infos']
        df_cpc = self.data['_table_cpc']
        df_patentees = self.data['_table_patentees_info']
        
        # Filtering 
        df_main.sort_values(by = Config.VAR_EARLIEST_FILLING_DATE,inplace = True)
        df_main.drop_duplicates(subset = [Config.VAR_DOCDC_FAMILY_ID],
                                keep = 'first',
                                inplace = True)
        
        # Update the table and the list of patent/fam ids
        self.data['_table_main_patent_infos'] = df_main
        
        # Storing ids and filtering datasets
        self = self.__update_patent_fam_ids()   
        return self
    

    def _select_breakthrough_patents(self):
        """Filtering the data to keep only breakthrough patents"""
        
        # Unpacking some variables for clarity
        X = Config.PERCENTAGE_TOP_PATENTS
        df = self.data['_table_main_patent_infos']
        
        # Selection  of the top patents
        filtered_df = pd.DataFrame()
        for year in df[Config.VAR_EARLIEST_FILING_YEAR].unique().tolist():
            df_year = df[df[Config.VAR_EARLIEST_FILING_YEAR] == year]
            df_year.sort_values(by = Config.VAR_NB_CITING_DOCDB_FAM,
                                ascending = False,
                                inplace = True)
            nb_top_patent_given_year = int(math.ceil(X*len(df_year))) # Needs rounding up
            df_year = df_year.head(nb_top_patent_given_year)
            filtered_df = pd.concat([filtered_df, df_year])
            
        # Update the table and the list of patent/fam ids
        self.data['_table_main_patent_infos'] = filtered_df
        
        # Storing ids and filtering datasets
        self = self.__update_patent_fam_ids()
        return self
    

    def __update_patent_fam_ids(self):
        """
        # code snippet
        
        Storing patents ids and family ids and filtering the datasets
        # Filtering the first 3 datasets on the list of patent ids 
        # Filtering the other 2 datasets on the list of family ids
        """
        
        # (1) Update the list of ids (patent ids and family ids)
        df_main = self.data['_table_main_patent_infos']
        self.patent_ids = df_main[Config.VAR_APPLN_ID].unique().tolist()
        self.patent_family_ids = df_main[Config.VAR_DOCDC_FAMILY_ID].unique().tolist()
        
        # (2) Filter the tables according to the new list of patent ids
        def __filter(df, var, list_ids):
            """Code snippet to filter a dataset according to a list of ids"""
            condition = df[var].isin(list_ids)
            return df[condition]
        
        for key in self.data:
            if key in ['_table_main_patent_infos','_table_cpc','_table_patentees_info']:
                self.data[key] = __filter(self.data[key], Config.VAR_APPLN_ID, self.patent_ids)
            elif key in ['_table_backward_docdb_citations','_table_forward_docdb_citations']:
                self.data[key] = __filter(self.data[key], Config.VAR_DOCDC_FAMILY_ID, self.patent_family_ids)
        
        return self

In [None]:
class NewMetrics:
    """Methods to derive new metrics from the data"""
    
    def __init__():
        pass
    
    
    def _get_DOCDB_fam_cites_per_year(self):
        """Adding a variable to keep track of yearly citations by patent family"""
        
        # Unpacking some variables for clarity
        df = self.data['_table_main_patent_infos']
        citations_by_year = Config.NEW_VAR_NB_CITING_DOCDB_FAM_BY_YEAR
        citations_docdb_fam = Config.VAR_NB_CITING_DOCDB_FAM
        year = Config.VAR_APPLN_FILLING_YEAR
        ref_year = Config.LAST_YEAR_TO_RECEIVE_CITAITONS
        
        # Compute the metric
        df[citations_by_year] = df[citations_docdb_fam]/(ref_year-df[year])
        
        # Updating the table
        self.TABLE_ALL_PATENTS_INFO = df 
        return self

In [None]:
class Patent:
    
    def __init__(self, appln_id):
        """Setting the patent parameters"""
        
        self.appln_id:int # as a shortcut we  store the main patent key
        self.patent_attributes = {} # Contains the list of the patent's attributes
        
        # Set instance attributes
        self.patent_attributes.update({Config.VAR_APPLN_ID :  appln_id})
        self.appln_id = appln_id 

In [None]:
class ReshapingToOOP:
    """Methods to assign the data to patent objects"""
    
    def __init__(self):
        pass
    
    
    def _create_patent_objects(self):
        """
        Create a Patent object for each patent id and store them in a list
        """
        self.patent_list = []
        for patent_id in list(self.patent_ids):
            a = Patent(patent_id)
            self.patent_list.append(a)
            
        return self
    
    
    def _assign_data_to_patent_obj(self):
        """
        Once the data has been retrieved from PATSTAT and the patent objects
        have been created, we assign the data to the Patent objects
        """
        
        def __snippet_store_patent_attributes(table):
            """
            Code snippet to dynamically store attributes 
            from a Pandas table in a dictionnary
            # If a value has several values, then ts stored in a list
            """
            a = {}
            for col in list(table):
                key = col
                value = table[col].unique().tolist()#[0]
                value = [x for x in value if (x == x)!=False] # new line
                if len(value) == 1:
                    value = value[0]
                a[key] = value
            return a
        
        # Unpacking some variables
        df_main = self.data['_table_main_patent_infos']
        df_cpc = self.data['_table_cpc']
        df_patentee = self.data['_table_patentees_info']
        df_bwd = self.data['_table_backward_docdb_citations']
        df_fwd = self.data['_table_forward_docdb_citations']
        
        # (1) Assigning the data contained in the main table to the patent
        # We merge backward citation data to the main table (on family id)
        key = Config.VAR_DOCDC_FAMILY_ID
        df_main = pd.merge(df_main, df_bwd,how = 'left',left_on = key,right_on = key)
        
        for patent in self.patent_list:                
            for df in [df_main, df_cpc, df_patentee]:  
                patent_table = df[df[Config.VAR_APPLN_ID]==patent.appln_id]
                d = __snippet_store_patent_attributes(table = patent_table)
                patent.patent_attributes.update(d)
        
        # (2) Assigning forward citations to the patents      
        df_fwd.columns = ['A','B','C'] # Random column names
        for patent in self.patent_list:
            patent_fam_table = df_fwd[df_fwd['A']==patent.patent_attributes[Config.VAR_DOCDC_FAMILY_ID]]
            citing_fam = patent_fam_table['B'].unique().tolist()
            patent.patent_attributes.update({Config.NEW_VAR_CITING_DOCDB_FAM_IDS :citing_fam})
            
        return self

In [None]:
class GetCitations:
    """Methods to compute direct and indirect (BC, CC, LC) citations between the patents"""
        
    def _get_direct_citations(self):
        """Get direct backwards citations (at the level of the family level)"""
        
        # Unpacking some varibles for clarity
        fam = Config.VAR_DOCDC_FAMILY_ID
        cited_fam = Config.VAR_CITED_DOCDB_FAM_ID
            
        # (1) If a patent cites only one family
        list1 = [(x,y) for x in self.patent_list for y in self.patent_list \
                 if y.patent_attributes[fam] == x.patent_attributes[cited_fam]]
        
        # (2) If the patent cites several families (then stored as list)
        list2 = [(x,y) for x in self.patent_list for y in self.patent_list \
                 if type(x.patent_attributes[cited_fam]) ==list \
                 if y.patent_attributes[fam] in x.patent_attributes[cited_fam]]
        
        # Concatenating the two lists to have the direct citations
        self.direct_citations = list1 + list2
        return self
        
        
    def _get_BC_citations(self):
        """
        # (1) Bibliographic coupling occurs when two works reference a common third work
        # (2) The produced list is non directed.
        # (3) Can be optimised
        """
            
        # Definition of variables
        BC = []
        a = self.patent_list
        all_patent_pairs = [(a[p1], a[p2]) for p1 in range(len(a)) for p2 in range(p1+1,len(a))]

        # Computing BC by looping over all pairs of patents
        for patent_1, patent_2 in all_patent_pairs:
            list_citing_1 = patent_1.patent_attributes[Config.NEW_VAR_CITING_DOCDB_FAM_IDS]
            list_citing_2 = patent_2.patent_attributes[Config.NEW_VAR_CITING_DOCDB_FAM_IDS]
            common_elements = [x for x in list_citing_1 if x in list_citing_2]
            if len(common_elements)>0:
                BC.append((patent_1, patent_2))
            
        # Removing duplicated items in the list
        self.BC = list(set(BC)) 
        return self
        
        
    def _get_CC_citations(self):
        """
        # (1) Co-citation is defined as the frequency with which two documents are cited together
        by other documents. If at least one other document cites two documents in common these documents
        are said to be co-cited
        # (2) The produced list is non directed
        """
        CC = []
            
        # Definition of all patent pairs
        a = self.patent_list
        all_patent_pairs = [(a[p1], a[p2]) for p1 in range(len(a)) for p2 in range(p1+1,len(a))]
            
        # Definition of the search algorithm
        for patent in self.patent_list:
            a = patent.patent_attributes[Config.VAR_CITED_DOCDB_FAM_ID]
            if type(a)==list:
                if len(a)>1:
                    all_cited_patent_pairs = [(a[p1], a[p2]) \
                                              for p1 in range(len(a)) \
                                              for p2 in range(p1+1,len(a))]
                    for pair in all_cited_patent_pairs:
                        CC.append(pair)
        
        pairs = list(set(CC)) 
        
        CC = []
        for pair in pairs:
            patent1 = [patent \
                       for patent in self.patent_list \
                       if patent.patent_attributes[Config.VAR_DOCDC_FAMILY_ID] == pair[0]]
            patent2 = [patent \
                       for patent in self.patent_list \
                       if patent.patent_attributes[Config.VAR_DOCDC_FAMILY_ID] == pair[1]]

            if len(patent1)>0:
                patent1 = patent1[0]
            else: patent1=np.nan

            if len(patent2)>0:
                patent2 = patent2[0]
            else: patent2=np.nan

            pair = (patent1, patent2)
            CC.append(pair)

        self.CC = [pair for pair in CC if (pair[0]==pair[0]) & (pair[1] == pair[1])]
        return self
     
        
    def _get_LC_citations(self):
        """
        # (1) LC (longitudinal coupling). A cites a document that cites B
        # (2) The produced list IS directed 
        # (3) Can be optimised
        """          
        LC = []
            
        # Identifying all patents cited by a given patent A
        for patent_A in self.patent_list:
            cited_fam = patent_A.patent_attributes[Config.VAR_CITED_DOCDB_FAM_ID]
            if type(cited_fam)==float:
                    cited_fam = []
                    cited_fam.append(patent_A.patent_attributes[Config.VAR_CITED_DOCDB_FAM_ID])
            cited_patents = [patent \
                             for patent in self.patent_list \
                             if patent.patent_attributes[Config.VAR_DOCDC_FAMILY_ID] in cited_fam]
                
            # Identifying all patents cited by a patent cited by patent A
            for cited_patent in cited_patents:
                cited_fam = cited_patent.patent_attributes[Config.VAR_CITED_DOCDB_FAM_ID]
                if type(cited_fam)==float:
                    cited_fam = []
                    cited_fam.append(cited_patent.patent_attributes[Config.VAR_CITED_DOCDB_FAM_ID])
                cited_cited_patents = [patent \
                                       for patent in self.patent_list \
                                       if patent.patent_attributes[Config.VAR_DOCDC_FAMILY_ID] in cited_fam]
                    
                # Adding the pairs in the LC list
                for patent_B in cited_cited_patents:
                    LC.append((patent_A, patent_B))
                    
        # Removing duplicated items in the LC list
        self.LC = list(set(LC)) 
        return self

In [None]:
class RetrieveFullTextData:
    """
    Methods to retrieve the full text data to the selected patents.
    """
    
    def __init__():
        pass
    
    
    def _assign_full_text_to_patents(self):
        """
        For each patent contained in the model, assigns in the patent attributes
        under the 'full_text' entry a dataframe containing all the text of this 
        patent, in raw format
        """
        
        lista = [str(x) for x in list(self.data['_text_data']['publication_number'])]
        df = self.data['_text_data']
        cols = list(df)

        for patent in self.patent_list:
            if patent.patent_attributes['publn_nr'] in lista:
                data = df[df['publication_number'] == int(patent.patent_attributes['publn_nr'])]
                patent.patent_attributes['full_text'] = data
            else:
                patent.patent_attributes['full_text'] = pd.DataFrame(columns=cols)
        return self
           
        
    @staticmethod    
    def _translate(text_to_translate, source_language = False, target_language = 'en'):
        """Machine translation using the Google translate API"""

        # initialise the translator object (see the googletrans API docs)
        translator = Translator()
        # behaviour of the function if the input language is known
        if source_language != "False":
            translatedText = translator.translate(text_to_translate, src=source_language, dest=target_language)
        # if the language of the input is not known
        else: 
            translatedText = translator.translate(text_to_translate, dest=target_language)
        return translatedText.text

    
    @staticmethod 
    def _get_text_claims(data_patent_text):
        """For a given dataframe coming from the EP full-text database, returns
        a list of its claim"""

        ## data manipulation

        # nickname 
        data = data_patent_text
        # select only the claims in the dataframe
        data = data[data['text_type']=='CLAIM']
        # sort by data
        data = data.sort_values(by = 'publication_date', ascending = False)
        # keep only most recent claims by language
        data.drop_duplicates(subset = ['language_text_component','text_type'], inplace = True)
        # keep languages according to the other EN, DE, FR, other ('xx') (for best consistency of the translating)
        data['language_text_component'] = pd.Categorical(data['language_text_component'] , categories=["en","de","fr","xx"], ordered=True)
        data = data.sort_values(by = 'language_text_component')
        data.drop_duplicates(subset = ['text_type'], inplace = True)

        # if the data contains no claims then the data selected is empty
        if len(data)==0:
            return ['Unavailable']

        else:

            # store the language at this point for translation at the end
            language = data.iloc[0]['language_text_component']
            # selection of the field of the pandas dataframe which contains the claims texts
            text_xml = data.iloc[0]['text']

            ## Process the xml to get the raw text
            
            # removing the tags for bold text
            text_xml_modified = text_xml.replace('<b>', '')
            text_xml_modified = text_xml_modified.replace('</b>', '')

            # modifying the claim to be processed as a real xml
            text_xml_modified = "<data>" + text_xml_modified + '</data>'
            # we parse it with the ElementTree XML APIÂ¶
            root = ET.fromstring(text_xml_modified)
            # and this is how we access the text of the claims
            claims = root.findall("./claim/claim-text")
            # we store the claims in a list
            claims_text = [claim.text for claim in claims]

            ## Translate if the claims are not in EN
            if language =='en':
                pass
            elif language == 'de':
                claims_text = [RetrieveFullTextData.translate(text, 'de', 'en') for text in claims_text]
            elif language == 'fr':
                claims_text = [RetrieveFullTextData.translate(text, 'fr', 'en') for text in claims_text]
            elif language == 'xx':
                claims_text = [RetrieveFullTextData.translate(text, False, 'en') for text in claims_text]
            else:
                claims_text = [RetrieveFullTextData.translate(text, False, 'en') for text in claims_text]

            return claims_text


    def _attribute_claims(self):
        """For each patent, uses the _get_text_claims function to store the claims text in the object"""
        for patent in self.patent_list:
            patent.patent_attributes['full_text_claims'] = RetrieveFullTextData._get_text_claims(patent.patent_attributes['full_text'])
        return self

In [None]:
class CustomStemmer(BaseEstimator, TransformerMixin):
    """We overwrite the Sklearn BaseEstimator class in order to have more control on the 
    text data preprocessing"""
    
    def __init__(self, stemmer_type):
        """We can use different types of stemmer"""
        
        self.tokenizer = RegexpTokenizer("(?u)\\b[\\w-]+\\b")
        self.stemmer_type = stemmer_type
        
        if stemmer_type == 'snowball':
            self.stemmer = SnowballStemmer("english")
    
    def fit(self, documents, labels = None):
        """Overwritten for the sake of completeness, does not perform any action"""
        return self
    
    def transform(self, documents):
        """Returns a stemmed version of the documents, using the Porter algorithm (snowball)
        and removing English stop words"""
        
        if self.stemmer_type!='no':
            documents = documents.apply(self.tokenizer.tokenize)
            # apply the second tokenizer here

            def snowball(x):
                liste = [self.stemmer.stem(y) for y in x]
                return liste

            # stemming
            documents = documents.apply(snowball)
            
            # remove stop words
            nltk.download('stopwords')
            
            
            
            # stopset = set(stopwords.words('english'))
            
            stopwords = nltk.corpus.stopwords.words('english')
            stopwords.append('claim')
            stopwords.append('according')
            stopwords.append('preceding')
            stopwords.append('characterised')
            stopset = set(stopwords)
            
            def remove_stop_words(x):
                liste = [y for y in x if not y in stopset]
                return liste
            
            documents = documents.apply(remove_stop_words)
            
            def remove_numbers(x):
                liste = [y for y in x if not any(char.isdigit() for char in y)]
                return liste
            
            documents = documents.apply(remove_numbers)

            # join
            documents = [' '.join(docs) for docs in documents]
        
        return documents

In [None]:
class CustomVectorizer(BaseEstimator, TransformerMixin):
    """We overwrite the Sklearn BaseEstimator class in order to have more control on the vectorisation"""
    
    def __init__(self, vectorizer_type):
        """Two possibilities here: count and tfidf. More can be added if neccessary"""
        
        if vectorizer_type == 'count':
            self.vectorizer = CountVectorizer(binary=True)
    
        if vectorizer_type == 'tfidf':
            self.vectorizer = TfidfVectorizer(stop_words='english')
    
    def fit(self, documents, labels = None):
        """Does not perform any action"""
        return self
    
    def transform(self, documents):
        """Return a numpy arraw - the feature space"""
        freqs = self.vectorizer.fit_transform(documents)
        return [freq.toarray()[0] for freq in freqs]

In [None]:
class TextProcessing:
    """
    Methods for text analysis and similarity measures, using the 2 classes above.
    """
    
    def _index_patents(self):
        """We create an index of patents, in order to be able to process all the text together in
        the following text processing steps and still be able to access individual patent text data"""
        
        self.dict_patents_indexes = {k: v for v, k in enumerate(self.patent_list)}
        return self
    
    
    def _store_vocabulary(self):
        """Store all the vocabulary contained in the patents in a Panda series called 'corpus' """
        
        # create an empty list and fill it with the claim text of each patent in a separate entry
        l = []
        for patent in self.patent_list:
            l.append(patent.patent_attributes['full_text_claims'])
        # flatten the nested list by joining the sentences/claims together in a single sentence
        # for each patent / except if the text is unavailable
        l = [' '.join(element) for element in l if isinstance(element, str)==False]
        # reshape as a single Pandas serie and store in the corpus
        self.corpus = pd.DataFrame(l, columns=['text']).pop('text')
        return self
    
    
    def _stemming(self):
        """Reducing words to their stem word (semantic root), and remove the English stop words"""
        
        stemmer = CustomStemmer('snowball')
        self.corpus_stemmed = stemmer.transform(self.corpus)
        return self
    
    
    def _vectorize(self):
        """Vectorise the patents in a high dimention space (returns a list)"""
        
        custom_vectorizer = CustomVectorizer('tfidf')
        self.feature_space = custom_vectorizer.fit_transform(self.corpus_stemmed)
        
        # iterating through all pairs of patents to get their pairwise similarity
        for patent in model.patent_list:
            index = model.dict_patents_indexes[patent]
            patent.patent_attributes['stemmed text'] = model.corpus_stemmed[index]
        return self

    
    def _compute_pairwise_similarities(self):
        """returns a numpy.ndarray containing all pairwise similarities between patents"""
        
        # https://stackoverflow.com/questions/12118720/python-tf-idf-cosine-to-find-document-similarity
        # To compute the cosine distance of the first doc to all the others 
        from sklearn.metrics.pairwise import linear_kernel
        # in this case linear_kernel is equivalent to cosine_similarity because the TfidfVectorizer produces normalized vectors.
        # returs an array with all pairwise similarities!
        self.cosine_similarities = linear_kernel(self.feature_space, self.feature_space)
        return self
    
    
    def _similarity(self, patent1, patent2):
        """Measure the similiarity between a pair of linked patents pair = (patent1, patent2)"""
        i = self.dict_patents_indexes[patent1]
        j = self.dict_patents_indexes[patent2]
        return self.cosine_similarities[i,j]

In [None]:
class BuildNetwork():
    """Builds a weighted network based on backwards citations and text similarity"""
    
    def _create_network(self):
        """Create the weighted and undirected network with igraph"""

        def filter_symmetric_duplicates(l:list):
            """Code snippet to filter symmetric duplicates in a list of tuples
            Eg [(1,2), (2,1)] -> [(1,2)]"""
            seen = []
            for pair in l:
                if pair in seen:
                    l.remove(pair)
                seen.append(tuple(reversed(pair)))
            return l
        
        # defining all possible links between any pair of patents
        links = self.direct_citations + self.CC + self.BC + self.LC
        
        # definition of the links
        links = filter_symmetric_duplicates(links)
        weighted_links = [(p1, p2, TextProcessing._similarity(self, p1, p2)) for (p1, p2) in links]
        
        # creation of the graph
        self.graph = Graph.TupleList(weighted_links, weights=True)
        
        return self
    

    def _simplify_network(self):
        """ 
        Removing (1) multiple edges (i.e. several links between patents) and (2) loops (i.e. links
        between a patent and itself).
        
        When multiple edges are removed, they are replaced by a single edge with the weight
        of the maximum weight of the previsous edges (normally all equal, since it is a 
        similarity measures of the nodes).
        """
        self.graph = self.graph.simplify(multiple=True, loops=True, combine_edges=max)
        return self
    
    
    def _select_resolution_parameter(self):
        """
        Selecting the resolution parameter which maximise the modularity of the graph with the
        Leiden algorithm:
        # show the table of values
        # display the figure showing the result
        """
    
        # looping over a wide range of possible resolution parameters to find the one
        # which maximise the modularity of the graph (more or less the goodness of the fit
        # of the partition of the graph in communitities/clusters)
        l = []
        for i in [x * 0.01 for x in range(0, 350)]:
            resolution_parameter = i
            comms = model.graph.community_leiden(objective_function = 'modularity',
                                                 n_iterations = -1,
                                                 weights = model.graph.es['weight'],
                                                 resolution_parameter=resolution_parameter)
            l.append({'resolution_parameter':i,
                      'modularity':model.graph.modularity(comms),
                     'nb_clusters':len(comms)})
        df = pd.DataFrame(l)
        # selection of the best performing resolution parameter and storing
        # it has an attribute of the model 'best_resolution_parameter'
        self.best_resolution_parameter = df.sort_values(by = 'modularity', ascending = False).reset_index(drop=True)['resolution_parameter'][0]
        
        # plotting the figure showing the selection
        
        # size
        sns.set(rc={'figure.figsize':(12,4)})
        # plotting aesthetics
        sns.set_style('white')
        # plot the line for modularity
        ax = df.plot(x="resolution_parameter", y="modularity", legend=False)
        # on a second axis, plot the line for the resolution parameter
        ax2 = ax.twinx()
        df.plot(x="resolution_parameter", y="nb_clusters", ax=ax2, legend=False, color="r")
        # legend
        ax.figure.legend(loc='bottom left')
        # display
        plt.show()
        # display the table of results
        display(df.sort_values(by = 'modularity', ascending = False).head(20))
        
        return self
        
        
    def _fit_Leiden_clustering_algorithm(self):
        """Get the community structure of the graph using the Leiden clustering algorithm
        defined in https://www.nature.com/articles/s41598-019-41695-z"""
        
        self.community_structure = model.graph.community_leiden(
            # the objective function is the graph modularity
            # https://en.wikipedia.org/wiki/Modularity_(networks)
            objective_function = 'modularity',
            # the algorithm iterate until convergence (coded by -1 steps in the API)
            n_iterations = -1,
            # the weights of the edges are the ones computed by the text similarity metric
            weights = model.graph.es['weight'],
            # the resolution parameters is the one previously computed, which 
            # maximise the modularity of the community structure (with iterations until
            # covergence as well)
            resolution_parameter = self.best_resolution_parameter
            )
        return self

In [None]:
class SummaryStatistics:
    """Summary statistics for the data section"""
    # Can also help comparing before data cleaning and after!
    
    def _print_nb_patents(self):
        """Printing info"""
        print('..Nb of patents:',len(self.data['_table_main_patent_infos']\
                                     [Config.VAR_APPLN_ID].unique().tolist()))
        
        
    def _plot_patent_filling_over_time(self):
        """Display a plot with the patent filling over time"""
        
        l = []
        for patent in self.patent_list:
            l.append(patent.patent_attributes['appln_filing_year'])
        df = pd.DataFrame(l,columns=['appln_filing_year'])['appln_filing_year'].value_counts().to_frame().reset_index()
        df.columns = ['appln_filing_year', 'count']
        df.sort_values(inplace = True, by = 'appln_filing_year')
        sns.lineplot(data=df,
                     x="appln_filing_year",
                     y= 'count').set_title('Patent filling over time', fontsize = 14);

In [None]:
class Visualisation:
    """Visualisation methods"""
    
    def _draw_graph_with_communities(self, kind = 'leiden', nb_iterations = -1):
        """Plot the graph with a custom layout and showing the communities:
        - kind = 'louvain' display the result according to the Louvain algorithm 
        by Blondel et al.
        - kind = 'leiden' display the result according to the Leiden algorithm
        - nb_iterations = -1 iterates until convergence.
        """
        
        # defining the visual layout for the graph
        visual_style = {}
        # size of the vertex (nodes)
        visual_style["vertex_size"] = 5
        # the thickness of the edges is proportionnal to the strenght of the link between the patents
        visual_style["edge_width"] = [int(2 * weight)+0.01 for weight in model.graph.es['weight']]
        
        if kind == 'louvain':
            # LOUVAIN: https://igraph.org/r/doc/cluster_louvain.html
            # this fonction is implemented in C in igraph and called 'community_multilevel'
            print('Using the Louvain algorithm:')
            comms = self.graph.community_multilevel(weights = self.graph.es['weight'])
            # plotting the graph with the communities and the visual style
            display(igraph.plot(comms, mark_groups = True, **visual_style))
        
        if kind == 'leiden':
            
            if nb_iterations!=-1:
                # LEIDEN: https://www.nature.com/articles/s41598-019-41695-z
                print('Using the Leiden algorithm, {} iterations:'.format(nb_iterations))
                comms = model.graph.community_leiden(objective_function = 'modularity',
                                                     weights = model.graph.es['weight'],
                                                    resolution_parameter = self.best_resolution_parameter)
                # plotting the graph with the communities and the visual style
                display(igraph.plot(comms, mark_groups = True, **visual_style))

            else:
                print('Using the Leiden algorithm, iterating until convergence:')
                print('This is the final model!')

                # nickname for clarity
                community_structure = self.community_structure

                # get the number of clusters and store the value in the model
                self.nb_clusters = len(self.community_structure)
                # defining all the range of possible colors for clusters from matplotlib colors
                colors = ['blue','paleturquoise','green','gold','red','grey','fuchsia', 'black', 'ivory',
                         'firebrick','lime','bisque','lightgrey']
                # creation of a color palette according to the number of clusters
                colors_selected = colors[:self.nb_clusters]
                pal = igraph.PrecalculatedPalette(colors_selected)

                # setting the random seed so that to fix the output
                random.seed(1234)
                # plotting the graph with the communities and the visual style
                # without marking communities
                display(igraph.plot(community_structure,
                                    palette = pal,
                                    **visual_style))

                # plot the legend
                legend_elements = [Line2D([0], [0], marker='o', color='w', label='Cluster '+ str(i+1),
                                          markerfacecolor=c, markersize=15) for i,c in enumerate(colors_selected)]
                fig, ax = plt.subplots()
                ax.legend(handles=legend_elements, loc='center')
                plt.axis('off')
                plt.show()
                
                # setting the random seed so that to fix the output
                random.seed(1234)
                # plotting the graph with the communities and the visual style
                # with marking of the communities with polygons
                display(igraph.plot(community_structure,
                                    palette = pal,
                                    mark_groups = True,
                                    **visual_style))
                # plot the legend again
                legend_elements = [Line2D([0], [0], marker='o', color='w', label='Cluster '+ str(i+1),
                                          markerfacecolor=c, markersize=15) for i,c in enumerate(colors_selected)]
                fig, ax = plt.subplots()
                ax.legend(handles=legend_elements, loc='center')
                plt.axis('off')
                plt.show()
        
        return self
        
        
    @staticmethod
    def normalise_voc(voc_cluster, voc_corpus):

        def vectorise_voc(data):
            """For a given corpus, compute the frequency of the words"""
            vectorizer = TfidfVectorizer(stop_words='english')
            vecs = vectorizer.fit_transform(data)
            feature_names = vectorizer.get_feature_names()
            dense = vecs.todense()
            lst1 = dense.tolist()
            df = pd.DataFrame(lst1, columns=feature_names)
            data = df.T.sum(axis=1)
            data = data.sort_values(ascending = False)
            return data

        # create a table to compare the frequency of a term inside the cluster 
        # and in the total vocabulary
        df1 = vectorise_voc(voc_corpus).to_frame()
        df1.columns = ['freq_corpus']
        df2 = vectorise_voc(voc_cluster).to_frame()
        df2.columns = ['freq_cluster']
        result_df = pd.concat([df1, df2], axis=1, sort=False)

        # floor to eliminate corner cases 
        result_df = result_df[result_df['freq_cluster']>=0.1]

        # create a new variable to select the most decisive words for each cluster
        # = squared frequency in the cluster / frequency in the total vocabulary
        result_df['squared_freq_cluster_div_freq_corpus'] = result_df['freq_cluster']*result_df['freq_cluster']/result_df['freq_corpus']
        result_df = result_df.sort_values(by = 'squared_freq_cluster_div_freq_corpus', ascending = False)
        # show the table
        display(result_df.head(10))

        return result_df['squared_freq_cluster_div_freq_corpus']


    @staticmethod
    def show_wordcloud(voc_cluster,voc_corpus):
        """Create a WordCloud"""

        data = Visualisation.normalise_voc(voc_cluster, voc_corpus)
        Cloud = WordCloud(background_color="white", max_words=50).generate_from_frequencies(data)

        # controlling figure aesthetics
        fig = plt.figure(1, figsize=(12, 12))
        plt.axis('off')
        plt.imshow(Cloud)
        plt.show()


    def _display_cluster_word_clouds(self):
        """Compute for each cluster the most original words in contains, 
        and display the result in a WordCloud"""

        # the community structure
        comms = self.community_structure

        # looping over the clusters
        for i in range(0,len(comms)):
            print('Cluster {}:'.format(i+1))

            # retrieving all patents belonging to the given cluster
            ids_patents = comms[i]
            # adding the stemmed corpus of all patent to get the cluster specific corpus
            l = []
            for id_patents in ids_patents:
                text_stemmed = comms.graph.vs[id_patents]['name'].patent_attributes['stemmed text']
                l.append(text_stemmed)
            voc_cluster = l
            # showing the wordcloud, once compared the corpus of the cluster with the 
            # corpus of the entire model (all patents)
            Visualisation.show_wordcloud(voc_cluster, voc_corpus = self.corpus_stemmed)
            
        return self
    
    
    def _display_S_curves(self):
        """Show the propagation of the different technologies"""

        # the community structure
        comms = self.community_structure

        # looping over the clusters
        dfs = []
        for i in range(0,len(comms)):

            # retrieving all patents belonging to the given cluster
            ids_patents = comms[i]
            # adding the application filling year for each patent i the cluster
            l = []
            for id_patents in ids_patents:
                date = comms.graph.vs[id_patents]['name'].patent_attributes['appln_filing_year']
                l.append(date)
            dates = l
            df = pd.DataFrame(l,columns=['appln_filing_year'])
            df['cluster'] = 'Cluster {}'.format(i+1)
            # showing the wordcloud, once compared the corpus of the cluster with the 
            # corpus of the entire model (all patents)
            dfs.append(df)
        df = pd.concat(dfs)

        l = []
        for cluster in df['cluster'].unique().tolist():
            for year in df['appln_filing_year'].unique().tolist():

                condition1 = df['appln_filing_year']<=year
                condition2 = df['cluster']==cluster
                nb = len(df[condition1 & condition2])

                l.append([cluster, year, nb])
        df = pd.DataFrame(l,columns=['Cluster:', 'appln_filing_year','count'])
        sns.lineplot(data=df,
                     x="appln_filing_year",
                     y= 'count',
                     hue="Cluster:",
                     style="Cluster:",
                     markers=True,
                     dashes=False).set_title('S-curves for the different technologies identified',
                                             fontsize = 14);

        return self

In [None]:
class Model(Config, DataCleaning, NewMetrics, ReshapingToOOP, GetCitations, RetrieveFullTextData,
            TextProcessing, BuildNetwork, SummaryStatistics, Visualisation):
    """
    Creation of a model which inherits several building blocks
    """
    
    # Attributes of the model
    
    def __init__(self):
        
        data: dict # datasets
        patent_list: list # patent objects
        dict_patents_indexes: dict # mapping of patents objects and their indexes
        patent_ids: list # list of patent ids contained in the model
        patent_family_ids: list # list of DOCDB family ids contained in the model
        direct_citations: list # directed list of simple citations
        CC: list # undirected list of co-citations
        BC: list # undirected list of bibliographical coupling
        LC: list # directed list of longitudinal citations
        corpus: pandas.core.series.Series # contains all claim text of each patent (raw)
        corpus_stemmed: pandas.core.series.Series # contains all claim text of each patent (stemmed)
        feature_space: list # the feature space, the high dimension representation of the text data
        cosine_similarities: numpy.ndarray # contains all the pairwise similarities between patents
        graph: igraph.Graph # Igraph network 
        best_resolution_parameter: int # best resolution parameter for the Leiden algorithm
        community_structure: igraph.clustering.VertexClustering # community structure identified
        
    
    def _input_data(self, data):
        """Getting the data in the model"""
        self.data = data
    
    
    def _compute_new_metrics(self):
        """Adding new variables in the dataset"""
        self = NewMetrics._get_DOCDB_fam_cites_per_year(self)  
    
    
    def _data_cleaning(self):
        """Data cleaning using the DataCleaning class methods"""
        self = DataCleaning._keep_only_EP_patents(self)
        self = DataCleaning._keep_only_granted_patents(self)
        self = DataCleaning._select_time_range(self)
        #self = DataCleaning._normalise(self)
        self = DataCleaning._select_one_patent_per_family(self)
        self = DataCleaning._select_breakthrough_patents(self)
    
    
    def _fit_to_object_oriented_design(self):
        """We reshape the data from a tabular form to an object oriented form"""
        self = ReshapingToOOP._create_patent_objects(self)
        self = ReshapingToOOP._assign_data_to_patent_obj(self)  
   

    def _get_citations(self):
        """Identify direct and indirect citations that link the patents"""
        self = GetCitations._get_direct_citations(self)
        self = GetCitations._get_CC_citations(self)
        self = GetCitations._get_BC_citations(self)
        self = GetCitations._get_LC_citations(self)
        
    
    def _get_full_text(self):
        """Retrieve the full text data corresponding to the patents in the model, 
        extract the claims and attribute them to the patent objects"""
        self = RetrieveFullTextData._assign_full_text_to_patents(self)
        self = RetrieveFullTextData._attribute_claims(self)
    
    
    def _text_preprocessing(self):
        """Computing text similarities between linked patents
        
        # 1. index all the patents in the 'dict_patents_indexes' dictionnary
        # 2. get all the vocabulary stored in a single Pandas serie
        # 3. text preprocessing
        # 4. vectorisation and create the feature space 
        # 5. compute the array of all pairwise similarities from the feature space
        """
        self = TextProcessing._index_patents(self)
        self = TextProcessing._store_vocabulary(self)
        self = TextProcessing._stemming(self)
        self = TextProcessing._vectorize(self)
        self = TextProcessing._compute_pairwise_similarities(self)
     
        
    def _build_patent_network(self):
        """We build the patent network (weighted directed graph)"""
        self = BuildNetwork._create_network(self)
        self = BuildNetwork._simplify_network(self)
        self = BuildNetwork._select_resolution_parameter(self)
        self = BuildNetwork._fit_Leiden_clustering_algorithm(self)
        
    
    def _display_summary_statistics(self):
        """Summary statistics about the model created and its data"""
        SummaryStatistics._print_nb_patents(self)
        SummaryStatistics._plot_patent_filling_over_time(self)
    
    
    def _visualise(self):
        """Show the different visualisations"""
        self = Visualisation._draw_graph_with_communities(self)
        self = Visualisation._display_cluster_word_clouds(self)
        self = Visualisation._display_S_curves(self)