In [2]:
import pandas as pd
import numpy as np
import time



In [3]:
%load_ext Cython

In [20]:
!pip install ray

Successfully installed aiohttp-3.7.3 aiohttp-cors-0.7.0 aioredis-1.3.1 async-timeout-3.0.1 blessings-1.7 colorama-0.4.4 colorful-0.5.4 contextvars-2.4 gpustat-0.6.0 hiredis-1.1.0 idna-ssl-1.1.0 immutables-0.14 multidict-5.1.0 opencensus-0.7.11 opencensus-context-0.1.2 py-spy-0.3.3 ray-1.1.0 redis-3.5.3 yarl-1.6.3


In [5]:
def target_mean_v1(data, y_name, x_name):
    result = np.zeros(data.shape[0])
    for i in range(data.shape[0]):
        groupby_result = data[data.index != i].groupby([x_name], as_index=False).agg(['mean', 'count'])
        result[i] = groupby_result.loc[groupby_result.index == data.loc[i, x_name], (y_name, 'mean')]
    return result

In [6]:
def target_mean_v2(data, y_name, x_name):
    result = np.zeros(data.shape[0])
    value_dict = dict()
    count_dict = dict()
    for i in range(data.shape[0]):
        if data.loc[i, x_name] not in value_dict.keys():
            value_dict[data.loc[i, x_name]] = data.loc[i, y_name]
            count_dict[data.loc[i, x_name]] = 1
        else:
            value_dict[data.loc[i, x_name]] += data.loc[i, y_name]
            count_dict[data.loc[i, x_name]] += 1
    for i in range(data.shape[0]):
        result[i] = (value_dict[data.loc[i, x_name]] - data.loc[i, y_name]) / (count_dict[data.loc[i, x_name]] - 1)
    return result

In [7]:
def main():
    y = np.random.randint(2, size=(5000, 1))
    x = np.random.randint(10, size=(5000, 1))
    data = pd.DataFrame(np.concatenate([y, x], axis=1), columns=['y', 'x'])
    result_1 = target_mean_v1(data, 'y', 'x')
    result_2 = target_mean_v2(data, 'y', 'x')
    diff = np.linalg.norm(result_1 - result_2)
    print(diff)
main()

0.0


In [8]:
%%cython
cimport numpy as cnp
cpdef ctarget_mean_v1(data,cnp.ndarray result, str y_name, str x_name):
    value_dict = dict()
    count_dict = dict()
    for i in range(data.shape[0]):
        if data.loc[i, x_name] not in value_dict.keys():
            value_dict[data.loc[i, x_name]] = data.loc[i, y_name]
            count_dict[data.loc[i, x_name]] = 1
        else:
            value_dict[data.loc[i, x_name]] += data.loc[i, y_name]
            count_dict[data.loc[i, x_name]] += 1
    for i in range(data.shape[0]):
        result[i] = (value_dict[data.loc[i, x_name]] - data.loc[i, y_name]) / (count_dict[data.loc[i, x_name]] - 1)
    return result

In [9]:
%%cython
cimport numpy as cnp
cpdef ctarget_mean_v2(cnp.ndarray y,cnp.ndarray x,cnp.ndarray result):
    value_dict = dict()
    count_dict = dict()
    cdef int x_,y_
    for i in range(y.shape[0]):
        x_ = x[i][0]
        y_ = y[i][0]
        if x_ not in value_dict.keys():
            value_dict[x_] = y_
            count_dict[x_] = 1
        else:
            value_dict[x_] += y_
            count_dict[x_] += 1
    for i in range(y.shape[0]):
        x_ = x[i][0]
        y_ = y[i][0]
        result[i] = (value_dict[x_] - y_) / (count_dict[x_] - 1)
    return result

In [18]:
%%cython
cimport numpy as cnp
cpdef ctarget_mean_v3(cnp.ndarray y,cnp.ndarray x,cnp.ndarray result):
    value_dict = dict()
    count_dict = dict()
    for i in range(y.shape[0]):
        x_ = x[i][0]
        y_ = y[i][0]
        if x_ not in value_dict.keys():
            value_dict[x_] = y_
            count_dict[x_] = 1
        else:
            value_dict[x_] += y_
            count_dict[x_] += 1
    for i in range(y.shape[0]):
        x_ = x[i][0]
        y_ = y[i][0]
        result[i] = (value_dict[x_] - y_) / (count_dict[x_] - 1)
    return result

