Skip to content

Commit

Permalink
[ROCKETMQ-171] Initialized the PHP_SDK basic structure closes #9
Browse files Browse the repository at this point in the history
  • Loading branch information
netroby authored and dongeforever committed Apr 11, 2017
1 parent ed9ad51 commit 950af6e
Show file tree
Hide file tree
Showing 17 changed files with 895 additions and 1 deletion.
4 changes: 3 additions & 1 deletion rocketmq-php/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ coverage.out
*.log
tags
temp_parser_file
y.output
y.output
/vendor/
.vscode/
16 changes: 16 additions & 0 deletions rocketmq-php/composer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"name": "rocketmq/rocketmq-php-sdk",
"description": "RocketMQ PHP SDK written with pure php code.",
"type": "library",
"license": "Apache-2.0",
"authors": [{
"name": "huzhifeng",
"email": "huzhifeng@douyu.tv"
}],
"require": {},
"autoload": {
"psr-4": {
"RocketMQ\\": "src/"
}
}
}
53 changes: 53 additions & 0 deletions rocketmq-php/example/simple/AsyncProducer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?php
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use RocketMQ\Client\Producer\DefaultMQProducer;
use RocketMQ\Common\Message\Message;
use RocketMQ\Remoting\Common\RemotingHelper;

$producer = new DefaultMQProducer("Jodie_Daily_test");
$producer->start();
$producer->setRetryTimesWhenSendAsyncFailed(0);

for ($i = 0; $i < 10000000; $i++) {

try {
$index = $i;
$msg = new Message("Jodie_topic_1023",
"TagA",
"OrderID188",
"Hello world" . getBytes(RemotingHelper::DEFAULT_CHARSET));
$producer->send($msg, new class() extends SendCallback() {
public
function onSuccess($sendResult)
{
printf("%-10d OK %s %n", $index, $sendResult->getMsgId());
}

public
function onException($e)
{
printf("%-10d Exception %s %n", $index, $e);
$e->printStackTrace();
}
});
} catch (\Exception $e) {
echo $e->getTraceAsString();
}

}
$producer->shutdown();
36 changes: 36 additions & 0 deletions rocketmq-php/example/simple/Producer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use RocketMQ\Client\Producer\DefaultMQProducer;
use RocketMQ\Common\Message\Message;

$producer = new DefaultMQProducer("ProducerGroupName");
$producer->start();

for ($i = 0; $i < 10000000; $i++) {
try {
$msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world");
$sendResult = $producer->send($msg);
echo $sendResult;
} catch (\Exception $e) {
echo $e->getMessage() . PHP_EOL . $e->getTraceAsString();
}
}
$producer->shutdown();
3 changes: 3 additions & 0 deletions rocketmq-php/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# RocketMQ PHP SDK

This is PHP SDK for RocketMQ. Written with pure PHP language.
27 changes: 27 additions & 0 deletions rocketmq-php/src/Client/Common/ClientErrorCode.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace RocketMQ\Client\Common;

class ClientErrorCode
{
const CONNECT_BROKER_EXCEPTION = 10001;
const ACCESSS_BROKER_TIMEOUT = 10002;
const BROKER_NOT_EXIST_EXCEPTION = 10003;
const NO_NAME_SERVER_EXCEPTION = 10004;
const NOT_FOUND_TOPIC_EXCEPTION = 10005;
}
23 changes: 23 additions & 0 deletions rocketmq-php/src/Client/Exception/MQBrokerException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace RocketMQ\Client\Exception;

class MQBrokerException extends \Exception
{

}
28 changes: 28 additions & 0 deletions rocketmq-php/src/Client/Exception/MQClientException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace RocketMQ\Client\Exception;

class MQClientException extends \Exception
{

public function setResponseCode($code)
{
$this->responseCode = $code;
return $this;
}
}
107 changes: 107 additions & 0 deletions rocketmq-php/src/Client/Latency/MQFaultStrategy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
<?php
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace RocketMQ\Client\Latency;

class MQFaultStrategy
{
public $log;
public $latencyFaultTolerance;
public $sendLatencyFaultEnable = false;
public $latencyMax = [50, 100, 550, 1000, 2000, 3000, 15000];
public $notAvailableDuration = [0, 0, 30000, 60000, 120000, 180000, 600000];
public function __construct()
{
$this->log = ClientLogger.getLog();
$this->latencyFaultTolerance = new LatencyFaultToleranceImpl();
}
public function getNotAvailableDuration()
{
return $this->notAvailableDuration;
}
public function setNotAvailableDuration($notAvailableDuration)
{
$this->notAvailableDuration = $notAvailableDuration;
}

public function getLatencyMax()
{
return $this->latencyMax;
}

public function setLatencyMax($latencyMax)
{
$this->latencyMax = $latencyMax;
}

public function isSendLatencyFaultEnable()
{
return $this->sendLatencyFaultEnable;
}
public function setSendLatencyFaultEnable($sendLatencyFaultEnable)
{
$this->sendLatencyFaultEnable = $sendLatencyFaultEnable;
}
public function selectOneMessageQueue($tpInfo, $lastBrokerName)
{
if ($this->sendLatencyFaultEnable) {
try {
$index = $tpInfo->getSendWhichQueue()->getAndIncrement();
for ($i = 0; $i < strlen($tpInfo->getMessageQueueList()); $i++) {
$pos = abs($index++) % strlen(tpInfo.getMessageQueueList());
if ($pos < 0)
$pos = 0;
$mq = $tpInfo->getMessageQueueList()->get($pos);
if ($this->latencyFaultTolerance->isAvailable($mq->getBrokerName())) {
if (null == $lastBrokerName || $mq->getBrokerName() == $lastBrokerName)
return $mq;
}
}

$notBestBroker = $this->latencyFaultTolerance->pickOneAtLeast();
$writeQueueNums = $tpInfo->getQueueIdByBroker($notBestBroker);
if ($writeQueueNums > 0) {
$mq = $tpInfo->selectOneMessageQueue();
if ($notBestBroker != null) {
$mq->setBrokerName($notBestBroker);
$mq->setQueueId($tpInfo->getSendWhichQueue()->getAndIncrement() % $writeQueueNums);
}
return $mq;
}
else {
$this->latencyFaultTolerance->remove($notBestBroker);
}
}
catch (\Exception $e) {
$this->log->error("Error occurred when selecting message queue", e);
}

return $this->tpInfo->selectOneMessageQueue();
}

return $this->tpInfo->selectOneMessageQueue($lastBrokerName);
}
public function updateFaultItem($brokerName, $currentLatency, $isolation)
{

}

public function computeNotAvailableDuration($currentLatency)
{

}
}
Loading

0 comments on commit 950af6e

Please sign in to comment.