Skip to content

Commit

Permalink
Merge pull request #43 from anagainaru/features
Browse files Browse the repository at this point in the history
Bug fixes
  • Loading branch information
anagainaru committed Jun 18, 2021
2 parents 77c440e + 70f42d5 commit 951eb57
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 33 deletions.
46 changes: 38 additions & 8 deletions analysis/clustering/extract_clusters.py
Expand Up @@ -18,13 +18,35 @@
# default path to save the cluster information
path = "./test.clusters/"

def get_clustering_columns(df):
log_columns = set([c for c in df.columns if 'perc' in c.lower()
or 'log10' in c.lower()])
feature_list = set()
with open("../features.filter.header") as f:
for line in f:
feature_list.add(line[:-1])
log_columns = log_columns - feature_list
return log_columns

# filter all entries that do very little IO (time or amount)
def fileter_entries(df):
df = df.fillna(0)
df["IO_runtime"] = df.Total_runtime * df.IO_runtime_perc
df["IO_total_bytes"] = df.POSIX_IO_total_bytes + df.MPIIO_IO_total_bytes +\
df.HDF5_IO_total_bytes
df = df[df.IO_runtime > 60]
df = df[df.IO_total_bytes > 1024]
return df

def default_dataset(paths):
df = pd.read_csv(paths[0])
print("Before filter:", len(df))
df = fileter_entries(df)
print("After filter:", len(df))
log_columns = get_clustering_columns(df)
print("Total columns %d, clustering columns %d" %(
len(df.columns), len(log_columns)))
# Use HDBSCAN
log_columns = set([c for c in df.columns if 'perc' in c.lower()
or 'log10' in c.lower()])
log_columns -= set(['Total_procs', 'RAW_runtime', 'users',
'apps', 'apps_short'])
clusterer = hdbscan.HDBSCAN(min_samples=10, cluster_selection_epsilon=5,
metric='manhattan', gen_min_span_tree=True,
core_dist_n_jobs=8)
Expand All @@ -35,6 +57,8 @@ def default_dataset(paths):
# for the child described by the column in the dataframe
def get_child_info(df, distance, column_name):
child_id = int(df[df.distance == distance][column_name].values[0])
if len(df[df.parent == child_id]) == 0:
return child_id, 0
count = int(df[df.parent == child_id]["size"].values[0])
return child_id, count

Expand Down Expand Up @@ -104,17 +128,21 @@ def analyze_cluster_formation(dataset, df, cluster, split_distances):
"left_child")
right_id, right_count = get_child_info(df, distance,
"right_child")
if left_count == 0 or right_count == 0:
# print("Noisy point, skip distance %f" %(distance))
continue
# identify the left and right cluster ids
print("Split at distance %f" %(distance))

# get the index of the entries from the dataframe that have been split
# at this level and dump the rest of the entries
left_cl_idx = [i for i in range(len(dataset))
if left_id in classification[i]]
if i in classification and left_id in classification[i]]
assert(len(left_cl_idx) == left_count), \
"Error identifying the clusters at distance %f" %(distance)
right_cl_idx = [i for i in range(len(dataset))
if right_id in classification[i]]
if i in classification and
right_id in classification[i]]
assert(len(right_cl_idx) == right_count), \
"Error identifying the clusters at distance %f" %(distance)

Expand All @@ -124,10 +152,10 @@ def analyze_cluster_formation(dataset, df, cluster, split_distances):
dataset, left_cl_idx, right_cl_idx, left_count, right_count)
print("Left cluster (%d): %d runs %s" %(
left_id, left_count,
filtered_df[filtered_df.Cluster==0]["apps"].unique()))
filtered_df[filtered_df.Cluster==0]["app_name"].unique()))
print("Right cluster (%d): %d runs %s" %(
right_id, right_count,
filtered_df[filtered_df.Cluster==1]["apps"].unique()))
filtered_df[filtered_df.Cluster==1]["app_name"].unique()))

