# Weak Scaling

In [1]:
import json
import time

from pyspark.sql import SparkSession
from nltk.sentiment import SentimentIntensityAnalyzer


In [2]:
def preprocess(rdd):
    rdd = rdd.map(lambda line: json.loads(line))\
             .map(lambda line: line['body']) \
             .map(lambda line: line.strip())
    return rdd


def classify_comments(rdd):
    sia = SentimentIntensityAnalyzer()

    def _classify(comment):
        scores = sia.polarity_scores(comment)
        compound = scores['compound']
        if compound >= 0.05:
            return 'positive'
        elif compound <= -0.05:
            return 'negative'
        else:
            return 'neutral'

    m_rdd = rdd.map(lambda line: (line, _classify(line)))
    return m_rdd


def analyze_comments(rdd):
    rdd = rdd.map(lambda pair: pair[1]) \
             .map(lambda k: (k, 1)) \
             .reduceByKey(lambda v1, v2: v1+v2)\
             .map(lambda kv: (kv[1], kv[0])) \
             .sortByKey(False)\
             .map(lambda vk: (vk[1], vk[0]))
    return rdd


def rdd_slice(rdd, start, end):
    rdd = rdd.zipWithIndex()\
            .filter(lambda kv: kv[1] >= start and kv[1] <= end) \
            .map(lambda kv: kv[0])
    return rdd     


In [3]:
data_path = "RC_2012-12"
executor_memory = "3g"
base_partition = 125
base_data_size = 250000

---

In [4]:
# specify the number of the cores
max_cores = 1

# specify data size
data_size = base_data_size * max_cores

In [5]:
# New API
spark_session = SparkSession\
    .builder\
    .master("spark://master:7077") \
    .appName("haodong_zhao_comment_classification_timing_weak_core1")\
    .config("spark.dynamicAllocation.enabled", True)\
    .config("spark.dynamicAllocation.shuffleTracking.enabled", True)\
    .config("spark.shuffle.service.enabled", False)\
    .config("spark.dynamicAllocation.executorIdleTimeout", "300s")\
    .config("spark.cores.max", max_cores) \
    .config("spark.executor.memory", executor_memory)\
    .config("spark.driver.port", 9999)\
    .config("spark.blockManager.port", 10005)\
    .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

spark_context.setLogLevel("ERROR")


rc_lines = spark_context.textFile('hdfs://master:9000/dataset/' + data_path)
rc_lines = rdd_slice(rc_lines, 0, data_size-1)
rc_lines = rc_lines.repartition(base_partition * max_cores)


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/17 14:59:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/03/17 14:59:21 WARN ExecutorAllocationManager: Dynamic allocation without a shuffle service is an experimental feature.
                                                                                

In [6]:
# Start Timing
start = time.time()

results = analyze_comments(classify_comments(preprocess(rc_lines))).collect()

# End Timing
end = time.time()
elapsed = end - start

                                                                                

In [7]:
print("Elapsed time: {}s".format(elapsed))
print("Results: ", results)

Elapsed time: 283.74720001220703s
Results:  [('positive', 107450), ('neutral', 83054), ('negative', 59496)]


In [8]:
spark_session.stop()


---

In [9]:
# specify the number of the cores
max_cores = 2

# specify data size
data_size = base_data_size * max_cores

In [10]:
spark_session = SparkSession\
    .builder\
    .master("spark://master:7077") \
    .appName("haodong_zhao_comment_classification_timing_weak_core2")\
    .config("spark.dynamicAllocation.enabled", True)\
    .config("spark.dynamicAllocation.shuffleTracking.enabled", True)\
    .config("spark.shuffle.service.enabled", False)\
    .config("spark.dynamicAllocation.executorIdleTimeout", "300s")\
    .config("spark.cores.max", max_cores) \
    .config("spark.executor.memory", executor_memory)\
    .config("spark.driver.port", 9999)\
    .config("spark.blockManager.port", 10005)\
    .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

