Import and install PySpark and needed libraries.


In [None]:
!rm -rf spark-3.1.1-bin-hadoop3.2 #remove hadoop if exists

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#!wget -q --show-progress http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
#!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
!pip install evaluate
!pip install rouge_score

Collecting evaluate
  Downloading evaluate-0.4.2-py3-none-any.whl (84 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.1/84.1 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting datasets>=2.0.0 (from evaluate)
  Downloading datasets-2.19.1-py3-none-any.whl (542 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m542.0/542.0 kB[0m [31m13.3 MB/s[0m eta [36m0:00:00[0m
Collecting dill (from evaluate)
  Downloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m16.1 MB/s[0m eta [36m0:00:00[0m
Collecting xxhash (from evaluate)
  Downloading xxhash-3.4.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (194 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m194.1/194.1 kB[0m [31m21.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting multiprocess (from evaluate)
  Downloading multiprocess-0.70.16-py310-none-any.whl (134 kB)
[2K     [

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
#os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.jars", "/usr/local/lib/python3.10/dist-packages/pyspark/jars/graphframes-0.8.2-spark3.3.2-s_2.11.jar") \
    .getOrCreate()

spark.conf.set("spark.sql.repl.eagerEval.enabled", True)  # Property used to format output tables better\

#building a spark session

In [None]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, CountVectorizer
from pyspark.ml import Pipeline
import numpy as np
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import monotonically_increasing_id
import pyspark.sql.functions as F
import evaluate

Create multiple functions for script.
*   pp_file for preprocessing file
*   cosine_sim to calc cosine similarity
*   out_deg for # of outdegrees per vertex
*   text_rank to perform text rank
*   driver to run whole script



In [None]:
def pp_file(file):
  # Read in .txt file and preprocess sentence.
  doc = spark.read.text(file, lineSep=".")
  doc = doc.select((monotonically_increasing_id()).alias("id"),
          (regexp_replace("value", "(\\n)", " ").alias("sentence"))
           )
  # Get vocab size by counting distinct words in sentence column
  vocab_size = doc.rdd.flatMap(lambda x: x.sentence.split(" ")).distinct().count()

  # Create text processing pipeline
  tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
  # Perform TF-IDF for each.
  hashing_tf = HashingTF(numFeatures = vocab_size, inputCol="words", outputCol="features")
  idf = IDF(inputCol="features", outputCol = 'tf_idf')

  pipeline = Pipeline(stages=[tokenizer, hashing_tf, idf])

  processed_doc = pipeline.fit(doc).transform(doc)

  return processed_doc

In [None]:
@F.udf
def cosine_sim(u, v):
  cos = float(u.dot(v) / (u.norm(2) * v.norm(2)))
  if cos < 0.01:
    return 0.0
  else:
    return cos

In [None]:
def out_deg (adj_list) :
  # Calc out degrees for each vertex from adjacency list.
  num_degrees = adj_list.filter(lambda x : x[2] != '0.0').map(lambda x : (x[0], x[2])).countByKey()

  outdegrees = np.fromiter(num_degrees.values(), int)

  return outdegrees


In [None]:
def text_rank(V_tuples, d, tol, max_iter):
  n = round(np.sqrt(V_tuples.count()))
  v = np.ones(n)
  i = 0

  delta = 1/tol

  # Obtain array of out degrees per vertex.
  out = out_deg(V_tuples)

  # Text rank is performed until delta drops below tolerance OR max iterations reached

  sorted_dict = []

  while delta > tol:
      i += 1
      prev_v = v
      v = (V_tuples # create (src, dest, cos_sim)
          .map(lambda x: (int(x[0]), int(x[1]), float(x[2])))
          # perform text rank calcs
          .map(lambda x: (x[0], x[2]*v[x[1]] / out[x[1]] ))
          .reduceByKey(lambda x, y: x+y)
          .map(lambda x: (x[0], d*x[1]+(1-d)))
          .collect())

      # Update rank scores for each vertex
      v = np.array([v[j][1] for j in range(len(v))])

      # L1 Norm
      delta = np.sum(np.abs(v-prev_v))

      if i >= max_iter:
            break

      rank_dict = {x:y for x, y in enumerate(v)}

      sorted_dict = sorted(rank_dict.items(), key=lambda x: x[1], reverse = True)[:5]

      rank_dict.clear()

      # Obtain top 5 id's
  return sorted_dict

In [None]:
def driver(file, d=0.85, tol=10**-6, max_iter=100, exp = 1):
  # Preprocess txt file -> Tokenize, TF-IDF Calcs
  p_doc = pp_file(file)
  #print("File has been processed.")
  tf_idf_vals = p_doc.select('id','tf_idf')
  # Perform cosine similarity on tf-idf vals
  combined = tf_idf_vals.crossJoin(tf_idf_vals.withColumnsRenamed({"tf_idf": "tf_idf2", "id":"id2"}))
  combine = combined.withColumn("cos_sim", cosine_sim(F.col("tf_idf"), F.col("tf_idf2"))).select("id", "id2", "cos_sim")
  combine_rdd = combine.rdd
  #print("Cosine similarity has been calculated.")
  # Perform text rank -> cosine similarity = edge weights
  sentence_rank = text_rank(combine_rdd, d, tol, max_iter)
  #print("Text Rank has been performed.")
  # Collect the top 5 sentences.
  all_sentences = [row.sentence for row in p_doc.select('sentence').collect()]
  summary = list()
  print("--------------")
  print("Output Summary: ")
  for x in sentence_rank:
    summary.append(all_sentences[x[0]])
    print(all_sentences[x[0]])

  with open('log.txt', 'a') as f :
    f.write(f"Experiment Number: {exp}\n")
    f.write("Parameters Chosen:\n")
    f.write(f"\tDampening Factor: {d}\n")
    f.write(f"\tTolerance: {tol}\n")
    f.write(f"\tMax Iterations: {max_iter}\n")
    f.write("Output: ")
    for sentence in summary:
      f.write(sentence + '.')
      f.write("\n")

    f.write("-----------------------------")
    f.write("\n")


  return summary







In [None]:
param_grid = [
    {'d': 0.85, 'tol': 10**-6 , 'maxIter': 100},
    {'d': 0.85, 'tol': 10**-1 , 'maxIter': 100},
    {'d': 0.85, 'tol': 10**-10 , 'maxIter': 100},
    {'d': 0.01, 'tol': 10**-6 , 'maxIter': 100},
    {'d': 0.01, 'tol': 10**-1 , 'maxIter': 100},
    {'d': 0.01, 'tol': 10**-10 , 'maxIter': 100},
    {'d': 0.5, 'tol': 10**-6 , 'maxIter': 100},
    {'d': 0.5, 'tol': 10**-1 , 'maxIter': 100},
    {'d': 0.5, 'tol': 10**-10 , 'maxIter': 100},
    {'d': 0.85, 'tol': 10**-6 , 'maxIter': 10},
    {'d': 0.85, 'tol': 10**-1 , 'maxIter': 10},
    {'d': 0.85, 'tol': 10**-10 , 'maxIter': 10},
    {'d': 0.01, 'tol': 10**-6 , 'maxIter': 10},
    {'d': 0.01, 'tol': 10**-1 , 'maxIter': 10},
    {'d': 0.01, 'tol': 10**-10 , 'maxIter': 10},
    {'d': 0.5, 'tol': 10**-6 , 'maxIter': 10},
    {'d': 0.5, 'tol': 10**-1 , 'maxIter': 10},
    {'d': 0.5, 'tol': 10**-10 , 'maxIter': 10},

    ]

In [None]:
# Param experiments done for each of the 3 files -> 3 separate log files
count = 0
for params in param_grid:
  count+=1
  summary = driver('/content/sample_data/theoutsider.txt',params['d'], params['tol'], params['maxIter'], exp = count)

--------------
Output Summary: 
 Unhappy is he to whom the memories of childhood bring only fear and sadness
 Wretched is he who looks back upon lone hours in vast and dismal chambers with brown hangings and maddening rows of antique books, or upon awed watches in twilight groves of grotesque, gigantic, and vine-encumbered trees that silently wave twisted branches far aloft
 Such a lot the gods gave to me—to me, the dazed, the disappointed; the barren, the broken
 And yet I am strangely content, and cling desperately to those sere memories, when my mind momentarily threatens to reach beyond to the other
  I know not where I was born, save that the castle was infinitely old and infinitely horrible; full of dark passages and having high ceilings where the eye could find only cobwebs and shadows
--------------
Output Summary: 
 Unhappy is he to whom the memories of childhood bring only fear and sadness
 Wretched is he who looks back upon lone hours in vast and dismal chambers with brown h

Evaluate the output summary of each work by comparing to summaries written by Lovecraft expert.

In [None]:
dagon_summary = driver('/content/sample_data/dagon.txt',d=0.85, tol=10**-6, max_iter=100)

File has been processed.
Cosine similarity has been calculated.
Text Rank has been performed.
--------------
Output Summary: 
   The end is near
 I think I  went mad then
   The change happened whilst I slept
 The great war was then at its  very beginning, and the ocean forces of the Hun had not completely sunk to their later  degradation; so that our vessel was made a legitimate prize, whilst we of her crew were  treated with all the fairness and consideration due us as naval prisoners
   It is at night, especially when the moon is gibbous and waning, that I see the thing


In [None]:
memory_summary = driver('/content/sample_data/memory.txt',d=0.85, tol=10**-6, max_iter=100)

--------------
Output Summary: 
 These beings were like the waters of the river Than, not to be understood
 And in trees that grow gigantic in crumbling courtyards leap little apes, while in and out of deep treasure-vaults writhe poison serpents and scaly things without a name
 At the very bottom of the valley lies the river Than, whose waters are slimy and filled with weeds
 And within the depths of the valley, where the light reaches not, move forms not meet to be beheld
 Their deeds I recall not, for they were but of the moment


In [None]:
outsider_summary = driver('/content/sample_data/theoutsider.txt',d=0.85, tol=10**-6, max_iter=100)

--------------
Output Summary: 
 Unhappy is he to whom the memories of childhood bring only fear and sadness
 Wretched is he who looks back upon lone hours in vast and dismal chambers with brown hangings and maddening rows of antique books, or upon awed watches in twilight groves of grotesque, gigantic, and vine-encumbered trees that silently wave twisted branches far aloft
 Such a lot the gods gave to me—to me, the dazed, the disappointed; the barren, the broken
 And yet I am strangely content, and cling desperately to those sere memories, when my mind momentarily threatens to reach beyond to the other
  I know not where I was born, save that the castle was infinitely old and infinitely horrible; full of dark passages and having high ceilings where the eye could find only cobwebs and shadows


In [None]:
rouge = evaluate.load('rouge')

Downloading builder script:   0%|          | 0.00/6.27k [00:00<?, ?B/s]

In [None]:
with open('/content/sample_data/dagon_summary.txt', 'r') as f:
    # Read in file as list of strings
    dagon_ref = f.read().split('.')[0:5]


In [None]:
results = rouge.compute(predictions=dagon_summary,
                        references=dagon_ref)
results

{'rouge1': 0.07886524822695035,
 'rouge2': 0.0,
 'rougeL': 0.07787817419639806,
 'rougeLsum': 0.07787817419639806}

In [None]:
with open('/content/sample_data/memory_summary.txt', 'r') as f:
    # Read in file as list of strings
    memory_ref = f.read().split('.')[0:5]

In [None]:
results = rouge.compute(predictions=memory_summary,
                        references=memory_ref)
results

{'rouge1': 0.2870829172208111,
 'rouge2': 0.12222222222222223,
 'rougeL': 0.24639027652817042,
 'rougeLsum': 0.24639027652817042}

In [None]:
with open('/content/sample_data/theoutsider_summary', 'r') as f:
    # Read in file as list of strings
    outsider_ref = f.read().split('.')[0:5]

In [None]:
results = rouge.compute(predictions=outsider_summary,
                        references=outsider_ref)
results

{'rouge1': 0.1063167353650595,
 'rouge2': 0.0,
 'rougeL': 0.09463728097460768,
 'rougeLsum': 0.09463728097460766}