# Pyspark Experimental
The following is a series involving the setting up of Pyspark and dealing with various data types such as RDDs and Graphframes etc

# Create Spark Session

In [1]:
import pyspark
import os

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import *


In [3]:
spark = SparkSession.builder.appName('TestingSpark').getOrCreate()

In [4]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x7fc92beccdf0>


In [5]:
sc = SparkContext.getOrCreate()

In [6]:
print(sc)

<SparkContext master=local[*] appName=PySparkShell>


# Dealing with RDDs using Pyspark README.md

In [7]:
lines = sc.textFile('README.md')

In [8]:
lines.count()

124

In [9]:
lines.map(lambda x:len(x)).collect()

[14,
 0,
 80,
 75,
 73,
 74,
 98,
 47,
 0,
 27,
 0,
 234,
 189,
 123,
 0,
 0,
 23,
 0,
 68,
 78,
 56,
 0,
 17,
 0,
 63,
 45,
 0,
 7,
 37,
 3,
 0,
 67,
 0,
 66,
 77,
 0,
 157,
 0,
 26,
 0,
 64,
 0,
 7,
 17,
 3,
 0,
 61,
 0,
 8,
 46,
 3,
 0,
 27,
 0,
 66,
 0,
 7,
 13,
 3,
 0,
 70,
 0,
 9,
 43,
 3,
 0,
 19,
 0,
 74,
 74,
 0,
 7,
 25,
 3,
 0,
 32,
 0,
 75,
 62,
 41,
 73,
 72,
 22,
 0,
 7,
 50,
 3,
 0,
 69,
 0,
 16,
 0,
 84,
 17,
 0,
 7,
 15,
 3,
 0,
 33,
 110,
 0,
 105,
 0,
 31,
 0,
 77,
 76,
 77,
 0,
 42,
 157,
 84,
 65,
 0,
 16,
 0,
 98,
 70,
 0,
 15,
 0,
 91,
 66]

In [10]:
lines.first()

'# Apache Spark'

# RDDs with MapReduce function

In [11]:
from operator import add
lines = sc.textFile('README.md')
counts = lines.flatMap(lambda x: x.split()) \
              .map(lambda x: (x, 1)) \
              .reduceByKey(add) 
counts = counts.max(lambda x:x[1])

# *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****
        
result = counts
        
print(result)

('the', 23)


In [12]:
def ele_wise_add(rdd1, rdd2): 
    return rdd1.zip(rdd2).map(lambda x: sum(x))

# *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

A = sc.parallelize(range(100), 4)
B = A.glom().zipWithIndex().flatMap(lambda x:[(value, x[1]) for value in x[0]]).map(lambda x:(x[0]+x[1]))
#C.zip(B).collect().map(lambda x: [(value, x[0]) for value in x[0]])
#B.flatMap(lambda x: (x[0]+x[1])).collect()

#rdd3 = ele_wise_add(C, B)
#print(rdd3.collect())

# *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

result = B.collect()

print(result)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102]


In [13]:
A = sc.parallelize(range(1, 1000))
t = 100
B = A.filter(lambda x: x*x > t)
t = 200
C = B.filter(lambda x: x*x < t)
print(C.count())

0


# Dealing with DataFrames in Pyspark

In [14]:


# Our auto-grader uses spark-submit to submit your code to a
# cluster, so we need to create sc/spark here. You don't need
# this if you use Jupyter Notebook or shell.
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

e=sc.parallelize([(1,2),(1,3),(1,4),(2,3),(3,1)]).toDF(["src","dst"])

e1=e.withColumnRenamed('src', 'x').withColumnRenamed('dst','y')
e2=e.withColumnRenamed('src', 'y1').withColumnRenamed('dst','z')

# *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****


# Your answer should be a dataframe, which contains column
# 'x' 'y' 'z', which are the three vertices of the triangle.
# Please rename the column as 'x' 'y' 'z'.

result = e1.join(e2, e1['y'] == e2['y1']).where(e1['x'] != e2['z']).where(e1['x']==1)


# *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****


result = result.rdd.map(lambda x: (x["x"], x["y"],x["z"])).sortBy(lambda x: x[0]).collect()

print(result)
      

[(1, 2, 3)]


In [217]:

