# Install Java, Spark, and Findspark
This installs Apache Spark 3.0.0, Java 8, and [Findspark](https://github.com/minrk/findspark), a library that makes it easy for Python to find Spark.

In [0]:
!pip install pyspark




In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.3.4/spark-2.3.4-bin-hadoop2.7.tgz  
!tar xf spark-2.3.4-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
Set the locations where Spark and Java are installed.

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.4-bin-hadoop2.7"

# Start a SparkSession
This will start a local Spark session.

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

# Use Spark!
That's all there is to it - you're ready to use Spark!

In [0]:
df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])
df.show(3)

In [0]:
#we will close this context for now
spark.stop()

# Word count in Spark

Let's start a simple pyspark example with an histogram of words computation. Your task here is to count the number of occurences of words in a simple python string.


In [0]:
from pyspark import SparkContext
sc = SparkContext("local", "Wordcount")

In [0]:
string = ["le petit chat est mort, le petit chat est mort ce soir.\
            Il était confiné dans son panier."]

# first create the RDD from the string
words = # todo

# take care of removing the punctuation ! ',' or '.'

# count the occurrence of each word
wordCounts = # Todo
print(wordCounts.collect())	


In [0]:
# shutdown the context
sc.stop()

# PageRank with Spark

We use the [Zachary Karate Club dataset](https://en.wikipedia.org/wiki/Zachary%27s_karate_club) for this part of the practical session. First we can inspect the nodes degrees.

In [0]:
import numpy as np
import matplotlib.pyplot as plt
import networkx as nx

G = nx.karate_club_graph() # load Karate Club graph
pos = nx.spring_layout(G) # needed to always display the graph in the same way
print("Node Degree")
for v in G:
    print('%s %s' % (v, G.degree(v)))

In [0]:

def draw_graph(G,pos,ranks=[],with_labels=True):
  if len(ranks) is not 0:
    nx.draw(G, pos, node_color=ranks, cmap=plt.cm.Blues, 
            with_labels=with_labels)
  else:
    nx.draw(G, pos, with_labels=with_labels)
  plt.show()

draw_graph(G,pos)

Note that you can specify a ranks table of float values, that will be used to display the rankings computed by Page Rank.

In [0]:
ranks = np.random.rand(G.number_of_nodes())
draw_graph(G,pos,ranks=ranks,with_labels=False)

In [0]:
from pyspark import SparkContext
sc = SparkContext("local", "PageRank")

In [0]:
vertices=set(G) #list vertices
verticeRDD =  #TODO create RDD of vertices
neighRDD = #TODO create RDD of neighbors which associates one index of a node to its list of neighbors
rankRDD =  #TODO create RDD of ranks which associates one index of a node to its rank

In [0]:
def getRanks(rankRDD):
  tab=[]
  for (link, rank) in rankRDD.collect():
    tab.append(rank)
  return tab

# a simple example to undertstand what the above functions do
test = rankRDD.collect()
print(test)
print(getRanks(rankRDD))

Now you are ready to implement the pagerank method in pyspark !


In [0]:
#TODO add method if needed

def pagerank(neighRDD,rankRDD,alpha=0.9,NUM_ITERATIONS = 50,display=True,verbose=True):
  for i in range(NUM_ITERATIONS): # is there a better way to test for convergence rather than doing all the iterations ?
    if verbose:
      print("Iteration {}".format(i))
      print(getRanks(rankRDD))

    # TODO

    if display:
      draw_graph(G,pos,ranks=getRanks(rankRDD),with_labels=False)
    return rankRDD

result = pagerank(neighRDD,rankRDD)

You can compare the result you obtained with the [pagerank](https://networkx.github.io/documentation/networkx-1.10/reference/generated/networkx.algorithms.link_analysis.pagerank_alg.pagerank.html) method from Newtorkx


In [0]:
pr = nx.pagerank(G, alpha=0.9)
draw_graph(G,pos,ranks=list(pr.values()),with_labels=False)