In [0]:
# IMPORTING LIBRARIES

from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.functions import col, min, max, avg
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import time
from pyspark.sql.types import *
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns
import warnings; warnings.filterwarnings(action='once')
from matplotlib.axes import Axes

In [0]:
# DATA UNDERSTANDING

In [0]:
# LOADING THE DATASET
ratings = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("/FileStore/tables/ratings.csv")
movies = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("/FileStore/tables/movies.csv")

In [0]:
# printing the column names and the top 5 rows of the column
print(ratings.columns)
print(ratings.show(5))

In [0]:
# printing the column names and the top 5 rows of the column
print(movies.columns)
print(movies.show(5))

In [0]:
# calculating sparsity
numerator = ratings.select("rating").count()
num_users = ratings.select("userId").distinct().count()
num_movies = ratings.select("movieId").distinct().count()
print(num_users)
print(num_movies)
denominator = num_users * num_movies
sparsity = (1.0 - (numerator *1.0)/denominator)*100

In [0]:
# printing sparsity
print(sparsity)

In [0]:
# distinct userId in ratings dataset
ratings.select("userId").distinct().count()

In [0]:
# distinct movieId in ratings dataset
ratings.select("movieId").distinct().count()

In [0]:
# number of ratings in the ratings dataset
ratings.select("rating").count()

In [0]:
# number of ratings given by each user in ratings dataset
userId_rating_count = ratings.groupBy("userId").count()
userId_rating_count.show()

In [0]:
# number of ratings given by each userId
userId_rating_count.sort("count", ascending = True).show()

In [0]:
# minimum count of rating by a userId
userId_rating_count.select(min("count")).show()

In [0]:
# maximum count of rating by a userId
userId_rating_count.select(max("count")).show()

In [0]:
# average count of rating by a userId
userId_rating_count.select(avg("count")).show()

In [0]:
# number of ratings received by a movie in ratings dataset
movieId_rating_count = ratings.groupBy("movieId").count()
movieId_rating_count.show()

In [0]:
# sorting the ratings received by a movie in ratings dataset
movieId_rating_count.sort("count", ascending = True).show()

In [0]:
# minimum count of rating by a movieId
movieId_rating_count.select(min("count")).show()

In [0]:
# maximum count of rating by a movieId
movieId_rating_count.select(max("count")).show()

In [0]:
# average count of rating by a movieId
movieId_rating_count.select(avg("count")).show()

In [0]:
# total number of movieId in the movies dataset
movies.select("movieId").count()

In [0]:
# DATA PREPARATION

In [0]:
# checking the ratings dataset format
print(ratings.show())

In [0]:
# checking data type of ratings columns
ratings.printSchema()

# checking data type of movies columns
movies.printSchema()

In [0]:
# checking data type of movies columns
movies.printSchema()

In [0]:
# checking if it dropped succesfully
ratings.show()

In [0]:
# joining ratings and movies data set
movies_ratings_data = ratings.join(movies, on='movieId', how='leftouter')

In [0]:
# final dataset
movies_ratings_data.show()

In [0]:
# MODELING

In [0]:
# using 100% of the dataset
sample_size = [100836]

# creating empty lists
rank_list = []
maxIter_list = []
regParam_list = []
time_list = []
RMSE_list = []


