In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

In [None]:
def rm_fillna(df,feature):
    process_df=df.loc[:,feature]
    for i in feature:
        knonw=process_df[process_df[i].notnull()]
        unknown=process_df[process_df[i].isnull()]
        if unknown.empty:
            continue
        else:
            y=knonw[i]
            x=knonw.drop(i,axis=1)
            rfr=RandomForestRegressor(random_state=0,n_estimators=200,max_depth=3,n_jobs=-1)
            rfr.fit(x,y)
            predict=rfr.predict(unknown.drop(i,axis=1))
            process_df.loc[unknown.index,i]=predict
    return process_df
    

In [None]:
def model_pca(data,feature):
    pca=PCA(n_components=8,random_state=0)
    pca.fit(data)
    l1=pca.explained_variance
    l2=pca.explained_variance_ratio_
    l3=pca.components_
    data_pca=pca.transform(data)
    k1_spss=pca.components_/np.sqrt(pca.explained_variance_.reshape(8,1))
    l=[l1,l2,l3,k1_spss]
    ratio_sum=l2.sum()
    variance_ratio=l2/ratio_sum
    list_ratio=np.ones([k1_spss.shape[0],k1_spss.shape[1]])
    for i in range(k1_spss.shape[0]):
        list_ratio[i]=k1_spss[i]*variance_ratio[i]
    feature_weight=np.zeros([k1_spss.shape[0],1])
    for i in range(0,k1_spss.shape[1]):
        for j in range(0,k1_spss.shape[0]):
            feature_weight[j]=feature_weight[j]+list_ratio[j][i]
    feature_weight=pd.DataFrame(feature_weight)
    feature_weight.index=data.columns
    feature_weight.columns=['weight']
    return data_pca,feature_weight
def get_entropy_weight(data,feature):
    data=data.apply(lambda x:(x-x.min())/(x.max()-x.min()))
    ent=lambda x:-x*np.log2(x)-(1-x)*np.log2(1-x)
    #计算熵
    ent=data.apply(lambda x:ent(x)).sum()
    #计算信息增益
    ent=ent-data.apply(lambda x:x*np.log2(x).sum())
    #计算信息增益比
    ent=ent/np.log2(len(data.columns))
    #计算信息熵权重
    ent_weight=1/(1+ent)
    return ent_weight
def topsis(data,feature_weight):
    data=data/np.sqrt((data**2).sum())
    #优劣距离
    z=pd.DataFrame([data.min(),data.max()],index=['best','worst'])
    #距离
    result=data.copy()
    result['best']=np.sqrt(((data-z.loc['best'])**2).sum(axis=1))
    result['worst']=np.sqrt(((data-z.loc['worst'])**2).sum(axis=1))
    #综合得分
    result['score']=result['worst']/(result['best']+result['worst'])
    return result,z,feature_weight
def ahp_weight(A_arr):
    A=np.array(A_arr)
    #按列归一化
    B=A/A.sum(axis=0)
    print('归一化后的矩阵为：\n',B)
    b_sum=B.sum(axis=1)
    print('按行求和后的矩阵为：\n',b_sum)
    #权重
    W=b_sum.sum()
    w_arr=[]
    for w in b_sum:
        w_arr.append(w/W)
    print('权重为：\n',w_arr)
    AW=[]
    for a in A:
        AW.append(a*w_arr)
    print('矩阵乘以权重后的矩阵为：\n',AW)
    result=np.array(AW)/np.array(w_arr)
    print('最终的矩阵为：\n',result)
    row=result.shape[0]
    max=result.sum()/row
    print('最大特征值为：\n',max)
    CI=(max-row)/(row-1)
    print('一致性指标CI为：\n',CI)
    

        


In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkConf


def build_spark_client() -> SparkSession:
    sc = SparkConf()
    spark = SparkSession.builder.config(conf=sc).appName('haozhe_spark').enableHiveSupport().config('hive.exec.dynamic.partition', 'true').config('hive.exec.dynamic.partition.mode', 'nonstrict').config('spark.sql.adaptive.enabled', 'true').config('spark.speculation', 'true').config(
        'spark.sql.execution.arrow.pyspark.enable', 'true').config('spark.executor.memory', '20g').config('spark.driver.memory', '20g').config('spark.driver.maxResultSize', '0').config('spark.kryoserializer.buffer.max', '1g').config('spark.sql.source.partitionOverwriteMode', 'dynamic').getOrCreate()
    return spark

def _map_to_pandas(rdds):
    '''needs to be here due to pickling issues'''
    return [pd.DataFrame(list(rdds))]
def topandas(df,n_partitions=None):
    '''
    
    '''
    if n_partitions is not None:
        df=df.repartition(n_partitions)
        df_pand=df.rdd.mapPartitions(_map_to_pandas).collect()
        df_pand=pd.concat(df_pand)
        df_pand.columns=df.columns
        return df_pand


In [None]:
spark=build_spark_client()
sql='''
select * from haozhe.tianchi_fresh_comp_train_user
'''
data=spark.sql(sql)
data=topandas(data)
data.head()

In [None]:
sql='''
create table if not exists haozhe.tianchi_fresh_comp_train_user_pca(
user_id string,
item_id string,
behavior_type string
)
partitioned by (dt string comment '日期')
row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' with serdeproperties ('serialization.format'='1') stored as inputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' outputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' tblproperties ('transient_lastDdlTime'='1659514291')
'''
spark.sql(sql)
sdf=spark.createDataFrame(data)
sdf.write.mode('overwrite').insertInto('haozhe.tianchi_fresh_comp_train_user_pca')