Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ lint-actions:
set -uo pipefail
rc=0
actionlint || rc=$?
zizmor .github/workflows || rc=$?
zizmor --offline .github/workflows || rc=$?
exit $rc

# Run static analysis (PHPStan).
Expand All @@ -70,7 +70,29 @@ analyze-mago *args:
src/lib/doctrine-dbal-bulk \
src/lib/snappy \
src/lib/parquet \
src/core/etl
src/core/etl \
src/lib/parquet-viewer \
src/bridge/openapi/specification \
src/bridge/psr3/telemetry \
src/bridge/symfony/http-foundation-telemetry \
src/bridge/telemetry/otlp \
src/bridge/filesystem/async-aws \
src/bridge/filesystem/azure \
src/bridge/monolog/http \
src/bridge/monolog/telemetry \
src/bridge/phpunit/postgresql \
src/bridge/phpunit/telemetry \
src/bridge/postgresql/valinor \
src/bridge/psr18/telemetry \
src/bridge/psr7/telemetry \
src/bridge/symfony/filesystem-bundle \
src/bridge/symfony/filesystem-cache \
src/bridge/symfony/http-foundation \
src/bridge/symfony/postgresql-bundle \
src/bridge/symfony/postgresql-cache \
src/bridge/symfony/postgresql-messenger \
src/bridge/symfony/postgresql-session \
src/bridge/symfony/telemetry-bundle