# creating a for loop for modeling
for s in sample_size:
    
    # taking sample through takeSample function
    sample_data = movies_ratings_data.rdd.takeSample(False, s, 42)

    # creating spark dataframes of sample_data
    sample = sqlContext.createDataFrame(sample_data)
    
    # recording the start time
    ALS_start = time.time()
    
    # spliting the dataset 
    (training_data, test_data) = sample.randomSplit([0.8, 0.2], seed = 42)
    
    # building a generic ALS model without hyperparameters
    als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True, implicitPrefs = False,\
          coldStartStrategy = "drop")
    
    # adding hyperparameters and their respective values to param_grid for parameter tuning
    param_grid= ParamGridBuilder().addGrid(als.rank, [5, 10, 15]).addGrid(als.maxIter, [10, 15, 20]).addGrid(als.regParam, [0.01, 0.05, 0.1]).build()
    
    # defining evaluator as RMSE
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
    
    # building cross validation using CrossValidator
    cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)
    
    # fitting cross validator on the training data
    model = cv.fit(training_data)
    
    # extracting the best combination of values (best model) from cross validation
    best_model = model.bestModel
    
    # generate test_data predictions
    test_predictions = best_model.transform(test_data)
   
    # evaluating the test_predictions using RMSE
    rmse = evaluator.evaluate(test_predictions)
    
    # recording the end time
    ALS_end = time.time()
    
    # appending the rank, maxIter, and regParam in the list
    rank_list.append((s, best_model.rank))
    maxIter_list.append((s, best_model._java_obj.parent().getMaxIter()))
    regParam_list.append((s, best_model._java_obj.parent().getRegParam()))
    
    # calculating the total_time
    total_time = ALS_end - ALS_start
    
    # appending time and rmse in the list
    time_list.append((s, total_time))
    RMSE_list.append((s, rmse))
        
# setting the schema for time and rmse
cSchema_time = StructType([StructField("size", IntegerType()),StructField("total_time", FloatType())])
cSchema_rmse = StructType([StructField("size", IntegerType()),StructField("rmse", FloatType())])

# creating the dataframe for time and rmse
df_time = spark.createDataFrame(time_list, schema = cSchema_time)
df_rmse = spark.createDataFrame(RMSE_list, schema = cSchema_rmse)

# displaying the time and rmse dataframe
df_time.show()
df_rmse.show()

In [0]:
# MEASURING THE IMPACT OF SCALE ON PERFORMANCE AND QUALITY

In [0]:
# distributing data into 25% size
sample_size = [25209]

# creating empty list
time_list_1 = []
RMSE_list_1 = []

# creating a for loop for modeling
for s in sample_size:
    
    # taking sample through takeSample function
    sample_data = movies_ratings_data.rdd.takeSample(False, s, 42)

    # creating spark dataframes of sample_data
    sample = sqlContext.createDataFrame(sample_data)
    
    # recording the start time
    ALS_start = time.time()
    
    # spliting the dataset 
    (training_data, test_data) = sample.randomSplit([0.8, 0.2], seed = 42)
    
    # building a generic ALS model without hyperparameters
    als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", rank = 10, maxIter = 20, regParam = 0.1, nonnegative = True,\
              implicitPrefs = False, coldStartStrategy = "drop")
    
    # fitting cross validator on the training data
    model = als.fit(training_data)
  
    # generate test_data predictions
    test_predictions = model.transform(test_data)
    
    # telling spark to evaluate predictions
    evaluator = RegressionEvaluator(metricName = "rmse", labelCol = "rating", predictionCol = "prediction")
   
    # evaluating the test_predictions using RMSE
    rmse = evaluator.evaluate(test_predictions)
    
    # recording the end time
    ALS_end = time.time()
    
    # calculating the total_time
    total_time = ALS_end - ALS_start
    
    # appending time and rmse in the list
    time_list_1.append((s, total_time))
    RMSE_list_1.append((s, rmse))
    
print(time_list_1)
print(RMSE_list_1)

In [0]:
# distributing data into 50% size
sample_size = [50418]

# creating empty list
time_list_2 = []
RMSE_list_2 = []