spark_context.setLogLevel("ERROR")


rc_lines = spark_context.textFile('hdfs://master:9000/dataset/' + data_path)
rc_lines = rdd_slice(rc_lines, 0, data_size-1)
rc_lines = rc_lines.repartition(base_partition * max_cores)


                                                                                

In [11]:
# Start Timing
start = time.time()

results = analyze_comments(classify_comments(preprocess(rc_lines))).collect()

# End Timing
end = time.time()
elapsed = end - start

                                                                                

In [12]:
print("Elapsed time: {}s".format(elapsed))
print("Results: ", results)


Elapsed time: 237.2197871208191s
Results:  [('positive', 213214), ('neutral', 165836), ('negative', 120950)]


In [13]:
spark_session.stop()


---

In [14]:
# specify the number of the cores
max_cores = 4

# specify data size
data_size = base_data_size * max_cores

In [15]:
spark_session = SparkSession\
    .builder\
    .master("spark://master:7077") \
    .appName("haodong_zhao_comment_classification_timing_weak_core4")\
    .config("spark.dynamicAllocation.enabled", True)\
    .config("spark.dynamicAllocation.shuffleTracking.enabled", True)\
    .config("spark.shuffle.service.enabled", False)\
    .config("spark.dynamicAllocation.executorIdleTimeout", "300s")\
    .config("spark.cores.max", max_cores) \
    .config("spark.executor.memory", executor_memory)\
    .config("spark.driver.port", 9999)\
    .config("spark.blockManager.port", 10005)\
    .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

spark_context.setLogLevel("ERROR")


rc_lines = spark_context.textFile('hdfs://master:9000/dataset/' + data_path)
rc_lines = rdd_slice(rc_lines, 0, data_size-1)
rc_lines = rc_lines.repartition(base_partition * max_cores)

                                                                                

In [16]:
# Start Timing
start = time.time()

results = analyze_comments(classify_comments(preprocess(rc_lines))).collect()

# End Timing
end = time.time()
elapsed = end - start

                                                                                

In [17]:
print("Elapsed time: {}s".format(elapsed))
print("Results: ", results)

Elapsed time: 200.67379927635193s
Results:  [('positive', 423836), ('neutral', 332980), ('negative', 243184)]


In [18]:
spark_session.stop()

---

In [19]:
# specify the number of the cores
max_cores = 8

# specify data size
data_size = base_data_size * max_cores

In [20]:
# New API
spark_session = SparkSession\
    .builder\
    .master("spark://master:7077") \
    .appName("haodong_zhao_comment_classification_timing_weak_core8")\
    .config("spark.dynamicAllocation.enabled", True)\
    .config("spark.dynamicAllocation.shuffleTracking.enabled", True)\
    .config("spark.shuffle.service.enabled", False)\
    .config("spark.dynamicAllocation.executorIdleTimeout", "300s")\
    .config("spark.cores.max", max_cores) \
    .config("spark.executor.memory", executor_memory)\
    .config("spark.driver.port", 9999)\
    .config("spark.blockManager.port", 10005)\
    .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

spark_context.setLogLevel("ERROR")


rc_lines = spark_context.textFile('hdfs://master:9000/dataset/' + data_path)
rc_lines = rdd_slice(rc_lines, 0, data_size-1)
rc_lines = rc_lines.repartition(base_partition * max_cores)

                                                                                

In [21]:
# Start Timing
start = time.time()

results = analyze_comments(classify_comments(preprocess(rc_lines))).collect()

# End Timing
end = time.time()
elapsed = end - start

                                                                                

In [22]:
print("Elapsed time: {}s".format(elapsed))
print("Results: ", results)

Elapsed time: 179.45079278945923s
Results:  [('positive', 849362), ('neutral', 658150), ('negative', 492488)]


In [23]:
spark_session.stop()

---