In [1]:
# Install findspark so that the Jupyter Notebook session knows where to find PySpark dependencies.
import findspark
findspark.init('/home/ubuntu/spark-3.3.1-bin-hadoop3')
findspark.find()

'/home/ubuntu/spark-3.3.1-bin-hadoop3'

## Part 2

In [2]:
from pyspark.sql import SparkSession

# The entry point into all functionality in Spark is the SparkSession class.
spark = (SparkSession
	.builder
	.appName("DS5110: A2 Simple Spark Algorithm")
	.master("spark://172.31.3.14:7077")
	.config("spark.executor.memory", "1024M")
	.getOrCreate())

# Read the data from a file into DataFrames
df = spark.read.csv("hdfs://172.31.3.14:9000/export.csv", header=True, inferSchema=True)

/home/ubuntu/spark-3.3.1-bin-hadoop3/conf/spark-env.sh: line 80: syntax error near unexpected token `newline'
/home/ubuntu/spark-3.3.1-bin-hadoop3/conf/spark-env.sh: line 80: `export SPARK_LOCAL_IP=<172.31.3.14>'
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/06 07:41:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [3]:
# Order DataFrame by the country code alphabetically (cca2) and then timestamp
df = df.orderBy(["cca2", "timestamp"], ascending=[True, True])

In [4]:
df.show()

+-------------+---------+----+----+--------------------+---------+--------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|battery_level|c02_level|cca2|cca3|                  cn|device_id|         device_name|humidity|             ip|latitude|   lcd|longitude|  scale|temp|    timestamp|
+-------------+---------+----+----+--------------------+---------+--------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|            5|     1217|  AE| ARE|United Arab Emirates|      501|device-mac-501e4O...|      48|  213.42.16.154|    24.0|yellow|     54.0|Celsius|  16|1458444054343|
|            0|      915|  AR| ARG|           Argentina|      227|meter-gauge-2273p...|      34|  200.71.230.81|   -34.6| green|   -58.38|Celsius|  15|1458444054251|
|            1|     1189|  AR| ARG|           Argentina|      319|meter-gauge-319Y3...|      54| 200.71.236.145|   -34.6|yellow|   -58.38|Celsius|  25|1458444054287|
|   

In [5]:
# Output the new DF results to HDFS as an HDFS file in form of csv
df.write.format("csv").mode("overwrite").save("hdfs://172.31.3.14:9000/sorted_export.csv")

                                                                                

In [6]:
spark.stop()

## Part 3

### Task 1

In [7]:
import re
import sys
from operator import add
from typing import Iterable, Tuple
from pyspark.resultiterable import ResultIterable

"""Helper function to calculate URL contributions to the rank of other URLs"""
def calculateRankContrib(urls: ResultIterable[str], rank: float) -> Iterable[Tuple[str, float]]:
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)


"""Helper function to parse a URLs string into URLs pair"""
def parseNeighborURLs(urls: str) -> Tuple[str, str]:
    parts = re.split(r'\s+', urls)
    return parts[0], parts[1]

spark = (SparkSession
	.builder
	.appName("DS5110: A2 PageRank")
	.master("spark://172.31.3.14:7077")
	.config("spark.executor.memory", "2048M")
	.getOrCreate())

# Load input file into lines RDD
linesRDD = spark.sparkContext.textFile("hdfs://172.31.3.14:9000/web-BerkStan.txt")

In [8]:
# Perform a transformation to define a links RDD by using parseNeighborURLs
linksRDD = linesRDD.map(lambda urls: parseNeighborURLs(urls)).distinct().groupByKey().cache()

# Initialize a ranks RDD with all ranks set to 1
ranksRDD = linksRDD.map(lambda url_neighbors: (url_neighbors[0], 1.0)).persist()

In [9]:
# PageRank Algorithm
for iteration in range(10):
    # Create RDD of contribution values using calculateRankContrib helper function
    contributeRDD = linksRDD.join(ranksRDD).flatMap(lambda url_rank: calculateRankContrib(url_rank[1][0], url_rank[1][1]))
    # Update the ranks according to contribution values
    ranksRDD = contributeRDD.reduceByKey(add).mapValues(lambda conSum: 0.15 + 0.85 * conSum).persist()

# Sort ranksRDD by rank value
sortedRDD = ranksRDD.sortBy(lambda url: url[1], ascending=False)

# Print top 50 ranks
for url, rank in sortedRDD.take(50):
    print(f"{url}: {rank}")

# Save rank results as a Spark DF to HDFS as an HDFS csv file
ranksDf = ranksRDD.toDF()
ranksDf.write.format("csv").save("hdfs://172.31.3.14:9000/pageRanks.csv")

                                                                                

