In [48]:
# start SparkSession
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("modeling").getOrCreate()

In [49]:
import numpy as np
import pandas as pd
import pyspark.sql.functions as f
import pyspark.ml.regression as rg
import pyspark.ml.clustering as clust
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, FloatType, DoubleType, StringType
from pyspark.sql.functions import col, udf
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.stat import Statistics
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
from pyspark.ml.feature import RFormula
from pyspark.ml.regression import RandomForestRegressor
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.feature import RFormula
from sklearn.metrics import roc_curve,auc

In [50]:
#Read data
data = spark.read\
.option('header', 'true')\
.option('inferSchema', 'true')\
.parquet('s3://chingsez/Final/data')

In [51]:
data.printSchema()

root
 |-- score: integer (nullable = true)
 |-- descendants: integer (nullable = true)
 |-- author: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- dayofweek: integer (nullable = true)
 |-- active_user: string (nullable = true)
 |-- title_length: integer (nullable = true)
 |-- from_top_web: string (nullable = true)
 |-- text_length: integer (nullable = true)
 |-- title_hot_words: integer (nullable = true)
 |-- title_array: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title_vec: vector (nullable = true)
 |-- if_highscore: integer (nullable = true)
 |-- score_scaled: double (nullable = true)
 |-- if_highscore_scaled: integer (nullable = true)
 |-- des_scaled: double (nullable = true)



## Linear regression model

In [32]:
#Change data type
convert = udf(lambda x: float(np.max(x)), FloatType())
change = data.withColumn('active_user', f.col('active_user').cast('double'))
change = change.withColumn('from_top_web', f.col('from_top_web').cast('double'))
change = change.withColumn('title_vec', convert("title_vec"))

In [33]:
#Get a correlation matrix for numerical variables
cols = ['score', 'descendants', 'year', 'dayofweek', 'month', 'text_length', 'title_hot_words', 'title_vec', 'active_user', 'from_top_web']
assemble = VectorAssembler(inputCols=cols,
                           outputCol='feat')
#Normailize features
norm = StandardScaler(
    inputCol= 'feat'
    , outputCol='normed'
    , withMean=True
    , withStd=True
)

pipeline = Pipeline(
    stages=[
        assemble
        , norm
    ])

vector = pipeline.fit(change).transform(change).select('normed')
matrix = Correlation.corr(vector, 'normed')

In [34]:
#Fix spark bug
#Ref:https://www.cnblogs.com/bonelee/p/10976253.html
matrix.sql_ctx.sparkSession._jsparkSession = spark._jsparkSession
matrix._sc = spark._sc
corrmatrix = matrix.collect()[0][0].toArray().tolist()

In [35]:
corr_df = pd.DataFrame(corrmatrix)
corr_df.index, corr_df.columns = cols, cols
corr_df

Unnamed: 0,score,descendants,year,dayofweek,month,text_length,title_hot_words,title_vec,active_user,from_top_web
score,1.0,0.008808,0.049505,-0.007856,-0.001137,-0.036419,0.005488,0.02342,-0.001841,0.011461
descendants,0.008808,1.0,0.000111,0.000832,-0.000591,0.001905,-0.000309,0.00018,-0.000714,-0.000218
year,0.049505,0.000111,1.0,-0.008611,-0.14063,-0.102557,0.034602,-0.028228,-0.134061,0.061733
dayofweek,-0.007856,0.000832,-0.008611,1.0,-0.006231,0.014633,-0.00056,0.00595,-0.000506,0.000444
month,-0.001137,-0.000591,-0.14063,-0.006231,1.0,0.002194,-0.007511,0.004458,-0.010427,-0.000185
text_length,-0.036419,0.001905,-0.102557,0.014633,0.002194,1.0,-0.034018,0.069131,-0.085365,-0.067538
title_hot_words,0.005488,-0.000309,0.034602,-0.00056,-0.007511,-0.034018,1.0,-0.07769,-0.033775,0.028547
title_vec,0.02342,0.00018,-0.028228,0.00595,0.004458,0.069131,-0.07769,1.0,-0.036292,-0.04704
active_user,-0.001841,-0.000714,-0.134061,-0.000506,-0.010427,-0.085365,-0.033775,-0.036292,1.0,0.05709
from_top_web,0.011461,-0.000218,0.061733,0.000444,-0.000185,-0.067538,0.028547,-0.04704,0.05709,1.0


