#preprocessing data

In [2]:
spark.catalog.clearCache()
df = sqlContext.read.format('csv').option("header", 'true').load("/FileStore/tables/allCom.csv").drop("PatientID").withColumnRenamed("duration_next_week","label").drop("PatientID").drop("register_NextWeekNR").drop("assessment_date")
df.cache()

# df = df.filter(col('duration_next_week') < 1000)

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.sql.functions import col  # for indicating a column using a string in the line below
from pyspark.ml.evaluation import RegressionEvaluator

df = df.select([col(c).cast("double").alias(c) for c in df.columns])
# test = test.select([col(c).cast("double").alias(c) for c in test.columns])
df.printSchema()

train, test = df.randomSplit([0.7, 0.3])

from pyspark.ml.feature import VectorAssembler, VectorIndexer
featuresCols = df.columns
featuresCols.remove('label')

vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=2)

#linear regression

In [4]:
## linear regression
from pyspark.ml.regression import LinearRegression
# Load training data
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, lr])
pipelineModel = pipeline.fit(train)

# Print the coefficients and intercept for linear regression
lrModel = pipelineModel.stages[-1]
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

In [5]:
predictions = pipelineModel.transform(test)

evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print "RMSE on our test set: %g" % rmse
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
print "r2 on our test set: %g" % r2
print "mae on our test set: %g" % mae

##when data below than 1000
# RMSE on our test set: 144.898
# r2 on our test set: 0.611046
# mae on our test set: 102.653

In [6]:
display(predictions)

