In [None]:
import requests
import pandas as pd
import re 
import json


# Elastic connection
ELASTIC_URL = "<cluster:port>"
API_KEY = "<API Key>"


headers = {
    "Authorization": f"ApiKey {API_KEY}"
}

# function for converting the store size in integer bytes
def size_to_bytes(size_str):
    """
    Converts a data size string (e.g., '112kb', '2gb', '55mb') to bytes.

    Args:
        size_str: The data size string.

    Returns:
        The size in bytes as an integer, or 0 if the format is invalid.
    """
    size_str = size_str.lower().strip()
    
    # Regular expression to extract the numerical value and the unit
    match = re.match(r'^(\d+\.?\d*)\s*([kmgtp]?b?)$', size_str)
    
    if not match:
        return 0

    value_str, unit = match.groups()
    value = float(value_str)

    # Conversion factors (powers of 1024 for binary prefixes)
    units = {
        'b': 1,
        'kb': 1024,
        'mb': 1024**2,
        'gb': 1024**3,
        'tb': 1024**4,
        'pb': 1024**5,
    }
    
    # Handle cases where only the letter is provided (e.g., 'k' for 'kb')
    if len(unit) == 1 and unit != 'b':
        unit += 'b'

    return int(value * units.get(unit, 1))

# Conversion factor to logsdb
# The settings call below retrieve 2 values , index.mode and source mode, both values are not always there: 
# conversion calculation has therefore a mixed approach

def conversion_factor(index_mode):
   lookup={"logsdb":1 , "time_series":1.0, "standard":0.5}
   
   return lookup.get(index_mode,1)



In [2]:
# Get a list of nodes 
api_url = ELASTIC_URL+"/_cat/nodes?format=json"
response = requests.get(api_url, headers=headers)
response.raise_for_status()

nodes_json = response.json()
df_nodes = pd.DataFrame(nodes_json)

#add note.type column
df_nodes["node.type"]=""

#Calculate node type
for i in range(len(df_nodes)):
  target_row=df_nodes.index[i]
  # filtering for only hot,warm and cold nodes ! Adjust if you want to take along frozen nodes !!
  df_nodes.at[target_row,'node.type']=re.sub("d|f|i|l|m|r|s|t|v|\-","",df_nodes.at[target_row,'node.role'])

df_hwc=df_nodes[['name','node.role','node.type']]

mask=df_hwc['node.type']!=''
df_hwc=df_hwc[mask]
df_hwc=df_hwc.rename(columns={'name':'node'})



In [3]:
# Get a list of shards
api_url = ELASTIC_URL + "/_cat/shards?format=json"
response = requests.get(api_url, headers=headers)
response.raise_for_status()

shards_json = response.json()
df_shards = pd.DataFrame(shards_json)
# Filter for data streams (.ds* indices) only !!!
mask = (df_shards['index'].str.contains('.ds')) & (df_shards['docs'].astype(int) > 0)
df_ds = df_shards[mask].copy() 
df_ds['store_bytes'] = df_ds['store'].astype(str).apply(size_to_bytes)


In [4]:
# Get Settings

api_url = ELASTIC_URL + "/_settings/?include_defaults&filter_path=*.settings.index.*"
response = requests.get(api_url, headers=headers)
response.raise_for_status()

response_json = response.json()
df_settings=pd.DataFrame(columns=['index','index.mode'])

for key in response_json:
    index_mode = response_json[key].get('settings', {}).get('index', {}).get('mode', 'standard')
    
    df_settings.loc[len(df_settings)] = [key, index_mode]



In [5]:
# Merge the nodes, shards , and settings dataframes to one , and calculate per shard the impact of a conversion

df_merged = pd.merge(pd.merge(df_hwc, df_ds, on='node', how='inner'), df_settings, on='index', how='inner')
df_merged['store_optimised'] = [
	conversion_factor(str(row['index.mode']))
	* row['store_bytes']
	for _, row in df_merged.iterrows()
]


In [6]:
# aggregate per instance store size and optimised size, calculate percentual benefit and convert values to a friendly format
agg_dict={
    'node' :'max',
    'node.type' : 'max',
    'store_bytes': 'sum',
    'store_optimised': 'sum'
}
df_summary=df_merged.groupby(['node','node.type']).agg(agg_dict)

df_summary['perc']=round((df_summary['store_bytes']- df_summary['store_optimised'])/df_summary['store_bytes']*100,1)
df_summary['store_gb']=round(df_summary['store_bytes']/1024/1024)
df_summary['optimised_gb']=round(df_summary['store_optimised']/1024/1024)

# Display result

output_table = df_summary[['node', 'node.type','store_gb','optimised_gb','perc']].rename(columns={
    'node': 'Instance',
     'node.type': 'Tier',
     'store_gb' : 'Current Volume (gb)',
     'optimised_gb': 'Est. Volume (gb)',
     'perc' : '% Change'
     })

display(output_table)

Unnamed: 0_level_0,Unnamed: 1_level_0,Instance,Tier,Current Volume (gb),Est. Volume (gb),% Change
node,node.type,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
instance-0000000000,h,instance-0000000000,h,452.0,447.0,1.0
instance-0000000003,h,instance-0000000003,h,452.0,447.0,1.0