# Auto-fix code style (Mago format + lint --fix) and GitHub Actions findings (zizmor --fix).
fix:
Expand Down
48 changes: 15 additions & 33 deletions mago.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ includes = [
"src/lib/postgresql/src/Flow/PostgreSql/Protobuf",
# Generated by Thrift
"src/lib/parquet/src/Flow/Parquet/ThriftModel",
# Generated by protoc (OTLP protocol definitions)
"src/bridge/telemetry/otlp/src/Opentelemetry",
"src/bridge/telemetry/otlp/src/GPBMetadata",
# Symfony
"src/bridge/symfony/filesystem-bundle/src/Flow/Bridge/Symfony/FilesystemBundle/FlowFilesystemBundle.php",
"src/bridge/symfony/postgresql-bundle/src/Flow/Bridge/Symfony/PostgreSqlBundle/FlowPostgreSqlBundle.php",
"src/bridge/symfony/telemetry-bundle/src/Flow/Bridge/Symfony/TelemetryBundle/FlowTelemetryBundle.php",
]
excludes = [
"src/**/vendor/**",
Expand All @@ -30,23 +37,14 @@ enable-short-tags = false
plugins = ["flow-php"]
excludes = [
"**/Flow/Types/PHPStan/**",
# Symfony bundle/config tests accessing deeply-nested processConfiguration() arrays (all mixed-array-access)
"src/bridge/symfony/filesystem-bundle/tests/Flow/Bridge/Symfony/FilesystemBundle/Tests/Unit/FlowFilesystemBundleTest.php",
"src/bridge/symfony/filesystem-bundle/tests/Flow/Bridge/Symfony/FilesystemBundle/Tests/Unit/ConfigurationTest.php",
"src/bridge/symfony/postgresql-bundle/tests/Flow/Bridge/Symfony/PostgreSqlBundle/Tests/Unit/DependencyInjection/ConfigurationTest.php",
"src/bridge/symfony/postgresql-bundle/tests/Flow/Bridge/Symfony/PostgreSqlBundle/Tests/Integration/FlowPostgreSqlExtensionTest.php",
"src/bridge/symfony/telemetry-bundle/tests/Flow/Bridge/Symfony/TelemetryBundle/Tests/Unit/DependencyInjection/ConfigurationTest.php",
]
ignore = [
{ code = "redundant-docblock-type", in = [
"src/lib/postgresql/src/Flow/PostgreSql/DSL/client.php",
] },
{ code = "unavailable-method", in = [
"src/lib/types/src/Flow/Types/Type/Logical/HTMLElementType.php",
"src/lib/types/src/Flow/Types/Type/Logical/HTMLType.php",
"src/lib/types/src/Flow/Types/Type/Native/StringType.php",
"src/lib/types/tests/Flow/Types/Tests/Unit/Type/Logical/HTMLElementTypeTest.php",
"src/lib/types/tests/Flow/Types/Tests/Unit/Type/Logical/HTMLTypeTest.php",
"src/lib/types/tests/Flow/Types/Tests/Unit/Type/Native/StringTypeTest.php",
"src/lib/types/tests/Flow/Types/Tests/Unit/Type/TypeDetectorTest.php",
"src/core/etl/src/Flow/ETL/Row/Entry/HTMLEntry.php",
"src/core/etl/src/Flow/ETL/Row/Entry/HTMLElementEntry.php",
] },
]
ignore = []

[linter]
integrations = ["symfony", "phpunit"]
Expand Down Expand Up @@ -113,23 +111,7 @@ no-sprintf-concat = { enabled = false }
no-global = { enabled = false }
no-redundant-method-override = { enabled = false }
no-redundant-else = { enabled = true, level = "warning" }
# Bug in Mago 1.28.0 - false positives on $this when passed as a call/constructor argument.
# Minimal repro: https://mago.carthage.software/1.27.1/en/playground/#019e3f91-7e6d-75d3-6c51-f9dae0ed72f8
# Revisit when fixed upstream.
no-redundant-variable = { enabled = true, level = "warning", exclude = [
"**/Flow/ETL/Function/ScalarFunctionChain.php",
"**/Flow/ETL/DataFrame.php",
"**/Flow/ETL/Row/EntryReference.php",
"**/Flow/ETL/Sort/SortAlgorithms.php",
"**/Flow/PostgreSql/QueryBuilder/**",
"**/Flow/PostgreSql/Tests/Unit/QueryBuilder/Expression/MockExpression.php",
"**/Flow/PostgreSql/Explain/Plan/PlanNodeType.php",
"**/Flow/Filesystem/Path.php",
"**/CacheSpyClient.php",
"**/Tests/Unit/ObjectExtractorTest.php",
"**/array-dot/src/Flow/ArrayDot/array_dot.php",
"**/TracableHttpClientTest.php",
] }
no-redundant-variable = { enabled = true, level = "warning" }
strict-types = { enabled = true, level = "error" }
no-fully-qualified-global-function = { enabled = true, level = "error" }
no-fully-qualified-global-class-like = { enabled = true, level = "error" }
Expand Down
25 changes: 14 additions & 11 deletions phpstan.neon
Original file line number Diff line number Diff line change
Expand Up @@ -144,26 +144,29 @@ parameters:
message: '#Dom\\(CharacterData|HTMLDocument|HTMLElement|Element)#i'
identifier: class.notFound
-
path: src/bridge/symfony/telemetry-bundle/src/Flow/Bridge/Symfony/TelemetryBundle/DependencyInjection/FlowTelemetryExtension.php
identifier: argument.type
-
path: src/bridge/symfony/telemetry-bundle/src/Flow/Bridge/Symfony/TelemetryBundle/DependencyInjection/FlowTelemetryExtension.php
path: src/bridge/symfony/telemetry-bundle/src/Flow/Bridge/Symfony/TelemetryBundle/FlowTelemetryBundle.php
identifier: offsetAccess.nonOffsetAccessible
-
path: src/bridge/symfony/telemetry-bundle/src/Flow/Bridge/Symfony/TelemetryBundle/DependencyInjection/FlowTelemetryExtension.php
path: src/bridge/symfony/telemetry-bundle/src/Flow/Bridge/Symfony/TelemetryBundle/FlowTelemetryBundle.php
identifier: cast.string
-
path: src/bridge/symfony/filesystem-cache/src/Flow/Bridge/Symfony/FilesystemCache/FlowFilesystemCacheAdapter.php
identifier: cast.string
-
path: src/bridge/symfony/telemetry-bundle/src/Flow/Bridge/Symfony/TelemetryBundle/DependencyInjection/FlowTelemetryExtension.php
identifier: foreach.nonIterable
path: src/bridge/symfony/telemetry-bundle/src/Flow/Bridge/Symfony/TelemetryBundle/Instrumentation/HttpKernel/HttpKernelSpanSubscriber.php
identifier: encapsedStringPart.nonString
-
path: src/bridge/symfony/telemetry-bundle/src/Flow/Bridge/Symfony/TelemetryBundle/DependencyInjection/FlowTelemetryExtension.php
identifier: binaryOp.invalid
path: src/bridge/symfony/postgresql-messenger/tests/Flow/Bridge/Symfony/PostgreSQLMessenger/Tests/Integration/FlowPostgreSqlTransportTest.php
identifier: cast.string
-
path: src/bridge/symfony/telemetry-bundle/src/Flow/Bridge/Symfony/TelemetryBundle/DependencyInjection/Compiler/DBALTelemetryPass.php
identifier: cast.int
-
path: src/bridge/symfony/telemetry-bundle/src/Flow/Bridge/Symfony/TelemetryBundle/Instrumentation/Doctrine/DBAL/TracingMiddleware.php
identifier: return.type
path: src/bridge/symfony/filesystem-cache/tests/Flow/Bridge/Symfony/FilesystemCache/Tests/Unit/Double/SpyMarshaller.php
identifier: parameterByRef.unusedType
-
path: src/bridge/symfony/postgresql-cache/tests/Flow/Bridge/Symfony/PostgreSQLCache/Tests/Unit/Double/SpyMarshaller.php
identifier: parameterByRef.unusedType
# brick/math 0.12-0.14 vs 0.15+ compatibility shim — branches differ per installed version
-
path: src/core/etl/src/Flow/Calculator/Calculator.php
Expand Down
1 change: 1 addition & 0 deletions src/bridge/filesystem/async-aws/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"require": {
"php": "~8.3.0 || ~8.4.0 || ~8.5.0",
"flow-php/filesystem": "self.version",
"flow-php/types": "self.version",
"async-aws/s3": "^2.6 || ^3.0"
},
"config": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

use function fclose;
use function fopen;
use function is_resource;
use function unlink;

final readonly class AsyncAWSS3BlockLifecycle implements BlockLifecycle
Expand Down Expand Up @@ -47,9 +46,7 @@ public function filled(Block $block): void
*/
$etag = $uploadPartResponse->getETag();

if (is_resource($handle)) {
fclose($handle);
}
fclose($handle);

unlink($block->path()->path());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ public static function openAppend(
BlockFactory $blockFactory = new NativeLocalFileBlocksFactory(),
int $blockSize = 1024 * 1024 * 4,
): self {
if ($blockSize < 1) {
throw new InvalidArgumentException('Block size must be greater than 0');
}

try {
$objectHead = $s3Client->headObject([
'Bucket' => $bucket,
Expand All @@ -79,7 +75,9 @@ public static function openAppend(
* and append to it. We need to read the file to memory and append it to the new file.
* S3 allows only the last part to be smaller than 5Mb, all other parts needs to be 5Mb+.
*/
if ($objectHead->getContentLength() < SizeUnits::mbToBytes(5)) {
$contentLength = $objectHead->getContentLength() ?? 0;

if ($contentLength < SizeUnits::mbToBytes(5)) {
$blocks = new Blocks(
$blockSize,
$blockFactory,
Expand Down Expand Up @@ -140,10 +138,6 @@ public static function openBlank(
BlockFactory $blockFactory = new NativeLocalFileBlocksFactory(),
int $blockSize = 1024 * 1024 * 4,
): self {
if ($blockSize < 1) {
throw new InvalidArgumentException('Block size must be greater than 0');
}

$response = $s3Client->createMultipartUpload(new CreateMultipartUploadRequest([
'Bucket' => $bucket,
'Key' => ltrim($path->path(), '/'),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public function getSystemTmpDir(): Path
return $this->options->tmpDir();
}

/**
* @return \Generator<FileStatus>
*/
public function list(Path $path, Filter $pathFilter = new KeepAll()): Generator
{
$this->mount->supports($path) || throw new InvalidSchemeException($path->protocol(), $this->mount->protocol);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ public function isOpen(): bool

public function iterate(int $length = 1): Generator
{
for ($offset = 0; $offset < $this->size(); $offset += $length) {
$size = $this->size() ?? 0;

for ($offset = 0; $offset < $size; $offset += $length) {
yield $this->read($length, $offset);
}
}
Expand All @@ -74,8 +76,9 @@ public function readLines(string $separator = "\n", ?int $length = null): Genera
{
$offset = 0;
$content = '';
$size = $this->size() ?? 0;

while ($offset < $this->size()) {
while ($offset < $size) {
// Read a chunk of the file
$chunk = $this->read($length ?? (1024 * 1024 * 9), $offset);
$offset += strlen($chunk);
Expand All @@ -98,12 +101,12 @@ public function readLines(string $separator = "\n", ?int $length = null): Genera

$content = $lines[$lastIndex];
} elseif (substr_count($content, $separator) === 1) {
// Split the content by the separator
/**
* @phpstan-ignore-next-line
*/
yield substr($content, 0, strpos($content, $separator));
$content = substr($content, strpos($content, $separator) + 1);
$pos = strpos($content, $separator);

if ($pos !== false) {
yield substr($content, 0, $pos);
$content = substr($content, $pos + 1);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Flow\Filesystem\Bridge\AsyncAWS\DSL;

use AsyncAws\Core\Configuration;
use AsyncAws\S3\S3Client;
use Flow\ETL\Attribute\DocumentationDSL;
use Flow\ETL\Attribute\Module;
Expand All @@ -13,12 +14,11 @@
use Flow\Filesystem\Mount;

/**
* @param array<string, mixed> $configuration - for details please see https://async-aws.com/clients/s3.html
* @param array<Configuration::OPTION_*, null|string> $configuration - for details please see https://async-aws.com/clients/s3.html
*/
#[DocumentationDSL(module: Module::S3_FILESYSTEM, type: Type::HELPER)]
function aws_s3_client(array $configuration): S3Client
{
/** @phpstan-ignore-next-line */
return new S3Client($configuration);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ public function test_writing_content_from_resource(): void
$stream->fromResource($resource);
$stream->close();

static::assertTrue($fs->status(path('aws-s3://orders.csv'))?->isFile());
static::assertFalse($fs->status(path('aws-s3://orders.csv'))->isDirectory());
$status = $fs->status(path('aws-s3://orders.csv'));
static::assertNotNull($status);
static::assertTrue($status->isFile());
static::assertFalse($status->isDirectory());
static::assertSame(
file_get_contents(__DIR__ . '/Fixtures/orders.csv'),
$fs->readFrom(path('aws-s3://orders.csv'))->content(),
Expand All @@ -48,8 +50,10 @@ public function test_writing_content_smaller_than_block_size_to_s3(): void
$stream->append('Hello, World!');
$stream->close();

static::assertTrue($fs->status(path('aws-s3://file.txt'))?->isFile());
static::assertFalse($fs->status(path('aws-s3://file.txt'))->isDirectory());
$status = $fs->status(path('aws-s3://file.txt'));
static::assertNotNull($status);
static::assertTrue($status->isFile());
static::assertFalse($status->isDirectory());
static::assertSame('Hello, World!', $fs->readFrom(path('aws-s3://file.txt'))->content());

$fs->rm(path('aws-s3://file.txt'));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@

use Flow\Filesystem\Bridge\AsyncAWS\Options;
use Flow\Filesystem\Bridge\AsyncAWS\Tests\Double\RecordingS3Client;
use Flow\Filesystem\FileStatus;
use Flow\Filesystem\Path\Filter\OnlyFiles;
use Flow\Filesystem\Tests\Double\RejectingFilter;

use function Flow\Filesystem\Bridge\AsyncAWS\DSL\aws_s3_filesystem;
use function Flow\Filesystem\DSL\path;
use function Flow\Types\DSL\type_string;
use function iterator_to_array;

final class AsyncAWSS3FilesystemFileFastPathTest extends AsyncAWSS3TestCase
Expand Down Expand Up @@ -123,6 +125,8 @@ public function test_list_single_file_does_not_yield_prefix_siblings(): void
$statuses = iterator_to_array($fs->list(path('aws-s3://var/orders/file.txt')));

static::assertCount(1, $statuses);
static::assertArrayHasKey(0, $statuses);
static::assertInstanceOf(FileStatus::class, $statuses[0]);
static::assertSame('aws-s3://var/orders/file.txt', $statuses[0]->path->uri());
static::assertSame(1, $client->headObjectCount);
static::assertSame(0, $client->listObjectsV2Count);
Expand All @@ -140,6 +144,8 @@ public function test_list_single_file_yields_via_head_only_when_fast_path_enable
$statuses = iterator_to_array($fs->list(path('aws-s3://var/orders/orders.csv'), new OnlyFiles()));

static::assertCount(1, $statuses);
static::assertArrayHasKey(0, $statuses);
static::assertInstanceOf(FileStatus::class, $statuses[0]);
static::assertSame('aws-s3://var/orders/orders.csv', $statuses[0]->path->uri());
static::assertTrue($statuses[0]->isFile());
static::assertSame(1, $client->headObjectCount, 'exactly one HEAD must be issued');
Expand All @@ -153,14 +159,13 @@ public function test_list_single_file_yields_via_head_only_when_fast_path_enable
private function recordingClient(): RecordingS3Client
{
$configuration = [
'pathStyleEndpoint' => true,
'endpoint' => $_ENV['S3_ENDPOINT'],
'region' => $_ENV['S3_REGION'],
'accessKeyId' => $_ENV['S3_ACCESS_KEY_ID'],
'accessKeySecret' => $_ENV['S3_SECRET_ACCESS_KEY'],
'pathStyleEndpoint' => 'true',
'endpoint' => type_string()->assert($_ENV['S3_ENDPOINT']),
'region' => type_string()->assert($_ENV['S3_REGION']),
'accessKeyId' => type_string()->assert($_ENV['S3_ACCESS_KEY_ID']),
'accessKeySecret' => type_string()->assert($_ENV['S3_SECRET_ACCESS_KEY']),
];

/** @phpstan-ignore-next-line */
return new RecordingS3Client($configuration);
}
}
Loading
Loading