In [None]:
from pyspark import SparkContext
sc = SparkContext()
sc.addPyFile("graphframes-0.7.0-spark2.4-s_2.11.jar")

from graphframes import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .getOrCreate()
spark

# Quizz 1 RDD

## Question 1 
Find the most frequent word. Output this word and its frequency.

In [None]:
from operator import add
lines = sc.textFile('data/README.md')
counts = lines.flatMap(lambda x: x.split()) \
              .map(lambda x: (x, 1)) \
              .reduceByKey(add)


# Solution 1:
counts.max(lambda x: x[1])

# Solution 2:
#counts.sortBy(lambda x: x[1], ascending=False).first()

## Question 2
Modify the word count example above, so that we only count the frequencies of those words consisting of 5 or more characters.

In [None]:
from operator import add
lines = sc.textFile('README.md')
counts = lines.flatMap(lambda x: x.split()) \
              .filter(lambda z: len(z) >= 5) \
              .map(lambda x: (x, 1)) \
              .reduceByKey(add)
print(counts.take(10))

## Question 3
Consider the following piece of code:
```
A = sc.parallelize(xrange(1, 100))
t = 50
B = A.filter(lambda x: x < t)
print B.count()
t = 10
C = B.filter(lambda x: x > t)
print C.count()
```
What's its output?

In [None]:
A = sc.parallelize(xrange(1, 100))
t = 50
B = A.filter(lambda x: x < t)
print B.count()
t = 10
C = B.filter(lambda x: x > t)
print C.count()

## Question 4
The intent of the code above is to get all numbers below 50 from A and put them into B, and then get all numbers above 10 from B and put them into C.  Fix the code so that it produces the desired behavior, by adding one line of code.  You are not allowed to change the existing code.

In [None]:
A = sc.parallelize(range(1, 100))
t = 50
B = A.filter(lambda x: x < t)
print (B.count())
B.cache() # Add this line to fix the bug
t = 10
C = B.filter(lambda x: x > t)
print (C.count())

## Question 5
Modify the PMI example by sending a_dict and n_dict inside the closure. Do not use broadcast variables.

In [None]:
from math import *

lines = sc.textFile('data/adj_noun_pairs.txt')

# Converting lines into word pairs. 
# Data is dirty: some lines have more than 2 words, so filter them out.
pairs = lines.map(lambda l: tuple(l.split())).filter(lambda p: len(p)==2)
pairs.cache()
N = pairs.count()

# Compute the frequency of each pair.
# Ignore pairs that not frequent enough
pair_freqs = pairs.map(lambda p: (p,1)) \
                  .reduceByKey(lambda f1, f2: f1 + f2) \
                  .filter(lambda pf: pf[1] >= 100)

# Computing the frequencies of the adjectives and the nouns
a_freqs = pairs.map(lambda p: (p[0],1)).reduceByKey(lambda x,y: x+y)
n_freqs = pairs.map(lambda p: (p[1],1)).reduceByKey(lambda x,y: x+y)

# Make a_dict and n_dict
a_dict = a_freqs.collectAsMap()
n_dict = n_freqs.collectAsMap()

# Computing the PMI for a pair.
def pmi_score(pair_freq, a_dict, n_dict):
    w1, w2 = pair_freq[0]
    f = pair_freq[1]
    pmi = log(float(f)*N / (a_dict[w1]*n_dict[w2]), 2)
    return pmi, (w1, w2)

# Note:
# Before broadcasting, a_dict and n_dict are <class 'dict'>
# After broadcasting, a_dict and n_dict are <class 'pyspark.broadcast.Broadcast'>

# Don't using broadcast variables way
# Computing the PMI for all pairs. Using lamdba to pass a_dict and n_dict into function pmi_score.
scored_pairs = pair_freqs.map(lambda x: pmi_score(x, a_dict, n_dict))

# Show the top 10 samples
scored_pairs.top(10)

## Question 6

The following code creates an RDD with 4 partitions: partition 0, 1, 2, and 3.

    A = sc.parallelize(xrange(100), 4)
For each item in the RDD, add its partition number to it, and write the results to another RDD, i.e., the resulting RDD should contain:
```
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102]
```

In [None]:
A = sc.parallelize(range(100), 4)
def add_index(index, part):
    for i in part:
        i += index
        yield i
    
B = A.mapPartitionsWithIndex(add_index)
print (B.collect())

# Quizz 2 Dataframe

In [None]:
df = spark.read.csv('data/sales.csv', header=True, inferSchema=True)

## Question 1-5

In [None]:
# Find all distinct countries.
df.select('Country').distinct().show()

# Find the Name and Price of sales records in Brazil.
df.filter("Country = 'Brazil'").select('Name', 'Price').show()

# For each country, find the total Price.
df.groupBy('Country').sum('Price') \
  .withColumnRenamed('sum(Price)', 'TotalPrice') \
  .show()

# List countries by their total Price in descending order.
df.groupBy('Country').sum('Price') \
  .withColumnRenamed('sum(Price)', 'TotalPrice') \
  .orderBy('TotalPrice', ascending = False) \
  .show()

In [None]:
df2 = spark.read.csv('data/countries.csv', header=True, inferSchema=True)

In [None]:
# For each country, find the total Price. Replace the country names by their IDs using df2.
df.groupBy('Country').sum('Price') \
  .join(df2, 'Country') \
  .withColumnRenamed('sum(Price)', 'TotalPrice') \
  .select('ID', 'TotalPrice') \
  .show()

## Question 6
Rewrite the PageRank example using DataFrame API. 

In [None]:
from pyspark.sql.functions import *

numOfIterations = 10

lines = spark.read.text("data/pagerank_data.txt")
# You can also test your program on the follow larger data set:
# lines = spark.read.text("dblp.in")

