Skip to content

Commit

Permalink
为分布式增加参数
Browse files Browse the repository at this point in the history
  • Loading branch information
ferventdesert committed Jun 30, 2016
1 parent 7bc3486 commit 58485eb
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 14 deletions.
25 changes: 16 additions & 9 deletions distributed.py
@@ -1,10 +1,10 @@
import sys;
from queue import Queue
from multiprocessing.managers import BaseManager
import etl;
import json
import extends;
import time;
rpc_port=8888
authkey= "etlpy".encode('utf-8')
timeout=1;

Expand Down Expand Up @@ -80,20 +80,19 @@ def start(self):

class Slave:

def __init__(self,execute):
def __init__(self):
# 派发出去的作业队列
self.dispatched_job_queue = Queue()
# 完成的作业队列
self.finished_job_queue = Queue()
self.execute=execute
def start(self):
def start(self,execute= True,serverip='127.0.0.1',port=8888):
# 把派发作业队列和完成作业队列注册到网络上
BaseManager.register('get_dispatched_job_queue')
BaseManager.register('get_finished_job_queue')

server = '127.0.0.1'
server = serverip;
print('Connect to server %s...' % server)
manager = BaseManager(address=(server, rpc_port), authkey=authkey)
manager = BaseManager(address=(server, port), authkey=authkey)
manager.connect()
# 使用上面注册的方法获取队列
dispatched_jobs = manager.get_dispatched_job_queue()
Expand All @@ -112,16 +111,24 @@ def start(self):
project= etl.LoadProject_dict(project);
module= project.modules[job.jobname];
count=0
generator= etl.parallel_reduce(module,[ job.config],self.execute)
generator= etl.parallel_reduce(module,[ job.config],execute)
for r in generator:
print(r)
count+=1;
resultjob= JobResult(job.jobname,count,job.id)

finished_jobs.put(resultjob)


if __name__ == '__main__':
slave= Slave(execute=False);
slave.start();
ip='127.0.0.1'
port=8888;
argv=sys.argv;
if len(argv)>1:
ip=argv[1];
if len(argv)>2:
port=int(argv[2]);
slave= Slave();
slave.start(True,ip,port);


15 changes: 11 additions & 4 deletions etl.py
Expand Up @@ -77,7 +77,9 @@ class Executor(ETLTool):
def execute(self,data):
pass;
def process(self,data):
return self.execute(data)
for r in data:
self.execute(r);
yield r;


class Filter(ETLTool):
Expand Down Expand Up @@ -642,10 +644,14 @@ class BfsGE(Generator):
class DictTF(Transformer):
pass;

class FileExistFT(Filter):
def filter(self, item):
class FileExistFT(Transformer):
def __init__(self):
super(FileExistFT,self).__init__();
self.Script = '';
self.OneInput = True;
def transform(self,data):
import os;
return str(os.path.exists(item));
return str(os.path.exists(data));

class MergeRepeatTF(Transformer):
pass;
Expand Down Expand Up @@ -740,6 +746,7 @@ def LoadProject_dict(dic):
task.CrawItems.append(crawlitem)
task.HttpItem= etl_factory(spider.HTTPItem(),proj)
extends.dict_copy_poco(task.HttpItem,module['HttpItem'])
task.HttpItem.Headers=module['HttpItem']["Headers"];
if task is not None:
proj.modules[key]=task;

Expand Down
2 changes: 1 addition & 1 deletion extends.py
Expand Up @@ -123,6 +123,6 @@ def dict_to_poco_type(obj):
def dict_copy_poco(obj,dic):
for key,value in obj.__dict__.items():
if key in dic:
if isinstance(value, (str,int,float)):
if isinstance(dic[key], (str,int,float)):

setattr(obj,key,dic[key])

0 comments on commit 58485eb

Please sign in to comment.