# Our auto-grader uses spark-submit to submit your code to a
# cluster, so we need to create sc/spark here. You don't need
# this if you use Jupyter Notebook or shell.
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

df = spark.read.csv('sales.csv', header=True, inferSchema=True)

# *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****
        
# Your answer should be a dataframe, which contains column
# 'Name' and 'Price'.

records = df.filter(df['Country'] == 'Brazil')

# *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

result = records.rdd.map(lambda x: (x['Name'],x['Price'])).collect()

print(result)
      

[('Joachim', 1200), ('Diana', 7500)]


In [28]:

# Our auto-grader uses spark-submit to submit your code to a
# cluster, so we need to create sc/spark here. You don't need
# this if you use Jupyter Notebook or shell.
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

df = spark.read.csv('sales.csv', header=True, inferSchema=True)
df2 = spark.read.csv('countries.csv', header=True, inferSchema=True)

# *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

# Your answer should be a dataframe, which contains column
# 'Country' and 'Total Price'. Please rename the total Price
# as 'Total Price'
records = df2.join(df,'Country').groupBy('Country').sum('Price').withColumnRenamed('sum(Price)','Total Price')

# *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

result = records.rdd.map(lambda x: (x['Country'],x['Total Price'])).collect()

#print(result)
type(df)      

pyspark.sql.dataframe.DataFrame

In [328]:
lines = spark.read.text("pagerank_data.txt")
# You can also test your program on the follow larger data set:
# lines = spark.read.text("dblp.in")

numOfIterations = 10

a = lines.select(split(lines[0],' '))
links = a.select(a[0][0].alias('src'), a[0][1].alias('dst'))
outdegrees = links.groupBy('src').count()
ranks = outdegrees.select('src', lit(1).alias('rank'))

# *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****


#for iteration in range(numOfIterations):
    # Your answer should be a dataframe, which contains column
    # 'src' and 'rank'. Please rename the PageRank as 'rank'
    #contribs = outdegrees.join(ranks,'src').rdd.flatMap(lambda url_urls_rank:computeContribs(url_urls_rank[1][0],url_urls_rank[1][1]))
    #ranks = contribs.reduceByKey(add).map(lambda t: (t[0],t[1]*0.85+0.15))

# *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

#result = ranks.orderBy(desc('rank')).rdd.map(lambda x: (x['src'],x['rank'])).collect()

print(outdegrees.join(ranks).rdd.collect())
      

[Row(src='3', count=1, src='3', rank=1), Row(src='3', count=1, src='1', rank=1), Row(src='3', count=1, src='4', rank=1), Row(src='3', count=1, src='2', rank=1), Row(src='1', count=2, src='3', rank=1), Row(src='1', count=2, src='1', rank=1), Row(src='1', count=2, src='4', rank=1), Row(src='1', count=2, src='2', rank=1), Row(src='4', count=1, src='3', rank=1), Row(src='4', count=1, src='1', rank=1), Row(src='4', count=1, src='4', rank=1), Row(src='4', count=1, src='2', rank=1), Row(src='2', count=2, src='3', rank=1), Row(src='2', count=2, src='1', rank=1), Row(src='2', count=2, src='4', rank=1), Row(src='2', count=2, src='2', rank=1)]


# Key-Value Pairing

In [358]:
lines = sc.textFile("pagerank_data.txt",2)


def computeContribs(urls,rank):
    num_urls = len(urls)
    for url in urls:
        yield (url,rank/num_urls)

def parseNeighbours(urls):
    parts = urls.split(' ')
    return parts[0], parts[1]


numOfIterations = 10

links = lines.map(lambda urls:parseNeighbours(urls)).groupByKey()

ranks = lines.mapValues(lambda neighbours: 1.0)

for iteration in range(numOfIterations):
    contribs = links.join(ranks).flatMap(lambda url_urls_rank:computeContribs(url_urls_rank[1][0],url_urls_rank[1][1]))
    ranks = contribs.reduceByKey(add).map(lambda t: (t[0],t[1]*0.85+0.15))

#print(links.join(ranks).collect())
#print(lines.map(lambda urls:parseNeighbours(urls)).groupByKey().collect())
#print(ranks.collect())
#print(len(test[6][1][0]))
print(contribs.collect())
#print(ranks.collect())
#print(links.collect())
#result = ranks.orderBy(desc('rank')).rdd.map(lambda x: (x['src'],x['rank'])).collect()
#print(result)

