Skip to content

Commit

Permalink
Merge pull request #8 from finaldie/0.8
Browse files Browse the repository at this point in the history
Merge branch 0.8 into master
  • Loading branch information
finaldie committed Jan 1, 2019
2 parents acd952a + 7e93bb7 commit 14dc9dd
Show file tree
Hide file tree
Showing 18 changed files with 160 additions and 82 deletions.
11 changes: 11 additions & 0 deletions .ycm_extra_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,19 @@
# For more information, please refer to <http://unlicense.org/>

import os
import os.path as p
import ycm_core

DIR_OF_THIS_SCRIPT = p.abspath(p.dirname(__file__))

def PythonSysPath(**kwargs):
sys_path: list = kwargs['sys_path']

dependencies = [p.join(DIR_OF_THIS_SCRIPT, 'src', 'common-py')]
sys_path[0:0] = dependencies

return sys_path

# These are the compilation flags that will be used in case there's no
# compilation database set (by default, one is not set).
# CHANGE THIS LIST OF FLAGS. YES, THIS IS THE DROID YOU HAVE BEEN LOOKING FOR.
Expand Down
3 changes: 3 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
* 2019-01-01 0.8.2
* Upgrade `skull` to 1.4.2
* Refine code structure
* 2018-12-23 0.8.1
* Upgrade `skull` to 1.4.1
* Upgrade `proto` to proto3
Expand Down
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License

Copyright (c) 2016-2018 Yuzhang Hu
Copyright (c) 2016-2019 Yuzhang Hu

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ DEPLOY_LOG_ROOT := $(DEPLOY_DIR_ROOT)/log
DEPLOY_ETC_ROOT := $(DEPLOY_DIR_ROOT)/etc

# Get all the sub dirs which have Makefile
COMMONS := $(shell find src/common -maxdepth 2 -name Makefile)
COMMON-PY := $(shell test -d src/common-py && find src/common-py -maxdepth 2 -name Makefile)
COMMON-CPP := $(shell test -d src/common-cpp && find src/common-cpp -maxdepth 1 -name Makefile)
MODS := $(shell find src/modules -maxdepth 2 -name Makefile)
SRVS := $(shell find src/services -maxdepth 2 -name Makefile)

SUBS = \
$(COMMONS) \
$(COMMON-PY) \
$(COMMON-CPP) \
$(MODS) \
$(SRVS)

Expand Down
2 changes: 1 addition & 1 deletion deps/http-parser
Submodule http-parser updated 4 files
+2 −2 Makefile
+20 −6 http_parser.c
+2 −2 http_parser.h
+21 −0 test.c
2 changes: 1 addition & 1 deletion deps/skull
Submodule skull updated 43 files
+148 −0 .clang-format
+8 −0 .gitignore
+6 −3 .travis.yml
+11 −0 ChangeLog.md
+1 −1 LICENSE
+1 −0 Makefile
+12 −6 scripts/bin/skull-addr-remap.py
+5 −5 scripts/bin/skull-config
+1 −1 scripts/bin/skull-config-utils.py
+2 −4 scripts/lib/skull_action_common.bash
+1 −1 scripts/lib/skull_action_create.bash
+4 −2 scripts/share/Makefile.tpl
+11 −0 scripts/share/ycm_extra_conf.py
+5 −0 src/Makefile.inc
+5 −3 src/api/sk_time.h
+5 −5 src/api/sk_txn.h
+1 −0 src/api/sk_types.h
+12 −12 src/common/sk_malloc.c
+1 −1 src/common/sk_queue.c
+22 −6 src/common/sk_time.c
+10 −10 src/common/sk_txn.c
+2 −2 src/components/sk_admin.c
+6 −3 src/components/sk_config.c
+14 −14 src/components/sk_ep_pool.c
+5 −5 src/pto/sk_pto_service_task_cb.c
+12 −11 src/pto/sk_pto_workflow_run.c
+3 −3 src/user-cpp/lib/skull_actions.bash
+8 −8 src/user-cpp/share/Makefile.common.targets
+2 −1 src/user-cpp/share/Makefile.inc
+2 −2 src/user-cpp/share/ycm_extra_conf.py
+3 −0 src/user-py/Makefile
+30 −30 src/user-py/lib/skull_actions.bash
+1 −1 src/user-py/setup.py
+67 −54 src/user-py/share/module.py
+3 −2 src/user-py/skull/__init__.py
+42 −10 src/user-py/skull/client.py
+8 −3 src/user-py/skull/descpool.py
+38 −7 src/user-py/skull/http.py
+65 −38 src/user-py/skull/logger.py
+41 −21 src/user-py/skull/module_executor.py
+62 −47 src/user-py/skull/module_loader.py
+61 −39 src/user-py/skull/txn.py
+11 −4 src/user-py/skull/txndata.py
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
16 changes: 16 additions & 0 deletions src/common-py/common/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# protobuf files
*.pb*.py
*.proto