# creating a for loop for modeling
for s in sample_size:
    
    # taking sample through takeSample function
    sample_data = movies_ratings_data.rdd.takeSample(False, s, 42)

    # creating spark dataframes of sample_data
    sample = sqlContext.createDataFrame(sample_data)
    
    # recording the start time
    ALS_start = time.time()
    
    # spliting the dataset 
    (training_data, test_data) = sample.randomSplit([0.8, 0.2], seed = 42)
    
    # building a generic ALS model without hyperparameters
    als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", rank = 10, maxIter = 20, regParam = 0.1, nonnegative = True,\
              implicitPrefs = False, coldStartStrategy = "drop")
    
    # fitting cross validator on the training data
    model = als.fit(training_data)
  
    # generate test_data predictions
    test_predictions = model.transform(test_data)
    
    # telling spark to evaluate predictions
    evaluator = RegressionEvaluator(metricName = "rmse", labelCol = "rating", predictionCol = "prediction")
   
    # evaluating the test_predictions using RMSE
    rmse = evaluator.evaluate(test_predictions)
    
    # recording the end time
    ALS_end = time.time()
    
    # calculating the total_time
    total_time = ALS_end - ALS_start
    
    # appending time and rmse in the list
    time_list_2.append((s, total_time))
    RMSE_list_2.append((s, rmse))
    
print(time_list_2)
print(RMSE_list_2)

In [0]:
# distributing data into 75% size
sample_size = [75627]

# creating empty list
time_list_3 = []
RMSE_list_3 = []

# creating a for loop for modeling
for s in sample_size:
    
    # taking sample through takeSample function
    sample_data = movies_ratings_data.rdd.takeSample(False, s, 42)

    # creating spark dataframes of sample_data
    sample = sqlContext.createDataFrame(sample_data)
    
    # recording the start time
    ALS_start = time.time()
    
    # spliting the dataset 
    (training_data, test_data) = sample.randomSplit([0.8, 0.2], seed = 42)
    
    # building a generic ALS model without hyperparameters
    als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", rank = 10, maxIter = 20, regParam = 0.1, nonnegative = True,\
              implicitPrefs = False, coldStartStrategy = "drop")
    
    # fitting cross validator on the training data
    model = als.fit(training_data)
  
    # generate test_data predictions
    test_predictions = model.transform(test_data)
    
    # telling spark to evaluate predictions
    evaluator = RegressionEvaluator(metricName = "rmse", labelCol = "rating", predictionCol = "prediction")
   
    # evaluating the test_predictions using RMSE
    rmse = evaluator.evaluate(test_predictions)
    
    # recording the end time
    ALS_end = time.time()
    
    # calculating the total_time
    total_time = ALS_end - ALS_start
    
    # appending time and rmse in the list
    time_list_3.append((s, total_time))
    RMSE_list_3.append((s, rmse))

print(time_list_3)
print(RMSE_list_3)

In [0]:
# distributing data into 100% size
sample_size = [100836]

# creating empty list
time_list_4 = []
RMSE_list_4 = []

# creating a for loop for modeling
for s in sample_size:
    
    # taking sample through takeSample function
    sample_data = movies_ratings_data.rdd.takeSample(False, s, 42)

    # creating spark dataframes of sample_data
    sample = sqlContext.createDataFrame(sample_data)
    
    # recording the start time
    ALS_start = time.time()
    
    # spliting the dataset 
    (training_data, test_data) = sample.randomSplit([0.8, 0.2], seed = 42)
    
    # building a generic ALS model without hyperparameters
    als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", rank = 10, maxIter = 20, regParam = 0.1, nonnegative = True,\
              implicitPrefs = False, coldStartStrategy = "drop")
    
    # fitting cross validator on the training data
    model = als.fit(training_data)
  
    # generate test_data predictions
    test_predictions = model.transform(test_data)
    
    # telling spark to evaluate predictions
    evaluator = RegressionEvaluator(metricName = "rmse", labelCol = "rating", predictionCol = "prediction")
   
    # evaluating the test_predictions using RMSE
    rmse = evaluator.evaluate(test_predictions)
    
    # recording the end time
    ALS_end = time.time()
    
    # calculating the total_time
    total_time = ALS_end - ALS_start
    
    # appending time and rmse in the list
    time_list_4.append((s, total_time))
    RMSE_list_4.append((s, rmse))

print(time_list_4)
print(RMSE_list_4)

