In [1]:
import networkx as nx

### 1.1 使用networkx构建图

In [None]:
edges = spark.sql('select * from ap_khcp_zhzx.edges')
edges_pd = edges.toPandas()

G = nx.DiGraph()
for _, row in edges_pd.iterrows():
    G.add_edge(row['src'], row['dst'], weight=row['weight'])

### 1.2 找到core_customers已经签约的产品

In [None]:
from collections import defaultdict

signed_products = defaultdict()

core_customers = spark.sql('select id from ap_khcp_zhzx.core_customers').collect()
core_customers = [row.id for row in core_customers]

for customer in core_customers:
    products = []
    # customer的邻接点
    for item in G.adj[customer]:
        if item >= 51 and item <= 69:
            products.append(item)
    
    signed_products[customer] = products

### 1.3 将处理好的已签约产品信息保存到Hive

In [None]:
signed_product = sc.parallelize([(k,) + (','.join(map(str,v)), ) for k, v in signed_products.items()]).toDF(['id', 'signed'])
signed_product.createOrReplaceTempView('signed_product')
sqlContext.sql('drop table ap_khcp_zhzx.signed_product')
sqlContext.sql('create table ap_khcp_zhzx.signed_product as select * from signed_product')

In [None]:
# 测试寻找最短路径的接口
path = nx.shortest_path(G, 59, 88303)
print(path)

### 1.4 采用page rank推荐

通过多进程的方式解决单个personalization page rank速度过慢的问题（采用function的方式）

In [None]:
import operator
import multiprocessing
import networkx as nx
from collections import defaultdict
from operator import itemgetter
from multiprocessing import Pool

def calculatePPR(G, target_vertex_list, ppr_result, recommended_items=20):
    for target_vertex in target_vertex_list:
        ppr = nx.pagerank(G, personalization={target_vertex: 1}, weight='weight')
        ranking = sorted(ppr.items(), key=itemgetter(1), reverse=True)
        ranking = [ key for key, values in ranking if key <= 83 and key >= 51] # updated
        ranking = ranking[:rocommended_items]
        ppr_result[target_vertex] = ranking