# generated metrics source files
metrics.py

# generated idl header files
protos.py
proto/*

# compiled file for ycm
.ycm_extra_conf.pyc

# python compiled files
*.pyc
File renamed without changes.
File renamed without changes.
19 changes: 12 additions & 7 deletions src/modules/dns_ask/module.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import yaml
import pprint

from skull import *
from skull.txn import *
from skull import logger
from skull.txn import Txn

from common import *
from common.proto import *
from common.proto import service_dns_query_req_pto


##
# Module Init Entry, be called when start phase
Expand All @@ -17,13 +17,15 @@ def module_init(config):
logger.info('ModuleInit', 'config: {}'.format(pprint.pformat(config)))
return True


##
# Module Release Function, be called when shutdown phase
#
def module_release():
logger.debug("dns_ask module release")
return


##
# Module Runnable Entry, be called when this module be picked up in current
# workflow
Expand Down Expand Up @@ -51,6 +53,7 @@ def module_run(txn):
logger.error("DNS_E1", "Dns iocall failed, ret: {}".format(ret))
return False


def _dns_response(txn, iostatus, api_name, request_msg, response_msg):
if iostatus != Txn.IO_OK:
logger.error("DNS_E2", "Dns response IO error: {}".format(iostatus))
Expand All @@ -60,9 +63,11 @@ def _dns_response(txn, iostatus, api_name, request_msg, response_msg):

for record in response_msg.record:
if logger.isDebugEnabled():
logger.debug("For question: {}, qtype: {}, got ip: {}, ttl: {}".format(
sharedData.question, sharedData.qtype, record.ip, record.ttl))
logger.debug(
"For question: {}, qtype: {}, got ip: {}, ttl: {}".format(
sharedData.question, sharedData.qtype, record.ip,
record.ttl))

sharedData.record.add(ip = record.ip, ttl = record.ttl)
sharedData.record.add(ip=record.ip, ttl=record.ttl)

return True
50 changes: 29 additions & 21 deletions src/modules/ranking/module.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import yaml
import pprint
import time

from skull import *
from skull.txn import *
from skull import logger
from skull.txn import Txn

from common import *
from common.proto import *
from common.proto import service_ranking_rank_req_pto

from . import ranking

Expand All @@ -15,6 +13,7 @@
CFG_LOW_LATENCY_BAR = 0
CFG_LATENCY_FACTOR = 1.2


##
# Module Init Entry, be called when start phase
#
Expand All @@ -31,17 +30,19 @@ def module_init(config):
CFG_MIN_TTL = config['min_ttl']
CFG_MAX_TTL = config['max_ttl']
CFG_LOW_LATENCY_BAR = config['low_latency_bar']
CFG_LATENCY_FACTOR = config['latency_factor']
CFG_LATENCY_FACTOR = config['latency_factor']

return True


##
# Module Release Function, be called when shutdown phase
#
def module_release():
logger.debug("py module release")
return


##
# Module Runnable Entry, be called when this module be picked up in current
# workflow
Expand All @@ -52,59 +53,67 @@ def module_release():
# - False if error occurred
def module_run(txn):
sharedData = txn.data()
question = sharedData.question
question = sharedData.question
question_type = sharedData.qtype

# Return record directly when total number <= 1
nrecords = len(sharedData.record)
if nrecords <= 1:
for record in sharedData.record:
sharedData.rankingRecord.add(ip = record.ip, ttl = record.ttl)
sharedData.rankingRecord.add(ip=record.ip, ttl=record.ttl)
return True

# NRecords > 1, then get the final records from ranking services
rank_query = service_ranking_rank_req_pto.rank_req()
rank_query.question = question
rank_query.qtype = question_type
rank_query.qtype = question_type

for record in sharedData.record:
rank_query.rRecord.add(ip = record.ip, expiredTime = int(time.time()) + record.ttl)
rank_query.rRecord.add(
ip=record.ip, expiredTime=int(time.time()) + record.ttl)

ret = txn.iocall('ranking', 'rank', rank_query, api_cb=_ranking_response)
if ret == Txn.IO_OK:
return True
else:
logger.error("Ranking_E1", "Ranking iocall failed, ret: {}".format(ret))
logger.error("Ranking_E1",
"Ranking iocall failed, ret: {}".format(ret))
return False


def _ranking_response(txn, iostatus, api_name, request_msg, response_msg):
if iostatus != Txn.IO_OK:
logger.error("Ranking_E2", "Ranking response IO error: {}".format(iostatus))
logger.error("Ranking_E2",
"Ranking response IO error: {}".format(iostatus))
return False

#print "response: {}".format(response_msg)
# print "response: {}".format(response_msg)
sharedData = txn.data()

# If the total records count <= 1, just store and return
nrecords = len(response_msg.result)
if nrecords <= 1:
for record in response_msg.result:
sharedData.rankingRecord.add(ip = record.ip, ttl = record.ttl)
sharedData.rankingRecord.add(ip=record.ip, ttl=record.ttl)
return True

# Do the score and rank
global CFG_LOW_LATENCY_BAR
global CFG_LATENCY_FACTOR
scoringResults = ranking.score(response_msg.result)
rankingResults, filtered = ranking.rank(scoringResults, CFG_LOW_LATENCY_BAR,
CFG_LATENCY_FACTOR)
rankingResults, filtered = ranking.rank(
scoringResults, CFG_LOW_LATENCY_BAR, CFG_LATENCY_FACTOR)

logger.info("{Ranking}", "question: {} ,total: {} ,filtered {} ,Results: {}".format(
request_msg.question, len(scoringResults), filtered, rankingResults))
logger.info(
"{Ranking}",
"question: {} ,total: {} ,filtered {} ,Results: {}".format(
request_msg.question, len(scoringResults), filtered,
rankingResults))

global CFG_MIN_TTL
for record in rankingResults:
# Recalculate ttl when there is no scoring information and ttl > CFG_MIN_TTL
# Recalculate ttl when there is no scoring information and ttl >
# CFG_MIN_TTL
ttl = record['ttl']
latency = record['avgLatency']
httpInfoCnt = record['httpInfoCnt']
Expand All @@ -118,7 +127,6 @@ def _ranking_response(txn, iostatus, api_name, request_msg, response_msg):
elif ttl > CFG_MAX_TTL:
ttl = CFG_MAX_TTL

sharedData.rankingRecord.add(ip = record['ip'], ttl = ttl)
sharedData.rankingRecord.add(ip=record['ip'], ttl=ttl)

return True

33 changes: 21 additions & 12 deletions src/modules/ranking/ranking.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
from skull import *
from skull import logger


def _create_result_item(ip, ttl, latency, httpInfoCnt):
return {
'ip' : ip,
'ip': ip,
'ttl': ttl,
'avgLatency': latency, # Integer
'httpInfoCnt': httpInfoCnt
}


def _item_cmp(left, right):
return left['avgLatency'] - right['avgLatency']


# Score each record, and return a list of record with ascending order latency
def score(records):
scoringTmp = []
Expand All @@ -29,14 +32,15 @@ def score(records):
# 0: OK
# 1: ERROR
# 2. TIMEOUT
status = httpInfo.status
status = httpInfo.status
httpCode = httpInfo.httpCode
latency = httpInfo.latency
latency = httpInfo.latency

if status != 1:
totalLatency += latency
else:
totalLatency += 2000 # For error connection record, increase 2000ms
# For error connection record, increase 2000ms
totalLatency += 2000

validRecords += 1

Expand All @@ -46,19 +50,22 @@ def score(records):
if validRecords > 0:
avgLatency = int(totalLatency / validRecords)

scoringTmp.append(_create_result_item(ip, ttl, avgLatency, validRecords))
scoringTmp.append(
_create_result_item(ip, ttl, avgLatency, validRecords))

# Ranking
if logger.isDebugEnabled():
logger.debug("scoringTmp: {}".format(scoringTmp))
logger.debug("scoringTmp: {}".format(scoringTmp))

scoringResults = sorted(scoringTmp, key = lambda record: record['avgLatency'])
scoringResults = sorted(
scoringTmp, key=lambda record: record['avgLatency'])

if logger.isDebugEnabled():
logger.debug("scoringResults: {}".format(scoringResults))

return scoringResults


# Rank and filter high latency record
def rank(scoringResults, low_latency_bar, latency_factor):
rankingResults = []
Expand All @@ -69,8 +76,8 @@ def rank(scoringResults, low_latency_bar, latency_factor):
if nrecords <= 1:
return scoringResults, 0

# 2. Keep latency <= 20ms, otherwise filter the record shouldn't larger than
# 1.5x of the first record
# 2. Keep latency <= 20ms, otherwise filter the record shouldn't larger
# than 1.5x of the first record
baseRecord = None
baseRecordLatency = 0

Expand All @@ -90,8 +97,10 @@ def rank(scoringResults, low_latency_bar, latency_factor):
if factor <= latency_factor:
rankingResults.append(scoringRecord)
else:
logger.info("{RankingUtil}", "Ranking terminated at latency: {}, factor: {}, base: {}".format(
latency, factor, baseRecordLatency));
logger.info(
"{RankingUtil}",
"Ranking terminated at latency: {}, factor: {}, base: {}".
format(latency, factor, baseRecordLatency))
break

nRankingRecords = len(rankingResults)
Expand Down

0 comments on commit 14dc9dd

Please sign in to comment.