<a href="https://colab.research.google.com/github/emilnebb/TDT4305_Big_Data_Architecture/blob/main/Prosjekt_del_2_Emil_Neby_%2B_Cornelia_Plesner.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### I used contents from these sources to create this Colab notebook: 
  1. https://colab.research.google.com/github/asifahmed90/pyspark-ML-in-Colab/blob/master/PySpark_Regression_Analysis.ipynb
  2. https://gist.github.com/dvainrub/b6178dc0e976e56abe9caa9b72f73d4a
  3. https://towardsdatascience.com/graphframes-in-jupyter-a-practical-guide-9b3b346cebc5

# **OUTCOME: having an enviornment to develop Spark apps in Python3**

## **Step 0: setting things up in Google Colab**

First, we need to install all the dependencies in Colab environment like Apache `Spark 3 with Hadoop 2.7`, `Python3`, `Java 11` (and a helper Python package named `Findspark`). 

Please note that you might need to update Spark's version to a newer value if, after executing the code in the cell bellow, you get an error like `wget` can't find and download `spark-3.0.2-*`

In [None]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://bitbucket.org/habedi/datasets/raw/b6769c4664e7ff68b001e2f43bc517888cbe3642/spark/spark-3.0.2-bin-hadoop2.7.tgz
!tar xf spark-3.0.2-bin-hadoop2.7.tgz
!rm -rf spark-3.0.2-bin-hadoop2.7.tgz*
!pip -q install findspark pyspark graphframes

[K     |████████████████████████████████| 212.3MB 81kB/s 
[K     |████████████████████████████████| 204kB 20.8MB/s 
[K     |████████████████████████████████| 163kB 57.5MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


Now that you installed Spark and Java in Colab, it is time to set some environment variables. We need to set the values for `JAVA_HOME` and `SPARK_HOME` (and `HADOOP_HOME`), as shown below:

In [None]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = os.environ["SPARK_HOME"]

os.environ["PYSPARK_DRIVER_PYTHON"] = "jupyter"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"] = "notebook"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 pyspark-shell"

## **Step 1: downloading project's dataset**
Now let's download the project's dataset from Github. You can read the dataset for the course's project from `datasets/data/TDT4305_S2021`

In [None]:
!rm -rf datasets
!git clone --depth=1 -q https://github.com/habedi/datasets
!ls datasets/data/TDT4305_S2021

 badges.csv.gz	  'Description of the data.pdf'   users.csv.gz
 comments.csv.gz   posts.csv.gz


## **Step 2: checking the Spark installation**
Run a local spark session to test your installation:

In [None]:
import findspark
findspark.init()

## **Step 3: making a helper method for creating a SaprkContext variable**
You can use `init_spark` to create a new `SaprkContext variable` and use it

In [None]:
from pyspark.sql import SparkSession

def init_spark(app_name="HelloWorldApp", execution_mode="local[*]"):
  spark = SparkSession.builder.master(execution_mode).appName(app_name).getOrCreate()
  sc = spark.sparkContext
  return spark, sc

## **Step 4: a HelloWorld Spark app**

Our first Spark application; it takes a list of numbers and squares each element and returns the list of squared numbers

In [None]:
def main1():
  _, sc = init_spark()
  nums = sc.parallelize([1, 2, 3, 4])
  print(nums.map(lambda x: x*x).collect())

if __name__ == '__main__':
  main1()

[1, 4, 9, 16]


## **Step 5: another Saprk app that loades a CSV files into an RDD**
Another simple app that prints the first two lines of from `users.csv.gz`

In [None]:
def main2():
  _, sc = init_spark()
  lines = sc.textFile('datasets/data/TDT4305_S2021/users.csv.gz')
  print(lines.take(2))

if __name__ == '__main__':
  main2()

['"Id"\t"Reputation"\t"CreationDate"\t"DisplayName"\t"LastAccessDate"\t"AboutMe"\t"Views"\t"UpVotes"\t"DownVotes"', "-1\t1\t2014-05-13 21:29:22\tCommunity\t2014-05-13 21:29:22\t<p>Hi, I'm not really a person.</p>&#xA;&#xA;<p>I'm a background process that helps keep this site clean!</p>&#xA;&#xA;<p>I do things like</p>&#xA;&#xA;<ul>&#xA;<li>Randomly poke old unanswered questions every hour so they get some attention</li>&#xA;<li>Own community questions and answers so nobody\t3\t819\t1575"]


## **Step 6: sample GraphFrames code**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

_, sc = init_spark()
sqlContext = SQLContext(sc)

## the rest of this code (down below) comes from: https://graphframes.github.io/graphframes/docs/_site/quick-start.html#getting-started-with-apache-spark-and-spark-packages

# Create a Vertex DataFrame with unique ID column "id"
v = sqlContext.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
], ["id", "name", "age"])

# Create an Edge DataFrame with "src" and "dst" columns
e = sqlContext.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
], ["src", "dst", "relationship"])