In [21]:
import ray

In [48]:
@ray.remote
def func2(i,dict_s):

  value_dict = dict_s['r_value_dict']
  count_dict = dict_s['r_count_dict']
  data = dict_s['r_data']
  x_name = dict_s['r_x_name']
  y_name = dict_s['r_y_name']
  return (value_dict[data.loc[i, x_name]] - data.loc[i, y_name]) / (count_dict[data.loc[i, x_name]] - 1)

In [49]:
def target_mean_v3(data, y_name, x_name):
    # result = np.zeros(data.shape[0])
    value_dict = dict()
    count_dict = dict()
    for i in range(data.shape[0]):
        if data.loc[i, x_name] not in value_dict.keys():
            value_dict[data.loc[i, x_name]] = data.loc[i, y_name]
            count_dict[data.loc[i, x_name]] = 1
        else:
            value_dict[data.loc[i, x_name]] += data.loc[i, y_name]
            count_dict[data.loc[i, x_name]] += 1
    ray.shutdown()
    ray.init()
    dict_s = ray.put({
        "r_value_dict":value_dict,
        "r_count_dict":count_dict,
        "r_data":data,
        "r_x_name":x_name,
        "r_y_name":y_name,
    })

    futures = [func2.remote(i,dict_s) for i in range(data.shape[0])]
    result = ray.get(futures)

    return result

In [50]:
# 验证和源代码的一致性
def cmain():
  y = np.random.randint(2, size=(5000, 1))
  x = np.random.randint(10, size=(5000, 1))
  
  data = pd.DataFrame(np.concatenate([y, x], axis=1), columns=['y', 'x'])
  result= np.zeros(data.shape[0])
  result_1 = target_mean_v1(data, 'y', 'x')
  result_2 = ctarget_mean_v1(data,result, 'y', 'x')
  result_3 = ctarget_mean_v2(y,x,result)
  result_4 = target_mean_v3(data, 'y', 'x')
  diff1 = np.linalg.norm(result_1 - result_2)
  diff2 = np.linalg.norm(result_1 - result_3)
  diff3 = np.linalg.norm(result_1 - np.array(result_4))

  print(diff1,diff2,diff3)
cmain()


2021-01-06 10:00:16,310	INFO services.py:1173 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


0.0 0.0 0.0


In [12]:
y = np.random.randint(2, size=(5000, 1))
x = np.random.randint(10, size=(5000, 1))
data = pd.DataFrame(np.concatenate([y, x], axis=1), columns=['y', 'x'])
result= np.zeros(data.shape[0])

In [13]:
#原代码
%%timeit -r 10
target_mean_v1(data, 'y', 'x')

1 loop, best of 10: 23.8 s per loop


In [14]:
#老师临时指导版
%%timeit -r 10
target_mean_v2(data, 'y', 'x')

1 loop, best of 10: 271 ms per loop


In [15]:
#光cython装饰器
%%timeit -r 10
ctarget_mean_v1(data, result, 'y', 'x')

1 loop, best of 10: 274 ms per loop


In [16]:
# 加了类型注释
%%timeit -r 10
ctarget_mean_v2(y ,x ,result )

100 loops, best of 10: 10.3 ms per loop


In [19]:
#取消两个中间变量的类型注释
%%timeit -r 10
ctarget_mean_v3(y ,x ,result)

100 loops, best of 10: 9.55 ms per loop


In [44]:
# 使用多进程ray
%%timeit -r 10
target_mean_v3(data, 'y', 'x')

2021-01-06 09:47:48,085	INFO services.py:1173 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m
2021-01-06 09:48:01,781	INFO services.py:1173 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m
2021-01-06 09:48:15,477	INFO services.py:1173 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m
2021-01-06 09:48:29,714	INFO services.py:1173 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m
2021-01-06 09:48:44,368	INFO services.py:1173 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m
2021-01-06 09:48:59,010	INFO services.py:1173 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m
2021-01-06 09:49:12,673	INFO services.py:1173 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m
2021-01-06 09:49:26,857	INFO services.py:1173 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m
2021-01-06 09:49:40,442	INFO services.py:1173 -- View the Ray dashboard 

1 loop, best of 10: 13.4 s per loop
