# Init Spark and build Spark session

In [None]:
import numpy as np
import pandas as pd
import findspark
findspark.init('/home/ywx-data/spark/spark-2.4.3-bin-hadoop2.7')

import pyspark
sc = pyspark.SparkContext(appName=None)

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(None).getOrCreate()

In [None]:
spark

# Read and show raw data with Spark DataFrame

In [None]:
df_raw = spark.read.csv("lh_tp_node_ui.csv", header=True, inferSchema=True)
input_lh_tp_node_ui = df_raw.filter('lv == 1')

In [None]:
df_raw.printSchema()

In [None]:
df_raw.show()

In [None]:
cnt = input_lh_tp_node_ui.select("node").distinct().count()
cnt

In [None]:
df = pd.DataFrame(np.random.random((1, 7)))
secondary_df = spark.createDataFrame(df, schema=['node', 'phase', 'rsquare', 'r', 'x', 'b0', 'b1'])
primary_list = []
secondary_df

In [None]:
df

In [None]:
secondary_df.show()

In [None]:
input_lh_tp_node_ui.show()

# naive

In [None]:
def get_corr_max_two_node_name(input_lh_tp_node_ui):
    '''
    得到电压相关性最大的两个节点的名称
    :param data: 原始数据
    :return: 节点名 （电表名称）
    '''
    # 生成两两节点组合的名称集合 node_couple
    node_array = np.array(input_lh_tp_node_ui.select("node").distinct().collect()).tolist()
    node_couple = []
    for i in range(len(node_array)):
        for j in range(i + 1, len(node_array)):
            for p in ['A', 'B', 'C']:
                if i != j:
                    node_couple.append([node_array[i][0], node_array[j][0], p])
    print('node_num: %s' %len(node_couple))
    # 利用电压相关系数找出相关性最大的两个点
    u_r2 = []
    for no1, no2, phase in node_couple:
        s1 = "node == %s" % no1
        node1 = input_lh_tp_node_ui.filter(s1)
        node1 = node1.select('data_time', 'u', 'l1').withColumnRenamed('u', 'u1')
        s2 = "node == %s" % no2
        node2 = input_lh_tp_node_ui.filter(s2)
        node2 = node2.select('data_time', 'u', 'l1').withColumnRenamed('u', 'u2').withColumnRenamed('l1', 'l2')
        node_join = node1.join(node2, (node1.data_time == node2.data_time) & (node1.l1 == node2.l2))
        node_join = node_join.coalesce(10)
        u_rsquare = node_join.corr('u1', 'u2')
        u_r2.append([no1, no2, phase, u_rsquare])
    name_u_r2 = ["no1", "no2", "phase", "u_rsquare"]
    u_r2 = pd.DataFrame(columns=name_u_r2, data=u_r2)
    node_x, node_y, phase_v, u_rsquare_best = u_r2[u_r2['u_rsquare'] == u_r2['u_rsquare'].max()].iloc[0,]
    print("In this circulation, node {} and node {} is best!".format(node_x, node_y))
    return node_x, node_y

In [None]:
q = 0
print("{} correlation data num of partition : {}".format(q, input_lh_tp_node_ui.rdd.getNumPartitions()))
node_x, node_y = get_corr_max_two_node_name(input_lh_tp_node_ui)

# cost matrix

In [None]:
input_lh_tp_node_ui.show()

In [None]:
input_lh_tp_node_ui.corr('U', 'IR')

In [None]:
def get_corr_matrix(input_lh_tp_node_ui):
    # 假设节点列表可以存进master
    distinct_nodes = [o[0] for o in input_lh_tp_node_ui.select("node").distinct().collect()]
    node_count = len(distinct_nodes)
    pairRDD = input_lh_tp_node_ui.rdd.map(lambda r: (r['NODE'], (r['DATA_TIME'], r['L1'], r['U']))).groupByKey().map(lambda x: (x[0], x[1].data))
    header = ['DATA_TIME', 'L1', 'U']
    
    def rdd_struct_corr(structs):
        node1, node2 = (o[0] for o in structs)
        df1, df2 = (pd.DataFrame(o[1], columns=header) for o in structs)
        df_join = df1.merge(df2, on=('DATA_TIME', 'L1'), suffixes=('_l', '_r'))
        # TODO: coalesce?
        corr = df_join[['U_l', 'U_r']].corr().iloc[0,1]
        return node1, node2, float(corr)

    cartesian = pairRDD.cartesian(pairRDD)
    corrs_rdd = cartesian.map(rdd_struct_corr).sortBy(lambda r: (r[0], r[1]))
    corrs = np.array([o for o in corrs_rdd.map(lambda r: r[2]).collect()]).reshape((node_count, node_count))
    return corrs
    
corrs = get_corr_matrix(input_lh_tp_node_ui)
print(corrs)