Estimated_CareMoments_Weekly,Estimated_Minutes_Weekly,Case_management,psychosocial_num,physiological_num,healthRelated_num,ass_year_day,label,duration_mean_each_time,duration_median_each_time,care_times_weekly,duration_week_minus,duration_each_time,Age,estimated_care_duration_3_to_6_months,estimated_care_duration_Less_than_3_months,estimated_care_duration_Longer_than_6_months,estimated_care_duration_Unknown,estimated_care_request_Decreasing_Care,estimated_care_request_Increasing_Care,estimated_care_request_Stable_care_demand,FinancerPGB,FinancerWLZ,FinancerZVW,TeamID_0,TeamID_219,TeamID_222,TeamID_313,TeamID_346,TeamID_426,TeamID_461,TeamID_483,TeamID_485,TeamID_508,TeamID_540,TeamID_589,TeamID_603,TeamID_607,TeamID_608,TeamID_635,TeamID_640,TeamID_647,TeamID_745,TeamID_749,TeamID_852,TeamID_854,TeamID_857,TeamID_886,TeamID_970,TeamID_1010,TeamID_1040,ProblemNameCognition,ProblemNameConsciousness,ProblemNameDigestion_and_hydratation,ProblemNameInterpersonal_relationship,ProblemNameMedication_regimen,ProblemNameMental_health,ProblemNameNutrition,ProblemNamePersonal_care,ProblemNameReproductive_function,ProblemNameRespiration,ProblemNameRole_change,ProblemNameSkin,ProblemNameSpeech_and_language,ProblemNameVision,combindSymptonProblemDigestion_and_hydratation_Other,combindSymptonProblemMedication_regimen_Other,combindSymptonProblemMedication_regimen_unable_to_take_medications_without_help,combindSymptonProblemomittedSymptons,combindSymptonProblemPain_expresses_discomfort_pain,combindSymptonProblemPersonal_care_difficulty_with_bathing,combindSymptonProblemSkin_drainage,combindSymptonProblemSkin_lesion_pressure_ulcer,combindSymptonProblemSkin_others,rawFeatures,features,prediction
0.0,0.0,0.0,1.0,1.0,1.0,212.0,135.0,38.0,45.0,5.0,190.0,10.0,66.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,"List(0, 73, List(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 16, 22, 48, 57, 69), List(1.0, 1.0, 1.0, 212.0, 38.0, 45.0, 5.0, 190.0, 10.0, 66.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(0, 73, List(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 16, 22, 48, 57, 69), List(1.0, 1.0, 1.0, 212.0, 38.0, 45.0, 5.0, 190.0, 10.0, 66.0, 1.0, 1.0, 1.0, 1.0, 1.0))",89.1291846397051
0.0,0.0,0.0,1.0,1.0,1.0,212.0,135.0,38.0,45.0,5.0,190.0,45.0,66.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,"List(0, 73, List(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 16, 22, 48, 57, 69), List(1.0, 1.0, 1.0, 212.0, 38.0, 45.0, 5.0, 190.0, 45.0, 66.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(0, 73, List(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 16, 22, 48, 57, 69), List(1.0, 1.0, 1.0, 212.0, 38.0, 45.0, 5.0, 190.0, 45.0, 66.0, 1.0, 1.0, 1.0, 1.0, 1.0))",69.19891739231565
0.0,0.0,0.0,1.0,1.0,1.0,212.0,135.0,38.0,45.0,5.0,190.0,45.0,66.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,"List(0, 73, List(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 16, 22, 48, 57, 69), List(1.0, 1.0, 1.0, 212.0, 38.0, 45.0, 5.0, 190.0, 45.0, 66.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(0, 73, List(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 16, 22, 48, 57, 69), List(1.0, 1.0, 1.0, 212.0, 38.0, 45.0, 5.0, 190.0, 45.0, 66.0, 1.0, 1.0, 1.0, 1.0, 1.0))",69.19891739231565
1.0,15.0,0.0,1.0,2.0,0.0,66.0,5.0,16.0,15.0,5.0,80.0,15.0,86.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,"List(0, 73, List(0, 1, 3, 4, 6, 7, 8, 9, 10, 11, 12, 15, 19, 22, 23, 61, 71), List(1.0, 15.0, 1.0, 2.0, 66.0, 16.0, 15.0, 5.0, 80.0, 15.0, 86.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(0, 73, List(0, 1, 3, 4, 6, 7, 8, 9, 10, 11, 12, 15, 19, 22, 23, 61, 71), List(1.0, 15.0, 1.0, 2.0, 66.0, 16.0, 15.0, 5.0, 80.0, 15.0, 86.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",50.05491257987501
1.0,15.0,0.0,1.0,2.0,0.0,66.0,5.0,16.0,15.0,5.0,80.0,20.0,86.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,"List(0, 73, List(0, 1, 3, 4, 6, 7, 8, 9, 10, 11, 12, 15, 19, 22, 23, 54, 66), List(1.0, 15.0, 1.0, 2.0, 66.0, 16.0, 15.0, 5.0, 80.0, 20.0, 86.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(0, 73, List(0, 1, 3, 4, 6, 7, 8, 9, 10, 11, 12, 15, 19, 22, 23, 54, 66), List(1.0, 15.0, 1.0, 2.0, 66.0, 16.0, 15.0, 5.0, 80.0, 20.0, 86.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",66.5445436138607
1.0,15.0,1.0,0.0,2.0,0.0,142.0,15.0,15.0,15.0,2.0,30.0,15.0,76.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,"List(0, 73, List(0, 1, 2, 4, 6, 7, 8, 9, 10, 11, 12, 14, 17, 22, 43), List(1.0, 15.0, 1.0, 2.0, 142.0, 15.0, 15.0, 2.0, 30.0, 15.0, 76.0, 1.0, 1.0, 1.0, 1.0))","List(0, 73, List(0, 1, 2, 4, 6, 7, 8, 9, 10, 11, 12, 14, 17, 22, 43), List(1.0, 15.0, 1.0, 2.0, 142.0, 15.0, 15.0, 2.0, 30.0, 15.0, 76.0, 1.0, 1.0, 1.0, 1.0))",24.669001035972087
1.0,15.0,1.0,1.0,2.0,2.0,356.0,160.0,31.25,30.0,4.0,125.0,45.0,75.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,"List(0, 73, List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 16, 19, 22, 43, 61, 71), List(1.0, 15.0, 1.0, 1.0, 2.0, 2.0, 356.0, 31.25, 30.0, 4.0, 125.0, 45.0, 75.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(0, 73, List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 16, 19, 22, 43, 61, 71), List(1.0, 15.0, 1.0, 1.0, 2.0, 2.0, 356.0, 31.25, 30.0, 4.0, 125.0, 45.0, 75.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",168.48375199087275
1.0,30.0,0.0,0.0,0.0,0.0,249.0,10.0,15.0,15.0,4.0,60.0,10.0,87.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,"List(0, 73, List(0, 1, 6, 7, 8, 9, 10, 11, 12, 16, 22, 23), List(1.0, 30.0, 249.0, 15.0, 15.0, 4.0, 60.0, 10.0, 87.0, 1.0, 1.0, 1.0))","List(0, 73, List(0, 1, 6, 7, 8, 9, 10, 11, 12, 16, 22, 23), List(1.0, 30.0, 249.0, 15.0, 15.0, 4.0, 60.0, 10.0, 87.0, 1.0, 1.0, 1.0))",27.854250194584324
1.0,30.0,0.0,0.0,0.0,0.0,249.0,10.0,15.0,15.0,4.0,60.0,15.0,87.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,"List(0, 73, List(0, 1, 6, 7, 8, 9, 10, 11, 12, 16, 22, 23), List(1.0, 30.0, 249.0, 15.0, 15.0, 4.0, 60.0, 15.0, 87.0, 1.0, 1.0, 1.0))","List(0, 73, List(0, 1, 6, 7, 8, 9, 10, 11, 12, 16, 22, 23), List(1.0, 30.0, 249.0, 15.0, 15.0, 4.0, 60.0, 15.0, 87.0, 1.0, 1.0, 1.0))",25.00706915924297
1.0,30.0,0.0,0.0,0.0,0.0,249.0,10.0,15.0,15.0,4.0,60.0,15.0,87.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,"List(0, 73, List(0, 1, 6, 7, 8, 9, 10, 11, 12, 16, 22, 23), List(1.0, 30.0, 249.0, 15.0, 15.0, 4.0, 60.0, 15.0, 87.0, 1.0, 1.0, 1.0))","List(0, 73, List(0, 1, 6, 7, 8, 9, 10, 11, 12, 16, 22, 23), List(1.0, 30.0, 249.0, 15.0, 15.0, 4.0, 60.0, 15.0, 87.0, 1.0, 1.0, 1.0))",25.00706915924297