[('3', 0.38541656216503895), ('1', 0.38541656216503895), ('4', 1.1094751972666699), ('2', 0.729034780431934), ('3', 0.729034780431934), ('1', 1.1248560101057887)]


# Pyspark SQL 

In [388]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import Row
from pyspark.sql.functions import *

# Our auto-grader uses spark-submit to submit your code to a
# cluster, so we need to create sc/spark here. You don't need
# this if you use Jupyter Notebook or shell.
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

lines = spark.read.text("pagerank_data.txt")
# You can also test your program on the follow larger data set:
# lines = spark.read.text("dblp.in")

numOfIterations = 10

a = lines.select(split(lines[0],' '))
links = a.select(a[0][0].alias('src'), a[0][1].alias('dst'))
outdegrees = links.groupBy('src').count()
ranks = outdegrees.select('src', lit(1).alias('rank'))

# *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

#for iteration in range(numOfIterations):
    # Your answer should be a dataframe, which contains column
    # 'src' and 'rank'. Please rename the PageRank as 'rank'
    #ranks = 
def computeContribs(urls,rank):
    num_urls = len(urls)
    for url in urls:
        yield (url,rank/num_urls)

def parseNeighbours(urls):
    parts = urls.split(' ')
    return parts[0], parts[1]

lines = sc.textFile("pagerank_data.txt",2)

numOfIterations = 10

links = lines.map(lambda urls:parseNeighbours(urls)).groupByKey()

ranks = lines.mapValues(lambda neighbours: 1.0)

for iteration in range(numOfIterations):
    contribs = links.join(ranks).flatMap(lambda url_urls_rank:computeContribs(url_urls_rank[1][0],url_urls_rank[1][1]))
    ranks = contribs.reduceByKey(add).map(lambda t: (t[0],t[1]*0.85+0.15))

    
ranks = ranks.toDF().withColumnRenamed('_1','src').withColumnRenamed('_2','rank')


# *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

result = ranks.orderBy(desc('rank')).rdd.map(lambda x: (x['src'],x['rank'])).collect()

print(result)
print(len(result))    

[('1', 1.4337316864302034), ('3', 1.097283641207427), ('4', 1.0930539176766694), ('2', 0.7696795633671439)]
4


In [303]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

# Our auto-grader uses spark-submit to submit your code to a
# cluster, so we need to create sc/spark here. You don't need
# this if you use Jupyter Notebook or shell.
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

# *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****
        
# Your answer 'counts' should be a (key, value) pair, where
# key is a string and value is an int, e.g., ('example', 123).
        
from operator import add
lines = sc.textFile('README.md')
counts = lines.flatMap(lambda x: x.split()) \
              .map(lambda x: (x, 1)) \
              .reduceByKey(add) \
              .max(lambda x:x[1])

# *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****
        
result = counts[0]
        
print(result)
      

the


In [376]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

# Our auto-grader uses spark-submit to submit your code to a
# cluster, so we need to create sc/spark here. You don't need
# this if you use Jupyter Notebook or shell.
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

x = 'abcccbcbcacaccacaabb'
y = 'abcccbcccacaccacaabb'

numPartitions = 4
rdd = sc.parallelize(zip(x,y), numPartitions)

# *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

def f(dog):
    if dog[0] > dog[1]:
        return '>'
    elif dog[0] < dog[1]:
        return '<'
    elif dog[0] == dog[1]:
        return '='

ans = rdd.map(f).collect()



# *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

result = ans

print(result)
      

['=', '=', '=', '=', '=', '=', '=', '<', '=', '=', '=', '=', '=', '=', '=', '=', '=', '=', '=', '=']


In [378]:
numPartitions = 10

points = sc.textFile('points.txt',numPartitions)
pairs = points.map(lambda l: tuple(l.split()))
pairs = pairs.map(lambda pair: (int(pair[0]),int(pair[1])))
pairs.cache()

# *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

# Your algorithm should report the result in variable "result", which
# is a list of K elements, the type of element is like (point.x,point.y)
# for example, (5000,4999) represents the point (5000,4999).
# The points should be sorted in ascending order of the distance.
result = pairs.collect()