In [0]:
# MEASURING THE IMPACT OF PARALLEL AND NON_PARALLEL COMPUTATION AS SCALE INCREASES ON TIME

In [0]:
# distributing data into 100% size with numBlocks 1
sample_size = [100836]

# creating empty list
time_list_1 = []
RMSE_list_1 = []

# creating a for loop for modeling
for s in sample_size:
    
    # taking sample through takeSample function
    sample_data = movies_ratings_data.rdd.takeSample(False, s, 42)

    # creating spark dataframes of sample_data
    sample = sqlContext.createDataFrame(sample_data)
    
    # recording the start time
    ALS_start = time.time()
    
    # spliting the dataset 
    (training_data, test_data) = sample.randomSplit([0.8, 0.2], seed = 42)
    
    # building a generic ALS model without hyperparameters
    als = ALS(numUserBlocks = 1, numItemBlocks = 1, userCol="userId", itemCol="movieId", ratingCol="rating", rank = 10, maxIter = 20, regParam = 0.1, nonnegative = True,\
              implicitPrefs = False, coldStartStrategy = "drop")
    
    # fitting cross validator on the training data
    model = als.fit(training_data)
  
    # generate test_data predictions
    test_predictions = model.transform(test_data)
    
    # telling spark to evaluate predictions
    evaluator = RegressionEvaluator(metricName = "rmse", labelCol = "rating", predictionCol = "prediction")
   
    # evaluating the test_predictions using RMSE
    rmse = evaluator.evaluate(test_predictions)
    
    # recording the end time
    ALS_end = time.time()
    
    # calculating the total_time
    total_time = ALS_end - ALS_start
    
    # appending time and rmse in the list
    time_list_1.append((s, total_time))
    RMSE_list_1.append((s, rmse))
    
print(time_list_1)
print(RMSE_list_1)

In [0]:
# distributing data into 100% size with 2=numBlocks 2
sample_size = [100836]

# creating empty list
time_list_2 = []
RMSE_list_2 = []

# creating a for loop for modeling
for s in sample_size:
    
    # taking sample through takeSample function
    sample_data = movies_ratings_data.rdd.takeSample(False, s, 42)

    # creating spark dataframes of sample_data
    sample = sqlContext.createDataFrame(sample_data)
    
    # recording the start time
    ALS_start = time.time()
    
    # spliting the dataset 
    (training_data, test_data) = sample.randomSplit([0.8, 0.2], seed = 42)
    
    # building a generic ALS model without hyperparameters
    als = ALS(numUserBlocks = 2, numItemBlocks = 2, userCol="userId", itemCol="movieId", ratingCol="rating", rank = 10, maxIter = 20, regParam = 0.1, nonnegative = True,\
              implicitPrefs = False, coldStartStrategy = "drop")
    
    # fitting cross validator on the training data
    model = als.fit(training_data)
  
    # generate test_data predictions
    test_predictions = model.transform(test_data)
    
    # telling spark to evaluate predictions
    evaluator = RegressionEvaluator(metricName = "rmse", labelCol = "rating", predictionCol = "prediction")
   
    # evaluating the test_predictions using RMSE
    rmse = evaluator.evaluate(test_predictions)
    
    # recording the end time
    ALS_end = time.time()
    
    # calculating the total_time
    total_time = ALS_end - ALS_start
    
    # appending time and rmse in the list
    time_list_2.append((s, total_time))
    RMSE_list_2.append((s, rmse))
    
print(time_list_2)
print(RMSE_list_2)

In [0]:
# distributing data into 100% size with numBlocks 3
sample_size = [100836]

# creating empty list
time_list_3 = []
RMSE_list_3 = []