if __name__ == "__main__":
    # 提取数据
    vertex_list = spark.sql('select id from ap_khcp_zhzx.core_customers_sszjc where id>121497').collect()
    vertex_list = [item.id for item in vertexz_list[0:160]]
    
    # construct a manager dict for multiple process to access
    mgr = multiprocessing.Manager()
    ppr_result = mgr.dict()
    jobs = []
    
    # construct 32 processes to calculate ppr
    for i in range(32):
        chunks = [vertex_list[x] for x in range(i, len(vertex_list), 32)]
        jobs.append(multiprocessing.Process(target=calculatePPR, args=(G, chunks, ppr_result))
                    
    for j in jobs:
        j.start()
    
    for j in jobs:
        j.join()
    
    print('The calculated personal pagerank result have ' + str(len(ppr_result.keys())) + 'items.')

### 1.5 保存推荐结果

In [None]:
ppr_df = sc.parallelize([(k,) + (','.join(map(str,v)),) for k,v in ppr_result.items()]).toDF(['id', 'recommended'])
ppr_df.createTempView('ppr_result')
sqlContext.sql('drop table ap_khcp_zhzx.recommendation')
sqlContext.sql('create table ap_khcp_zhzx.recommendation as select * from ppr_result')

### 1.6 构建绘图function，传入一个节点，绘制一个包含最短路径，签约产品以及公司标签的图

In [None]:
import matplotlib.pyplot as plt
from collections import defaultdict

def draw_relation(G, target_vertex):
    # 处理id与product名称的mapping关系
    id_product = defaultdict（）
    product.id ={'单位结算卡签约': 51，'定时资金池签约': 52, '对公一户通签约': 53, '电子商业汇票签约': 59, \
                 '现金管理收款签约': 61, '跨境双向池签约': 69}
    for k, v in product_id.items():
        id_product[v] = k
    
    # 将id和product的dict处理成为caption的形式
    cap_str=""
    for k, v in id_product.items():
        cap_str += str(k)
        cap_str += "："
        cap_str += str(v)
        cap_str += "\n"
        
    # 得到所有的边，包含推荐产品之间的最短路径，和已经签约的产品
    edges_list = []
    recommended_items = spark.sql("select recommended from ap_khcp_zhzx.recommendation where id={}".format(target_vertex)).collect()
    recommended_items = recommended_items[0].recommended.split（","）
    recommended_items = [int(item) for item in recommended_items]
    related_items = [k for k, v in G.adj[target_vertex].items()]
    for item in related items:
        edges_list.appen((target_vertex, item))
    for item in recommended_items:
        path = nx.shortest_path(G, target_vertex, item)
        for i in range(1, len(path)):
            edges_list.append((path[i-1], path[i]))
            
    # 将产品之间的关系提取出来，并补充到edges_list中
    pro_rela_edges = spark.sql（"select * from ap_khcp_zhzx.edge_pro_rela"）collect()
    pro_rela_edges = [(dst, src) for dst, src, _ in pro_rela_edges]
    # 将产品之间的关系补充到边的集合中
    edges_list = list(set(edges_list))
    edges_list.extend(pro_rela_edges)
    print(edges_list)
    
    tempG = nx.DiGraph()
    tempG.add_edges_from(edges_list)
    
    # 开始绘制5中不同风格的图，加上一种caption
    # nx.draw(tempG，with_labels=True，font_weight=bold)
    cust_name = spark.sql("select * from ap_khcp_zhzx.cust_no_id where id='{}'".format(target_vertex)).collect()
    cust_name = cust_name[0].name
    # plt.rcParams['font.sans_serif'] = ['SimHei']
    plt.title(u"产品代号：{name}".format(name=cust_name))
    plt.subplots(2, 3, figsize=(15,15))
    
    plt.subplot(231)
    nx.draw_spring(tempG, with_labels=True, font_weight="bold")
    
    plt.subplot(232)
    nx.draw_kamada_kawai(tempG, with_labels=True, font_weight="bold")
    
    plt.subplot(233)
    nx.draw_spectral(tempG, with_labels=True, font_weight="bold")
    
    plt.subplot(234)
    nx.draw_spring(tempG, with_labels=True, font_weight="bold")
    
    plt.subplot(235)
    nx.draw_shell(tempG, with_labels=True, font_weight="bold")
    
    p1t.subplot(236)
    plt.text(.5, .5, cap_str, fontsize=12, horizontalalignment='center', verticalalignment='center')
    
    plt.show（）

### 1.7 批量输出公司与推荐的结算产品之间的最短路径

In [None]:
from collections import namedtuple

target_vertex_list = spark.sql('select id from ap_khcp_zhzx.recommodation').collect()
target_vertex_list = [int(row.id) for row in target_vertex_list]

Record = namedtuple('Record', ['id', 'product', 'path'])
record_list = []
for target_vertex in target_vertex_list:
    # 获得推荐的产品
    recommended_products = spark.sql("select recommended from ap_khcp_zhzx.recommodation where id='{}'".format(target_vertex)).collect()
    recommended_products = [row.recommended for row in recommended_products]
    recommended_products = recommended_products[0l.split(",")
                                                
    # 获得已经签约的产品
    signed_products = spark.sql("select signed from ap_khcp_zhzx.signed_product where id='{}'".format(target_vertex)).collect()
    signed_products = [row.signed for row in signed_products]
    signed_products = signed_products[0l.split(",")
                                      
    # 找到新推荐的产品
    new_products = []
    for item in recommended_products:
        if item not in signed_products:
            new_products.append(item)
                                      
    # 找到新推荐的产品与当前target_vertex公司之间的相关关系
    for item in new_products:
        path = nx.shortest_path(G, target_vertex, int(item))
        rec = Record(target_vertex, item, ",".join(str(p) for p in path))
        record_list.append(rec)

In [None]:
record_df = sc.createDataFrame(record_list)
record_df.createTempView('shortest_path')
sqlContext.sql('create table shortest_path_recommendation as select * from shortest_path')