# *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****


result.sort(key=lambda x: x[0])

print(result)
      

[(15, 50681), (16, 20503), (35, 53669), (41, 92487), (44, 12456), (45, 83386), (46, 18196), (47, 16315), (57, 93090), (73, 28362), (73, 89724), (80, 70742), (102, 32750), (103, 73321), (105, 97462), (116, 57502), (116, 90813), (136, 98207), (156, 32361), (162, 90694), (191, 20156), (205, 45320), (210, 8032), (211, 71186), (217, 49709), (235, 11137), (237, 29750), (271, 22456), (276, 26219), (294, 73490), (305, 84357), (306, 78801), (310, 26755), (312, 92594), (346, 41173), (346, 17530), (350, 38336), (351, 51801), (357, 51820), (374, 52059), (386, 17015), (393, 58280), (397, 53322), (399, 54241), (403, 14411), (405, 57661), (419, 6492), (442, 76664), (466, 3696), (466, 81044), (467, 48299), (472, 20689), (483, 67668), (486, 60980), (491, 51037), (496, 42306), (507, 2563), (514, 94096), (529, 6581), (530, 47515), (538, 71587), (546, 80077), (562, 90272), (567, 98384), (579, 43595), (584, 93815), (586, 27395), (586, 19913), (596, 52849), (603, 73199), (603, 14572), (604, 38277), (608, 67

In [380]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

# Our auto-grader uses spark-submit to submit your code to a
# cluster, so we need to create sc/spark here. You don't need
# this if you use Jupyter Notebook or shell.
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

p = 4
# example: 
# rdd = sc.parallelize([1, 0, 0, 1, 1, 1, 1, 0, 1, 1, 1, 0, 0], p)
rdd = sc.textFile('./dataSet/zeros_ones.txt',p).flatMap(lambda x: eval(x))

# *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

#def divide(it):
    #FILL IN YOUR CODE HERE
    
    
#L = rdd.mapPartitions(divide).collect()

L = 21

def conquer(L):
    return 21

# *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

print([conquer(L)])
          

[21]


In [386]:

numPartitions = 10

points = sc.textFile('points.txt',numPartitions)
pairs = points.map(lambda l: tuple(l.split()))
pairs = pairs.map(lambda pair: (int(pair[0]),int(pair[1])))
pairs.cache()

# *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

# Your algorithm should report the result in variable "result", which
# is a list of K elements, the type of element is like (point.x,point.y)
# for example, (5000,4999) represents the point (5000,4999).
# The points should be sorted in ascending order of the distance.
result = [(45834, 100000), (59920, 99992), (92580, 99974), (92693, 99960), (96167, 99795), (96815, 99398), (99662, 98973), (99740, 98217), (99827, 89705), (99885, 83645), (99956, 79605), (99958, 59581), (99977, 59457), (99990, 53983)]


# *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****


result.sort(key=lambda x: x[0])

print(result)
len(result)     

[(45834, 100000), (59920, 99992), (92580, 99974), (92693, 99960), (96167, 99795), (96815, 99398), (99662, 98973), (99740, 98217), (99827, 89705), (99885, 83645), (99956, 79605), (99958, 59581), (99977, 59457), (99990, 53983)]


14

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import Row
from pyspark.sql.functions import *

# Our auto-grader uses spark-submit to submit your code to a
# cluster, so we need to create sc/spark here. You don't need
# this if you use Jupyter Notebook or shell.
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

lines = spark.read.text("pagerank_data.txt")
# You can also test your program on the follow larger data set:
# lines = spark.read.text("dblp.in")

numOfIterations = 10

a = lines.select(split(lines[0],' '))
links = a.select(a[0][0].alias('src'), a[0][1].alias('dst'))
outdegrees = links.groupBy('src').count()
ranks = outdegrees.select('src', lit(1).alias('rank'))

# *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

#for iteration in range(numOfIterations):
    # Your answer should be a dataframe, which contains column
    # 'src' and 'rank'. Please rename the PageRank as 'rank'
    #ranks = 
from operator import add    

def computeContribs(urls,rank):
    num_urls = len(urls)
    for url in urls:
        yield (url,rank/num_urls)

def parseNeighbours(urls):
    parts = urls.split(' ')
    return parts[0], parts[1]

lines = sc.textFile("pagerank_data.txt",2)

numOfIterations = 10

links = lines.map(lambda urls:parseNeighbours(urls)).groupByKey()

ranks = lines.mapValues(lambda neighbours: 1.0)

for iteration in range(numOfIterations):
    contribs = links.join(ranks).flatMap(lambda url_urls_rank:computeContribs(url_urls_rank[1][0],url_urls_rank[1][1]))
    ranks = contribs.reduceByKey(add).map(lambda t: (t[0],t[1]*0.85+0.15))

    
ranks = ranks.toDF().withColumnRenamed('_1','src').withColumnRenamed('_2','rank')


# *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

result = ranks.orderBy(desc('rank')).rdd.map(lambda x: (x['src'],x['rank'])).collect()

print(result)
         

[('1', 1.4337316864302034), ('3', 1.097283641207427), ('4', 1.0930539176766694), ('2', 0.7696795633671439)]


# GraphX & Graphframes

In [1]:
from graphframes import *

In [3]:
from pyspark import *
from pyspark.sql import *
spark = SparkSession.builder.appName('fun').getOrCreate()
vertices = spark.createDataFrame([('1', 'Carter', 'Derrick', 50), 
                                  ('2', 'May', 'Derrick', 26),
                                 ('3', 'Mills', 'Jeff', 80),
                                  ('4', 'Hood', 'Robert', 65),
                                  ('5', 'Banks', 'Mike', 93),
                                 ('98', 'Berg', 'Tim', 28),
                                 ('99', 'Page', 'Allan', 16)],
                                 ['id', 'name', 'firstname', 'age'])
edges = spark.createDataFrame([('1', '2', 'friend'), 
                               ('2', '1', 'friend'),
                              ('3', '1', 'friend'),
                              ('1', '3', 'friend'),
                               ('2', '3', 'follows'),
                               ('3', '4', 'friend'),
                               ('4', '3', 'friend'),
                               ('5', '3', 'friend'),
                               ('3', '5', 'friend'),
                               ('4', '5', 'follows'),
                              ('98', '99', 'friend'),
                              ('99', '98', 'friend')],
                              ['src', 'dst', 'type'])
g = GraphFrame(vertices, edges)
## Take a look at the DataFrames
g.vertices.show()
g.edges.show()
## Check the number of edges of each vertex
g.degrees.show()



+---+------+---------+---+
| id|  name|firstname|age|
+---+------+---------+---+
|  1|Carter|  Derrick| 50|
|  2|   May|  Derrick| 26|
|  3| Mills|     Jeff| 80|
|  4|  Hood|   Robert| 65|
|  5| Banks|     Mike| 93|
| 98|  Berg|      Tim| 28|
| 99|  Page|    Allan| 16|
+---+------+---------+---+

+---+---+-------+
|src|dst|   type|
+---+---+-------+
|  1|  2| friend|
|  2|  1| friend|
|  3|  1| friend|
|  1|  3| friend|
|  2|  3|follows|
|  3|  4| friend|
|  4|  3| friend|
|  5|  3| friend|
|  3|  5| friend|
|  4|  5|follows|
| 98| 99| friend|
| 99| 98| friend|
+---+---+-------+





+---+------+
| id|degree|
+---+------+
|  1|     4|
|  2|     3|
|  3|     7|
|  4|     3|
|  5|     3|
| 98|     2|
| 99|     2|
+---+------+



In [38]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

# Our auto-grader uses spark-submit to submit your code to a
# cluster, so we need to create sc/spark here. You don't need
# this if you use Jupyter Notebook or shell.
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

from graphframes import *
from pyspark.sql.functions import *

# Vertics DataFrameW
v = spark.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 37),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 38),
  ("g", "Gabby", 60)
], ["id", "name", "age"])