272919: 6531.324623752469
438238: 4335.323158564438
571448: 2383.8976074118896
601656: 2195.3940755967283
316792: 1855.6908757901526
319209: 1632.8193684975693
184094: 1532.2842374483407
571447: 1492.9301630938794
401873: 1436.160093346929
66244: 1261.5783958673323
68949: 1260.7919421349116
284306: 1257.2475650644851
68948: 1251.1723536459208
96070: 1235.298540597624
86238: 1235.298540597624
86239: 1235.298540597624
95551: 1235.298540597624
68947: 1235.2985405976237
68946: 1235.2985405976237
77284: 1235.2985405976237
66909: 1235.2985405976237
95552: 1235.2985405976235
86237: 1235.2985405976235
768: 1225.5975665113076
927: 1117.8383051141836
210376: 920.6701252803678
95527: 919.6797146521081
100130: 916.0190658202678
101163: 912.5380530105941
95018: 911.183108007798
100646: 909.7095673033007
96045: 904.398131580974
66879: 895.7909746044757
210305: 893.0386730972408
319412: 887.9352083382674
571451: 875.7852546255617
570985: 871.5825582573228
544858: 869.6096568148239
184142: 863.2307781

AnalysisException: path hdfs://172.31.3.14:9000/pageRanks.csv already exists.

In [10]:
spark.stop()

### Task 2

In [11]:
spark = (SparkSession
	.builder
	.appName("DS5110: A2 PageRank")
	.master("spark://172.31.3.14:7077")
	.config("spark.executor.memory", "2048M")
	.getOrCreate())

# Load input file into lines RDD
linesRDD = spark.sparkContext.textFile("hdfs://172.31.3.14:9000/web-BerkStan.txt")
# Perform a transformation to define a links RDD by using parseNeighborURLs, and partition into 8 even chunks
linksRDD = linesRDD.map(lambda urls: parseNeighborURLs(urls)).distinct().groupByKey().partitionBy(8).cache()
# Initialize a ranks RDD with all ranks set to 1, and partition similarly
ranksRDD = linksRDD.map(lambda url_neighbors: (url_neighbors[0], 1.0)).partitionBy(8).persist()

# PageRank Algorithm
for iteration in range(10):
    # Create RDD of contribution values using calculateRankContrib helper function
    contributeRDD = linksRDD.join(ranksRDD).flatMap(lambda url_rank: calculateRankContrib(url_rank[1][0], url_rank[1][1]))
    ranksRDD = contributeRDD.reduceByKey(add).mapValues(lambda conSum: 0.15 + 0.85 * conSum).persist()

# Sort ranksRDD by rank value
sortedRDD = ranksRDD.sortBy(lambda url: url[1], ascending=False)
# Print top 50 ranks
for url, rank in sortedRDD.take(50):
    print(f"{url}: {rank}")



272919: 6531.324623752422
438238: 4335.323158564443
571448: 2383.8976074118846
601656: 2195.3940755967296
316792: 1855.6908757901415
319209: 1632.819368497568
184094: 1532.284237448333
571447: 1492.930163093876
401873: 1436.1600933469272
66244: 1261.5783958673355
68949: 1260.7919421349154
284306: 1257.2475650644835
68948: 1251.1723536459237
77284: 1235.2985405976276
95551: 1235.2985405976276
95552: 1235.2985405976272
86237: 1235.2985405976272
68946: 1235.298540597627
86239: 1235.298540597627
68947: 1235.2985405976267
66909: 1235.2985405976267
86238: 1235.2985405976267
96070: 1235.2985405976267
768: 1225.5975665113033
927: 1117.8383051141807
210376: 920.6701252803675
95527: 919.6797146521103
100130: 916.0190658202699
101163: 912.5380530105961
95018: 911.1831080078001
100646: 909.7095673033026
96045: 904.3981315809759
66879: 895.7909746044775
210305: 893.0386730972403
319412: 887.9352083382672
571451: 875.7852546255596
570985: 871.5825582573198
544858: 869.6096568148241
184142: 863.23077

                                                                                

In [12]:
spark.stop()

### Task 3

In [None]:
spark = (SparkSession
	.builder
	.appName("DS5110: A2 PageRank")
	.master("spark://172.31.3.14:7077")
	.config("spark.executor.memory", "2048M")
	.getOrCreate())

# Load input file into lines RDD
linesRDD = spark.sparkContext.textFile("hdfs://172.31.3.14:9000/web-BerkStan.txt")
# Perform a transformation to define a links RDD by using parseNeighborURLs
linksRDD = linesRDD.map(lambda urls: parseNeighborURLs(urls)).distinct().groupByKey().cache()
# Initialize a ranks RDD with all ranks set to 1
ranksRDD = linksRDD.map(lambda url_neighbors: (url_neighbors[0], 1.0)).persist()

# PageRank Algorithm (NOTE: Kill vm2 Worker node around iteration 5)
for iteration in range(10):
    # Create RDD of contribution values using calculateRankContrib helper function
    contributeRDD = linksRDD.join(ranksRDD).flatMap(lambda url_rank: calculateRankContrib(url_rank[1][0], url_rank[1][1]))
    # Update the ranks according to contribution values
    ranksRDD = contributeRDD.reduceByKey(add).mapValues(lambda conSum: 0.15 + 0.85 * conSum).persist()

# Sort ranksRDD by rank value
sortedRDD = ranksRDD.sortBy(lambda url: url[1], ascending=False)
# Print top 50 ranks
for url, rank in sortedRDD.take(50):
    print(f"{url}: {rank}")

[Stage 0:>                                                          (0 + 2) / 2]

In [None]:
spark.stop()