Skip to content

Commit

Permalink
@grafana/data: improve the CircularVector api (#18716)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryantxu committed Aug 26, 2019
1 parent a540f05 commit 73d9f26
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 44 deletions.
133 changes: 124 additions & 9 deletions packages/grafana-data/src/utils/vector.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,131 @@ describe('Check Proxy Vector', () => {
});

describe('Check Circular Vector', () => {
it('should support constant values', () => {
const buffer = [3, 2, 1, 0];
const v = new CircularVector(buffer);
expect(v.length).toEqual(4);
expect(v.toJSON()).toEqual([3, 2, 1, 0]);
it('should append values', () => {
const buffer = [1, 2, 3];
const v = new CircularVector({ buffer }); // tail is default option
expect(v.toArray()).toEqual([1, 2, 3]);

v.add(4);
expect(v.toArray()).toEqual([2, 3, 4]);

v.add(5);
expect(v.toArray()).toEqual([3, 4, 5]);

v.add(6);
expect(v.toArray()).toEqual([4, 5, 6]);

v.add(7);
expect(v.toArray()).toEqual([5, 6, 7]);

v.add(8);
expect(v.toArray()).toEqual([6, 7, 8]);
});

it('should grow buffer until it hits capacity (append)', () => {
const v = new CircularVector({ capacity: 3 }); // tail is default option
expect(v.toArray()).toEqual([]);

v.add(1);
expect(v.toArray()).toEqual([1]);

v.add(2);
expect(v.toArray()).toEqual([1, 2]);

v.add(3);
expect(v.toArray()).toEqual([1, 2, 3]);

v.add(4);
expect(v.toArray()).toEqual([2, 3, 4]);

v.add(5);
expect(v.toArray()).toEqual([3, 4, 5]);
});

it('should prepend values', () => {
const buffer = [3, 2, 1];
const v = new CircularVector({ buffer, append: 'head' });
expect(v.toArray()).toEqual([3, 2, 1]);

v.add(4);
expect(v.toArray()).toEqual([4, 3, 2]);

v.add(5);
expect(v.toArray()).toEqual([5, 4, 3]);

v.add(6);
expect(v.toArray()).toEqual([6, 5, 4]);

v.add(7);
expect(v.toArray()).toEqual([7, 6, 5]);

v.add(8);
expect(v.toArray()).toEqual([8, 7, 6]);
});

it('should expand buffer and then prepend', () => {
const v = new CircularVector({ capacity: 3, append: 'head' });
expect(v.toArray()).toEqual([]);

v.add(1);
expect(v.toArray()).toEqual([1]);

v.add(2);
expect(v.toArray()).toEqual([2, 1]);

v.add(3);
expect(v.toArray()).toEqual([3, 2, 1]);

v.add(4);
expect(v.toArray()).toEqual([4, 3, 2]);

v.add(5);
expect(v.toArray()).toEqual([5, 4, 3]);
});

it('should reduce size and keep working (tail)', () => {
const buffer = [1, 2, 3, 4, 5];
const v = new CircularVector({ buffer });
expect(v.toArray()).toEqual([1, 2, 3, 4, 5]);

v.setCapacity(3);
expect(v.toArray()).toEqual([3, 4, 5]);

v.add(6);
expect(v.toArray()).toEqual([4, 5, 6]);

v.add(7);
expect(v.toArray()).toEqual([5, 6, 7]);
});

it('should reduce size and keep working (head)', () => {
const buffer = [5, 4, 3, 2, 1];
const v = new CircularVector({ buffer, append: 'head' });
expect(v.toArray()).toEqual([5, 4, 3, 2, 1]);

v.setCapacity(3);
expect(v.toArray()).toEqual([5, 4, 3]);

v.add(6);
expect(v.toArray()).toEqual([6, 5, 4]);

v.add(7);
expect(v.toArray()).toEqual([7, 6, 5]);
});

it('change buffer direction', () => {
const buffer = [1, 2, 3];
const v = new CircularVector({ buffer });
expect(v.toArray()).toEqual([1, 2, 3]);

v.setAppendMode('head');
expect(v.toArray()).toEqual([3, 2, 1]);

v.append(4);
expect(v.toJSON()).toEqual([4, 3, 2, 1]);
v.add(4);
expect(v.toArray()).toEqual([4, 3, 2]);

v.append(5);
expect(v.toJSON()).toEqual([5, 4, 3, 2]);
v.setAppendMode('tail');
v.add(5);
expect(v.toArray()).toEqual([3, 4, 5]);
});
});
145 changes: 120 additions & 25 deletions packages/grafana-data/src/utils/vector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,28 +76,18 @@ export class ScaledVector implements Vector<number> {
}
}

export class CircularVector<T = any> implements Vector<T> {
buffer: T[];
index: number;
length: number;

constructor(buffer: T[]) {
this.length = buffer.length;
this.buffer = buffer;
this.index = 0;
}
/**
* Values are returned in the order defined by the input parameter
*/
export class SortedVector<T = any> implements Vector<T> {
constructor(private source: Vector<T>, private order: number[]) {}

append(value: T) {
let idx = this.index - 1;
if (idx < 0) {
idx = this.length - 1;
}
this.buffer[idx] = value;
this.index = idx;
get length(): number {
return this.source.length;
}

get(index: number): T {
return this.buffer[(index + this.index) % this.length];
return this.source.get(this.order[index]);
}

toArray(): T[] {
Expand All @@ -109,18 +99,123 @@ export class CircularVector<T = any> implements Vector<T> {
}
}

interface CircularOptions<T> {
buffer?: T[];
append?: 'head' | 'tail';
capacity?: number;
}

/**
* Values are returned in the order defined by the input parameter
* Circular vector uses a single buffer to capture a stream of values
* overwriting the oldest value on add.
*
* This supports addting to the 'head' or 'tail' and will grow the buffer
* to match a configured capacity.
*/
export class SortedVector<T = any> implements Vector<T> {
constructor(private source: Vector<T>, private order: number[]) {}
export class CircularVector<T = any> implements Vector<T> {
private buffer: T[];
private index: number;
private capacity: number;
private tail: boolean;

constructor(options: CircularOptions<T>) {
this.buffer = options.buffer || [];
this.capacity = this.buffer.length;
this.tail = 'head' !== options.append;
this.index = 0;

get length(): number {
return this.source.length;
this.add = this.getAddFunction();
if (options.capacity) {
this.setCapacity(options.capacity);
}
}

get(index: number): T {
return this.source.get(this.order[index]);
/**
* This gets the appropriate add function depending on the buffer state:
* * head vs tail
* * growing buffer vs overwriting values
*/
private getAddFunction() {
// When we are not at capacity, it should actually modify the buffer
if (this.capacity > this.buffer.length) {
if (this.tail) {
return (value: T) => {
this.buffer.push(value);
if (this.buffer.length >= this.capacity) {
this.add = this.getAddFunction();
}
};
} else {
return (value: T) => {
this.buffer.unshift(value);
if (this.buffer.length >= this.capacity) {
this.add = this.getAddFunction();
}
};
}
}

if (this.tail) {
return (value: T) => {
this.buffer[this.index] = value;
this.index = (this.index + 1) % this.buffer.length;
};
}

// Append values to the head
return (value: T) => {
let idx = this.index - 1;
if (idx < 0) {
idx = this.buffer.length - 1;
}
this.buffer[idx] = value;
this.index = idx;
};
}

setCapacity(v: number) {
if (this.capacity === v) {
return;
}
// Make a copy so it is in order and new additions can be at the head or tail
const copy = this.toArray();
if (v > this.length) {
this.buffer = copy;
} else if (v < this.capacity) {
// Shrink the buffer
const delta = this.length - v;
if (this.tail) {
this.buffer = copy.slice(delta, copy.length); // Keep last items
} else {
this.buffer = copy.slice(0, copy.length - delta); // Keep first items
}
}
this.capacity = v;
this.index = 0;
this.add = this.getAddFunction();
}

setAppendMode(mode: 'head' | 'tail') {
const tail = 'head' !== mode;
if (tail !== this.tail) {
this.buffer = this.toArray().reverse();
this.index = 0;
this.tail = tail;
this.add = this.getAddFunction();
}
}

/**
* Add the value to the buffer
*/
add: (value: T) => void;

get(index: number) {
return this.buffer[(index + this.index) % this.buffer.length];
}

get length() {
return this.buffer.length;
}

toArray(): T[] {
Expand Down
20 changes: 10 additions & 10 deletions public/app/plugins/datasource/testdata/StreamHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ export class StreamWorker {
for (let i = 0; i < append.length; i++) {
const row = append[i];
for (let j = 0; j < values.length; j++) {
values[j].append(row[j]); // Circular buffer will kick out old entries
values[j].add(row[j]); // Circular buffer will kick out old entries
}
}
// Clear any cached values
Expand Down Expand Up @@ -178,8 +178,8 @@ export class SignalWorker extends StreamWorker {
const { speed, buffer } = this.query;
const request = this.stream.request;
const maxRows = buffer ? buffer : request.maxDataPoints;
const times = new CircularVector(new Array<number>(maxRows));
const vals = new CircularVector(new Array<number>(maxRows));
const times = new CircularVector({ capacity: maxRows });
const vals = new CircularVector({ capacity: maxRows });
this.values = [times, vals];

const data = new DataFrameHelper({
Expand All @@ -193,8 +193,8 @@ export class SignalWorker extends StreamWorker {

for (let i = 0; i < this.bands; i++) {
const suffix = this.bands > 1 ? ` ${i + 1}` : '';
const min = new CircularVector(new Array<number>(maxRows));
const max = new CircularVector(new Array<number>(maxRows));
const min = new CircularVector({ capacity: maxRows });
const max = new CircularVector({ capacity: maxRows });
this.values.push(min);
this.values.push(max);

Expand All @@ -209,7 +209,7 @@ export class SignalWorker extends StreamWorker {
for (let i = 0; i < maxRows; i++) {
const row = this.nextRow(time);
for (let j = 0; j < this.values.length; j++) {
this.values[j].append(row[j]);
this.values[j].add(row[j]);
}
time += speed;
}
Expand Down Expand Up @@ -347,8 +347,8 @@ export class LogsWorker extends StreamWorker {

const maxRows = buffer ? buffer : request.maxDataPoints;

const times = new CircularVector(new Array(maxRows));
const lines = new CircularVector(new Array(maxRows));
const times = new CircularVector({ capacity: maxRows });
const lines = new CircularVector({ capacity: maxRows });

this.values = [times, lines];
this.data = new DataFrameHelper({
Expand All @@ -364,8 +364,8 @@ export class LogsWorker extends StreamWorker {
let time = Date.now() - maxRows * speed;
for (let i = 0; i < maxRows; i++) {
const row = this.nextRow(time);
times.append(row[0]);
lines.append(row[1]);
times.add(row[0]);
lines.add(row[1]);
time += speed;
}
}
Expand Down

0 comments on commit 73d9f26

Please sign in to comment.