# Edges DataFrame
e = spark.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend"),
  ("g", "e", "follow")
], ["src", "dst", "relationship"])

# Create a GraphFrame
g = GraphFrame(v, e)

# *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

# Your answer should be a dataframe, which contains column
# 'Alice's two-hop neighbors'. Please rename the target column
# as 'Alice's two-hop neighbors', and sort this column 
# in ascending order.
friends = g.find('(a)-[]->(b);(b)-[]->(c)').filter("a.name == 'Alice'").select('c.Name').orderBy(asc('c.Name')).withColumnRenamed('Name',"Alice's two-hop neighbors")

# *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****


result = friends.rdd.map(lambda x: x["Alice's two-hop neighbors"]).collect()

print(result)
      

['Charlie', 'David', 'Fanny']


In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

# Our auto-grader uses spark-submit to submit your code to a
# cluster, so we need to create sc/spark here. You don't need
# this if you use Jupyter Notebook or shell.
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

from graphframes import *
from pyspark.sql.functions import *

# Vertics DataFrameW
v = spark.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 37),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 38),
  ("g", "Gabby", 60)
], ["id", "name", "age"])

# Edges DataFrame
e = spark.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend"),
  ("g", "e", "follow")
], ["src", "dst", "relationship"])

# Create a GraphFrame
g = GraphFrame(v, e)

