In [5]:
import matplotlib.pyplot as plt
import numpy as np
from pyspark.sql import SparkSession
from matplotlib.ticker import ScalarFormatter
import statsmodels.api as sm

data_path = "hdfs://10.1.4.11:9000/user/hduser/"

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("TraceAnalysis") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()


In [None]:
# Read batck_task parquet
df_batch_task = spark\
    .read\
    .parquet(data_path + "batch_task_parquet")
print("batch_task:")
df_batch_task.show()
df_batch_task.createOrReplaceTempView("batch_task")

# Read batch_instance parquet
df_batch_instance = spark\
    .read\
    .parquet(data_path + "batch_instance_parquet")
print("batch_instance:")
df_batch_instance.show()
df_batch_instance.createOrReplaceTempView("batch_instance")


In [None]:
######################## Write staging results to HDFS ####################################

# # Write job total time to staging results
# df_batch_task = spark.sql("SELECT job_name, SUM(end_time - start_time) AS duration FROM batch_task GROUP BY job_name")
# df_batch_task.write.parquet("batch_task_staging/job_duration_parquet")
# 
# # Write task total time to staging results
# df_batch_task = spark.sql("SELECT task_name, SUM(end_time - start_time) AS duration FROM batch_task GROUP BY task_name")
# df_batch_task.write.parquet("batch_task_staging/task_duration_parquet")
# 
# # Write instance total time to staging results
# df_batch_instance = spark.sql("SELECT instance_name, SUM(end_time - start_time) AS duration FROM batch_instance GROUP BY instance_name")
# df_batch_instance.write.parquet("batch_instance_staging/instance_duration_parquet")

###########################################################################################


In [1]:

######################## Write staging results to HDFS ####################################

# Read staging results
duration = spark.read.parquet("batch_task_staging/job_duration_parquet").filter("duration >= 0")

# Reshape the list to 1-d array
rows = duration.count()
data = np.reshape(duration.select("duration").collect(), rows)


NameError: name 'spark' is not defined

In [None]:
# Compute ecdf and values of x and y
ecdf = sm.distributions.ECDF(data)
x = np.linspace(min(data), max(data))
y = ecdf(x)

# Get the first index of x when y = 0.99
x_99 = np.argwhere(y >= 0.99)[0][0]

# Set layout
plt.figure(figsize=[11, 4])
plt.xscale('log')
plt.xlabel("Duration(seconds)")
plt.ylabel("CDF")
plt.xlim(0.5, 10 ** 6 + 100)
plt.ylim(0, 1.01)
# plt.xticks(10 ** np.arange(0, 7))
# plt.yticks(np.arange(0, 1.2, 0.2))

ax = plt.gca()
# ax.xaxis.set_major_formatter(ScalarFormatter())
ax.set_yticklabels(["{:.0f}%".format(y * 100) for y in ax.get_yticks()])

# Plot
plt.plot(x, y, label="job")
plt.vlines(x_99, 0, 0.99, colors="r", linestyles="dashed", label="99% job")
plt.text(x_99, 0.99, "101", size=18, position=(x_99, 0.5))
plt.legend(loc='center right')
plt.grid()
plt.show()



"""

task_duration = spark.read.parquet("batch_task_staging/task_duration_parquet")
ins_duration = spark.read.parquet("batch_instance_staging/instance_duration")

# Reshape the list to 1-d array
job_rows = job_duration.count()
task_rows = task_duration.count()
ins_rows = ins_duration.count()

job_data = np.reshape(job_duration.select("duration").collect(), job_rows)
task_data = np.reshape(task_duration.select("duration").collect(), task_rows)
ins_data = np.reshape(ins_duration.select("duration").collect, ins_rows)


# Compute ecdf and values of x and y
job_ecdf = sm.distributions.ECDF(job_data)
task_ecdf = sm.distributions.ECDF(task_data)
ins_ecdf = sm.distributions.ECDF(ins_data)

job_x = np.linspace(min(job_ecdf), max(job_ecdf))
job_y = job_ecdf(job_x)
job_99 =









df_container_meta = df_container_meta\
    .select("container_id", "app_du")\
    .dropDuplicates()\
    .groupBy("app_du")\
    .agg({"app_du": "count"})

df_container_meta.show()

# Reshape the list to 1-d array
rows = df_container_meta.count()
data = np.reshape(df_container_meta.select("count(app_du)").collect(), rows)

# Compute ecdf and values of x and y
ecdf = sm.distributions.ECDF(data)
x = np.linspace(min(data), max(data))
y = ecdf(x)

# Set layout
plt.figure(figsize=[11, 4])
plt.xscale('log')
plt.xlabel("Container")
plt.ylabel("CDF")
plt.xlim(1, 600)
plt.ylim(0, 1.01)
plt.xticks(2 ** np.arange(10))
plt.yticks(np.arange(0, 1.2, 0.2))

ax = plt.gca()
ax.xaxis.set_major_formatter(ScalarFormatter())
ax.set_yticklabels(["{:.0f}%".format(y * 100) for y in ax.get_yticks()])

# Plot
plt.plot(x, y, label="service")
plt.vlines(101, 0, 0.99, colors="r", linestyles="dashed", label="99% service")
plt.text(101, 0.99, "101", size=18, position=(101, 0.5))
plt.legend(loc='upper right')
plt.grid()
plt.show()
"""
