In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType
import time
import pandas as pd
import nltk
from nltk.sentiment import SentimentIntensityAnalyzer

nltk.download('vader_lexicon')
sid = SentimentIntensityAnalyzer()

def analyze_sentiment(text):
    sentiment_score = sid.polarity_scores(text)
    return sentiment_score['compound']

# Register UDF once, outside loop to avoid re-registration overhead
sentiment_udf = udf(analyze_sentiment, FloatType())

results = []
chunk_amounts = [100, 600, 3600]
executor_cores_options = [1, 2]
executor_nodes_options = [1, 2]
iterations = 2

for datasize in chunk_amounts:
    file_paths = [f"hdfs://master:9000/path/in/hdfs/JsonFiles/corpus-webis-shortened_chunk{i}.json" for i in range(1, datasize + 1)]
    
    for executor_nodes in executor_nodes_options:
        for executor_cores in executor_cores_options:
            # Initialize Spark session with specified configuration
            spark_session = SparkSession.builder\
                           .appName(f"STRONG_SCAL_DF_cores_{executor_cores}_instances_{executor_nodes}_datasize_{datasize}")\
                           .master("spark://192.168.2.230:7077")\
                           .config("spark.dynamicAllocation.enabled", True)\
                           .config("spark.executor.instances", executor_nodes)\
                           .config("spark.executor.cores", executor_cores)\
                           .config("spark.dynamicAllocation.minExecutors","1")\
                           .config("spark.dynamicAllocation.maxExecutors","1")\
                           .config("spark.driver.port", 9999)\
                           .config("spark.blockManager.port", 10005)\
                           .getOrCreate()

            # Read data outside of the iteration loop and cache it for reuse
            df = spark_session.read.option("multiline", "true").json(file_paths).cache()
            
            # Apply sentiment analysis once to cache the results
            df_with_sentiment = df.withColumn("sentiment_score", sentiment_udf(col("content"))).cache()

            for iteration in range(iterations):
                start_time = time.time()
                
                # Use the cached dataframe with sentiment analysis
                positive_comments = df_with_sentiment.filter(col("sentiment_score") > 0).count()
                negative_comments = df_with_sentiment.filter(col("sentiment_score") < 0).count()
                
                end_time = time.time()
                processing_time = end_time - start_time
                
                # Append the results to your results list
                results.append({
                    'Workers': executor_nodes,
                    'Cores/W': executor_cores,
                    'Total Cores': executor_cores * executor_nodes,
                    'Chunks': datasize,
                    'Iteration': iteration + 1,
                    'Time (s)': processing_time,
                    'Positive Comments': positive_comments,
                    'Negative Comments': negative_comments
                })
            
            # Clean up cached data and stop Spark session to free resources
            df.unpersist()
            df_with_sentiment.unpersist()
            spark_session.stop()

# Convert the collected results into a pandas DataFrame for analysis
results_df = pd.DataFrame(results)
print(results_df)


[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /home/ubuntu/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/14 18:57:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/14 18:57:59 ERROR DAGScheduler: Failed to update accumulator 0 (org.apache.spark.api.python.PythonAccumulatorV2) for task 1
org.apache.spark.SparkException: EOF reached before Python server acknowledged
	at org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:751)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$updateAccumulators$1(DAGScheduler.scala:1694)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$updateAccumulators$1$adapted(DAGScheduler.scala:1685)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at sc

    Workers  Cores/W  Total Cores  Chunks  Iteration    Time (s)  \
0         1        1            1     100          1   20.115071   
1         1        1            1     100          2    0.560871   
2         1        2            2     100          1   12.566884   
3         1        2            2     100          2    0.427228   
4         2        1            2     100          1   12.538368   
5         2        1            2     100          2    0.504156   
6         2        2            4     100          1    8.108554   
7         2        2            4     100          2    0.368404   
8         1        1            1     600          1   99.718052   
9         1        1            1     600          2    0.748781   
10        1        2            2     600          1   53.806358   
11        1        2            2     600          2    0.571788   
12        2        1            2     600          1   53.068240   
13        2        1            2     600       

24/03/14 19:47:36 ERROR TransportResponseHandler: Still have 2 requests outstanding when connection from /192.168.2.10:52514 is closed
java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@3df2fa93 rejected from java.util.concurrent.ThreadPoolExecutor@4736a4be[Shutting down, pool size = 51, active threads = 0, queued tasks = 0, completed tasks = 82]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at scala.concurrent.impl.ExecutionContextImpl$$anon$4.execute(ExecutionContextImpl.scala:138)
	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted