In [17]:
%%time

import polars as pl

def pl_replace_from_to(column, from_, to_):
  """
  Produces an expression for polars to replace a `from` values
  to `to` values inside a specified column.
  """
  branch = pl.when(pl.col(column) == from_[0]).then(to_[0])
  for (from_value, to_value) in zip(from_, to_):
    branch = branch.when(pl.col(column) == from_value).then(to_value)
  return branch.otherwise(pl.col(column)).alias(column)

def pl_replace(column, mapping):
  from_ = [k for k, _ in sorted(mapping.items())]
  to_   = [v for _, v in sorted(mapping.items())]
  return pl_replace_from_to(column, from_, to_)

FILE_PATH = "/home/ubuntu/dos-californium/deter/expdata/real/final/cloud_proxy_and_500mbpsattacker_withattacker/cloud_proxy_and_500mbpsattacker_withattacker*"

df = (
  pl
  .scan_csv(FILE_PATH, use_pyarrow=True)
  .collect()
)

df

CPU times: user 22.3 ms, sys: 7.2 ms, total: 29.5 ms
Wall time: 6.18 ms


device_name,metric_name,trial,metric_value,observation_timestamp
str,str,i64,f64,i64
"""proxy""","""cpu_utilization""",1,0.6,1648839200
"""proxy""","""cpu_utilization""",1,14.4,1648839201
"""proxy""","""cpu_utilization""",1,20.0,1648839202
"""proxy""","""cpu_utilization""",1,7.2,1648839203
"""proxy""","""cpu_utilization""",1,1.5,1648839204
"""proxy""","""cpu_utilization""",1,0.7,1648839205
"""proxy""","""cpu_utilization""",1,0.4,1648839206
"""proxy""","""cpu_utilization""",1,1.0,1648839207
"""proxy""","""cpu_utilization""",1,0.7,1648839208
"""proxy""","""cpu_utilization""",1,4.4,1648839209


In [18]:
%%time

# Rename if necessary
df = df.with_columns([
  pl_replace("device_name", {"server": "originserver"})
])

# Compute the minimum timestamp per trial
min_timestamp_df = (
  df
  .groupby(["trial", "device_name"])
  .agg([
    pl.col("observation_timestamp").min().alias("min_observation_timestamp")
  ])
)

joined_df = (
  # Join minimum timestamp to the metric data
  df
  .join(min_timestamp_df, on=["trial", "device_name"])
  
  # Normalize time stamps
  .with_columns([
    (pl.col("observation_timestamp") - pl.col("min_observation_timestamp")).alias("observation_timestamp")
  ])
  
  # Drop unnecessary columns
  .drop([
    "min_observation_timestamp"
  ])
)

joined_df

CPU times: user 71.3 ms, sys: 33.2 ms, total: 104 ms
Wall time: 15.4 ms


device_name,metric_name,trial,metric_value,observation_timestamp
str,str,i64,f64,i64
"""proxy""","""cpu_utilization""",1,0.6,0
"""proxy""","""cpu_utilization""",1,14.4,1
"""proxy""","""cpu_utilization""",1,20.0,2
"""proxy""","""cpu_utilization""",1,7.2,3
"""proxy""","""cpu_utilization""",1,1.5,4
"""proxy""","""cpu_utilization""",1,0.7,5
"""proxy""","""cpu_utilization""",1,0.4,6
"""proxy""","""cpu_utilization""",1,1.0,7
"""proxy""","""cpu_utilization""",1,0.7,8
"""proxy""","""cpu_utilization""",1,4.4,9
