<a href="https://colab.research.google.com/github/ErnstHolger/jupyter_notebook/blob/main/OSIsoft_PI_Compression.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import math
import datetime
import plotly.express as px

In [None]:
def slope(firstValue, lastValue, threshold):
  diff= (lastValue[0]-firstValue[0])
  return (lastValue[1]+threshold-firstValue[1])/(diff.total_seconds() * 1000)

In [None]:
timeValue=(float,float)


In [None]:
class baseCompression(object):
  def calculate(self, timeStamp:datetime, value:float):
    return timeStamp,value
  def calculateList(self, x, y, includeLast=True):
    xc=[]
    yc=[]
    for i in range(len(x)):
      pval=self.calculate(x[i],y[i])
      if pval:
        xc.append(pval[0])
        yc.append(pval[1])
    if includeLast and xc[-1]!=x[-1]:
      xc.append(x[-1])
      yc.append(y[-1])
    return xc, yc

    

In [None]:
class exception(baseCompression):
  def __init__(self, threshold):
    self.threshold=threshold
    self.lastRecordedValue = None
    self.previousValue = None
    self.queue=[]
  def calculate(self, timeStamp:datetime, value:float)->timeValue:
    dv=(timeStamp, value)
    if not self.lastRecordedValue:
      self.lastRecordedValue=dv
      return dv
    if abs(dv[1] - self.lastRecordedValue[1]) > self.threshold:
      self.lastRecordedValue=dv
      if self.previousValue:
        self.queue.append(self.previousValue)
      self.queue.append(dv)
      self.previousValue=None
    else:
      self.previousValue=dv
    if self.queue:
      return self.queue.pop(0)
    else:
      return None


In [None]:
class compression(baseCompression):
  def __init__(self, threshold):
    self.threshold=threshold
    self.heldPoint=None
    self.upperSlope=0
    self.lowerSlope=0
    self.archivedPoint=None
    self.heldPoint=None

  def calculate(self, timeStamp:datetime, value:float)->timeValue:
    dv=(timeStamp, value)
    if not self.archivedPoint:
      self.archivedPoint = dv
      self.LastRecordedValue = dv
      return dv
    if not self.heldPoint:
      self.heldPoint=dv
      self.upperSlope = slope(self.archivedPoint, self.heldPoint, self.threshold);
      self.lowerSlope = slope(self.archivedPoint, self.heldPoint, -self.threshold);
      return None
    else:
      pointSlope = slope(self.archivedPoint, dv, 0);
      newUpperSlope = slope(self.archivedPoint, dv, self.threshold)
      newLowerSlope = slope(self.archivedPoint, dv, -self.threshold)
      if pointSlope <= self.upperSlope and pointSlope >= self.lowerSlope:
        self.upperSlope = min(self.upperSlope, newUpperSlope);
        self.lowerSlope = min(self.lowerSlope, newLowerSlope);
        self.heldPoint = dv
        return None
      else:
        self.archivedPoint = self.heldPoint
        self.heldPoint = dv
        self.upperSlope = slope(self.archivedPoint, self.heldPoint, self.threshold);
        self.lowerSlope = slope(self.archivedPoint, self.heldPoint, -self.threshold);
        self.lastRecordedValue = dv;
        return self.archivedPoint

create some sample data and apply osisoft pi compression and exception algorithms

In [None]:
def create_plot(x,y,xc,yc, name):
  fig=px.line(x=x,y=y)
  fig.add_scatter(x=xc,y=yc,name=name,mode='lines+markers')
  fig.update_layout({'plot_bgcolor': 'rgba(0, 0, 0, 0)',
              'paper_bgcolor': 'rgba(0, 0, 0, 0)',
              'font_family':"Courier New",
              'font_size':14,
              'font_color':"white",
              'title_font_family':"Courier New",
              'title_font_color':"white",})
  fig.show()

In [None]:
x=list(range(400))
y=[math.sin(i/10) for i in x]
dt=datetime.datetime(2022,1,1,8,0,0)
x=[dt + datetime.timedelta(seconds=i) for i in x]

exc=exception(0.15)
xc,yc=exc.calculateList(x,y)
create_plot(x,y, xc,yc,'exception')


comp=compression(0.005)
xc,yc=comp.calculateList(x,y)
create_plot(x,y,xc,yc,'compression')