a = lines.select(split(lines[0],' '))

links = a.select(a[0][0].alias('src'), a[0][1].alias('dst'))
links.show()
outdegrees = links.groupBy('src').count()

outdegrees = outdegrees.select('src', 'count')

ranks = outdegrees.select('src', lit(1).alias('rank')) # lit(1) is the meaning of initializing to 1

# number of objects --> num = 4
num = ranks.count()

for iteration in range(numOfIterations):
    contribs = links.join(outdegrees, 'src').join(ranks, 'src') \
                    .select('*', (ranks['rank'] / outdegrees['count']).alias('contribs')) \
                    .withColumnRenamed('dst','dst1').groupBy('dst1').sum('contribs')
    
    ranks = contribs.select ('dst1', (contribs['sum(contribs)'] * 0.85 + 0.15 / num).alias('rank')) \
                    .withColumnRenamed('dst1', 'src')


ranks.orderBy(desc('rank')).show()

# Quizz 3 Algorithm Design

## Question 1
Load it into spark and use divide-and-conquer to find the first (adj, noun) pair in which the noun is `'unification'`. Print the corresponding adjective. One solution is to use `filter()` to find all pairs where the noun is 'unification', and then report the first one. This is inefficient. The better idea is to find, in parallel, the first such pair in each partition (if one exists), and then find the first partition that returns such a pair.

In [None]:
numPartitions = 10
lines = sc.textFile("data/adj_noun_pairs.txt", numPartitions)
pairs = lines.map(lambda l: tuple(l.split())).filter(lambda p: len(p)==2)
pairs.cache()

def find_word(iterator):
    for w in iterator:
        if w[1] == "unification":
            yield w
            break
print(pairs.mapPartitions(find_word).take(1)[0][0])

## Question 2
Design a parallel divide-and-conquer algorithm for the following problem: Given two strings of equal length, compare them lexicographically. Output '<', '=', or '>', depending on the comparison result. The skeleton code is provided below.  Your code should run on all partitions of the rdd in parallel.

In [None]:
x = 'abcccbcbcacaccacaabb'
y = 'abcccbcccacaccacaabb'

numPartitions = 4
rdd = sc.parallelize(zip(x,y), numPartitions).cache()
def char_compare(iterator):
    for c in iterator:
        if (ord(c[0]) - ord(c[1])) < 0:
            yield -1
        elif (ord(c[0]) - ord(c[1])) > 0:
            yield 1
res = rdd.mapPartitions(char_compare)
if res.isEmpty():
    print("=")
elif res.take(1)[0] == 1:
    print(">")
else:
    print("<")

# Quizz 4 Graph and Stream

In [None]:
# Vertics DataFrame
v = spark.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 37),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 38),
  ("g", "Gabby", 60)
], ["id", "name", "age"])

# Edges DataFrame
e = spark.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend"),
  ("g", "e", "follow")
], ["src", "dst", "relationship"])

# Create a GraphFrame
g = GraphFrame(v, e)

g.vertices.show()
g.edges.show()

## Question 1-4
Find Alice's two-hop neighbors' names, regardless of the edge type.

In [None]:
# Find Alice's two-hop neighbors' names, regardless of the edge type.
Alice_two_hop = g.find("(a)-[]->(b); (b)-[]->(c)").filter("a.name = 'Alice'")
Alice_two_hop.select("c.name").show()

# Redo the previous question, but exclude Alice's two-hop neighbors who have an edge back to Alice.
Alice_two_hop_back = g.find("(a)-[]->(b); (b)-[]->(c); (c)-[]->(a)").filter("a.name = 'Alice'")
Alice_two_hop_back.select('c.name').show()

# Find all people who follow Charlie.
who_follow_Charie = g.find("(a)-[e]->(b)") \
                     .filter("b.name = 'Charlie' AND e.relationship = 'follow'")
who_follow_Charie.select("a.name").show()

# Find all people who are being followed by at least 2 people.
e2 = g.edges.filter("relationship = 'follow'")
g2 = GraphFrame(v, e2)
g2.vertices.join(g2.inDegrees, 'id', 'left_outer') \
           .where("inDegree >= 2") \
           .select("name") \
           .show()

## Question 5

Create a queue of 10 RDDs using this data set and feed it into a Spark Streaming program.  Your Spark Streaming algorithm should maintain a state that keeps track of the longest noun seen so far associated with each distinct adjective. After each RDD, print any 5 adjectives and their associated longest nouns, as well as the longest noun associated with the adjective 'good'. Note that not every line in the data set contains exactly two words, so make sure to clean the data as they are fed into the streaming program.

In [None]:
from pyspark.streaming import StreamingContext

ssc = StreamingContext(sc, 5)
# Provide a checkpointing directory. Required for stateful transformations
ssc.checkpoint("checkpoint")

numPartitions = 8
rdd = sc.textFile('data/adj_noun_pairs.txt', numPartitions)
rddQueue = rdd.randomSplit([1]*10, 123) # Split the RDD into 10 RDDs
lines = ssc.queueStream(rddQueue)

def updateFunc(newNoun, runningNoun):
    if runningNoun is None:
        return newNoun
    elif len(newNoun) > len(runningNoun):
        return newNoun
    else:
        return runningNoun

pairs = lines.map(lambda l: tuple(l.split(" "))) \
             .filter(lambda p: len(p)==2) \
             .reduceByKey(lambda a, b: a if len(a) > len(b) else b) \
             .updateStateByKey(updateFunc)

def printResults(rdd):
    print (rdd.take(5))
    print ('Longest noun associated with good:', rdd.lookup('good')[0])

pairs.foreachRDD(printResults)

# Strart spark stream
ssc.start()
print("Start")
ssc.awaitTermination(150)
ssc.stop(False)
print("Finished")