/
StreamExtendingTest.php
126 lines (109 loc) · 2.91 KB
/
StreamExtendingTest.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
<?php
/**
* This file is part of the sci/stream package.
*
* (c) Sascha Schimke <sascha@schimke.me>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Sci\Stream\Tests;
use Sci\Stream\IteratorStream;
use Sci\Stream\Stream;
class StreamExtendingTest extends \PHPUnit_Framework_TestCase
{
/**
* @test
*/
public function it_should_create_csv_stream()
{
$csvStream = CsvStream::from(__DIR__.'/example.csv')
->where(['first_name' => 'Peter'])
->select(['id', 'first_name', 'last_name', 'city'])
->limit(3, 5);
$this->assertInstanceOf(Stream::class, $csvStream);
$cnt = 0;
foreach ($csvStream as $row) {
$this->assertArrayHasKey('id', $row);
$this->assertArrayHasKey('first_name', $row);
$this->assertArrayHasKey('last_name', $row);
$this->assertArrayHasKey('city', $row);
$this->assertEquals('Peter', $row['first_name']);
++$cnt;
}
$this->assertEquals(5, $cnt);
}
}
class CsvStream extends IteratorStream
{
/**
* @param string $filename
*
* @return static
*/
public static function from($filename)
{
return static::create(self::readCsv($filename));
}
/**
* @param array $conditions
*
* @return static
*/
public function where(array $conditions)
{
return $this->filter(function (array $row) use ($conditions) {
foreach ($conditions as $column => $value) {
if (!array_key_exists($column, $row) || $row[$column] !== $value) {
return false;
}
}
return true;
});
}
/**
* @param array|string[] $columns
*
* @return static
*/
public function select(array $columns)
{
return $this->map(function (array $row) use ($columns) {
$result = [];
foreach ($columns as $column) {
if (array_key_exists($column, $row)) {
$result[$column] = $row[$column];
}
}
return $result;
});
}
/**
* @param int $offset
* @param int $limit
*
* @return static
*/
public function limit($offset, $limit)
{
return $this->filter(function (array $row) use ($offset, $limit) {
static $counter = 0;
++$counter;
return $offset < $counter && $counter <= $offset + $limit;
});
}
/**
* @param string $filename
*
* @return \Generator
*/
private static function readCsv($filename)
{
$fd = fopen($filename, 'r');
$header = fgetcsv($fd);
while ($row = fgetcsv($fd)) {
yield array_combine($header, $row);
}
fclose($fd);
}
}