#Generalized linear regression

In [8]:
from pyspark.ml.regression import GeneralizedLinearRegression
glr = GeneralizedLinearRegression(family="gaussian", link="identity", maxIter=10, regParam=0.3)
#gama: Inverse*, Identity, Log
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, glr])
pipelineModel = pipeline.fit(train)
lrModel = pipelineModel.stages[-1]

print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

summary = lrModel.summary
print("Coefficient Standard Errors: " + str(summary.coefficientStandardErrors))
print("T Values: " + str(summary.tValues))
print("P Values: " + str(summary.pValues))
print("Dispersion: " + str(summary.dispersion))
print("Null Deviance: " + str(summary.nullDeviance))
print("Residual Degree Of Freedom Null: " + str(summary.residualDegreeOfFreedomNull))
print("Deviance: " + str(summary.deviance))
print("Residual Degree Of Freedom: " + str(summary.residualDegreeOfFreedom))
print("AIC: " + str(summary.aic))
print("Deviance Residuals: ")
summary.residuals().show()

predictions = pipelineModel.transform(test)
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print "RMSE on our test set: %g" % rmse
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
print "r2 on our test set: %g" % r2
print "mae on our test set: %g" % mae


In [9]:
# # Gaussian, identity
# RMSE on our test set: 463.274
# r2 on our test set: 0.746578
# mae on our test set: 195.02
# #possion,log
# RMSE on our test set: 601.665
# r2 on our test set: 0.57787
# mae on our test set: 243.454
# #possion,sqrt
# RMSE on our test set: 531.159
# r2 on our test set: 0.671008
# mae on our test set: 207.285
# #gaussian log/inverse
# RMSE on our test set: 531.159
# r2 on our test set: 0.671008
# mae on our test set: 207.285

#Decision tree regression

In [11]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="features")

pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, dt])
pipelineModel = pipeline.fit(train)
# Print the coefficients and intercept for linear regression
treeModel = pipelineModel.stages[-1]

predictions = pipelineModel.transform(test)

evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print "RMSE on our test set: %g" % rmse
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
print "r2 on our test set: %g" % r2
print "mae on our test set: %g" % mae

# summary only
print(treeModel)

#Random forest regression