# creating a for loop for modeling
for s in sample_size:
    
    # taking sample through takeSample function
    sample_data = movies_ratings_data.rdd.takeSample(False, s, 42)

    # creating spark dataframes of sample_data
    sample = sqlContext.createDataFrame(sample_data)
    
    # recording the start time
    ALS_start = time.time()
    
    # spliting the dataset 
    (training_data, test_data) = sample.randomSplit([0.8, 0.2], seed = 42)
    
    # building a generic ALS model without hyperparameters
    als = ALS(numUserBlocks = 3, numItemBlocks = 3, userCol="userId", itemCol="movieId", ratingCol="rating", rank = 10, maxIter = 20, regParam = 0.1, nonnegative = True,\
              implicitPrefs = False, coldStartStrategy = "drop")
    
    # fitting cross validator on the training data
    model = als.fit(training_data)
  
    # generate test_data predictions
    test_predictions = model.transform(test_data)
    
    # telling spark to evaluate predictions
    evaluator = RegressionEvaluator(metricName = "rmse", labelCol = "rating", predictionCol = "prediction")
   
    # evaluating the test_predictions using RMSE
    rmse = evaluator.evaluate(test_predictions)
    
    # recording the end time
    ALS_end = time.time()
    
    # calculating the total_time
    total_time = ALS_end - ALS_start
    
    # appending time and rmse in the list
    time_list_3.append((s, total_time))
    RMSE_list_3.append((s, rmse))
    
print(time_list_3)
print(RMSE_list_3)

In [0]:
# distributing data into 100% size with numBlocks 4
sample_size = [100836]

# creating empty list
time_list_4 = []
RMSE_list_4 = []

# creating a for loop for modeling
for s in sample_size:
    
    # taking sample through takeSample function
    sample_data = movies_ratings_data.rdd.takeSample(False, s, 42)

    # creating spark dataframes of sample_data
    sample = sqlContext.createDataFrame(sample_data)
    
    # recording the start time
    ALS_start = time.time()
    
    # spliting the dataset 
    (training_data, test_data) = sample.randomSplit([0.8, 0.2], seed = 42)
    
    # building a generic ALS model without hyperparameters
    als = ALS(numUserBlocks = 4, numItemBlocks = 4, userCol="userId", itemCol="movieId", ratingCol="rating", rank = 10, maxIter = 20, regParam = 0.1, nonnegative = True,\
              implicitPrefs = False, coldStartStrategy = "drop")
    
    # fitting cross validator on the training data
    model = als.fit(training_data)
  
    # generate test_data predictions
    test_predictions = model.transform(test_data)
    
    # telling spark to evaluate predictions
    evaluator = RegressionEvaluator(metricName = "rmse", labelCol = "rating", predictionCol = "prediction")
   
    # evaluating the test_predictions using RMSE
    rmse = evaluator.evaluate(test_predictions)
    
    # recording the end time
    ALS_end = time.time()
    
    # calculating the total_time
    total_time = ALS_end - ALS_start
    
    # appending time and rmse in the list
    time_list_4.append((s, total_time))
    RMSE_list_4.append((s, rmse))
    
print(time_list_4)
print(RMSE_list_4)

In [0]:
# RECOMMENDATIONS

In [0]:
# generate 5 recommendations for all users
ALS_recommendations = model.recommendForAllUsers(5)

In [0]:
# showing the recommendations
ALS_recommendations.show(5)

In [0]:
# creating a temporary table
ALS_recommendations.registerTempTable('ALS_recs_temp')

In [0]:
# selecting data from temporary table
new_recs = spark.sql("SELECT userId,\
                               movieIds_and_ratings.movieId AS movieId,\
                               movieIds_and_ratings.rating AS prediction\
                       FROM ALS_recs_temp\
                       LATERAL VIEW explode(recommendations) exploded_table\
                       AS movieIds_and_ratings")

In [0]:
# showing records of the newtable created
new_recs.show(5)

In [0]:
# joining the new records table with the movies table using left join
new_recs_2 = new_recs.join(movies, ['movieId'], "left")
new_recs_2.show(5)

In [0]:
# joining the table created above with the ratings table and setting the filter to suggest movies which the userId has not seen
final_recs = new_recs_2.join(ratings, ["userId", 'movieId'], "left").filter(ratings.rating.isNull())