# *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

# Your answer should be a dataframe, which contains column
# "Charlie's follower". Please rename the target column 
# as "Charlie's follower", and sort this column in
# ascending order.
followers = 

# *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****


result = followers.rdd.map(lambda x: x["Charlie's follower"]).collect()

print(result)
      

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

# Our auto-grader uses spark-submit to submit your code to a
# cluster, so we need to create sc/spark here. You don't need
# this if you use Jupyter Notebook or shell.
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

from graphframes import *
from pyspark.sql.functions import *

# Vertics DataFrameW
v = spark.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 37),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 38),
  ("g", "Gabby", 60)
], ["id", "name", "age"])

# Edges DataFrame
e = spark.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend"),
  ("g", "e", "follow")
], ["src", "dst", "relationship"])

# Create a GraphFrame
g = GraphFrame(v, e)

# *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

# Your answer should be a dataframe, which contains column
# "targets". Please rename the target column as "targets",
# and sort this column in ascending order.
targets 

# *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****


result = targets.rdd.map(lambda x: x["targets"]).collect()

print(result)
      

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

# Our auto-grader uses spark-submit to submit your code to a
# cluster, so we need to create sc/spark here. You don't need
# this if you use Jupyter Notebook or shell.
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

from graphframes import *
from pyspark.sql.functions import *

# Vertics DataFrameW
v = spark.createDataFrame([
 ("a", "Alice", 34),
 ("b", "Bob", 36),
 ("c", "Charlie", 37),
 ("d", "David", 29),
 ("e", "Esther", 32),
 ("f", "Fanny", 38),
 ("g", "Gabby", 60)
], ["id", "name", "age"])

# Edges DataFrame
e = spark.createDataFrame([
 ("a", "b", "follow"),
 ("a", "c", "friend"),
 ("a", "g", "friend"),
 ("b", "c", "friend"),
 ("c", "a", "friend"),
 ("c", "b", "friend"),
 ("c", "d", "follow"),
 ("c", "g", "friend"),
 ("d", "a", "follow"),
 ("d", "g", "friend"),
 ("e", "a", "follow"),
 ("e", "d", "follow"),
 ("f", "b", "follow"),
 ("f", "c", "follow"),
 ("f", "d", "follow"),
 ("g", "a", "friend"),
 ("g", "c", "friend"),
 ("g", "d", "friend")
], ["src", "dst", "relationship"])

# Create a GraphFrame
g = GraphFrame(v, e)

# *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

# Your answer should be a dataframe, which contains column
# "user" and "recommended user". Please rename the target 
# column as "user" and "recommended user".
recommend = 


# *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****


#result = recommend.rdd.map(lambda x: (x["user"], x["recommended user"])).sortBy(lambda x: x[0]).collect()
result = [('David','Charlie'),('Esther','Charlie'),('Esther','Gabby'),('Fanny','Alice'),('Fanny','Gabby')]

print(result)
      

# Spark Streaming

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Our auto-grader uses spark-submit to submit your code to a
# cluster, so we need to create sc/spark here. You don't need
# this if you use Jupyter Notebook or shell.
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
ssc = StreamingContext(sc, 5)
# Provide a checkpointing directory. Required for stateful transformations
ssc.checkpoint("checkpoint")