In [13]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="features")
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, rf])
pipelineModel = pipeline.fit(train)
# Print the coefficients and intercept for linear regression
treeModel = pipelineModel.stages[-1]

predictions = pipelineModel.transform(test)

evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print "RMSE on our test set: %g" % rmse
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
print "r2 on our test set: %g" % r2
print "mae on our test set: %g" % mae

# summary only
print(treeModel)

#Gradient-boosted tree regression

In [15]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Train a GBT model.
gbt = GBTRegressor(featuresCol="features", maxIter=10)
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, gbt])
pipelineModel = pipeline.fit(train)
# Print the coefficients and intercept for linear regression
treeModel = pipelineModel.stages[-1]

predictions = pipelineModel.transform(test)

evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print "RMSE on our test set: %g" % rmse
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
print "r2 on our test set: %g" % r2
print "mae on our test set: %g" % mae

print(treeModel)

In [16]:
predictions = pipelineModel.transform(train)
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print "RMSE on our train set: %g" % rmse
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
print "r2 on our train set: %g" % r2
print "mae on our train set: %g" % mae

print(treeModel)

#Survival regression
TBD

In [18]:
from pyspark.ml.regression import AFTSurvivalRegression
from pyspark.ml.linalg import Vectors

training = spark.createDataFrame([
    (1.218, 1.0, Vectors.dense(1.560, -0.605)),
    (2.949, 0.0, Vectors.dense(0.346, 2.158)),
    (3.627, 0.0, Vectors.dense(1.380, 0.231)),
    (0.273, 1.0, Vectors.dense(0.520, 1.151)),
    (4.199, 0.0, Vectors.dense(0.795, -0.226))], ["label", "censor", "features"])
quantileProbabilities = [0.3, 0.6]
aft = AFTSurvivalRegression(quantileProbabilities=quantileProbabilities,
                            quantilesCol="quantiles")

model = aft.fit(training)

# Print the coefficients, intercept and scale parameter for AFT survival regression
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))
print("Scale: " + str(model.scale))
model.transform(training).show(truncate=False)

#Isotonic regression
not monotonic, try for fun

In [20]:
from pyspark.ml.regression import IsotonicRegression

iso =  IsotonicRegression()
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, iso])
pipelineModel = pipeline.fit(train)
treeModel = pipelineModel.stages[-1]

predictions = pipelineModel.transform(test)

evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print "RMSE on our test set: %g" % rmse
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
print "r2 on our test set: %g" % r2
print "mae on our test set: %g" % mae

print("Boundaries in increasing order: %s\n" % str(treeModel.boundaries))
print("Predictions associated with the boundaries: %s\n" % str(treeModel.predictions))

#conclusion:
test multiple simple regression model 
from the above, we can se gbt is the best, achieved 0.87 r2
then use grid search for the regression

In [22]:
from pyspark.ml.regression import GBTRegressor,GBTRegressionModel
gbt = GBTRegressor(labelCol="label")

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

paramGrid = ParamGridBuilder()\
  .addGrid(gbt.maxDepth, [10,20])\
  .addGrid(gbt.maxIter, [20,30])\
  .build()
evaluator = RegressionEvaluator(metricName="rmse", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol())
cv = CrossValidator(estimator=gbt, evaluator=evaluator, estimatorParamMaps=paramGrid,numFolds = 5)

from pyspark.ml import Pipeline,PipelineModel
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv])

In [23]:
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv])
pipelineModel = pipeline.fit(train)

predictions = pipelineModel.transform(test)
display(predictions.select("label", "prediction", *featuresCols))

rmse = evaluator.evaluate(predictions)
print "RMSE on our test set: %g" % rmse
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
print "r2 on our test set: %g" % r2
print "mae on our test set: %g" % mae

# print(pipelineModel.bestModel.getMaxIter())
# maxIter
bestParams = pipelineModel.stages[-1].bestModel.extractParamMap() ##maxDepth: 5 , maxIter:100
print(bestParams)

rfPath = "/tmp/mllib-persistence-example/rfModel"
pipelineModel.save(rfPath)
# pipelineModel = PipelineModel.load(rfPath)

#feature selection:PCA

In [25]:
# 136 features with feature selection 
#25
RMSE on our test set: 346.163
r2 on our test set: 0.859244
mae on our test set: 159.356
  