In [0]:
# showing the final recommendations
final_recs.show(5)

In [0]:
# seeing the movies rated by userId 60
movies_ratings_data.filter(col("userId") == 60).show()

In [0]:
# seeing the recommednations for userId 60
clean_recs.filter(col("userId") == 60).show()

In [0]:
# ANALYSIS

In [0]:
# IMPACT OF SCALE ON PERFORMANCE (TIME)

In [0]:
# building a dataframe of time on local machine
time_local = [197.27, 209.52, 226.90, 237.77]
size = ['25', '50', '75', '100']
df_time_local = pd.DataFrame(list(zip(size, time_local_1)))
df_time_local.columns = ['size', 'time']
df_time_local = df_time_local.set_index('size')
df_time_local

In [0]:
# building a dataframe of time on aws
time_aws = [11.92, 13.47, 19.18, 22.38]
size = ['25', '50', '75', '100']
df_time_aws = pd.DataFrame(list(zip(size, time_aws)))
df_time_aws.columns = ['size', 'time']
df_time_aws = df_time_aws.set_index('size')
df_time_aws

In [0]:
# plotting the graph to measure the impact of scale on performance (time)
plt.figure(figsize=(7,5), dpi= 120);
plt.plot(df_time_local.index, df_time_local['time'], color='darkorange', label = 'Local', lw=2);
plt.plot(df_time_aws.index, df_time_aws['time'], color='darkblue', label = 'Cloud', lw=2);
plt.legend(loc="upper left");
plt.scatter(df_time_local.index,df_time_local['time'], color='darkorange', lw=2);
plt.scatter(df_time_aws.index,df_time_aws['time'], color='darkblue', lw=2);
plt.xlabel('SCALE',fontsize=12);
plt.ylabel('RUNTIME (SECONDS)',fontsize=12);
plt.title('IMPACT OF SCALE ON PERFORMANCE (TIME)', fontdict={'size':14});
plt.grid(alpha=0.5)
labels = ['25%','50%', '75%', '100%']
plt.show();



In [0]:
# IMPACT OF SCALE ON QUALITY (RMSE)

In [0]:
# building a dataframe for RMSE genearted on local
size = [25, 50, 75, 100]
rmse_local = [1.05, 0.96, 0.91, 0.87]
df_rmse_local = pd.DataFrame(list(zip(size, rmse_local)))
df_rmse_local.columns = ['Size', 'RMSE']
df_rmse_local = df_rmse_local.set_index('Size')
df_rmse_local

In [0]:
# building a dataframe for RMSE genearted on aws
size1 = [25, 50, 75, 100]
rmse_aws = [1.07, 0.94, 0.91, 0.88]
df_rmse_aws = pd.DataFrame(list(zip(size1, rmse_aws)))
df_rmse_aws.columns = ['Size', 'RMSE']
df_rmse_aws = df_rmse_aws.set_index('Size')
df_rmse_aws

In [0]:
# plotting the graph to measure the impact of scale on quality (rmse)
plt.figure(figsize=(7,5), dpi= 120);
plt.plot(df_rmse_local.index, df_rmse_local['RMSE'], color='darkorange', label = 'Local', lw=2);
plt.plot(df_rmse_aws.index, df_rmse_aws['RMSE'], color='darkblue', label = 'Cloud', lw=2);
plt.legend(loc="upper right");
plt.scatter(df_rmse_local.index,df_rmse_local['RMSE'], color='darkorange', lw=2);
plt.scatter(df_rmse_aws.index,df_rmse_aws['RMSE'], color='darkblue', lw=2);
plt.xlabel('SCALE',fontsize=12);
plt.ylabel('RMSE',fontsize=12);
plt.title('IMPACT OF SCALE ON QUALITY', fontdict={'size':14});
plt.grid(alpha=1)
labels = ['25%','50%', '75%', '100%']
plt.show();

In [0]:
# IMPACT OF PARALLEL COMPUTATION AS SCALE INCREASES COMPARED TO NON-PARALLEL EXECUTION

