@@ -10,76 +10,65 @@ import tryCatch from '../util/tryCatch';
10
10
import { errorObject } from '../util/errorObject' ;
11
11
import bindCallback from '../util/bindCallback' ;
12
12
13
- export default function windowCount < T > ( windowSize : number , startWindowEvery : number = 0 ) : Observable < Observable < T > > {
13
+ export default function windowCount < T > ( windowSize : number ,
14
+ startWindowEvery : number = 0 ) : Observable < Observable < T > > {
14
15
return this . lift ( new WindowCountOperator ( windowSize , startWindowEvery ) ) ;
15
16
}
16
17
17
18
class WindowCountOperator < T , R > implements Operator < T , R > {
18
19
19
- constructor ( private windowSize : number , private startWindowEvery : number ) {
20
+ constructor ( private windowSize : number ,
21
+ private startWindowEvery : number ) {
20
22
}
21
23
22
24
call ( subscriber : Subscriber < T > ) : Subscriber < T > {
23
25
return new WindowCountSubscriber ( subscriber , this . windowSize , this . startWindowEvery ) ;
24
26
}
25
27
}
26
28
27
- interface WindowObject < T > {
28
- count : number ;
29
- notified : boolean ;
30
- window : Subject < T > ;
31
- }
32
-
33
29
class WindowCountSubscriber < T > extends Subscriber < T > {
34
- private windows : WindowObject < T > [ ] = [
35
- { count : 0 , notified : false , window : new Subject < T > ( ) }
36
- ] ;
30
+ private windows : Subject < T > [ ] = [ new Subject < T > ( ) ] ;
37
31
private count : number = 0 ;
38
32
39
- constructor ( destination : Subscriber < T > , private windowSize : number , private startWindowEvery : number ) {
33
+ constructor ( destination : Subscriber < T > ,
34
+ private windowSize : number ,
35
+ private startWindowEvery : number ) {
40
36
super ( destination ) ;
37
+ destination . next ( this . windows [ 0 ] ) ;
41
38
}
42
39
43
40
_next ( value : T ) {
44
- const count = ( this . count += 1 ) ;
45
41
const startWindowEvery = ( this . startWindowEvery > 0 ) ? this . startWindowEvery : this . windowSize ;
46
42
const windowSize = this . windowSize ;
47
43
const windows = this . windows ;
48
44
const len = windows . length ;
49
45
50
- if ( count % startWindowEvery === 0 ) {
51
- let window = new Subject < T > ( ) ;
52
- windows . push ( { count : 0 , notified : false , window : window } ) ;
53
- }
54
-
55
46
for ( let i = 0 ; i < len ; i ++ ) {
56
- let w = windows [ i ] ;
57
- const window = w . window ;
58
-
59
- if ( ! w . notified ) {
60
- w . notified = true ;
61
- this . destination . next ( window ) ;
62
- }
63
-
64
- window . next ( value ) ;
65
- if ( windowSize === ( w . count += 1 ) ) {
66
- window . complete ( ) ;
67
- }
47
+ windows [ i ] . next ( value ) ;
48
+ }
49
+ const c = this . count - windowSize + 1 ;
50
+ if ( c >= 0 && c % startWindowEvery === 0 ) {
51
+ windows . shift ( ) . complete ( ) ;
52
+ }
53
+ if ( ++ this . count % startWindowEvery === 0 ) {
54
+ let window = new Subject < T > ( ) ;
55
+ windows . push ( window ) ;
56
+ this . destination . next ( window ) ;
68
57
}
69
58
}
70
59
71
60
_error ( err : any ) {
72
61
const windows = this . windows ;
73
62
while ( windows . length > 0 ) {
74
- windows . shift ( ) . window . error ( err ) ;
63
+ windows . shift ( ) . error ( err ) ;
75
64
}
76
65
this . destination . error ( err ) ;
77
66
}
78
67
79
68
_complete ( ) {
80
69
const windows = this . windows ;
81
70
while ( windows . length > 0 ) {
82
- windows . shift ( ) . window . complete ( ) ;
71
+ windows . shift ( ) . complete ( ) ;
83
72
}
84
73
this . destination . complete ( ) ;
85
74
}
0 commit comments