<a href="https://colab.research.google.com/github/andiub97/CovidPubRank/blob/master/CovidPageRank.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Page Ranking Algorithms on Google Cloud Dataproc

- Use the [Cloud Resource Manager](https://cloud.google.com/resource-manager) to create a project if you do not already have one.
- Enable Dataproc and Cloud Storage services for the project 
- [Enable billing](https://support.google.com/cloud/answer/6293499#enable-billing) for the project.
- See [Google Cloud Storage (GCS) Documentation](https://cloud.google.com/storage/) for more info.


## Set Environment variables(optional) 
Here you can choose if install colab-env dependency and allow Google Drive to access to your Drive and create environment file containing enviroment variables that you can choose for all later tasks involving bucket, cluster properties and so on.

In [None]:
!pip install colab-env --upgrade

In [None]:
import colab_env

colab_env.envvar_handler.add_env("GOOGLE_PROJECT_ID", "insert_project-name", overwrite=True)
colab_env.envvar_handler.add_env("DATASET_BUCKET", "insert_input-file-bucket-name", overwrite=True)
colab_env.envvar_handler.add_env("OUTPUT_BUCKET", "insert_output-file-bucket-name", overwrite=True)
colab_env.envvar_handler.add_env("CLUSTER_REGION", "insert_cluster-regione-name", overwrite=True)
colab_env.envvar_handler.add_env("CLUSTER_ZONE", "insert_cluster-zone-name", overwrite=True)

!more gdrive/My\ Drive/vars.env

## Allow Google Cloud access to the notebook and set GC project for this session

In [None]:
import os
from google.colab import auth
auth.authenticate_user()

project_id = os.getenv("GOOGLE_PROJECT_ID")
!gcloud config set project {project_id}

## Clone CovidPubRank repo from Github, unzip citation archive and move files to "citations" folder 

In [None]:
import tarfile
!git clone https://github.com/andiub97/CovidPubRank.git

tar = tarfile.open("./CovidPubRank/data/data.tar.gz")
tar.extractall()
tar.close()

!mkdir ./sample_data/datasets
!mv ./dataset_1015681.txt ./sample_data/datasets
!mv ./dataset_32685.txt ./sample_data/datasets
!mv ./dataset_14924.txt ./sample_data/datasets
!mv ./dataset_9647.txt ./sample_data/datasets

## Create buckets and load datasets and jar file into them
Create a bucket and upload datasets into it.
Remember create a bucket for jar file and upload it using GCP Graphic Interface or shell, but we suggest using Google Cloud CLI by your machine

In [None]:
bucket_name = os.getenv("DATASET_BUCKET")

!gsutil mb -l us-central1 -b on gs://{bucket_name}

# Copy files to new bucket.
!gsutil cp ./sample_data/datasets/dataset_9647.txt gs://{bucket_name}/
!gsutil cp ./sample_data/datasets/dataset_14924.txt gs://{bucket_name}/
!gsutil cp ./sample_data/datasets/dataset_32685.txt gs://{bucket_name}/
!gsutil cp ./sample_data/datasets/dataset_1015681.txt gs://{bucket_name}/

## Create output bucket for storing algorithms statistics

In [None]:
output_bucket_name = os.getenv("OUTPUT_BUCKET")
!gsutil mb -l us-central1 -b on gs://{output_bucket_name}

# Weak Scalability 
### Perform all algorithms on same cluster varying in dataset size

## Create single node cluster

In [None]:
region = os.getenv("CLUSTER_REGION")
zone = os.getenv("CLUSTER_ZONE")

!gcloud dataproc clusters create single-node-cluster \
  --region {region} \
  --zone {zone} \
  --single-node 

## Send Dataproc jobs to single-node cluster for all datasets and algorithms

In [None]:
!gcloud dataproc jobs submit spark \
    --cluster=single-node-cluster \
    --region={os.getenv("CLUSTER_REGION")} \
    --jar=gs://covid_program/covidpubrank_2.12-0.1.0-SNAPSHOT.jar \
    -- "local" "AllAlgorithms" {os.getenv("DATASET_BUCKET")}"dataset_9647.txt" {os.getenv("OUTPUT_BUCKET")}"single-node/dataset_9647" "4" "localOnCloud" "0"

In [None]:
!gcloud dataproc jobs submit spark \
    --cluster=single-node-cluster \
    --region={os.getenv("CLUSTER_REGION")} \
    --jar=gs://covid_program/covidpubrank_2.12-0.1.0-SNAPSHOT.jar \
    -- "local" "AllAlgorithms" {os.getenv("DATASET_BUCKET")}"dataset_14924.txt" {os.getenv("OUTPUT_BUCKET")}"single-node/dataset_14924" "4" "localOnCloud" "0"

In [None]:
!gcloud dataproc jobs submit spark \
    --cluster=single-node-cluster \
    --region={os.getenv("CLUSTER_REGION")} \
    --jar=gs://covid_program/covidpubrank_2.12-0.1.0-SNAPSHOT.jar \
    -- "local" "AllAlgorithms" {os.getenv("DATASET_BUCKET")}"dataset_32685.txt" {os.getenv("OUTPUT_BUCKET")}"single-node/dataset_32685" "4" "localOnCloud" "0"

In [None]:
!gcloud dataproc jobs submit spark \
    --cluster=single-node-cluster \
    --region={os.getenv("CLUSTER_REGION")} \
    --jar=gs://covid_program/covidpubrank_2.12-0.1.0-SNAPSHOT.jar \
    -- "local" "AllAlgorithms" {os.getenv("DATASET_BUCKET")}"dataset_1015681.txt" {os.getenv("OUTPUT_BUCKET")}"single-node/dataset_1015681" "4" "localOnCloud" "0"

## Delete cluster

In [None]:
!gcloud dataproc clusters delete single-node-cluster \
    --region={os.getenv("CLUSTER_REGION")}

# Strong scalability
### Perform all algorithms on same dataset varying number of workers per cluster

## Create 2-workers cluster with n1-standard-4 machines

In [None]:
!gcloud dataproc clusters create two-workers-cluster \
  --region {os.getenv("CLUSTER_REGION")} \
  --zone {os.getenv("CLUSTER_ZONE")} \
  --master-machine-type n1-standard-4\
  --master-boot-disk-size 500 \
  --worker-machine-type n1-standard-4 \
  --num-workers 2 \
  --worker-boot-disk-size 500

## Send Dataproc jobs performing DistributedPageRank and ParallelPageRankLibrary algorithms on "dataset_1015681.txt" dataset

In [None]:
!gcloud dataproc jobs submit spark \
    --cluster=two-workers-cluster \
    --region={os.getenv("CLUSTER_REGION")} \
    --jar=gs://covid_program/covidpubrank_2.12-0.1.0-SNAPSHOT.jar \
    -- "yarn" "DistributedAlgorithms" {os.getenv("DATASET_BUCKET")}"dataset_1015681.txt" {os.getenv("OUTPUT_BUCKET")}"two-nodes-4/distributed" "16" "distributedOnCloud" "two_workers_n1_standard_4"

## Delete cluster

In [None]:
!gcloud dataproc clusters delete two-workers-cluster \
    --region={os.getenv("CLUSTER_REGION")}

## Create 2-workers cluster with n1-standard-8 machines

In [None]:
!gcloud dataproc clusters create two-workers-cluster \
  --region {os.getenv("CLUSTER_REGION")} \
  --zone {os.getenv("CLUSTER_ZONE")} \
  --master-machine-type n1-standard-8\
  --master-boot-disk-size 500 \
  --worker-machine-type n1-standard-8 \
  --num-workers 2 \
  --worker-boot-disk-size 500

## Send Dataproc jobs performing DistributedPageRank and ParallelPageRankLibrary algorithms on "dataset_1015681.txt" dataset

In [None]:
!gcloud dataproc jobs submit spark \
    --cluster=two-workers-cluster \
    --region={os.getenv("CLUSTER_REGION")} \
    --jar=gs://covid_program/covidpubrank_2.12-0.1.0-SNAPSHOT.jar \
    -- "yarn" "DistributedAlgorithms" {os.getenv("DATASET_BUCKET")}"dataset_1015681.txt" {os.getenv("OUTPUT_BUCKET")}"two-nodes-8/distributed" "16" "distributedOnCloud" "two_workers_n1_standard_8"

## Delete cluster

In [None]:
!gcloud dataproc clusters delete two-workers-cluster \
    --region={os.getenv("CLUSTER_REGION")}

## Create 4-workers cluster

In [None]:
!gcloud dataproc clusters create four-workers-cluster \
  --region {os.getenv("CLUSTER_REGION")} \
  --zone {os.getenv("CLUSTER_ZONE")} \
  --master-machine-type n1-standard-4 \
  --master-boot-disk-size 500 \
  --worker-machine-type n1-standard-4 \
  --num-workers 4\
  --worker-boot-disk-size 500 \


## Send Dataproc jobs performing DistributedPageRank and ParallelPageRankLibrary algorithms on "dataset_1015681.txt" dataset

In [None]:
!gcloud dataproc jobs submit spark \
    --cluster=four-workers-cluster \
    --region={os.getenv("CLUSTER_REGION")} \
    --jar=gs://covid_program/covidpubrank_2.12-0.1.0-SNAPSHOT.jar \
    -- "yarn" "DistributedAlgorithms" {os.getenv("DATASET_BUCKET")}"dataset_1015681.txt" {os.getenv("OUTPUT_BUCKET")}"four-nodes/distributed" "16" "distributedOnCloud" "four_workers_n1_standard_4"

## Delete cluster

In [None]:
!gcloud dataproc clusters delete four-workers-cluster \
    --region={os.getenv("CLUSTER_REGION")}

## Create 5-workers cluster

In [None]:
!gcloud dataproc clusters create five-workers-cluster \
  --region {os.getenv("CLUSTER_REGION")} \
  --zone {os.getenv("CLUSTER_ZONE")} \
  --master-machine-type n1-standard-4 \
  --master-boot-disk-size 500 \
  --worker-machine-type n1-standard-4 \
  --num-workers 5 \
  --worker-boot-disk-size 500 \

## Send Dataproc jobs performing DistributedPageRank and ParallelPageRankLibrary algorithms on "dataset_1015681.txt" dataset

In [None]:
!gcloud dataproc jobs submit spark \
    --cluster=five-workers-cluster \
    --region={os.getenv("CLUSTER_REGION")} \
    --jar=gs://covid_program/covidpubrank_2.12-0.1.0-SNAPSHOT.jar \
    -- "yarn" "DistributedAlgorithms" {os.getenv("DATASET_BUCKET")}"dataset_1015681.txt" {os.getenv("OUTPUT_BUCKET")}"five-nodes/distributed" "16" "distributedOnCloud" "five_workers_n1_standard_4"

## Delete cluster

In [None]:
!gcloud dataproc clusters delete five-workers-cluster \
    --region={os.getenv("CLUSTER_REGION")}

## Get jobs list and delete job specifying its id

In [None]:
!gcloud config set dataproc/region {os.getenv("CLUSTER_REGION")}
listJob = !gcloud dataproc jobs list --format='value(JOB_ID)'

for i in listJob:
  !gcloud dataproc jobs delete {i}

## Get job execution output, in other words get ranking algorithms' execution time and plot them

In [None]:
!gsutil -m cp -r "gs://output_bucket_results" .

In [43]:
with open("./output_bucket_results/five-nodes/distributed/part-00000") as textfile1, open("./output_bucket_results/four-nodes/distributed/part-00000") as textfile2, open("./output_bucket_results/two-nodes/distributed/part-00000") as textfile3, open("./output_bucket_results/two-nodes-8/distributed/part-00000") as textfile4: 
    data1 = textfile1.read()
    data2 = textfile2.read()
    data3 = textfile3.read()
    data4 = textfile4.read()


data1 += data2
data1 += data3
data1 += data4


with open("./output_bucket_results/strong-scalability-result.txt", 'w') as textfile5:

    textfile5.write(data1)

In [None]:
with open("./output_bucket_results/single-node/dataset_1015681/part-00000") as textfile1, open("./output_bucket_results/single-node/dataset_14924/part-00000") as textfile2, open("./output_bucket_results/single-node/dataset_32685/part-00000") as textfile3, open("./output_bucket_results/single-node/dataset_9647/part-00000") as textfile4: 
    data1 = textfile1.read()
    data2 = textfile2.read()
    data3 = textfile3.read()
    data4 = textfile4.read()

data1 += "\n"+ data2
data1 += data3 
data1 += data4
print (data1)
with open("./output_bucket_results/weak-scalability-result.txt", 'w') as textfile5:

    textfile5.write(data1)

## Plot execution time of algorithms showing weak scalability

In [None]:
import numpy as np
import matplotlib.pyplot as plt                                                

names = []
times = []
dataset_name = []
chars = "()\n"

results1 = {'dataset_9647.txt': {
       'ranking.DistributedPageRank': '0.0',
       'ranking.ParallelPageRankLibrary': '0.0',
       'ranking.PageRank':'0.0',
       'ranking.PageRankLibrary':'0.0'
   },
   'dataset_14924.txt': {
       'ranking.DistributedPageRank': '0.0',
       'ranking.ParallelPageRankLibrary': '0.0',
       'ranking.PageRank':'0.0',
       'ranking.PageRankLibrary':'0.0'
   },
   'dataset_32685.txt': {
       'ranking.DistributedPageRank': '0.0',
       'ranking.ParallelPageRankLibrary': '0.0',
       'ranking.PageRank':'0.0',
       'ranking.PageRankLibrary':'0.0'
   },
   'dataset_1015681.txt': {
       'ranking.DistributedPageRank': '0.0',
       'ranking.ParallelPageRankLibrary': '0.0',
       'ranking.PageRank':'0.0',
       'ranking.PageRankLibrary':'0.0'
   }
}

f = open("./output_bucket_results/weak-scalability-result.txt",'r')
     
    

for row in f:
  for c in chars:
    row = row.replace(c,"")
  for c in ",":
    row = row.replace(c," ")
    row = row.split(' ')
  if ("dataset_9647.txt" == row[2]):
    results1["dataset_9647.txt"][row[0]] = (row[1])
  if ("dataset_14924.txt" == row[2]):
    results1["dataset_14924.txt"][row[0]] = (row[1])
  if ("dataset_32685.txt" == row[2]):
    results1["dataset_32685.txt"][row[0]] = (row[1])    
  if ("dataset_1015681.txt" == row[2]):
    results1["dataset_1015681.txt"][row[0]] = (row[1])

# set width of bar
barWidth = 0.1
fig = plt.subplots(figsize =(16, 9))

# set height of bar
dataset_9647 = []
for i in results1["dataset_9647.txt"]:
    dataset_9647.append(float(results1["dataset_9647.txt"][i]))

dataset_14924 = []
for i in results1["dataset_14924.txt"]:
    dataset_14924.append(float(results1["dataset_14924.txt"][i]))

dataset_32685 = []
for i in results1["dataset_32685.txt"]:
    dataset_32685.append(float(results1["dataset_32685.txt"][i]))

dataset_1015681 = []
for i in results1["dataset_1015681.txt"]:
    dataset_1015681.append(float(results1["dataset_1015681.txt"][i]))

# Set position of bar on X axis
br1 = np.arange(len(dataset_9647))
br2 = [x + barWidth for x in br1]
br3 = [x + barWidth for x in br2]
br4 = [x + barWidth for x in br3]

# Make the plot
plt.bar(br1, dataset_9647, color ='r', width = barWidth,
		edgecolor ='black', label ='dataset_9647.txt')

plt.bar(br2, dataset_14924, color ='g', width = barWidth,
		edgecolor ='black', label ='dataset_14924.txt')

plt.bar(br3, dataset_32685, color ='b', width = barWidth,
		edgecolor ='black', label ='dataset_32685.txt')

plt.bar(br4, dataset_1015681, color ='yellow', width = barWidth,
		edgecolor ='black', label ='dataset_1015681.txt')

# Adding Xticks
plt.xlabel('Algorithms Names', fontweight ='bold', fontsize = 10)
plt.ylabel('Execution time [s] in log scale', fontweight ='bold', fontsize = 10)
# Setting a logarithmic scale for y-axis
x = results1["dataset_9647.txt"]

ticks = []
for k in x.keys():
  if k:
      ticks.append(k)
plt.xticks([r + barWidth for r in range(len(br1))],ticks)
plt.yscale('log')
plt.ylim(1)
plt.legend()
plt.title("Weak scalability performance", fontdict = {'fontsize' : 20})
plt.show()

## Plot execution time of algorithms showing strong scalability

In [None]:
import numpy as np
import matplotlib.pyplot as plt                                                

names = []
times = []
dataset_name = []
chars = "()\n"

stats = {
   "two_workers_n1_standard_4":{
      "ranking.DistributedPageRank":"0.0",
      "ranking.ParallelPageRankLibrary":"0.0"
   },
   "two_workers_n1_standard_8":{
      "ranking.DistributedPageRank":"0.0",
      "ranking.ParallelPageRankLibrary":"0.0"
   },
   "four_workers_n1_standard_4":{
      "ranking.DistributedPageRank":"0.0",
      "ranking.ParallelPageRankLibrary":"0.0"
   },
    "five_workers_n1_standard_4":{
      "ranking.DistributedPageRank":"0.0",
      "ranking.ParallelPageRankLibrary":"0.0"
   }
}

f = open('./output_bucket_results/strong-scalability-result.txt','r')
     
    
for row in f:
  for c in chars:
    row = row.replace(c,"")
  for c in ",":
    row = row.replace(c," ")
    row = row.split(' ')
  if ("two_workers_n1_standard_4" == row[3]):
    stats["two_workers_n1_standard_4"][row[0]] = (row[1])
  if ("two_workers_n1_standard_8" == row[3]):
    stats["two_workers_n1_standard_8"][row[0]] = (row[1])
  if ("four_workers_n1_standard_4" == row[3]):
    stats["four_workers_n1_standard_4"][row[0]] = (row[1])
  if ("five_workers_n1_standard_4" == row[3]):
    stats["five_workers_n1_standard_4"][row[0]] = (row[1])

# set width of bar
barWidth = 0.05
fig = plt.subplots(figsize = (16, 9))

# set height of bar
two_workers_n1_standard_4 = []
for i in stats["two_workers_n1_standard_4"]:   
  two_workers_n1_standard_4.append(float(stats["two_workers_n1_standard_4"][i]))

two_workers_n1_standard_8 = []
for i in stats["two_workers_n1_standard_8"]:   
  two_workers_n1_standard_8.append(float(stats["two_workers_n1_standard_8"][i]))

four_workers_n1_standard_4 = []
for i in stats["four_workers_n1_standard_4"]:   
  four_workers_n1_standard_4.append(float(stats["four_workers_n1_standard_4"][i]))

five_workers_n1_standard_4 = []
for i in stats["five_workers_n1_standard_4"]:   
  five_workers_n1_standard_4.append(float(stats["five_workers_n1_standard_4"][i]))

# Set position of bar on X axis
br1 = np.arange(len(two_workers_n1_standard_4))
br2 = [x + barWidth for x in br1]
br3 = [x + barWidth for x in br2]
br4 = [x + barWidth for x in br3]

# Make the plot
plt.bar(br1, two_workers_n1_standard_4, color ='r', width = barWidth,
		edgecolor ='black', label ='2 workers n1-standard-4')

plt.bar(br2, two_workers_n1_standard_8, color ='g', width = barWidth,
		edgecolor ='black', label ='2 workers n1-standard-8')

plt.bar(br3, four_workers_n1_standard_4, color ='b', width = barWidth,
		edgecolor ='black', label ='4 workers n1-standard-4')

plt.bar(br4, five_workers_n1_standard_4, color ='yellow', width = barWidth,
		edgecolor ='black', label ='5 workers n1-standard-4')

# Adding Xticks
plt.xlabel('Algorithms Names', fontweight ='bold', fontsize = 10)
plt.ylabel('Execution time [s]', fontweight ='bold', fontsize = 10)
# Setting a logarithmic scale for y-axis
x = stats["two_workers_n1_standard_4"]

ticks = []
for k in x.keys():
  if k:
      ticks.append(k)

plt.xticks([r + barWidth for r in range(len(br1))],ticks)
plt.title("Algorithms result with 75648912 edges dataset", fontdict = {'fontsize' : 20})
plt.legend()
plt.show()
    


