**导入相关包**

In [1]:
import pandas as pd
import numpy as np

**读取数据**

In [2]:
content_list_id = sc.textFile('hdfs:///test/content_list_id.txt')
content_list_id = content_list_id.map(lambda line: line.split("\t"))
content_list_id.take(4)

[['uid', 'content_list', 'content_id'],
 ['0', '164423,430922,112513,485726,488385,340139,489273,391258', '112513'],
 ['1',
  '635374,409237,586823,305055,519191,772121,788428,754213',
  '305055,586823,305055,305055'],
 ['2',
  '57518,70020,828660,9511,477360,821209,178443,973485',
  '178443,70020,178443,9511']]

**将数据转化为DF**

In [3]:
content_list_id_df = sqlContext.createDataFrame(content_list_id).toPandas()
content_list_id_df.columns = ['UID','Content_list',"Content_id"]
content_list_id_df = content_list_id_df[1:]
content_list_id_df[:2]

Unnamed: 0,UID,Content_list,Content_id
1,0,"164423,430922,112513,485726,488385,340139,4892...",112513
2,1,"635374,409237,586823,305055,519191,772121,7884...",305055586823305055305055


**计算每个content的CTR**

In [8]:
def ctr(a, b):
    unions = len(set(a).union(set(b)))
    intersections = len(set(a).intersection(set(b)))
    return intersections / unions

def CTR(content_list_id_df):
    R = []
    
    for i in range(content_list_id_df.shape[0]):
        b = content_list_id_df.iat[i,1].split(",")
        a = content_list_id_df.iat[i,2].split(",")
        R.append(ctr(a, b))
    return R

In [9]:
ctr = CTR(content_list_id_df)
content_list_id_df["CTR"] = pd.Series(ctr)

In [10]:
content_list_id_df[:4]

Unnamed: 0,UID,Content_list,Content_id,CTR
1,0,"164423,430922,112513,485726,488385,340139,4892...",112513,0.25
2,1,"635374,409237,586823,305055,519191,772121,7884...",305055586823305055305055,0.375
3,2,"57518,70020,828660,9511,477360,821209,178443,9...",178443700201784439511,0.125
4,3,"542973,871389,914465,513667,536708,646545,9080...",536708,0.25


**杰卡德相似系数Python实现**

In [11]:
def jaccard_sim(a, b):
    unions = len(set(a).union(set(b)))
    intersections = len(set(a).intersection(set(b)))
    return intersections / unions

def jaccard(content_list_id_df):
    js = []
    
    for i in range(content_list_id_df.shape[0]):
        b = content_list_id_df.iat[i,1].split(",")
        a = content_list_id_df.iat[i,2].split(",")
        js.append(jaccard_sim(a, b))
    return js

In [12]:
jcd = jaccard(content_list_id_df)

In [13]:
content_list_id_df["jaccard"] = pd.Series(jcd)

In [14]:
content_list_id_df[:2]

Unnamed: 0,UID,Content_list,Content_id,CTR,jaccard
1,0,"164423,430922,112513,485726,488385,340139,4892...",112513,0.25,0.25
2,1,"635374,409237,586823,305055,519191,772121,7884...",305055586823305055305055,0.375,0.375


**Tanimoto系数Python实现**

In [15]:
def tanimoto_coefficient(p_vec, q_vec):
    """
    This method implements the cosine tanimoto coefficient metric
    :param p_vec: vector one
    :param q_vec: vector two
    :return: the tanimoto coefficient between vector one and two
    """
    pq = np.dot(p_vec, q_vec)
    p_square = np.linalg.norm(p_vec)
    q_square = np.linalg.norm(q_vec)
    return pq / (p_square + q_square - pq)
def tanimoto(content_list_id_df):
    tc = []
    
    for i in range(content_list_id_df.shape[0]):
        b = content_list_id_df.iat[i,1].split(",")
        a = content_list_id_df.iat[i,2].split(",")
        tc.append(jaccard_sim(a, b))
    return tc

In [16]:
tc = tanimoto(content_list_id_df)

In [17]:
content_list_id_df["tanimoto_coefficient"] = pd.Series(tc)

In [18]:
content_list_id_df[:5]

