def gen_redis_proto(*args): """根据命令生成 redis 协议字符串""" proto = '' proto += '*' + str(len(args)) + '\r\n' for arg in args: proto += '$' + str(len(str(arg))) + '\r\n' proto += str(arg) + '\r\n' return proto def load_user_to_redis_from_file(cfg_file, user_file): """将文件中的用户信息载入 redis""" conf = ConfigParser.ConfigParser() conf.read(cfg_file) flag_file = os.path.join(conf.get('run','base_dir'),conf.get('run','sync_dir'),conf.get('run', 'habo_flag_file').format(user_data_date.strftime("%Y%m%d"))) # 创建标签信息 t_time = time.time() user_label_index = [] for line in open(flag_file, 'r'): user_label_index.append(line.strip()) user_labels = get_user_labels(cfg_file) write_str = '' for line in open(user_file, 'r'): user_prop = line.strip().split('\t') user_id = user_prop[0] if user_id != 'NULL': for i in range(1,len(user_prop)): label_name = user_label_index[i] label_value = user_prop[i] # if label_value != 'NULL': if label_value != 'NULL' and user_labels[label_name]['rule_value_type'] == 'datetime_range': label_value = int((time.time() - time.mktime(time.strptime(label_value,"%Y%m%d")))/(24*3600)) # 用户行信息 # key = 'userid:{}'.format(user_id) # write_str += gen_redis_proto('hset', key, label_name, label_value) # 该标签取值的用户群 redis_key = '{}:{}'.format(label_name,label_value) write_str += gen_redis_proto('sadd',redis_key, user_id) write_str += gen_redis_proto('zadd',label_name, label_value, redis_key) if len(write_str) > 0: shell_cmd = '{} -p {} -a {} --pipe'.format(conf.get('redis','exec_path'), conf.get('redis','port'), conf.get('redis', 'passwd')) p = subprocess.Popen(shell_cmd, shell = True, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE) # p = subprocess.Popen(shell_cmd, shell = True, stdin = subprocess.PIPE) p.communicate(write_str) def load_user_to_redis(cfg_file): """将用户数据载入 redis""" conf = ConfigParser.ConfigParser() conf.read(cfg_file) r = redis.StrictRedis(port=conf.get('redis','port'),password=conf.get('redis','passwd')) r.flushall() splited_user_files = [] for root,dirname,filename in os.walk(os.path.join(conf.get('run','base_dir'), conf.get('run', 'split_dir'))): for f in filename: splited_user_files.append(os.path.join(root,f)) print(len(splited_user_files)) results = [] pool = multiprocessing.Pool(processes = process_pool_count) for f in splited_user_files: results.append(pool.apply_async(load_user_to_redis_from_file,(cfg_file,f))) pool.close() pool.join()