# Create a GraphFrame
from graphframes import *
g = GraphFrame(v, e)

# Query: Get in-degree of each vertex.
g.inDegrees.show()

# Query: Count the number of "follow" connections in the graph.
g.edges.filter("relationship = 'follow'").count()

# Run PageRank algorithm, and show results.
#results = g.pageRank(resetProbability=0.01, maxIter=10)
#results.vertices.select("id", "pagerank").show()

+---+--------+
| id|inDegree|
+---+--------+
|  c|       1|
|  b|       2|
+---+--------+



2

### See: https://towardsdatascience.com/graphframes-in-jupyter-a-practical-guide-9b3b346cebc5 for more examples


## **Step 7 and beyond: create your apps down here (as many as you need)**

##Constructing the graph of terms

In [None]:
#Clone list of stopwords
!git clone --depth=1 -q https://gist.github.com/habedi/c7229ee5bd50bf49f5b2bc404366344d


###Helper functions

In [None]:
import string

#return a list with stopwords
def get_stopwords():
  stopwords = []
  f = open('c7229ee5bd50bf49f5b2bc404366344d/big_list_of_english_stopwords', "r")

  for line in f:
    stopwords.append(line[:-1])

  return stopwords

#Remove all special characters, except 'DOT', and remove 'TAB'
def remove_special_characters(document):
  for i in document:
    if (i in string.punctuation) and not(i == "."):
      document = document.replace(i, ' ')

  document = document.replace('\n', ' ')
  return document

#Remove words with fewer than three characters
def remove_smaller_3(words):
  for word in words[:]:
    if len(word)< 3:
      words.remove(word)

  return words

#Remove 'DOT' at start/end of token
def remove_DOT(words):
  for i in range(0, len(words)):
    if (words[i][0] == '.') or (words[i][-1] == '.'):
      words[i] = words[i].replace('.', '')

  return words

#Remove stopwords
def remove_stopwords(words):
  stopwords = get_stopwords()
  new_words = []

  for word in words:
    if word not in stopwords:
      new_words.append(word)

  return new_words



###Constructing a sequence of terms

In [None]:
#Output: sequence of terms, i.e. a list of strings
#Input arg: string

def sequence_of_terms(document):
  #1. Turn all characters to lower case
  lower = document.lower()

  #2. Remove all the punctuations (like '!' and '?') except 'DOT' characters
  #3. Remove all the symbols (like '$' and '>') and special characters (like 'TAB')
  lower = remove_special_characters(lower)

  #4. Tokenise the output of the previous step (the separator of tokens is the 'WHITESPACE' character); at this stage should have a sequence of tokens
  tokens = lower.split(" ")

  #5. Remove the tokens that are smaller than three characters long from the sequence of the tokens
  sequence = remove_smaller_3(tokens)

  #6. Remove all the 'DOT' characters from the start or the end of each token
  sequence = remove_DOT(sequence)

  #7. Remove the stopwords from the sequence of tokens
  sequence = remove_stopwords(sequence)

  return sequence


###Constructing the graph

In [None]:
import itertools
from graphframes import *
from pyspark.sql.types import StringType

#Output: rdd containing edges
#Input arg: sequence of terms
def edges(terms):
  window = []
  edges = []
  terms_que = terms 

  while len(terms_que) > 1:
    if (len(window) < 5):
      window.append(terms_que[0])
      terms_que.remove(terms_que[0])
    else:
      window.remove(window[0])
      window.append(terms_que[0])
      terms_que.remove(terms_que[0])

    if (len(window) == 5):
      tuples = list(itertools.permutations(window, 2))

      for tup in tuples:
        if not (tup[0] == tup[1]):
          edges.append(tup)
 
  rddE = sc.parallelize(edges)
  rddE = rddE.distinct().map(lambda x: (x[0], x[1], "neighboors"))
  return rddE