In [36]:
#Tranform features and split data
lin_cols = ['descendants', 'year', 'dayofweek', 'month', 'text_length', 'title_hot_words', 'title_vec']

features = VectorAssembler(inputCols=lin_cols
    , outputCol="features")

lr_feat = features.transform(change)
lr_feat.printSchema()

splitted_data = lr_feat.randomSplit([0.8, 0.2], 24)
train = splitted_data[0]
test = splitted_data[1]
print(train.count())
print(test.count())

root
 |-- score: integer (nullable = true)
 |-- descendants: integer (nullable = true)
 |-- author: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- dayofweek: integer (nullable = true)
 |-- active_user: double (nullable = true)
 |-- title_length: integer (nullable = true)
 |-- from_top_web: double (nullable = true)
 |-- text_length: integer (nullable = true)
 |-- title_hot_words: integer (nullable = true)
 |-- title_array: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title_vec: float (nullable = true)
 |-- if_highscore: integer (nullable = true)
 |-- score_scaled: double (nullable = true)
 |-- if_highscore_scaled: integer (nullable = true)
 |-- des_scaled: double (nullable = true)
 |-- features: vector (nullable = true)

1296793
324443


In [37]:
# #这个是 normalize 了 features 的
# #import pyspark.ml.regression as rg
# Scale numeric variables first 

# lin_cols = ['descendants', 'year', 'dayofweek', 'month', 'text_length', 'title_hot_words', 'title_vec']
# scale_feat = VectorAssembler(inputCols=lin_cols
#     , outputCol="norm")

# features = VectorAssembler(inputCols=['normed']
#     , outputCol="features")

# norm = StandardScaler(inputCol='norm',
#                     outputCol='normed')

# score_feat = VectorAssembler(inputCols=['score']
#     , outputCol="score_feat")

# score_norm = StandardScaler(inputCol='score_feat',
#                     outputCol='score_normed')

# pipeline = Pipeline(
#     stages=[
#         scale_feat
#         , score_feat
#         , score_norm
#         , norm
#         , features
#     ])

# lr_feat = pipeline.fit(change).transform(change)

In [38]:
# #Normalize feature
# lr_feat =  lr_feat.withColumn('score_normed', convert("score_normed"))
# lr_feat.printSchema()

# splitted_data = lr_feat.randomSplit([0.8, 0.2], 24)
# train = splitted_data[0]
# test = splitted_data[1]
# print(train.count())
# print(test.count())

In [39]:
#Fit linear regression model
lr_obj = rg.LinearRegression(
    maxIter=10
    , regParam=0.01
    , elasticNetParam=1.00
    , labelCol = 'score')
lr_model =  lr_obj.fit(train)

In [40]:
#Get model data
summary = lr_model.summary

print(
    summary.r2
    , summary.rootMeanSquaredError
    , summary.meanAbsoluteError
)

0.00426367869877331 42.4308516092833 15.657403785230473


In [41]:
#Predict and evaluate model
lr_predictions = lr_model.transform(test)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="score",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

R Squared (R2) on test data = 0.00469586


## K-means cluster

In [16]:
#Build k-means model
vectorAssembler = VectorAssembler(
    inputCols= cols
    , outputCol='features')

kmeans_obj = clust.KMeans(k=5, seed=666)

kmeans_data = vectorAssembler.transform(change)

In [17]:
#Split data
splitted_data = kmeans_data.randomSplit([0.8, 0.2], 24)
train = splitted_data[0]
test = splitted_data[1]
print(train.count())
print(test.count())

1296855
324381


In [18]:
#Get kmeans result
results = (
    kmeans_obj
    .fit(train)
    .transform(test)
    .select('features', 'prediction')
)

results.show(5)