Unnamed: 0,UID,Content_list,Content_id,CTR,jaccard,tanimoto_coefficient
1,0,"164423,430922,112513,485726,488385,340139,4892...",112513,0.25,0.25,0.25
2,1,"635374,409237,586823,305055,519191,772121,7884...",305055586823305055305055,0.375,0.375,0.375
3,2,"57518,70020,828660,9511,477360,821209,178443,9...",178443700201784439511,0.125,0.125,0.125
4,3,"542973,871389,914465,513667,536708,646545,9080...",536708,0.25,0.25,0.25
5,4,"530817,401690,813927,107595,472415,375159,1135...",530817375159,0.375,0.375,0.375


**pyspark实现TFIDF**

In [1]:
# 导入库
from pyspark.ml.feature import HashingTF, IDF, Tokenizer


In [2]:
jsons=sqlContext.read.json("hdfs:///test/Beauty_5.json")

In [3]:
jsons.show()

+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|      asin| helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|             summary|unixReviewTime|
+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|7806397051|  [3, 4]|    1.0|Very oily and cre...|01 30, 2014|A1YJEY40YUW4SE|              Andrea|Don't waste your ...|    1391040000|
|7806397051|  [1, 1]|    3.0|This palette was ...|04 18, 2014| A60XNB876KYML|          Jessica H.|         OK Palette!|    1397779200|
|7806397051|  [0, 1]|    4.0|The texture of th...| 09 6, 2013|A3G6XNM240RMWA|               Karen|       great quality|    1378425600|
|7806397051|  [2, 2]|    2.0|I really can't te...| 12 8, 2013|A1PQFP6SAJ6D80|               Norah|Do not work on my...|    1386460800|
|7806397051|  [0, 0]|    3.0|It was a little s...|10 19

In [4]:
tokenizer = Tokenizer(inputCol="reviewText", outputCol="words")
wordsData = tokenizer.transform(jsons)

In [5]:
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)

In [6]:
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

In [7]:
rescaledData.select("overall", "features").show(1)