#Output: rdd containing vertices
#Input arg: sequence of terms 
def vertices(terms):
  rddV = sc.parallelize(terms)
  rddV = rddV.map(lambda x: (x, 1))
  rddV = rddV.reduceByKey(lambda a,b: a+b).zipWithIndex().map(lambda x: ( x[0][0], x[1]))
  return rddV

#Output: graphFrame
#Input arg: vertices as rdd, edges as rdd
def graph(vertices, edges):
  print("Graph is under construction... \n")
  #Create dataframes of vertices and edges
  edges = sqlContext.createDataFrame( edges.collect(), ["src", "dst", "relationship"])
  vertices = sqlContext.createDataFrame( vertices.collect(), ["id", "name"])

  g = GraphFrame(vertices, edges)
  print("Vertices: \n")
  g.vertices.show()
  print("Edges: \n")
  g.edges.show()

  return g

###Methods to get body of post with given path and id

In [None]:
import base64
import re

#Output: rdd containing posts
#Input arg: directory path
def loadPosts(path):
  _, sc = init_spark()
  rddP = sc.textFile(path)
  rddP = rddP.map(lambda x: x.split('\t'))
  print("The 'posts.csv.gz' file is loaded into the RDD 'rddP'")
  return rddP

#Output: body as a string
#Input arg: directory path, id
def getPostBody(path, id):
  rddP = loadPosts(path)
  postscolumns = ['Id', 'PostTypeId', 'CreationDate','Score','ViewCount',"Body",'OwnerUserId','LastActivityDate',"Title","Tags",'AnswerCount','CommentCount','FavoriteCount','Closedate']

  codedBody = rddP.filter(lambda x: x[postscolumns.index('Id')] == id).map(lambda x: x[postscolumns.index("Body")])
  codedBody = str(codedBody.collect()[0])
  body = str(base64.b64decode(codedBody), "utf-8")
  return body


###Pagerank program

In [197]:
from pyspark.sql.functions import *
import base64
import re

#Input arg: directory path, id
def pagerank_Program(path, id):
  document = getPostBody(path, id)
  doc1 = sequence_of_terms(document)
  doc2 = sequence_of_terms(document)

  edge = edges(doc1)
  vertice = vertices(doc2)

  graph_of_terms = graph(vertice, edge)

  print("Calculating pagerank... \n")
  pagerank = graph_of_terms.pageRank(resetProbability= 0.15, tol= 0.0001)

  print("PageRank score for top 10 nodes: \n")
  pagerank.vertices.select("id","pagerank").orderBy( desc("pagerank")).show(10)

if __name__ == '__main__':
  directory_path = 'datasets/data/TDT4305_S2021/posts.csv.gz'
  post_id = '14'
  
  pagerank_Program(directory_path, post_id)

The 'posts.csv.gz' file is loaded into the RDD 'rddP'
Graph is under construction... 

Vertices: 

+-----------+----+
|         id|name|
+-----------+----+
|    science|   0|
|     fields|   1|
|      large|   2|
|   question|   3|
|     mining|   4|
|   graduate|   5|
|      class|   6|
|      years|   7|
|differences|   8|
| proficient|   9|
|       data|  10|
|  discussed|  11|
|      forum|  12|
|   synonyms|  13|
|   analyzed|  14|
+-----------+----+

Edges: 

+---------+---------+------------+
|      src|      dst|relationship|
+---------+---------+------------+
|     data|discussed|  neighboors|
|     data|    forum|  neighboors|
|     data| synonyms|  neighboors|
|discussed|     data|  neighboors|
|discussed|    forum|  neighboors|
|discussed| synonyms|  neighboors|
|    forum|     data|  neighboors|
|    forum|discussed|  neighboors|
|    forum| synonyms|  neighboors|
| synonyms|     data|  neighboors|
| synonyms|discussed|  neighboors|
| synonyms|    forum|  neighboors|
|  sc