In [0]:
# building a dataframe with respect to time for 100% data on local
number = [1, 2, 3, 4]
size1 = [100, 100, 100, 100']
time2 = [125.85, 109.13, 102.65, 99.86]
df_time_local2 = pd.DataFrame(list(zip(size1, number, time2)))
df_time_local2.columns = ['Size of the Dataset', 'Number', 'Time']
df_time_local2 = df_time_local2.set_index('Size of the Dataset')
df_time_local2

In [0]:
# building a dataframe with respect to time for 100% data on aws
number = [1, 2, 3, 4]
time3 = [33.39, 22.75, 16.10, 10.96]
size3 = [100, 100, 100, 100]
df_time_aws3 = pd.DataFrame(list(zip(size3, number, time3)))
df_time_aws3.columns = ['Size of the Dataset', 'Number', 'Time']
df_time_aws3 = df_time_aws3.set_index('Size of the Dataset')
df_time_aws3

In [0]:
# plotting a graph to study the impact of parallel and non-parallel computation as scale increases
plt.figure(figsize=(7,5), dpi= 120);
plt.plot(df_time_local2['Number'], df_time_local2['Time'], color='darkorange', label = 'Local', lw=2);
plt.plot(df_time_aws3['Number'], df_time_aws3['Time'], color='darkblue', label = 'Cloud', lw=2);
plt.legend(loc="upper right");
plt.scatter(df_time_local2['Number'],df_time_local2['Time'], color='darkorange', lw=2);
plt.scatter(df_time_aws3['Number'],df_time_aws3['Time'], color='darkblue', lw=2);
plt.xlabel('NUMBER',fontsize=12);
plt.ylabel('RUNTIME (SECONDS)',fontsize=12);
plt.title('IMPACT OF PARALLEL COMPUTATION ON PERFORMANCE (TIME)', fontdict={'size':14});
plt.grid(alpha=0.5)
labels = ['1', '2', '3', '4']
plt.show();

In [0]:
# building a dataframe with respect to time for 25% data on local
number = [1, 2, 3, 4]
size = [25, 25, 25, 25]
time1_local = [26.95, 30.69, 36.65, 41.68]
df_time_local2 = pd.DataFrame(list(zip(size, number, time1_local)))
df_time_local2.columns = ['Size of the Dataset', 'Number', 'Time']
df_time_local2 = df_time_local2.set_index('Size of the Dataset')
df_time_local2

In [0]:
# building a dataframe with respect to time for 25% data on aws
number = [1, 2, 3, 4]
size = [25, 25, 25, 25]
time_aws2 = [26.09, 18.01, 16.35, 10.73]
df_time_aws2 = pd.DataFrame(list(zip(size, number, time_aws2)))
df_time_aws2.columns = ['Size of the Dataset', 'Number', 'Time']
df_time_aws2 = df_time_aws2.set_index('Size of the Dataset')
df_time_aws2

In [0]:
# plotting a graph to study the impact of parallel and non-parallel computation as scale increases
plt.figure(figsize=(7,5), dpi= 120);
plt.plot(df_time_local2['Number'], df_time_local2['Time'], color='darkorange', label = 'Local', lw=2);
plt.plot(df_time_aws2['Number'], df_time_aws2['Time'], color='darkblue', label = 'Cloud', lw=2);
plt.legend(loc="upper left");
plt.scatter(df_time_local2['Number'],df_time_local2['Time'], color='darkorange', lw=2);
plt.scatter(df_time_aws2['Number'],df_time_aws2['Time'], color='darkblue', lw=2);
plt.xlabel('NUMBER',fontsize=12);
plt.ylabel('RUNTIME (SECONDS)',fontsize=12);
plt.title('IMPACT OF PARALLEL COMPUTATION ON PERFORMANCE (TIME)', fontdict={'size':14});
plt.grid(alpha=0.5)
labels = ['1', '2', '3', '4']
plt.show();