+-------+--------------------+
|overall|            features|
+-------+--------------------+
|    1.0|(20,[0,3,4,6,7,8,...|
+-------+--------------------+
only showing top 1 row



**Pyspark实现LR**

In [1]:
# 导入包
import pandas as pd
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression

In [2]:
# 读取文件
data_file = "hdfs:///test/Titanic/train.csv"
data_all = spark.read.csv(data_file, header=True, inferSchema=True)

In [3]:
# 转换为pandas df 进行数据处理
data_all = data_all.toPandas()

In [4]:
# 数据拼接
data_all[:5]

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S


In [5]:
# 标签
cate_feature = ["Sex","Embarked"]
for i in cate_feature:
    data_all[i] = data_all[i].map(dict(zip(data_all[i].unique(), range(0, data_all[i].nunique()))))

In [6]:
# 删除
data_all = data_all.drop(["Name","Cabin","Ticket","PassengerId"],axis =1)

In [7]:
# 填充
data_all = data_all.fillna(data_all.mean())

In [8]:
data_all.isnull().sum()

Survived    0
Pclass      0
Sex         0
Age         0
SibSp       0
Parch       0
Fare        0
Embarked    0
dtype: int64

In [9]:
# 切分
train = data_all[:491]
test = data_all[492:891]

In [10]:
train.shape,test.shape

((491, 8), (399, 8))

In [11]:
train = spark.createDataFrame(train)
test = spark.createDataFrame(test)

In [12]:
train.show()

+--------+------+---+-----------------+-----+-----+-------+--------+
|Survived|Pclass|Sex|              Age|SibSp|Parch|   Fare|Embarked|
+--------+------+---+-----------------+-----+-----+-------+--------+
|       0|     3|  0|             22.0|    1|    0|   7.25|     0.0|
|       1|     1|  1|             38.0|    1|    0|71.2833|     1.0|
|       1|     3|  1|             26.0|    0|    0|  7.925|     0.0|
|       1|     1|  1|             35.0|    1|    0|   53.1|     0.0|
|       0|     3|  0|             35.0|    0|    0|   8.05|     0.0|
|       0|     3|  0|29.69911764705882|    0|    0| 8.4583|     2.0|
|       0|     1|  0|             54.0|    0|    0|51.8625|     0.0|
|       0|     3|  0|              2.0|    3|    1| 21.075|     0.0|
|       1|     3|  1|             27.0|    0|    2|11.1333|     0.0|
|       1|     2|  1|             14.0|    1|    0|30.0708|     1.0|
|       1|     3|  1|              4.0|    1|    1|   16.7|     0.0|
|       1|     1|  1|             

In [23]:
trainingSet = train.rdd.map(list).map(lambda x:[float(item) for item in x]).map(lambda x:Row(label=x[0], features=Vectors.dense(x[1:]))).toDF() 

In [24]:
trainingSet.take(5)

[Row(features=DenseVector([3.0, 0.0, 22.0, 1.0, 0.0, 7.25, 0.0]), label=0.0),
 Row(features=DenseVector([1.0, 1.0, 38.0, 1.0, 0.0, 71.2833, 1.0]), label=1.0),
 Row(features=DenseVector([3.0, 1.0, 26.0, 0.0, 0.0, 7.925, 0.0]), label=1.0),
 Row(features=DenseVector([1.0, 1.0, 35.0, 1.0, 0.0, 53.1, 0.0]), label=1.0),
 Row(features=DenseVector([3.0, 0.0, 35.0, 0.0, 0.0, 8.05, 0.0]), label=0.0)]

In [33]:
lr = LogisticRegression( regParam=0.3)
LRModel = lr.fit(trainingSet)

In [34]:
print("相关系数: \n" + str(LRModel.coefficients))
print("截距: " + str(LRModel.intercept))


相关系数: 
[-0.24322763358387192,1.024904775215135,-0.008572946912823856,-0.07216838758298957,0.04724060725895138,0.002295553641758597,0.1666864739240968]
截距: -0.15029301794960143


In [35]:

# 2.测试
#2.1读取数据
#2.2 构造测试数据集

In [36]:
testSet = test.rdd.map(list).map(lambda x:[float(item) for item in x]).map(lambda x:Row(label=x[0], features=Vectors.dense(x[1:]))).toDF() 
result = LRModel.transform(testSet)
print(result.show())



+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[1.0,0.0,55.0,0.0...|  0.0|[0.79501834566514...|[0.68890784972023...|       0.0|
|[1.0,0.0,71.0,0.0...|  0.0|[0.72187386182752...|[0.67301952067595...|       0.0|
|[3.0,0.0,21.0,0.0...|  0.0|[1.04152859705436...|[0.73914484221339...|       0.0|
|[3.0,0.0,29.69911...|  0.0|[0.93470860050442...|[0.71802958128616...|       0.0|
|[1.0,1.0,54.0,1.0...|  1.0|[-0.4426284849437...|[0.39111483243997...|       1.0|
|[3.0,0.0,29.69911...|  0.0|[1.09992201765660...|[0.75024549376767...|       0.0|
|[1.0,1.0,25.0,1.0...|  0.0|[-0.7872644322044...|[0.31275635046786...|       1.0|
|[3.0,0.0,24.0,0.0...|  0.0|[1.06783096752856...|[0.74418420703166...|       0.0|
|[3.0,0.0,17.0,0.0...|  0.0|[1.00583078279748...|[0.73220343445482...|       0.0|
|[3.0,1.0,21.0,0

In [37]:

# 评估分类效果
total_amount=result.count()
correct_amount = result.filter(result.label==result.prediction).count()
precision_rate = correct_amount/total_amount
print("预测准确率为:{}".format(precision_rate))


预测准确率为:0.7994987468671679


In [38]:
positive_precision_amount = result.filter(result.label == 1).filter(result.prediction == 1).count()
negative_precision_amount = result.filter(result.label == 0).filter(result.prediction == 0).count()
positive_false_amount = result.filter(result.label == 1).filter(result.prediction == 0).count()
negative_false_amount = result.filter(result.label== 0).filter(result.prediction == 1).count()
 
print("正样本预测准确数量:{},负样本预测准确数量:{}".format(positive_precision_amount,negative_precision_amount))


正样本预测准确数量:90,负样本预测准确数量:229


In [39]:
positive_amount = result.filter(result.label == 1).count()
negative_amount = result.filter(result.label == 0).count()
 
print("正样本数:{},负样本数:{}".format(positive_amount,negative_amount))


正样本数:150,负样本数:249


In [40]:
print("正样本预测错误数量:{},负样本预测错误数量:{}".format(positive_false_amount,negative_false_amount))
 
recall_rate1 = positive_precision_amount/positive_amount
recall_rate2 = negative_precision_amount/negative_amount
 
print("正样本召回率为:{},负样本召回率为:{}".format(recall_rate1,recall_rate2))


正样本预测错误数量:60,负样本预测错误数量:20
正样本召回率为:0.6,负样本召回率为:0.9196787148594378


[参考文献](https://blog.csdn.net/flysky1991/article/details/80182501)