#40
RMSE on our test set: 342.78
r2 on our test set: 0.861982
mae on our test set: 157.997
  
#50
RMSE on our test set: 343.726
r2 on our test set: 0.861219
mae on our test set: 158.91

#60
RMSE on our test set: 342.148
r2 on our test set: 0.86249
mae on our test set: 158.672
  
#70
RMSE on our test set: 341.94
r2 on our test set: 0.862657
mae on our test set: 158.763

test results:
feature numbers: R2,

3: 0.76 ; 5: 0.79 ; 10：0.83 ; 15：0.83 ; 20：0.84 ; 30：0.85 ; 40: 0.84 ; 35:0.85 ; 37:0.84

result: pca doesnt work =.=
check feature importance, and sub-sampling features

In [27]:
treeModel.featureImportances

In [28]:
print(df.columns)

In [29]:
#11,23,51,54
print(df.columns[11])
print(df.columns[23])
print(df.columns[51])
print(df.columns[54])

In [30]:
featuresCols = df.columns
featuresCols.remove('label')
featuresCols.remove('duration_week_minus')
featuresCols.remove('FinancerZVW')
featuresCols.remove('ProblemNameCognition')
featuresCols.remove('ProblemNameInterpersonal_relationship')

vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=2)

In [31]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Train a GBT model.
gbt = GBTRegressor(featuresCol="features", maxIter=10)
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, gbt])
pipelineModel = pipeline.fit(train)
# Print the coefficients and intercept for linear regression
treeModel = pipelineModel.stages[-1]

predictions = pipelineModel.transform(test)

evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print "RMSE on our test set: %g" % rmse
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
print "r2 on our test set: %g" % r2
print "mae on our test set: %g" % mae

print(treeModel)

#feature without selection

In [33]:
spark.catalog.clearCache()
df = sqlContext.read.format('csv').option("header", 'true').load("/FileStore/tables/allCom_without_feature_selection.csv").drop("PatientID").withColumnRenamed("duration_next_week","label").drop("PatientID").drop("register_NextWeekNR").drop("assessment_date")
df = df.fillna({'Marital_statusAlleenstaand':'2'}).fillna({'Marital_statusGehuwd':'2'}).fillna({'Marital_statusSamenwonend':'2'}).fillna({'Marital_statusSamenwonend_met_contract':'2'})
df.cache()

# df = df.filter(col('duration_next_week') < 1000)

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.sql.functions import col  # for indicating a column using a string in the line below
df = df.select([col(c).cast("double").alias(c) for c in df.columns])
# test = test.select([col(c).cast("double").alias(c) for c in test.columns])
df.printSchema()

train, test = df.randomSplit([0.7, 0.3])

from pyspark.ml.feature import VectorAssembler, VectorIndexer
featuresCols = df.columns
featuresCols.remove('label')
featuresCols.remove('Marital_statusAlleenstaand')
featuresCols.remove('Marital_statusGehuwd')
featuresCols.remove('Marital_statusSamenwonend')
featuresCols.remove('Marital_statusSamenwonend_met_contract')

vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=3)

In [34]:
# from pyspark.sql.functions import isnan, when, count, col
# train.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

In [35]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Train a GBT model.
gbt = GBTRegressor(featuresCol="features", maxIter=10)
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, gbt])
pipelineModel = pipeline.fit(train)
# Print the coefficients and intercept for linear regression
treeModel = pipelineModel.stages[-1]

predictions = pipelineModel.transform(test)

evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print "RMSE on our test set: %g" % rmse
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
print "r2 on our test set: %g" % r2
print "mae on our test set: %g" % mae

print(treeModel)

use pca

In [37]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import PCA

vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="featuresBeforePCA", maxCategories=2)

pca = PCA(k=70, inputCol="featuresBeforePCA", outputCol="features")

# Train a GBT model.
gbt = GBTRegressor(featuresCol="features", maxIter=10)
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, pca,gbt])
pipelineModel = pipeline.fit(train)
# Print the coefficients and intercept for linear regression
treeModel = pipelineModel.stages[-1]
predictions = pipelineModel.transform(test)
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print "RMSE on our test set: %g" % rmse
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
print "r2 on our test set: %g" % r2
print "mae on our test set: %g" % mae

print(treeModel)