# make sure the clusters are correct
check_cluster_goodness(cluster, distance, left_cl_idx, right_cl_idx)
Expand All @@ -148,6 +176,8 @@ def main(top_apps=6, jobs_per_app=16):
df, cluster = default_dataset(paths=[sys.argv[1]])
# save the cluster with all the labes
save_clusters(0, df, cluster.labels_)
print("Optimal number of clusters:", max(cluster.labels_)+1)
print("Number of noisy points:", len([i for i in cluster.labels_ if i<0]))

# extract the hierarchy of binary splits
tree = cluster.single_linkage_tree_.to_pandas()
Expand Down
11 changes: 6 additions & 5 deletions analysis/extract_logs/extract_logs.sh
@@ -1,18 +1,19 @@
#!/bin/bash
module load darshan-util

mkdir logs_againaru
for j in {9..12}; do
app=$1
mkdir logs_${app}
for j in {1..12}; do
echo "/gpfs/alpine/darshan/summit/2020/$j"
echo "---"
for i in /gpfs/alpine/darshan/summit/2020/${j}/*/*againaru*; do
for i in /gpfs/alpine/darshan/summit/2020/${j}/*/*_${app}_*; do
ls $i
if [ $? -eq 0 ]; then
filename=$(basename -- "$i")
extension="${filename##*.}"
filename="${filename%.*}"
darshan-parser --file $i > logs_againaru/${filename}.log
darshan-parser $i >> logs_againaru/${filename}.log
darshan-parser --file $i > logs_${app}/${filename}.log
darshan-parser $i >> logs_${app}/${filename}.log
else
continue
fi
Expand Down
45 changes: 34 additions & 11 deletions analysis/features/extract_aggregated_features.py
Expand Up @@ -125,6 +125,8 @@ def convert_counters_in_perc(df, total_access, IOtype, agg="AGG_"):

def RW_features(df, total_bytes, IOtype):
feature_list = {}
if total_bytes == 0:
total_bytes = 1
type_ops = ["READ", "WRITTEN"]
for op in type_ops:
feature_list["%s_BYTES_%s_PERC" %(IOtype, op)] = \
Expand Down Expand Up @@ -195,20 +197,39 @@ def additional_MPIIO_features(df, total_files):
df[df.Counter=="MPIIO_VIEWS"]["Value"].sum() / total_files
return feature_list

def rank_features(df, IOtype):
def rank_features(df, IOtype, IOranks):
feature_list = {}
IOtypeREAD = df[(df.Counter.str.contains(IOtype)) &
(df.Counter.str.contains("READ"))]["Rank"].unique()
IOtypeWRITE = df[(df.Counter.str.contains(IOtype)) &
(df.Counter.str.contains("WRITE"))]["Rank"].unique()
feature_list[IOtype + "_read_ranks_perc"] = len(IOtypeREAD)
feature_list[IOtype + "_write_ranks_perc"] = len(IOtypeWRITE)
rankList = df[(df.Counter.str.contains(IOtype)) &
(df.Counter.str.contains("READ"))]["Rank"].unique()
feature_list[IOtype + "_read_ranks_perc"] = len(rankList)
# collective operations
if -1 in rankList:
feature_list[IOtype + "_read_ranks_perc"] = IOranks
rankList = df[(df.Counter.str.contains(IOtype)) &
(df.Counter.str.contains("WRITE"))]["Rank"].unique()
feature_list[IOtype + "_write_ranks_perc"] = len(rankList)
# collective operations
if -1 in rankList:
feature_list[IOtype + "_write_ranks_perc"] = IOranks
rankList = df[df.Counter.str.contains("READ")]["Rank"].unique()
feature_list["Total_read_ranks_perc"] = len(rankList)
if -1 in rankList:
feature_list["Total_read_ranks_perc"] = IOranks
rankList = df[df.Counter.str.contains("WRITE")]["Rank"].unique()
feature_list["Total_write_ranks_perc"] = len(rankList)
if -1 in rankList:
feature_list["Total_write_ranks_perc"] = IOranks
return feature_list