In [None]:
q = 0
print("{} correlation data num of partition : {}".format(q, input_lh_tp_node_ui.rdd.getNumPartitions()))
node_x, node_y = get_corr_max_two_node_name(input_lh_tp_node_ui)

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
def linear_regression(node_x, node_y, node_join_xy, phase='A'):
    '''
    两节点在单个相位上做线性回归分析
    :param node_x: node_x name
    :param node_y: node_y name
    :node_join_xy: node_x node_y拼接后的数据
    :param phase: 相位
    :return: 回归系数列表
    '''
    # 筛选出单相位的数据
    s3 = 'l1 == "%s"' % phase
    node_join_p = node_join_xy.filter(s3)
    node_join_p = node_join_p.drop('node1', 'node2', 'l1')
    assembler = VectorAssembler(inputCols=["u1", "ir1", "ix1", "ir2", "ix2"], outputCol="features")
    output = assembler.transform(node_join_p)
    label_features = output.select("features", "u2").toDF('features', 'label')
    lr = LinearRegression(maxIter=5, elasticNetParam=0.8)
    lrModel = lr.fit(label_features)
    trainingSummary = lrModel.summary
    param = [node_x, node_y, phase, trainingSummary.r2,
             lrModel.intercept,
             lrModel.coefficients[0],
             lrModel.coefficients[1],
             lrModel.coefficients[2],
             lrModel.coefficients[3],
             lrModel.coefficients[4]]
    return param

# from pyspark.ml.regression import LinearRegression
def get_linear_regression_param_list(data, node_x, node_y):
    '''
    两节点在A、B、C三个相位上分别做线性回归
    :param data: 原始数据
    :param node_x: node_x name
    :param node_y: node_y name
    :return: 不同相位的回归系数 ~ spark-df/ node_x,node_y合并后的数据
    '''
    # 生成做回归分析的数据
    s_x = "node == {}".format(node_x)
    s_y = "node == {}".format(node_y)
    nodex = data.filter(s_x)
    nodey = data.filter(s_y)
    nodex = nodex.withColumnRenamed('node', 'node1').withColumnRenamed('u', 'u1').withColumnRenamed('ir',
                                                                                                    'ir1').withColumnRenamed(
        'ix', 'ix1')
    nodey = nodey.withColumnRenamed('node', 'node2').withColumnRenamed('l1', 'l2').withColumnRenamed('u',
                                                                                                     'u2').withColumnRenamed(
        'ir', 'ir2').withColumnRenamed('ix', 'ix2').withColumnRenamed('data_time', 'data_time2')
    node_join_xy = nodex.join(nodey, ((nodex['data_time'] == nodey.data_time2) & (nodex['l1'] == nodey.l2)))
    node_join_xy = node_join_xy.select('node1', 'node2', 'data_time', 'l1', 'u1', 'ir1', 'ix1', 'u2', 'ir2', 'ix2')
    node_join_xy = node_join_xy.withColumn("ir2", node_join_xy["ir2"] * (-1))
    node_join_xy = node_join_xy.withColumn("ix2", node_join_xy["ix2"] * (-1))
    node_join_xy = node_join_xy.coalesce(10)
    # 获得两表~三相位~的回归系数列表
    param_list = []
    for phase in ['A', 'B', 'C']:
        param_list.append(linear_regression(node_x, node_y, node_join_xy, phase=phase))
    name = ['node1', 'node2', 'phase', 'rsquare', 'b0', 'b1', 'r1', 'x1', 'r2', 'x2']
    param_df = pd.DataFrame(columns=name, data=param_list)
    param_dfs = spark.createDataFrame(param_df)
    return param_dfs, node_join_xy

In [None]:
# 得到两个点线性回归后的回归系数 - dfs
# 19/07/14 03:34:09 WARN Column: Constructing trivially true equals predicate, 'data_time#13 = data_time#13'. Perhaps you need to use aliases.
print("{} regression data num of partition : {}".format(q, input_lh_tp_node_ui.rdd.getNumPartitions()))
param_dfs, node_join_xy = get_linear_regression_param_list(input_lh_tp_node_ui, node_x, node_y)
node_join_xy = node_join_xy.coalesce(10)

In [None]:
param_dfs.show()
node_join_xy.show()

In [None]:
# 更新输入数据：从原始数据中删除两个子节点，添加新的父节点
input_lh_tp_node_ui = updata_node_couple(input_lh_tp_node_ui, node_join_xy, param_dfs, node_x, node_y, q)
print("before modify,input_lh_tp_node_ui num of partition : {}".format(input_lh_tp_node_ui.rdd.getNumPartitions()))
input_lh_tp_node_ui = input_lh_tp_node_ui.coalesce(10)
print("after modify,input_lh_tp_node_ui's partition: {}".format(input_lh_tp_node_ui.rdd.getNumPartitions()))
print('%s loop new_data lines_num:%s' % (q, input_lh_tp_node_ui.count()))
print('other running time: %s Seconds' % (time.time() - a))

# 主副表单条数据生成、添加
s1_dfs, s2_dfs, p1_list, p2_list = get_primary_secondary_single_data(param_dfs, node_x, node_y, q)
primary_list.append(p1_list)
primary_list.append(p2_list)
secondary_df = secondary_df.union(s1_dfs)
secondary_df = secondary_df.union(s2_dfs)
secondary_df = secondary_df.coalesce(10)
print('%s all running time: %s' % (q, time.time() - a))