Skip to content
Permalink
Browse files
update
  • Loading branch information
wangjinxi committed Jun 19, 2019
1 parent 17d9666 commit 1c33271076b56ce847da5ba82ab806985eed49c9
Showing 13 changed files with 396,474 additions and 86 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

@@ -1,5 +1,5 @@
TARGET = release/agent
CFLAGS = -g -Wall -lpthread -lzookeeper_mt -L./lib -lhiredis -lzlog -I./include
CFLAGS = -g -Wall -lpthread -lzookeeper_mt -L./lib -lhiredis -lzlog -I./include
CC = gcc
$(TARGET):service/fsof_mq.c src/fsof_main.c service/fsof_zookeeper.c common/strmap.c service/fsof_redis.c service/fsof_log.c common/fsof_url.c common/fsof_util.c service/mt_array.c
$(CC) $^ $(CFLAGS) -o $@
Binary file not shown.
Binary file not shown.
@@ -1,6 +1,8 @@
<?php

namespace com\fenqile\fsof\common\protocol\fsof;

use com\fenqile\fsof\consumer\Type;
/**
*
* Dubbo网络协议
@@ -36,7 +38,9 @@ class DubboParser
const DUBBO_PROTOCOL_MAGIC = 0xdabb;

//serialize 方案编号
const DUBBO_PROTOCOL_SERIALIZE_FAST_JSON = 6; //serialization code
const DUBBO_PROTOCOL_SERIALIZE_FAST_JSON = 6; //fastjson serialization code

const DUBBO_PROTOCOL_SERIALIZE_HESSIAN2 = 2; //hessian2 serialization code


//fsof协议包头cmd字段含义
@@ -54,34 +58,48 @@ class DubboParser

private $logger;

public function __construct()
public function __construct()
{
$this->logger = \Logger::getLogger(__CLASS__);
}

public function packRequest(DubboRequest &$request)
public function packRequest(DubboRequest $request)
{
$reqData = json_encode($request->getDubboVersion()) . PHP_EOL .
json_encode($request->getService()) . PHP_EOL .
json_encode($request->getVersion()) . PHP_EOL .
json_encode($request->getMethod()) . PHP_EOL ;
json_encode($request->getService()) . PHP_EOL;
if($request->getVersion()){
$reqData .= json_encode($request->getVersion()) . PHP_EOL;
}else{
$reqData .= '""' . PHP_EOL;
}
$reqData .= json_encode($request->getMethod()) . PHP_EOL;
$typeStr = "";
for($i=0;$i < count($request->getTypes());$i++){
$typeStr = $typeStr.$request->getTypes()[$i];
foreach ($request->getTypes() as $type){
$typeStr .= $type;
}
$reqData = $reqData.json_encode($typeStr) . PHP_EOL;
for ($x = 0; $x < count($request->getParams()); $x++) {
$reqData = $reqData . json_encode($request->getParams()[$x]) . PHP_EOL;
$reqData .= json_encode($typeStr) . PHP_EOL;
foreach ($request->getParams() as $value){
if($value instanceof \stdClass){
$value = $value->object;
}elseif($value instanceof Type){
$value = $value->value;
}
$reqData .= json_encode($value) . PHP_EOL;
}

$attach = array();
$attach['path'] = $request->getService();
$attach['interface'] = $request->getService();
$attach['group'] = $request->getGroup();
if($request->getGroup()){
$attach['group'] = $request->getGroup();
}
if($request->getVersion()){
$attach['version'] = $request->getVersion();
}
$attach['timeout'] = $request->getTimeout();
$attach['version'] = $request->getVersion();
$request->setAttach($attach);
$reqData = $reqData . json_encode($request->getAttach());

echo $reqData;
$upper = ($request->getSn() & self::UPPER_MASK) >> 32;
$lower = $request->getSn() & self::LOWER_MASK;
$flag = (self::FLAG_REQUEST | self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON);
@@ -98,63 +116,62 @@ public function packRequest(DubboRequest &$request)
}


public function parseResponseHeader(DubboResponse &$response)
public function parseResponseHeader(DubboResponse $response)
{
$res_header = substr($response->getFullData(), 0, self::PACKAGE_HEDA_LEN);
$format = 'n1magic/C1flag/C1status/N1upper/N1lower/N1len';
$_arr = unpack($format, $res_header);
print_r($_arr);
$response->setStatus($_arr['status']);
$response->setSn($_arr["upper"] << 32 | $_arr["lower"]);
$flag = $_arr["flag"];
if (($flag & self::FLAG_HEARTBEAT_EVENT) != 0)
{
if (($flag & self::FLAG_HEARTBEAT_EVENT) != 0) {
$response->setHeartbeatEvent(true);
}
$response->setSerialization($flag & self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON);
$response->setLen($_arr["len"]);
return $response;
}

public function parseResponseBody(DubboResponse &$response)
public function parseResponseBody(DubboResponse $response)
{
$_data = substr($response->getFullData(), self::PACKAGE_HEDA_LEN);
$response->setResponseBody($_data);
$dataArr = array();
if ($_data){
$dataArr = explode(PHP_EOL,$_data);
}
if ($response->getStatus() == DubboResponse::OK)
{
if ($response->isHeartbeatEvent())
{
$response->setResult(json_decode($dataArr[0], true));
} else
{
switch ($dataArr[0])
{
if (DubboResponse::OK == $response->getStatus()) {
if (self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON == $response->getSerialization()) {
list($status, $content) = explode(PHP_EOL, $_data);
switch ($status) {
case self::RESPONSE_NULL_VALUE:
break;
case self::RESPONSE_VALUE:
$response->setResult(json_decode($dataArr[1], true));
$response->setResult(json_decode($content, true));
break;
case self::RESPONSE_WITH_EXCEPTION:
$exception = json_decode($dataArr[1], true);
if(is_array($exception) && array_key_exists('message', $exception)){
$exception = json_decode($content, true);
if (is_array($exception) && array_key_exists('message', $exception)) {
throw new \Exception($exception['message']);
}else if(is_string($exception)){
} else if (is_string($exception)) {
throw new \Exception($exception);
}else{
} else {
throw new \Exception("provider occur error");
}
break;
default:
return false;
}
} else if(self::DUBBO_PROTOCOL_SERIALIZE_HESSIAN2 == $response->getSerialization() ){
$_data = substr($_data, 1);
$hess_stream = new \HessianStream($_data);
$hess_options = new \HessianOptions();
$hess_factory = new \HessianFactory();
$parser = $hess_factory->getParser($hess_stream, $hess_options);
$content = $parser->parseReply();
$response->setResult($content);
}
return $response;
} else {
throw new \Exception(json_decode($dataArr[0], true));
throw new \Exception($_data);
}
return $response;
}


@@ -165,8 +182,7 @@ public function parseRequestHeader(DubboRequest &$request)
$_arr = unpack($format, $_data);
$flag = $_arr['flag'];
$request->setTwoWay(($flag & self::FLAG_TWOWAY) != 0);
if (($flag & self::FLAG_HEARTBEAT_EVENT) != 0)
{
if (($flag & self::FLAG_HEARTBEAT_EVENT) != 0) {
$request->setHeartbeatEvent(true);
}
$request->setSerialization($flag & self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON);
@@ -178,57 +194,48 @@ public function parseRequestHeader(DubboRequest &$request)

public function parseRequestBody(DubboRequest &$request)
{
if ($request->getSerialization() != self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON)
{
if ($request->getSerialization() != self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON) {
$this->logger->error("unknown serialization type :" . $request->getSerialization());
return false;
}
$cliData = substr($request->getFullData(), self::PACKAGE_HEDA_LEN);
if ($cliData)
{
if ($request->isHeartbeatEvent())
{
if ($cliData) {
if ($request->isHeartbeatEvent()) {
//心跳请求,不需要数据回送
} else
{
} else {
$dataArr = explode(PHP_EOL, $cliData);
$request->setDubboVersion(json_decode($dataArr[0], true));
$request->setService(json_decode($dataArr[1], true));
$request->setVersion(json_decode($dataArr[2], true));
$methodName = json_decode($dataArr[3],true);
if ($methodName == "\$invoke")
{
$methodName = json_decode($dataArr[3], true);
if ($methodName == "\$invoke") {
//泛化调用
$request->setMethod(json_decode($dataArr[5],true));
$request->setParams(json_decode($dataArr[7],true));
$attach = json_decode($dataArr[8],true);
} else
{
$request->setMethod(json_decode($dataArr[5], true));
$request->setParams(json_decode($dataArr[7], true));
$attach = json_decode($dataArr[8], true);
} else {
//非泛化调用
$request->setMethod($methodName);
$paramTypes = json_decode($dataArr[4], true);
if ($paramTypes == "")
{
if ($paramTypes == "") {
//调用没有参数的方法
$request->setTypes(NULL);
$request->setParams(NULL);
$attach = json_decode($dataArr[5],true);
} else
{
$attach = json_decode($dataArr[5], true);
} else {
$typeArr = explode(";", $paramTypes);
$typeArrLen = count($typeArr);
$request->setParamNum($typeArrLen - 1);
$params = array();
for ($i = 0; $i < $typeArrLen - 1; $i++) {
$params[$i] = json_decode($dataArr[5 + $i],true);
$params[$i] = json_decode($dataArr[5 + $i], true);
}
$request->setParams($params);
$attach = json_decode($dataArr[5 + ($typeArrLen - 1)],true);
$attach = json_decode($dataArr[5 + ($typeArrLen - 1)], true);
}
}
$request->setAttach($attach);
if (array_key_exists('group', $attach))
{
if (array_key_exists('group', $attach)) {
$request->setGroup($attach['group']);
}
return $request;
@@ -241,17 +248,17 @@ public function parseRequestBody(DubboRequest &$request)
public function packResponse(DubboResponse &$response)
{
if ($response->getStatus() != DubboResponse::OK) {
$resData = json_encode($response->getErrorMsg()) ;
$resData = json_encode($response->getErrorMsg());
} else {
if($response->getErrorMsg() != NULL && $response->getErrorMsg() != ""){
if ($response->getErrorMsg() != NULL && $response->getErrorMsg() != "") {
$resData = json_encode(self::RESPONSE_WITH_EXCEPTION) . PHP_EOL . json_encode($response->getErrorMsg());
}else if ($response->getResult() == NULL) {
} else if ($response->getResult() == NULL) {
$resData = json_encode(self::RESPONSE_NULL_VALUE);
} else {
$resData = json_encode(self::RESPONSE_VALUE) . PHP_EOL . json_encode($response->getResult());
}
}
$resData = $resData.PHP_EOL;
$resData = $resData . PHP_EOL;
$upper = ($response->getSn() & self::UPPER_MASK) >> 32;
$lower = $response->getSn() & self::LOWER_MASK;
$flag = self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON;
@@ -1,6 +1,6 @@
[fsof_container_setting]
;php path
php = '/usr/bin/php'
php = '/usr/local/php-7.1.12/bin/php'

;app's user
user = root
@@ -16,10 +16,10 @@ p2p_mode = false
zklog_level = 0

;zookeepr log path
zklog_path = '/var/fsof/provider/zookeeper.log'
zklog_path = '/home/devops/workspace/dubbo/logs/zookeeper.log'

;zookeeper url list
zk_url_list = http://127.0.0.1:2181
zk_url_list = http://192.168.214.148:2181

;provider overload mode switch
overload_mode = true

0 comments on commit 1c33271

Please sign in to comment.