# Imports

In [1]:
# - general imports 
from google.colab       import files
from google.colab       import drive
import os
import sys
import numpy                            as np
from numpy.linalg       import norm
import re
import math
import time
import pandas                           as pd
from zipfile            import ZipFile
!pip install nltk
import nltk
nltk.download('punkt')
from nltk.tokenize      import sent_tokenize

# imports for the models
!!pip install bert-extractive-summarizer
!pip install -U sentence-transformers
from summarizer         import Summarizer
from summarizer.sbert   import SBertSummarizer

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting sentence-transformers
  Downloading sentence-transformers-2.2.2.tar.gz (85 kB)
[K     |████████████████████████████████| 85 kB 3.1 MB/s 
Collecting sentencepiece
  Downloading sentencepiece-0.1.97-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
[K     |████████████████████████████████| 1.3 MB 30.0 MB/s 
Building wheels for collected packages: sentence-transformers
  Building wheel for sentence-transformers (setup.py) ... [?25l[?25hdone
  Created wheel for sentence-transformers: filename=sentence_transformers-2.2.2-py3-none-any.whl size=125938 sha256=f6b15048f29a9fa5d7ecda9e52a0d6257dad083c95d1d790b4fd5584b1f8f9f2
  Stored in directory: /root/.cache/pip/wheels/5e/6f/8c/d88aec621f3f542d26fac0342bef5e693335d125f4e54aeffe
Successfully built sentence-transformers
Installing collected packages: sentencepiece, sentence-transformers
Successfully installed sentence

# Load the dataset

In [2]:
"""
get the paths from the zip file
"""
def loadFairySum(verbose = True):
  load_mod = "local"
  fairy_tale_texts_path = "./FAIRY_TALE/texts"; short_story_texts_path = "./SHORT_STORY/texts"
  fairy_tale_summaries_path = "./FAIRY_TALE/summaries"; short_story_summaries_path = "./SHORT_STORY/summaries"

  if load_mod == "drive":
    drive.mount("/content/drive", force_remount=True)
    %cd '/content/drive/MyDrive/Colab Notebooks/Narrative understanding and storytelling/homework_1a/FairySum'
  else: # load locally
    files.upload()
    file_name = "./FairySum.zip"
    with ZipFile(file_name, 'r') as zip_file:
      zip_file.extractall()
    %cd FairySum

  if verbose: print("\nLoading texts:")

  # load the texts 
  fairy_tale_textsPaths  = [fairy_tale_texts_path + "/" + text for text in sorted(os.listdir(fairy_tale_texts_path))]
  short_Story_textsPaths = [short_story_texts_path + "/" + text for text in sorted(os.listdir(short_story_texts_path))]

  # merge paths about fairy tales and short stories
  full_list_textsPaths = [*fairy_tale_textsPaths,*short_Story_textsPaths]

  if verbose: 
    for x in full_list_textsPaths: print("- Loaded: {}".format(x))
    print("\nLoading summaries:")

  # load the summaries
  fairy_tale_summariesPaths  = [fairy_tale_summaries_path + "/" + text for text in sorted(os.listdir(fairy_tale_summaries_path))]
  short_Story_summariessPaths = [short_story_summaries_path + "/" + text for text in sorted(os.listdir(short_story_summaries_path))]

  # merge paths about fairy tales and short stories
  full_list_summariesPaths = [*fairy_tale_summariesPaths,*short_Story_summariessPaths]

  if verbose: 
    for x in full_list_summariesPaths: print("- Loaded: {}".format(x))
  
  return full_list_textsPaths,full_list_summariesPaths

full_list_textsPaths,full_list_summariesPaths = loadFairySum()

Saving FairySum.zip to FairySum.zip
/content/FairySum

Loading texts:
- Loaded: ./FAIRY_TALE/texts/bn_00173084n_The Old Dame and her Hen.txt
- Loaded: ./FAIRY_TALE/texts/bn_00187346n_The Dove.txt
- Loaded: ./FAIRY_TALE/texts/bn_00187442n_Corvetto.txt
- Loaded: ./FAIRY_TALE/texts/bn_00608721n_Sun, Moon, and Talia.txt
- Loaded: ./FAIRY_TALE/texts/bn_00790303n_Childe Rowland.txt
- Loaded: ./FAIRY_TALE/texts/bn_01593735n_Town Musicians of Bremen.txt
- Loaded: ./FAIRY_TALE/texts/bn_01899260n_The Raven.txt
- Loaded: ./FAIRY_TALE/texts/bn_02154461n_Herr Korbes.txt
- Loaded: ./FAIRY_TALE/texts/bn_02173122n_Thumbelina.txt
- Loaded: ./FAIRY_TALE/texts/bn_02238889n_The Nightingale.txt
- Loaded: ./FAIRY_TALE/texts/bn_02277659n_The Ugly Duckling.txt
- Loaded: ./FAIRY_TALE/texts/bn_02442758n_Jack the Giant Killer.txt
- Loaded: ./FAIRY_TALE/texts/bn_03300215n_The Wonderful Birch.txt
- Loaded: ./FAIRY_TALE/texts/bn_03301753n_Cap-o_-Rushes.txt
- Loaded: ./FAIRY_TALE/texts/bn_03326399n_The Story of Pret

## Manage data

In [3]:
# extract as list of sentences, used for the document text
def sents_from_file(path):
  with open(path,'r') as file:
    text = []
    for line in file.readlines():
      text.append(line)
      match = re.match(r"^\d+:",line) # to keep track of any problem in the loading of sentences
      if match == None:
         # look if the problem is with an empty line
        if re.match(r'^\s*$', line):
          continue
        else:
          print(f"ID error in the following line:\n{line}")
          raise ValueError("Sentence without ID, wrong extraction")
    # text = [line for line in file.readlines()]
    file.close()
    return text

# extract as single text varible, used for summaries
def text_from_file(path, toggle_filter = True):
    with open(path,'r') as file:
      if toggle_filter:
        text_r = [line for line in file.readlines()]
        text = "".join(filter(lambda x: not re.match(r'^\s*$', x), text_r)) # remove empty lines in the text
      else:
        text = file.read()
    file.close()
    return text

# for each document separate ids from text and store in a dictionary, take the list
# of sentences as input if doc sentences loaded with sents_from_file(path), 
# already taken in account the check for the presence of the ID.
def dict_from_sents(sents):
  doc_dictionary = {}
  for sent in sents:
    match = re.match(r"^\d+:",sent)
    id = match.group().replace(":", "")
    text = "".join(sent.split(":")[1:]).strip()
    doc_dictionary[id] = text
  return doc_dictionary
  
# marge all the sentence to get the full raw text.
# like using text_from_file(path), but here we have separated before ID and text.
def fullText_from_dict(doc_dictionary):
  document_text = ""
  for k,v in doc_dictionary.items():
    document_text += v + "\n" 
  return document_text

# simple method to get the right format for the output
def getResultFormat(array):
  result = []
  for k,vs in y_docs:
    row = [k]
    for v in vs:
      row.append(v)
    result.append(row)
  return result

""" priting table not on the line """
def print_table(table):
  for i,(k,v) in enumerate(table.items()):
    print("{:<4} K: {:<30} V: {:<30}".format(str(i)+")",str(k),str(v)))

""" priting list not on the line """
def print_list(list):
  for i,elem in enumerate(list):
    print("{:<4} {:<30}".format(str(i)+")",str(elem)))

# Extractive summarization

## initialization and auxiliary functions

In [4]:
# choose between the 2 approches proposed
METHODS = ("bert","sbert","s-bert distance")
method = METHODS[1] 

# load the model model
# CUDA is automatically used whether gpu is available.
if method == "bert":
  model = Summarizer()  
elif method == "sbert" or  method == "s-bert distance":
  model = SBertSummarizer('paraphrase-MiniLM-L6-v2')

# start functions 

"""
Estimate the number of sentence as input for the extractive summarizer.
summary -> abstractive summary from the dataset (raw text)
"""
def estimate_nSents(summary, n_sentsDoc):
  # compute the n. of sentences to return based on the summary
  n_sents_summ = len(sent_tokenize(summary))
  multiplier = math.log(1/(n_sents_summ/50))
  if multiplier < 1: multiplier = 1
  out = math.ceil(multiplier*n_sents_summ)

  # choice: a summary cannot have more than half n.sentences present in the doc
  half = math.ceil(n_sentsDoc/2)
  if out> half:
    out = half

  return out 

"""
take the original senteces from the document:
sents_summary -> raw text of the summary
dict_doc -> dictionary of the document: { id_sentence : text_sentence }
"""
def get_referenceSents(summary, dict_doc, verbose = False):
  # first we take the sentences from the summary.
  sents_summary = sent_tokenize(summary)
  n_sents_summary = len(sents_summary)
  if verbose: print(f"extracted from the summary {n_sents_summary} phrases")

  # define the output dictionary
  dict_summary = {}

  # get the all sentences from the document that are in the summary sentence
  def _check_fullSummarySentence(summary_sent,dict_doc,i, verbose = False):
    if verbose: print("\ncheck full summary sentence")

    next_matches = {}
    for j,(k,v_) in enumerate(dict_doc.items()):
      if j<i: continue
      v = v_.strip().lower()
      if summary_sent.find(v)!=-1 or summary_sent==v:
        summary_sent = summary_sent.replace(v,"").strip()
        next_matches[k] = v_
      elif v.find(summary_sent)!=-1:
        summary_sent = ''
        next_matches[k] = v_
      if re.search("[:alpha:]",sent_summary_tmp) == None:
      # if summary_sent.strip()=='':
        return next_matches
    return False

  # check if a summary sentence is part of 2 or more reference sentences (dataset)
  skip_index = 0
  for sent_summary in sents_summary:
    if verbose: print(f"checking for: {sent_summary}")
    sent_summary = sent_summary.strip().lower()
    match = False
    for idx,(k,v_) in enumerate(dict_doc.items()):
      if idx < skip_index: continue
      v = v_.strip().lower()
      if sent_summary in v:
        if k in list(dict_summary.keys()): 
          match = True
          break  # skip since key already inserted, nothing to do
        if verbose: print(f"inserting: {v_} \nwith index {k}\n")
        dict_summary[k] = v_; match = True
        skip_index = idx
        # break

    if match == False: 
      # no suitable sentences has been found you have to check the opposite now
      # if a sentence is composed by 2 or more sentences of reference sentences (dataset)
      sent_summary_tmp = sent_summary
      try: # try to retrieve over the next sentences 
        for i,(k,v_) in enumerate(dict_doc.items()):
          if i < skip_index: continue
          v = v_.strip().lower()
          
          if sent_summary_tmp.find(v) == 0:
            sent_summary_tmp = sent_summary_tmp.replace(v,"").strip()
            next_matches = _check_fullSummarySentence(sent_summary_tmp,dict_doc,i+1, verbose)
            if next_matches:
              if verbose: print(f"inserting: {v_}\nwith index {k}")
              dict_summary[k] = v_; match = True
              for k2,v2 in next_matches.items():
                if verbose: print(f"insert for continuity: {v2} \nwith index {k2}" )
                dict_summary[k2] = v2
              break
            else:
              sent_summary_tmp = sent_summary

        if match == False:
              raise ValueError(f"no matching between for:\n{sent_summary}")
      except: 
          # if some words are cutted, find over the next possible sentences 
          # accepting part of summary not matched (extremely rare case)
          for i,(k,v_) in enumerate(dict_doc.items()):
            if i < skip_index: continue
            v = v_.strip().lower()
            
            if re.search("[:alpha:]",sent_summary_tmp) == None:  # meaningless part of the sentence remained
              break
            else:
              if not(sent_summary_tmp.find(v)==-1) :
                sent_summary_tmp = sent_summary_tmp.replace(v,"").strip()
                if verbose: print(f"inserting: {v_}\nwith index {k}")
                dict_summary[k] = v_; match = True
              if not(v.find(sent_summary_tmp)==-1) :
                sent_summary_tmp = ''
                if verbose: print(f"inserting: {v_}\nwith index {k}")
                dict_summary[k] = v_; match = True
                break

  # just for safe sort everything if not (practically is not used since the
  # retrieving is performed follwign the order)
  keys = list(dict_summary.keys())
  if not(all(int(keys[i]) <= int(keys[i+1]) for i in range(len(keys) - 1))):
    if verbose: 
      print(keys)
      print(f"keys are been wrongly ordered when selected")
    dict_summary = dict(sorted(dict_summary.items(), key=lambda item: int(item[0])))
    # recompute keys after ordering
    keys = list(dict_summary.keys())
    if not(all(int(keys[i]) <= int(keys[i+1]) for i in range(len(keys) - 1))):
      raise ValueError(f"keys are been wrongly ordered when selected")

  if verbose: print(f"final number of reference sentence extracted is {len(keys)}")
  
  return dict_summary

# get embeddings for the summary (dataset)
def get_embeddings(model, summary_doc, verbose = False):
  # list for the embeddings
  embs = []

  # first we take the sentences from the summary.
  sents_summary = sent_tokenize(summary_doc)
  n_sents_summary = len(sents_summary)
  if verbose: print(f"extracted from the summary {n_sents_summary} phrases")

  # loop and compute
  for sent_summary in sents_summary:
    emb = model.run_embeddings(sent_summary, num_sentences = 1, aggregate='mean')
    embs.append(emb)

  return embs

# get embeddings for the text (dataset)
def get_document_emb(model, dict_doc):
  # list for the document embeddings
  emb_doc = {}

  # loop and compute
  for k,v in dict_doc.items():
    doc_emb = model.run_embeddings(v, num_sentences = 1, aggregate='mean')
    emb_doc[k] = doc_emb
  return emb_doc

"""
get sentence of the summary from the embeddings
summ_embs - > list of embedding from summary, [emb_0, ..., emb_n]
doc_embs -> dictionary of embedding from the document {id_0: emb_0, ..., id_n: emb_n}
"""
def get_referenceSents_emb(model, summ_embs,doc_embs, verbose = True):
  # define the output dictionary
  dict_summary = {}

  # define the cosine similarity
  cos_sim = lambda x,y: np.dot(x,y)/(norm(x)*norm(y))

  # retrieve sentences with max cosine similarity from the embeddings
  for summ_emb in summ_embs:
    scores = {}
    for k,v in doc_embs.items():
    # use try-except blocks to avoid errors for too short sentences wrongly 
    # separated in both cases: document of the dataset and SBD of the summary
      try:
        score = cos_sim(summ_emb,v)
      except:
        continue
      scores[k] = score
    try:
      k_max = max(scores, key=scores.get)
    except:
      continue
    dict_summary[k_max] = dict_doc[k_max]

    #ordering keys
    dict_summary = dict(sorted(dict_summary.items(), key=lambda item: int(item[0])))
    
  return dict_summary

Downloading:   0%|          | 0.00/690 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/190 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/3.69k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/629 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/122 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/112 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/466k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/314 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/229 [00:00<?, ?B/s]

## Main loop

In [8]:
# defined table to save results from the texts
y_docs = []

# save the initial time at beginning of the task
startTime = time.time()

# main loop
for idx,(path, path_summary) in enumerate(zip(full_list_textsPaths, full_list_summariesPaths)):
  try:
    file = " ".join(path.split('/')[3].split('_')[2:])
    print(f"Extractive summarization for the document n°{idx+1}: {file}")
  except:
    print(f"Extractive summarization for the document n°{idx+1}: {path.split('/')[3].split('_')[2]}")

  title= path.split('/')[3].replace(".txt","")

  # extract all information from the txt file
  sentences = sents_from_file(path)
  dict_doc = dict_from_sents(sentences)
  full_text_doc = fullText_from_dict(dict_doc)
  summary_dataset = text_from_file(path_summary, toggle_filter=True)

  # Extractive summarization

  # it's possible to fix a number of sentences to return or just let the system choose the optimal
  n_sents = estimate_nSents(summary_dataset, len(sentences))

  if method == "bert" or method == "sbert":

    # get the extractive summary without "refining" 
    summary_doc = model(full_text_doc, return_as_list= False, num_sentences = n_sents)

    # the model apply internally another SBD, we have to reconstruct the original numered sentences
    # extractive summary
    summary_docDict = get_referenceSents(summary_doc, dict_doc, verbose = False)

  elif method == "s-bert distance":
    # get the embedding for the doc summary, here there isn't an augmenting of the number for the 
    # sentences to return, are in equal number to the ones found by the sentence tokenizer

    # summary (dataset) embedding
    summary_embs_list = get_embeddings(model,summary_dataset)

    # document (dataset) embedding
    document_embs_dict = get_document_emb(model, dict_doc)

    # extractive summary
    summary_docDict = get_referenceSents_emb(model,summary_embs_list,document_embs_dict, verbose = False)
  else:
    raise ValueError("Wrong name method selected.")

  print(f"-> extracted {len(summary_docDict)} sentences")

  # create the result row 
  ids = list(summary_docDict.keys())
  y_doc = [title,ids]
  y_docs.append(y_doc)

print("\nResult:\n")
print_list(y_docs)
print("\nTotal time elapsed for the task: {} [s]".format((time.time() - startTime)))

Extractive summarization for the document n°1: The Old Dame and her Hen.txt
-> extracted 40 sentences
Extractive summarization for the document n°2: The Dove.txt
-> extracted 22 sentences
Extractive summarization for the document n°3: Corvetto.txt
-> extracted 21 sentences
Extractive summarization for the document n°4: Sun, Moon, and Talia.txt
-> extracted 29 sentences
Extractive summarization for the document n°5: Childe Rowland.txt
-> extracted 36 sentences
Extractive summarization for the document n°6: Town Musicians of Bremen.txt
-> extracted 19 sentences
Extractive summarization for the document n°7: The Raven.txt
-> extracted 21 sentences
Extractive summarization for the document n°8: Herr Korbes.txt
-> extracted 11 sentences
Extractive summarization for the document n°9: Thumbelina.txt
-> extracted 19 sentences
Extractive summarization for the document n°10: The Nightingale.txt
-> extracted 12 sentences
Extractive summarization for the document n°11: The Ugly Duckling.txt
-> ext

# generate .tsv file

In [9]:
"""
Directives:
- not include header in the tsv file
- fields: "document_title","sentences_ids"
"""

# editing of the format of the results for the dataframe
result = getResultFormat(y_docs)

# build the dataframe
df = pd.DataFrame(result)

# save the dataframe as .tsv file
df.to_csv(path_or_buf= "result.tsv",sep="\t",header=False, index = False, encoding='utf-16')
# download and save locally the result file 
files.download('result.tsv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>