def aggregated_features(darshan_file, IOtype_list, total_runtime):
def aggregated_features(darshan_file, IOtype_list,
total_runtime, total_ranks):
df = read_aggregated_log(darshan_file)
feature_list = {}
feature_list["IO_ranks"] = len(df[df.Rank >= 0]["Rank"].unique())
# if the application does IO collective opperations
if -1 in df.Rank.unique():
feature_list["IO_ranks"] = total_ranks
else:
feature_list["IO_ranks"] = len(df[df.Rank >= 0]["Rank"].unique())
for IOtype in IOtype_list:
feature_list.update(overall_features(df, IOtype, total_runtime))
feature_list.update(file_features(df, IOtype))
Expand All @@ -222,9 +243,11 @@ def aggregated_features(darshan_file, IOtype_list, total_runtime):
if IOtype == "POSIX":
agg = ""
feature_list.update(convert_counters_in_perc(
df, feature_list["%s_IO_total_accesses" %(IOtype)], IOtype, agg=agg))
df, feature_list["%s_IO_total_accesses" %(IOtype)],
IOtype, agg=agg))

feature_list.update(rank_features(df, IOtype))
feature_list.update(rank_features(
df, IOtype, feature_list["IO_ranks"]))

if "MPIIO" in IOtype_list:
feature_list.update(additional_MPIIO_features(
Expand Down
15 changes: 6 additions & 9 deletions analysis/features/extract_feature_list.py
Expand Up @@ -32,14 +32,9 @@ def create_empty_dict():
def update_allRank_metrics(feature_list, IOtype_list, total_IO_ranks):
feature_list["IO_ranks_perc"] = feature_list["IO_ranks"] /\
feature_list["Total_procs"]
feature_list["Total_read_ranks_perc"] = 0
feature_list["Total_write_ranks_perc"] = 0
for IOtype in IOtype_list:
feature_list["Total_read_ranks_perc"] += \
feature_list[IOtype + "_read_ranks_perc"]
feature_list["Total_write_ranks_perc"] += \
feature_list[IOtype + "_write_ranks_perc"]
print(IOtype, feature_list[IOtype + "_read_ranks_perc"], feature_list[IOtype + "_write_ranks_perc"], total_IO_ranks)
print(IOtype, feature_list[IOtype + "_read_ranks_perc"],
feature_list[IOtype + "_write_ranks_perc"], total_IO_ranks)
# normalize the features for each IOtype
feature_list[IOtype + "_read_ranks_perc"] /= total_IO_ranks
feature_list[IOtype + "_write_ranks_perc"] /= total_IO_ranks
Expand Down Expand Up @@ -68,7 +63,8 @@ def update_allIO_metrics(feature_list, IOtype_list, IOtype_used):
feature_list["%s_write_runtime_perc" %(IOtype)]
feature_list["Metadata_runtime_perc"] += \
feature_list["%s_metadata_runtime_perc" %(IOtype)]
# once the information is no longer needed, normalize it
for IOtype in IOtype_list:
# normalize all the entries
feature_list["%s_read_runtime_perc" %(IOtype)] /= \
feature_list["%s_IO_runtime_perc" %(IOtype)]
feature_list["%s_write_runtime_perc" %(IOtype)] /= \
Expand Down Expand Up @@ -111,7 +107,8 @@ def update_allIO_metrics(feature_list, IOtype_list, IOtype_used):

# extract features from the aggregated logs
features, IOtype_used= aggregated_features(
darshan_file, IOtype_list, feature_list["Total_runtime"])
darshan_file, IOtype_list, feature_list["Total_runtime"],
feature_list["Total_procs"])
#print(set(features) - set(feature_list.keys()))
feature_list.update(features)
print("Aggregated features:", len(feature_list))
Expand Down

0 comments on commit 951eb57

Please sign in to comment.