Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
chipslays committed Jan 17, 2024
1 parent 5068afe commit 8e71ae5
Show file tree
Hide file tree
Showing 6 changed files with 464 additions and 7 deletions.
25 changes: 23 additions & 2 deletions src/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Porter\Events\Event;
use Porter\Events\Payload;
use Porter\Events\Bus as EventBus;
use Porter\Server\Channels;
use Porter\Server\Connection;
use Porter\Server\Connections;
use Workerman\Worker;
Expand All @@ -23,6 +24,11 @@ class Server
*/
protected EventBus $eventBus;

/**
* @var Channels
*/
protected Channels $channels;

/**
* @var Closure|null
*/
Expand All @@ -41,6 +47,7 @@ public function __construct(string $ip, int $port, array $context = [], int $pro
$this->registerEventAndMessageCallbacks();

$this->eventBus = new EventBus;
$this->channels = new Channels;
}

protected function createWorkerInstance(string $ip, int $port, array $context = [], int $processes = 1): void
Expand Down Expand Up @@ -78,11 +85,21 @@ public function setWorker(Worker $worker): self
*
* @return EventBus
*/
public function getEventBus(): EventBus
public function events(): EventBus
{
return $this->eventBus;
}

/**
* Gets a channels.
*
* @return Channels
*/
public function channels(): Channels
{
return $this->channels;
}

/**
* The callback function triggered when the client establishes
* a connection with Workerman (after the TCP three-way handshake is completed).
Expand Down Expand Up @@ -139,7 +156,11 @@ public function onConnected(Closure $callback): self
public function onDisconnected(Closure $callback): self
{
$this->worker->onClose = function (TcpConnection $connection) use ($callback) {
call_user_func_array($callback, [new Connection($connection)]);
$connection = new Connection($connection);

call_user_func_array($callback, [$connection]);

$connection->disconnect();
};

return $this;
Expand Down
168 changes: 168 additions & 0 deletions src/Server/Channel.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
<?php

namespace Porter\Server;

use Porter\Support\Store;
use Closure;

class Channel
{
protected Connections $connections;

protected Store $store;

/**
* @var Closure|null
*/
protected ?Closure $onJoinHandler = null;

/**
* @var Closure|null
*/
protected ?Closure $onLeaveHandler = null;

/**
* @var Closure|null
*/
protected ?Closure $onDestroyHandler = null;

public function __construct(protected $id)
{
$this->connections = new Connections;
$this->store = new Store;
}

/**
* Returns a channel ID.
*
* @return string
*/
public function id(): string
{
return $this->id;
}

/**
* Returns a channel store.
*
* @return Store
*/
public function store(): Store
{
return $this->store;
}

/**
* Join given connections to channel.
*
* @param Connection|Connection[]|Connections|array $connections
* @return self
*/
public function join(Connection|Connections|array $connections): self
{
if ($connections instanceof Connections) {
$connections = $connections->all();
}

$connections = is_array($connections) ? $connections : [$connections];

foreach ($connections as $connection) {
$this->connections->add($connection);

$connection->channels()->add($this);

if ($this->onJoinHandler) {
call_user_func_array($this->onJoinHandler, [$connection]);
}
}

return $this;
}

/**
* Delete given connection from channel.
*
* @param Connection|Connection[]|Connections|array $connection
* @return self
*/
public function leave(Connection|Connections|array $connections): self
{
if ($connections instanceof Connections) {
$connections = $connections->all();
}

$connections = is_array($connections) ? $connections : [$connections];

foreach ($connections as $connection) {
if (!$this->exists($connection)) {
continue;
};

if ($this->onLeaveHandler) {
call_user_func_array($this->onLeaveHandler, [$connection]);
}

$connection->channels()->remove($this);

$this->connections->remove($connection);
}

return $this;
}

/**
* Checks if given connection exists in channel.
*
* @param Connection|int $connection
* @return bool
*/
public function exists(Connection|int $connection): bool
{
return $this->connections->has($connection);
}

/**
* Get all connections in channel.
*
* @return Connections
*/
public function connections(): Connections
{
return $this->connections;
}

/**
* Fire callback after any connection joining to channel.
*
* @param callable $callback
* @return self
*/
public function onJoin(callable $callback): self
{
$this->onJoinHandler = $callback;

return $this;
}

/**
* Fire callback after any connection leaving channel.
*
* @param callable $callback
* @return self
*/
public function onLeave(callable $callback): self
{
$this->onLeaveHandler = $callback;

return $this;
}

/**
* Fire on destruct object.
*/
public function __destruct()
{
dump('destruct: ' . $this->id());
$this->leave($this->connections);
}
}
127 changes: 127 additions & 0 deletions src/Server/Channels.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
<?php

namespace Porter\Server;

class Channels
{
/**
* Array of channels.
*
* @var Channel[]
*/
protected array $channels = [];

/**
* Get array of channels.
*
* @return Channel[]
*/
public function all(): array
{
return $this->channels;
}

/**
* Get count of channels.
*
* @return int
*/
public function count(): int
{
return count($this->channels);
}

/**
* Create new channel.
*
* @param string $id
* @return Channel
*/
public function create(string $id): Channel
{
$channel = new Channel($id);

$this->channels[$id] = $channel;

return $this->channels[$id];
}

/**
* Create channel if not exists, get channel if exists.
*
* @param string $id
* @return Channel
*/
public function createOrGet(string $id): Channel
{
return $this->channels[$id] ?? $this->create($id);
}

/**
* Delete channel.
*
* @param Channel|string $id
* @return void
*/
public function remove(Channel|string $channel): void
{
if ($channel instanceof Channel) {
$channel = $channel->id();
}

unset($this->channels[$channel]);
}

/**
* Destroy and remove channel.
*
* @param Channel|string $channel
* @return void
*/
public function destroy(Channel|string $channel): void
{
if (is_string($channel)) {
$channel = $this->get($channel);
}

$channel->leave($channel->connections());

unset($this->channels[$channel->id()]);
unset($channel);
}

/**
* Get a channel.
*
* @param string $id
* @return Channel|null
*/
public function get(string $id): ?Channel
{
$channel = $this->channels[$id] ?? null;

return $channel;
}

/**
* @param Channel $channel
* @return self
*/
public function add(Channel $channel): self
{
$this->channels[$channel->id()] = $channel;

return $this;
}

/**
* Checks that the channel exists.
*
* @param string $id
* @return bool
*/
public function has(string $id): bool
{
return isset($this->channels[$id]);
}
}
Loading

0 comments on commit 8e71ae5

Please sign in to comment.