In [368]:
import math
import numpy as np
import pandas as pd
import sys
from collections import defaultdict

In [468]:
class rainflow():
    def __init__(self, binsize_mean = 50, binsize_rng = 50, min_mean=-100, min_rng = -100, max_mean=100, max_rng = 100):
        self.points = []
        self.counts = defaultdict(float)
        self.binsize_mean = binsize_mean
        self.binsize_rng = binsize_rng
        self.min_mean = min_mean
        self.min_rng = min_rng
        self.max_mean = max_mean
        self.max_rng = max_rng
        self.nmax_mean = int(np.ceil((max_mean - min_mean)/binsize_mean))
        self.nmax_rng = int(np.ceil((max_rng - min_rng)/binsize_rng))
        
        for i in range(1, self.nmax_rng):
            for j in range(1, self.nmax_mean):
                self.counts.setdefault((np.round(i * binsize_rng+min_rng,2),np.round(j * binsize_mean+min_mean,2)), 0.0)

        
    def reversals(self, x_next):
        result = False 
        if len(self.points) <= 1:
            result = True
        elif self.points[-1] != x_next:
            d_last = (self.points[-1] - self.points[-2])
            d_next = x_next - self.points[-1]
            if d_last * d_next < 0:
                result = True
        return result
    
    def format_output(point1, point2, count):
        x1 = point1
        x2 = point2
        rng = abs(x1 - x2)
        mean = 0.5 * (x1 + x2)
        return [rng, mean, count, x1, x2]

    def extract_cycles(self, x_next):
        result = 0
        tmp = self.reversals(x_next)
        if len(self.points) > 0 and tmp == False:
            self.points.pop()
            self.points.append(x_next)
        else:
            self.points.append(x_next)
            while len(self.points) >= 4:
                # Form ranges X and Y from the three most recent points
                x1, x2, x3 = self.points[-4], self.points[-3], self.points[-2]
                X = abs(x3 - x2)
                Y = abs(x2 - x1)

                if X < Y:
                    # Read the next point
                    break
                elif len(self.points) == 4:
                    # Y contains the starting point
                    # Count Y as one-half cycle and discard the first point
                    result = rainflow.format_output(self.points[0], self.points[1], 0.5)
                    self.count_cycles(result[0],result[1],result[2])
                    print(result)
                    self.points.pop(0)
                else:
                    # Count Y as one cycle and discard the peak and the valley of Y
                    result = rainflow.format_output(self.points[-4], self.points[-3], 1.0)
                    print(result)
                    self.count_cycles(result[0],result[1],result[2])
                    last = self.points.pop()
                    second_last = self.points.pop()
                    self.points.pop()
                    self.points.pop()
                    self.points.append(second_last)
                    self.points.append(last)

                    
    def count_cycles(self,rng,mean,count):
        if rng < self.min_rng:
            n_rng = -9999
        elif rng > self.max_rng:
            n_rng = 9999
        else:
            n_rng = np.round(np.round((rng-self.min_rng)/self.binsize_rng,0)*self.binsize_rng+self.min_rng,2)

        if mean < self.min_mean:
            n_mean = -9999
        elif mean > self.max_mean:
            n_mean = 9999
        else:
            n_mean = np.round(np.round((mean-self.min_mean)/self.binsize_mean,0)* self.binsize_mean+self.min_mean,2)

        self.counts[(n_rng,n_mean)] += count
    
    def rainflow_count(self):
        return pd.DataFrame([(k[0],k[1],v) for k, v in self.counts.items()], columns = ["Range","Mean","Count"])

In [469]:
time = [4.0 * i / 200 for i in range(1000+1)]
signal = [(0.2 + 0.5 * math.sin(t) + 0.2 * math.cos(10*t) + 0.2 * math.sin(4*t)) for t in time]

signal.append(1000)
signal.append(-1000)
signal.append(1000)
signal.append(-1000)
signal.append(1000)
signal.append(-1000)

tmp = rainflow(binsize_mean = 0.1, binsize_rng = 0.1, min_mean=-1, min_rng = -1, max_mean=1, max_rng = 1)
# series = [-2, 1, -3, -5, -7, 3, -4, 4]
# series = [-5,5,-5,5,-5,5,6,7,-5,6,-4,9]
series = signal
for x in series:
    tmp.extract_cycles(x)


[0.04258965150708488, 0.4212948257535425, 0.5, 0.4, 0.4425896515070849]
[0.11294628078612906, 0.38611651111402034, 0.5, 0.4425896515070849, 0.32964337072095584]
[0.48305748051348263, 0.5711721109776972, 0.5, 0.32964337072095584, 0.8127008512344385]
[0.2057106991158965, 0.5871049987922978, 1.0, 0.6899603483502461, 0.4842496492343496]
[0.5286423866535466, 0.5483796579076652, 0.5, 0.8127008512344385, 0.2840584645808919]
[0.10973439445727551, 0.551753104941295, 1.0, 0.4968859077126573, 0.6066203021699328]
[0.7809330293159786, 0.6745249792388812, 0.5, 0.2840584645808919, 1.0649914938968705]
[0.21467990941625242, 0.14222718600014936, 1.0, 0.034887231292023146, 0.24956714070827557]
[0.4388985979776988, 0.18268137509849586, 1.0, -0.03676792389035352, 0.40213067408734526]
[0.24957617387341174, -0.3903893617355505, 1.0, -0.2656012747988446, -0.5151774486722563]
[0.2805072180382011, -0.15006635727482914, 1.0, -0.2903199662939297, -0.009812748255728587]
[0.4740144922642714, -0.11896103153626192, 1

In [470]:
# tmp = rainflow()
# # series = [-2, 1, -3, 5, -1, 3, -4, 4, -3, 1, -2, 3, 2, 6]
# # series = [-5,5,-5,5,-5,5,6,7,-5,6,-4,9]
# series = signal
# for x in series:
#     print(x,tmp.reversals(x))

In [471]:
# tmp = rainflow()
# series = signal
# for x in series:
#     tmp.extract_cycles(x)


In [472]:
tmp.rainflow_count()

Unnamed: 0,Range,Mean,Count
0,0.1,-0.9,0.0
1,0.1,-0.8,0.0
2,0.1,-0.7,0.0
3,0.1,-0.6,0.0
4,0.1,-0.5,0.0
...,...,...,...
170,0.9,0.9,0.0
171,0.0,0.4,0.5
172,9999.0,0.3,2.5
173,9999.0,9999.0,0.5