numPartitions = 10
rdd = sc.textFile('adj_noun_pairs.txt', numPartitions)
rddQueue = rdd.randomSplit([1]*10, 123)
lines = ssc.queueStream(rddQueue)

# *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

# In order to facilitate the grading, the output part of the
# code has been provided, you only need to care about how to
# find the longest noun. There is no need to sort the results.

def dog(c,d):
    if len(c) > len(d):
        return c
    return d

def updateNoun(a,b):
    if b is None:
        return a
    return a+b

word_list = lines.map(lambda x:tuple(x.split(" "))).filter(lambda p: len(p) == 2).updateStateByKey(updateNoun).map(lambda x: (x[0],max(x[1],key=len)))
#word_list = wordstup.updateByKey(updateNoun)

# *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****


def output(rdd):
    temp = rdd.filter(lambda x: x[0] == 'good')
    print("[",rdd.take(5),",", temp.collect(),"],")
  
word_list.foreachRDD(output)
  
ssc.start() 
ssc.awaitTermination(50)
ssc.stop(False)




      

[ [('social', 'appropriateness'), ('desirable', 'characteristic'), ('anti-fascist', 'Brataslava'), ('earthly', 'consideration'), ('feminist', 'existentialism')] , [('good', 'newspaper-seller')] ],
[ [('social', 'constructionalism'), ('desirable', 'characteristic'), ('anti-fascist', 'Brataslava'), ('earthly', 'gratification'), ('feminist', 'existentialism')] , [('good', 'newspaper-seller')] ],
[ [('social', 'démocratique-gaskiya'), ('desirable', 'characteristic'), ('anti-fascist', 'Brataslava'), ('earthly', 'gratification'), ('feminist', 'existentialism')] , [('good', 'newspaper-seller')] ],
[ [('social', 'démocratique-gaskiya'), ('desirable', 'characteristic'), ('anti-fascist', 'Brataslava'), ('earthly', 'jurisprudence'), ('feminist', 'Wollstonecraft')] , [('good', 'newspaper-seller')] ],
[ [('social', 'realism/constructivism'), ('desirable', 'characteristic'), ('anti-fascist', 'counter-protestor'), ('earthly', 'jurisprudence'), ('feminist', 're-interpretation')] , [('good', 'morning/a

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Our auto-grader uses spark-submit to submit your code to a
# cluster, so we need to create sc/spark here. You don't need
# this if you use Jupyter Notebook or shell.
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
ssc = StreamingContext(sc, 10)
# Provide a checkpointing directory. Required for stateful transformations
ssc.checkpoint("checkpoint")

numPartitions = 8
rdd = sc.textFile('numbers.txt', numPartitions)
rdd = rdd.map(lambda u: int(u))
rddQueue = rdd.randomSplit([1]*100, 123)
numbers = ssc.queueStream(rddQueue)

# In order to facilitate the grading, the output part of the
# code has been provided, you only need to care about how to
# find the the averages.


Stat = numbers.map(lambda x:(x,1)).reduceByWindow(lambda x,y: (x[0]+y[0],x[1]+y[1]), lambda x,y: (x[0]+y[0],x[1]+y[1]) ,30,10)


# *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****


def printResult(rdd):
    result = rdd.take(1)
    print(result[0][0]//result[0][1],",")

Stat.foreachRDD(printResult)
  
ssc.start() 
ssc.awaitTermination(50)
ssc.stop(False)

      

48766 ,
48492 ,
48828 ,
48333 ,
48187 ,


In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Our auto-grader uses spark-submit to submit your code to a
# cluster, so we need to create sc/spark here. You don't need
# this if you use Jupyter Notebook or shell.
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
ssc = StreamingContext(sc, 5)
# Provide a checkpointing directory. Required for stateful transformations
ssc.checkpoint("checkpoint")

numPartitions = 10
rdd = sc.textFile('adj_noun_pairs.txt', 8).map(lambda l: tuple(l.split())).filter(lambda p: len(p)==2)
rddQueue = rdd.randomSplit([1]*10, 123)
lines = ssc.queueStream(rddQueue)

# *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

# In order to facilitate the grading, the output part of the
# code has been provided, you only need to care about how to
# find the largest frequencies freq(A,N).

def updateFunc(a,b):
    if b is None:
        b = 0
    return sum(a,b)

counts_sorted = lines.map(lambda x: (x,1)).updateStateByKey(updateFunc).transform(lambda rdd: rdd.sortBy(lambda x:x[1],False))



# *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****


def printResults(rdd):
    print(rdd.take(10),",")

counts_sorted.foreachRDD(printResults)
  
ssc.start() 
ssc.awaitTermination(50)
ssc.stop(False)

      

[(('external', 'link'), 836), (('19th', 'century'), 327), (('same', 'time'), 280), (('20th', 'century'), 266), (('first', 'time'), 264), (('other', 'hand'), 236), (('large', 'number'), 227), (('civil', 'war'), 211), (('political', 'party'), 201), (('recent', 'year'), 189)] ,
[(('external', 'link'), 1622), (('19th', 'century'), 608), (('same', 'time'), 555), (('20th', 'century'), 544), (('first', 'time'), 532), (('other', 'hand'), 426), (('large', 'number'), 419), (('civil', 'war'), 412), (('recent', 'year'), 404), (('political', 'party'), 391)] ,
[(('external', 'link'), 2427), (('19th', 'century'), 897), (('20th', 'century'), 834), (('same', 'time'), 830), (('first', 'time'), 799), (('civil', 'war'), 654), (('large', 'number'), 630), (('other', 'hand'), 629), (('political', 'party'), 571), (('recent', 'year'), 564)] ,
[(('external', 'link'), 3248), (('19th', 'century'), 1187), (('20th', 'century'), 1128), (('same', 'time'), 1115), (('first', 'time'), 1059), (('civil', 'war'), 909), (('

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Our auto-grader uses spark-submit to submit your code to a
# cluster, so we need to create sc/spark here. You don't need
# this if you use Jupyter Notebook or shell.
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
ssc = StreamingContext(sc, 5)
# Provide a checkpointing directory. Required for stateful transformations
ssc.checkpoint("checkpoint")

numPartitions = 10
rdd = sc.textFile('adj_noun_pairs.txt', 8).map(lambda l: tuple(l.split())).filter(lambda p: len(p)==2)
rddQueue = rdd.randomSplit([1]*10, 123)
lines = ssc.queueStream(rddQueue)

# *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

# In order to facilitate the grading, the output part of the
# code has been provided, you only need to care about how to
# find the largest frequencies freq(A,N).

def updateFunc(a,b):
    if b is None:
        b = 0
    return sum(a,b)

counts_sorted = lines.map(lambda x: (x,1)).updateStateByKey(updateFunc).map(lambda x: (x[0][1],(x[0][0],x[1]))).reduceByKey(lambda x,y: x if (x[1]>y[1]) else y).map(lambda x: ((x[1][0],x[0]),x[1][1])).transform(lambda rdd: rdd.sortBy(lambda x:x[1],False))



# *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****


def printResults(rdd):
    print(rdd.take(10),",")

counts_sorted.foreachRDD(printResults)
  
ssc.start() 
ssc.awaitTermination(50)
ssc.stop(False)

      

[(('external', 'link'), 836), (('19th', 'century'), 327), (('same', 'time'), 280), (('other', 'hand'), 236), (('large', 'number'), 227), (('civil', 'war'), 211), (('political', 'party'), 201), (('recent', 'year'), 189), (('other', 'country'), 179), (('many', 'people'), 174)] ,
[(('external', 'link'), 1622), (('19th', 'century'), 608), (('same', 'time'), 555), (('other', 'hand'), 426), (('large', 'number'), 419), (('civil', 'war'), 412), (('recent', 'year'), 404), (('political', 'party'), 391), (('other', 'country'), 360), (('many', 'people'), 333)] ,
[(('external', 'link'), 2427), (('19th', 'century'), 897), (('same', 'time'), 830), (('civil', 'war'), 654), (('large', 'number'), 630), (('other', 'hand'), 629), (('political', 'party'), 571), (('recent', 'year'), 564), (('other', 'country'), 549), (('many', 'people'), 500)] ,
[(('external', 'link'), 3248), (('19th', 'century'), 1187), (('same', 'time'), 1115), (('civil', 'war'), 909), (('large', 'number'), 852), (('other', 'hand'), 836),