diff --git a/src/QueryBuilder/Core/DBFactory.php b/src/QueryBuilder/Core/DBFactory.php index 61f9f41..38df04f 100644 --- a/src/QueryBuilder/Core/DBFactory.php +++ b/src/QueryBuilder/Core/DBFactory.php @@ -183,6 +183,28 @@ public function streamQuery(string $query): StreamEventHandler return $connection->streamQuery($query); } + /** + * @throws \Saraf\QB\QueryBuilder\Exceptions\DBFactoryException + */ + public function streamQueryRaw(string $query): ReadableStreamInterface + { + $isWrite = true; + if (str_starts_with(strtolower($query), "select") + || str_starts_with(strtolower($query), "show") + ) $isWrite = false; + + $bestConnections = $this->getBestConnection(); + + $connection = $isWrite + ? $this->writeConnections[$bestConnections['write']] + : $this->readConnections[$bestConnections['read']]; + + if (!($connection instanceof DBWorker)) + throw new DBFactoryException("Connections Not Instance of Worker / Restart App"); + + return $connection->streamQueryRaw($query); + } + /** * @throws DBFactoryException */ diff --git a/src/QueryBuilder/Core/DBWorker.php b/src/QueryBuilder/Core/DBWorker.php index 591233e..f39e61e 100644 --- a/src/QueryBuilder/Core/DBWorker.php +++ b/src/QueryBuilder/Core/DBWorker.php @@ -40,6 +40,11 @@ public function streamQuery(string $query): StreamEventHandler })); } + public function streamQueryRaw(string $query): ReadableStreamInterface + { + return $this->connection->queryStream($query); + } + protected function handleResult(QueryResult $result): array { if (!is_null($result->resultRows)) { diff --git a/src/QueryBuilder/Core/EQuery.php b/src/QueryBuilder/Core/EQuery.php index 1d54442..347913d 100644 --- a/src/QueryBuilder/Core/EQuery.php +++ b/src/QueryBuilder/Core/EQuery.php @@ -36,6 +36,11 @@ public function stream(): StreamEventHandler return $this->factory->streamQuery($this->query); } + public function streamRaw(): ReadableStreamInterface + { + return $this->factory->streamQueryRaw($this->query); + } + public function getQuery(): Promise { return new Promise(function (callable $resolve) {