## Serves as a advanced node model which focuses on building model on multiple nodes

### import packages and read data

In [4]:
!pip install /root/git/msspackages/dist/msspackages-0.0.7-py3-none-any.whl
from msspackages import setup_runner
setup_runner(setup_type = 'notebook' , project = 'understanding-eks-data')

Keyring is skipped due to an exception: 'keyring.backends'
Processing /root/git/msspackages/dist/msspackages-0.0.7-py3-none-any.whl
msspackages is already installed with the same version as the provided wheel. Use --force-reinstall to force an installation of the wheel.
[0mb"Hit:1 http://security.debian.org/debian-security buster/updates InRelease\nHit:2 http://deb.debian.org/debian buster InRelease\nHit:3 http://deb.debian.org/debian buster-updates InRelease\nReading package lists...\nBuilding dependency tree...\nReading state information...\n51 packages can be upgraded. Run 'apt list --upgradable' to see them.\nReading package lists...\nBuilding dependency tree...\nReading state information...\nsudo is already the newest version (1.8.27-1+deb10u4).\n0 upgraded, 0 newly installed, 0 to remove and 51 not upgraded.\nReading package lists...\nBuilding dependency tree...\nReading state information...\ndefault-jre is already the newest version (2:1.11-71).\n0 upgraded, 0 newly installed, 

In [5]:
%pip install hmmlearn
from msspackages import Pyspark_data_ingestion
from pyspark import StorageLevel
from pyspark.sql.functions import *
from hmmlearn import hmm,base
from sklearn.preprocessing import StandardScaler as scale
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt 
%matplotlib inline
import numpy as np
# pandas settings 
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

Keyring is skipped due to an exception: 'keyring.backends'
[0mNote: you may need to restart the kernel to use updated packages.


In [4]:
%%time
node_obj = Pyspark_data_ingestion(year = '2022', month = "7", day = "7", hour = -1, filter_column_value ='Node',setup='128gb')
spark_node = node_obj.get_spark()
err, node_data = node_obj.read()

CPU times: user 22.4 ms, sys: 22.6 ms, total: 45 ms
Wall time: 9.71 s


In [None]:
df = node_data.toPandas()

In [None]:
df.InstanceId[:5]

In [None]:
df = df[df.ClusterName=='nk-ndc-eks-cluster-test-dev-usw2-az2-perf'][['NodeName','Timestamp','node_cpu_utilization','node_memory_utilization']]

In [None]:
def clean_data(df):
    df = df.dropna(subset=['node_cpu_utilization','node_memory_utilization'])
    nodelist = df.NodeName.unique()
    subdfs = []
    for node in nodelist:
        
        subdf = df[df.NodeName==node]
        sc_m = scale()
        sc_c = scale()
        c_s = sc_c.fit_transform(subdf['node_cpu_utilization'].values.reshape(-1,1))
        m_s = sc_m.fit_transform(subdf['node_memory_utilization'].values.reshape(-1,1))
        subdf['node_cpu_utilization'] = c_s
        subdf['node_memory_utilization'] = m_s
        subdfs.append(subdf)
    cleaned = subdfs[0]
    
    for i in subdfs[1:]:
        cleaned = pd.concat([cleaned, i], axis=0)
    return cleaned

In [None]:
import warnings
warnings.filterwarnings('ignore')
new_df = clean_data(df)

## feature selection

In [None]:
len(new_df)

In [None]:
fig,ax = plt.subplots(len(new_df.NodeName.unique()),figsize = (10,40),sharex=True)
i = 0
for node in new_df.NodeName.unique():
    sub = new_df[new_df.NodeName==node]
    ax[i].hist(sub.node_cpu_utilization)
    i+=1
plt.show()
    

In [None]:
new_df.reset_index(inplace=True,drop=True)

In [None]:
new_df.head()

In [None]:
def sample_data(df,data_frac= 0.5,random_state = 1, slice_length = 6):
    res_ind = df.sample(frac=data_frac/6, replace=True, random_state=random_state).index.to_list()
    for i in res_ind :
        for j in range(slice_length):
            res_ind.append(i+j)
            
    return df.iloc[res_ind.sort()]
        
        
    

In [None]:
res_ind = new_df.sample(frac=0.5/6, replace=True, random_state=1).index.to_list()

In [None]:

for i in res_ind :
    for j in range(6):
        res_ind.append(i+j)

In [None]:


res_ind[:5]

In [None]:
new_df.iloc[[5,5]]

## model testing

In [40]:
%%time
test = Pyspark_data_ingestion(year = '2022', month = "7", day = "7", hour = "10", filter_column_value ='Node',setup='128gb')
test_node = test.get_spark()
err, test_df = test.read()

CPU times: user 18.2 ms, sys: 1.03 ms, total: 19.2 ms
Wall time: 272 ms


In [6]:
test_df.show(n=1)

+------------+--------------------+--------------------+--------------------+--------------------+-------------------------------+-------------------------------+-------------------+-------------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------+--------------------+--------------------+-------------+----+-------+--------------------+--------------+----------------+--------------------------+---------------------+--------------------+-------------------+--------------------+-----------------+-------------------+--------------------------------+-----------------------------------+-----------------+-----------------------+---------------------+-------------------+----------------------+-------------------+-----------------------------+---------------+----------------+-----------------+-----------------------+-----------------------+---------------------+-----------------------+----------------------+----------------

### preprocess

In [41]:
features = ['node_cpu_utilization','node_memory_utilization']
#filter inital node df based on request features
node_df =test_df.select("Timestamp", "NodeName", 'node_cpu_utilization','node_memory_utilization')
node_df = node_df.withColumn("Datetime",(col("Timestamp")/1000).cast("timestamp"))

# Drop NA
cleaned_node_df = node_df.na.drop(subset=features)


#Quality(timestamp filtered) nodes
quality_filtered_node_df = cleaned_node_df.groupBy("NodeName").agg(count("Timestamp").alias("timestamp_count"))
quality_filtered_nodes = quality_filtered_node_df.filter(col("timestamp_count").between(45,75))

print(3)
#Processed Node DF                                                      
processed_node_df = cleaned_node_df.filter(col("NodeName").isin(quality_filtered_nodes["NodeName"]))



3


### split data

In [42]:
input_df, test_df = processed_node_df .randomSplit(weights=[0.8,0.2], seed=200)

In [48]:
input_df.show(n=1)

+-------------+--------------------+--------------------+-----------------------+--------------------+
|    Timestamp|            NodeName|node_cpu_utilization|node_memory_utilization|            Datetime|
+-------------+--------------------+--------------------+-----------------------+--------------------+
|1657188364512|ip-100-64-71-112....| 0.24938494304307315|     1.2227495249870506|2022-07-07 10:06:...|
+-------------+--------------------+--------------------+-----------------------+--------------------+
only showing top 1 row



### feature engineering

In [43]:
n = 0
samplesize =1
time_steps=12
final_df = np.zeros((samplesize,time_steps,len(features)+1))

In [44]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

In [56]:
from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.ml import Pipeline


In [57]:
import random
##pick random node
random_nodename = random.choice(input_df.select("NodeName").rdd.flatMap(list).collect())
node_df = input_df[(input_df["NodeName"] ==  random_nodename)][["Timestamp", "NodeName"] + features].select('*')
node_df = node_df.sort("Timestamp")
node_df = node_df.na.drop(subset=features)
    


#standardize data from the node


w = Window.partitionBy('NodeName')
for c in features:
    node_df = (node_df.withColumn('mean', F.mean(c).over(w))
        .withColumn('stddev', F.stddev(c).over(w))
        .withColumn(c, ((F.col(c) - F.col('mean')) / (F.col('stddev'))))
        .drop('mean')
        .drop('stddev'))


    

In [67]:
node_df.columns

['Timestamp', 'NodeName', 'node_cpu_utilization', 'node_memory_utilization']

In [72]:
#pick random time slice of 12 timestamps from this node
final_df = None
start = random.choice(range(node_df.count()-time_steps))
node_slice_df = node_df.withColumn('rn', row_number().over(Window.orderBy("Timestamp"))).filter((col("rn") >= start) & (col("rn") < start+time_steps)).select(["Timestamp"] + features)
node_uti_data = node_slice_df.select(*features).rdd.flatMap(list).collect()

if not final_df:
    final_df = node_slice_df 
else:
    final_df = final_df.union(node_slice_df )


In [73]:
final_df = final_df.groupBy("Timestamp").mean()

['Timestamp', 'node_cpu_utilization', 'node_memory_utilization']

In [85]:
vecAssembler2 = VectorAssembler(inputCols=features, outputCol="features")
test_df = vecAssembler2.transform(final_df)

In [87]:
test_df.columns

['Timestamp', 'node_cpu_utilization', 'node_memory_utilization', 'features']

In [91]:
tensor_list = test_df.select("features").rdd.flatMap(list).collect()
        

In [92]:
tensor_list

[DenseVector([0.6247, -0.1377]),
 DenseVector([0.6129, 0.157]),
 DenseVector([-0.1713, 0.2353]),
 DenseVector([1.1362, -0.079]),
 DenseVector([2.2032, 0.1915]),
 DenseVector([0.6544, 0.5208]),
 DenseVector([0.744, 0.7314]),
 DenseVector([0.0606, -0.0444]),
 DenseVector([0.1141, 0.2514]),
 DenseVector([0.291, 0.3067]),
 DenseVector([-0.0118, 0.4379]),
 DenseVector([0.1642, 2.236])]

In [74]:
final_df.columns

['Timestamp', 'node_cpu_utilization', 'node_memory_utilization']

In [66]:

node_slice_df = node_slice_df.select
final_df[n] = node_slice_df.collect()

AttributeError: 'function' object has no attribute 'collect'

In [79]:
train_df.columns

['Timestamp', 'scaled_features']

In [54]:
train_df = train_df.select("Timestamp","scaled_features")
train_df.groupby("Timestamp").agg(mean("scaled_features").alias("scaled_features"))

AnalysisException: cannot resolve 'avg(scaled_features)' due to data type mismatch: function average requires numeric or interval types, not struct<type:tinyint,size:int,indices:array<int>,values:array<double>>;
'Aggregate [Timestamp#1594], [Timestamp#1594, avg(scaled_features#2054) AS scaled_features#2100]
+- Project [Timestamp#1594, scaled_features#2054]
   +- Project [Timestamp#1594, scaled_features#2054]
      +- Project [Timestamp#1594, scaled_features#2054]
         +- Project [Timestamp#1594, NodeName#1592, node_cpu_utilization#1604, node_memory_utilization#1619, vectorized_features#2045, UDF(vectorized_features#2045) AS scaled_features#2054]
            +- Project [Timestamp#1594, NodeName#1592, node_cpu_utilization#1604, node_memory_utilization#1619, UDF(struct(node_cpu_utilization, node_cpu_utilization#1604, node_memory_utilization, node_memory_utilization#1619)) AS vectorized_features#2045]
               +- Filter atleastnnonnulls(2, node_cpu_utilization#1604, node_memory_utilization#1619)
                  +- Sort [Timestamp#1594 ASC NULLS FIRST], true
                     +- Project [Timestamp#1594, NodeName#1592, node_cpu_utilization#1604, node_memory_utilization#1619]
                        +- Project [Timestamp#1594, NodeName#1592, node_cpu_utilization#1604, node_memory_utilization#1619]
                           +- Filter (NodeName#1592 = ip-172-27-9-79.ec2.internal)
                              +- Sample 0.0, 0.8, false, 200
                                 +- Sort [Timestamp#1594 ASC NULLS FIRST, NodeName#1592 ASC NULLS FIRST, node_cpu_utilization#1604 ASC NULLS FIRST, node_memory_utilization#1619 ASC NULLS FIRST, Datetime#1863 ASC NULLS FIRST], false
                                    +- Filter NodeName#1592 IN (NodeName#1592)
                                       +- Filter atleastnnonnulls(2, node_cpu_utilization#1604, node_memory_utilization#1619)
                                          +- Project [Timestamp#1594, NodeName#1592, node_cpu_utilization#1604, node_memory_utilization#1619, cast((cast(Timestamp#1594 as double) / cast(1000 as double)) as timestamp) AS Datetime#1863]
                                             +- Project [Timestamp#1594, NodeName#1592, node_cpu_utilization#1604, node_memory_utilization#1619]
                                                +- Project [account_id#1537, log_group_name#1538, log_stream_name#1539, record_id#1540, stream_name#1541, record_arrival_stream_timestamp#1542, record_arrival_stream_epochtime#1543L, log_event_timestamp#1544, log_event_epochtime#1545L, log_event_id#1546, AutoScalingGroupName#1587, CloudWatchMetrics#1688, ClusterName#1589, InstanceId#1590, InstanceType#1591, NodeName#1592, Sources#1745, Timestamp#1594, Type#1595, Version#1596, to_json(kubernetes#1597, Some(Etc/UTC)) AS kubernetes#1802, node_cpu_limit#1598L, node_cpu_request#1599L, node_cpu_reserved_capacity#1600, ... 32 more fields]
                                                   +- Project [account_id#1537, log_group_name#1538, log_stream_name#1539, record_id#1540, stream_name#1541, record_arrival_stream_timestamp#1542, record_arrival_stream_epochtime#1543L, log_event_timestamp#1544, log_event_epochtime#1545L, log_event_id#1546, AutoScalingGroupName#1587, CloudWatchMetrics#1688, ClusterName#1589, InstanceId#1590, InstanceType#1591, NodeName#1592, to_json(Sources#1593, Some(Etc/UTC)) AS Sources#1745, Timestamp#1594, Type#1595, Version#1596, kubernetes#1597, node_cpu_limit#1598L, node_cpu_request#1599L, node_cpu_reserved_capacity#1600, ... 32 more fields]
                                                      +- Project [account_id#1537, log_group_name#1538, log_stream_name#1539, record_id#1540, stream_name#1541, record_arrival_stream_timestamp#1542, record_arrival_stream_epochtime#1543L, log_event_timestamp#1544, log_event_epochtime#1545L, log_event_id#1546, AutoScalingGroupName#1587, to_json(CloudWatchMetrics#1588, Some(Etc/UTC)) AS CloudWatchMetrics#1688, ClusterName#1589, InstanceId#1590, InstanceType#1591, NodeName#1592, Sources#1593, Timestamp#1594, Type#1595, Version#1596, kubernetes#1597, node_cpu_limit#1598L, node_cpu_request#1599L, node_cpu_reserved_capacity#1600, ... 32 more fields]
                                                         +- Filter (Type#1595 = Node)
                                                            +- Project [account_id#1537, log_group_name#1538, log_stream_name#1539, record_id#1540, stream_name#1541, record_arrival_stream_timestamp#1542, record_arrival_stream_epochtime#1543L, log_event_timestamp#1544, log_event_epochtime#1545L, log_event_id#1546, log_event_message#1569.AutoScalingGroupName AS AutoScalingGroupName#1587, log_event_message#1569.CloudWatchMetrics AS CloudWatchMetrics#1588, log_event_message#1569.ClusterName AS ClusterName#1589, log_event_message#1569.InstanceId AS InstanceId#1590, log_event_message#1569.InstanceType AS InstanceType#1591, log_event_message#1569.NodeName AS NodeName#1592, log_event_message#1569.Sources AS Sources#1593, log_event_message#1569.Timestamp AS Timestamp#1594, log_event_message#1569.Type AS Type#1595, log_event_message#1569.Version AS Version#1596, log_event_message#1569.kubernetes AS kubernetes#1597, log_event_message#1569.node_cpu_limit AS node_cpu_limit#1598L, log_event_message#1569.node_cpu_request AS node_cpu_request#1599L, log_event_message#1569.node_cpu_reserved_capacity AS node_cpu_reserved_capacity#1600, ... 32 more fields]
                                                               +- Project [account_id#1537, log_group_name#1538, log_stream_name#1539, record_id#1540, stream_name#1541, record_arrival_stream_timestamp#1542, record_arrival_stream_epochtime#1543L, log_event_timestamp#1544, log_event_epochtime#1545L, log_event_id#1546, from_json(StructField(AutoScalingGroupName,StringType,true), StructField(CloudWatchMetrics,ArrayType(StructType(StructField(Dimensions,ArrayType(ArrayType(StringType,true),true),true),StructField(Metrics,ArrayType(StructType(StructField(Name,StringType,true),StructField(Unit,StringType,true)),true),true),StructField(Namespace,StringType,true)),true),true), StructField(ClusterName,StringType,true), StructField(InstanceId,StringType,true), StructField(InstanceType,StringType,true), StructField(NodeName,StringType,true), StructField(Sources,ArrayType(StringType,true),true), StructField(Timestamp,StringType,true), StructField(Type,StringType,true), StructField(Version,StringType,true), StructField(kubernetes,StructType(StructField(host,StringType,true)),true), StructField(node_cpu_limit,LongType,true), StructField(node_cpu_request,LongType,true), StructField(node_cpu_reserved_capacity,DoubleType,true), StructField(node_cpu_usage_system,DoubleType,true), StructField(node_cpu_usage_total,DoubleType,true), StructField(node_cpu_usage_user,DoubleType,true), StructField(node_cpu_utilization,DoubleType,true), StructField(node_memory_cache,LongType,true), StructField(node_memory_failcnt,LongType,true), StructField(node_memory_hierarchical_pgfault,LongType,true), StructField(node_memory_hierarchical_pgmajfault,LongType,true), StructField(node_memory_limit,LongType,true), StructField(node_memory_mapped_file,LongType,true), ... 23 more fields) AS log_event_message#1569, year#1548, month#1549, day#1550, hour#1551, region#1552]
                                                                  +- Relation [account_id#1537,log_group_name#1538,log_stream_name#1539,record_id#1540,stream_name#1541,record_arrival_stream_timestamp#1542,record_arrival_stream_epochtime#1543L,log_event_timestamp#1544,log_event_epochtime#1545L,log_event_id#1546,log_event_message#1547,year#1548,month#1549,day#1550,hour#1551,region#1552] parquet


In [80]:
train_df.columns

['Timestamp', 'scaled_features']

In [None]:
assemblers = [VectorAssembler(inputCols=[col], outputCol=col + "_vec") for col in features]
scalers = [StandardScaler(inputCol=col + "_vec", outputCol=col + "_scaled") for col in features]
pipeline = Pipeline(stages=assemblers + scalers)
scalerModel = pipeline.fit(node_df)
scaledData = scalerModel.transform(node_df)



In [24]:
feature_list = [col + "_scaled" for col in features]

In [25]:
feature_list

['node_cpu_utilization_scaled', 'node_memory_utilization_scaled']

In [29]:
scaledData.select('Timestamp','node_cpu_utilization_scaled', 'node_memory_utilization_scaled').show(n=1)

+-------------+---------------------------+------------------------------+
|    Timestamp|node_cpu_utilization_scaled|node_memory_utilization_scaled|
+-------------+---------------------------+------------------------------+
|1657188485695|       [11.840651108770663]|           [139.4979764656397]|
+-------------+---------------------------+------------------------------+
only showing top 1 row



In [31]:
 #fill the large dataset
        node_slice_df = node_slice_df.select
        final_df[n] = node_slice_df.collect()

ValueError: setting an array element with a sequence.

In [None]:
    n = 0
    samplesize = input_df.count*weight/time_step
    final_df = np.zeros((samplesize,time_steps,len(features)+1))
    while n < samplesize:


        ##pick random node
        random_nodename = random.choice(input_df.select("NodeName").rdd.flatMap(list).collect())
        node_df = input_df[(input_df["NodeName"] ==  random_nodename)][["Timestamp", "NodeName"] + features].select('*')
        node_df = node_df.sort("Timestamp")
        node_df = node_df.na.drop(subset=features)

        #fix negative number bug 
        if node_df.count()-time_steps<= 0:
            print(f'Exception occurred: not enough data')
            continue
            
        #standardize data from the node
         
        assembler = VectorAssembler(inputCols=features, outputCol="vectorized_features")
        scaler = StandardScaler(inputCol = "vectorized_features", outputCol = "scaled_features", withMean=True, withStd=True)
        pipeline = Pipeline(stages=[assembler, scaler])
        node_df = pipeline.fit(node_df).transform(node_df)
    

        #pick random time slice of 12 timestamps from this node
        start = random.choice(range(node_df.count()-time_steps))
        node_slice_df = node_df.withColumn('rn', row_number().over(Window.orderBy("Timestamp"))).filter((col("rn") >= start) & (col("rn") < start+time_steps)).select(["Timestamp"] + features)

        #fill the large dataset
        node_slice_df = node_slice_df.select
        final_df[n] = node_slice_df.collect()

        print(f'Finished with sample #{n}')

        n += 1

    final_df.reshape(time_steps*samplesize,len(features)+1)
 
    final_df.groupBy("Timestamp").mean("no").show(truncate=False)
    

In [None]:
n = 0
samplesize = 3
features = ['node_cpu_utilization','node_memory_utilization']
input_df = test_df
time_steps = 12
final_df = np.zeros((samplesize,time_steps,len(features)+1))
while n < samplesize:
    ##pick random node
    random_nodename = random.choice(input_df.select("NodeName").rdd.flatMap(list).collect())
    node_df = input_df[(input_df["NodeName"] ==  random_nodename)][["Timestamp", "NodeName"] + features].select('*')
    node_df = node_df.sort("Timestamp")
    node_df = node_df.na.drop(subset=features)

    #fix negative number bug 
    if node_df.count()-time_steps<= 0:
        print(f'Exception occurred: not enough data')
        continue

    #pick random time slice of 12 timestamps from the random node
    start = random.choice(range(node_df.count()-time_steps))
    node_slice_df = node_df.withColumn('rn', row_number().over(Window.orderBy("Timestamp"))).filter((col("rn") >= start) & (col("rn") < start+time_steps)).select(["Timestamp"] + features)
    
    #fill the large dataset
    final_df[n] = node_slice_df.collect()
    
    print(f'Finished with sample #{n}')

    n += 1

final_df = final_df.reshape(time_steps*samplesize,len(features)+1).collect()