+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|[1.0,-1.0,2011.0,...|         0|
|[1.0,-1.0,2009.0,...|         0|
|[1.0,-1.0,2011.0,...|         0|
|[1.0,-1.0,2011.0,...|         0|
|[1.0,-1.0,2011.0,...|         0|
+--------------------+----------+
only showing top 5 rows



In [20]:
#Evaluate kmeans model performance
clustering_ev = ClusteringEvaluator()
clustering_ev.evaluate(results)

0.9322713415689663

In [19]:
#Get cluster center data
model = kmeans_obj.fit(train)
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[3.75706087e+00 8.11430996e-01 2.01201610e+03 3.99475979e+00
 6.43605662e+00 2.25988702e+00 4.15220888e-01 1.79913875e-01
 4.63950981e-01 2.64218020e-01]
[1.00000000e+00 2.37210300e+06 2.01100000e+03 6.00000000e+00
 4.00000000e+00 3.70000000e+01 0.00000000e+00 1.30351201e-01
 0.00000000e+00 0.00000000e+00]
[7.56431357e+02 2.64022746e+02 2.01312998e+03 3.88383428e+00
 6.31356621e+00 3.60682372e-01 4.06986190e-01 1.99093302e-01
 3.82615760e-01 2.73761170e-01]
[2.78486291e+02 1.24671569e+02 2.01286374e+03 3.94026255e+00
 6.36665005e+00 3.03423064e-01 4.31040213e-01 1.89016017e-01
 4.34446660e-01 2.83649053e-01]
[9.03691200e+01 4.50790665e+01 2.01239419e+03 3.93944900e+00
 6.45897199e+00 2.94826903e-01 4.39349894e-01 1.81391813e-01
 4.79436443e-01 2.91308800e-01]


## Random forest

In [52]:
#Encode variables
sI1 = StringIndexer(inputCol="active_user", outputCol="active_userIndex"); 
en1 = OneHotEncoder(dropLast=False, inputCol="active_userIndex", outputCol="active_userVec");
sI2 = StringIndexer(inputCol="from_top_web", outputCol="from_top_webIndex"); 
en2 = OneHotEncoder(dropLast=False, inputCol="from_top_webIndex", outputCol="from_top_webVec");

encodedFinal = Pipeline(stages=[sI1, en1, sI2, en2]).fit(data).transform(data);

In [53]:
#Split dataset
train_data, test_data = encodedFinal.randomSplit([0.7, 0.3], seed=1234);
print("Number of training records: " + str(train_data.count()))
print("Number of testing records : " + str(test_data.count()))

Number of training records: 1135713
Number of testing records : 485523


In [63]:
# using score


# define fomular
regFormula = RFormula(formula="score ~ des_scaled + year + month + dayofweek + title_length + text_length + title_hot_words + title_vec + active_userVec + from_top_webVec")

# define indexer for categorical variables
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", handleInvalid='skip', maxCategories=32)

# define random forest estimator
randForest = RandomForestRegressor(featuresCol = 'indexedFeatures', labelCol = 'label', numTrees=20, 
                                   featureSubsetStrategy="auto",impurity='variance', maxDepth=8, maxBins=200)

# fit model with formula and other ransformations
model = Pipeline(stages=[regFormula, featureIndexer, randForest]).fit(train_data)
# predict
predictions = model.transform(test_data)
predictionAndLabels = predictions.select("label","prediction").rdd

testMetrics = RegressionMetrics(predictionAndLabels)
print("RMSE = %s" % testMetrics.rootMeanSquaredError)
print("R-sqr = %s" % testMetrics.r2)

RMSE = 24.91817371612741
R-sqr = 0.3669494235262043


In [60]:
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
r2

0.6628565631473219

## Logistic regression

In [65]:
logReg = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)


classFormula = RFormula(formula="if_highscore_scaled ~ des_scaled + year + month + dayofweek + title_length + text_length + title_hot_words + title_vec + active_userVec + from_top_webVec")


model = Pipeline(stages=[classFormula, logReg]).fit(train_data)


predictions = model.transform(test_data)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

Area under ROC = 1.0


In [66]:
spark.stop()