-
Notifications
You must be signed in to change notification settings - Fork 1
/
spark_lr.py
executable file
·115 lines (74 loc) · 2.41 KB
/
spark_lr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
File Name: test.py
Function:
Usage:
Input:
Output:
Author: panwenhai
Create Time: 2017-05-31 15:50:27
"""
import sys
reload(sys)
sys.setdefaultencoding("utf-8")
import os
import time
from datetime import datetime, timedelta
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import SparkSession
from pyspark.sql.types import *
def main():
spark = SparkSession.builder.appName("spark machine learning").getOrCreate()
training = spark.read.csv("file:///home/panwenhai/stock_model/data/data1.csv", inferSchema=True)
feature_col = training.columns[:-1]
label_col = training.columns[-1]
training = training.withColumnRenamed(label_col, "label")
vecAssembler = VectorAssembler(inputCols=feature_col, outputCol="features")
training = vecAssembler.transform(training)
print "Training Data:"
print training.show()
print ""
lr = LogisticRegression(maxIter=1000, regParam=0.01)
print "LR params:"
print lr.explainParams()
print ""
model1 = lr.fit(training)
print model1
print "model1 params:"
print lr.extractParamMap()
param_map = {}
param_map[lr.maxIter] = 3000
param_map.update(
{
lr.regParam: 0.1,
lr.threshold: 0.55
})
param_map2 = {}
param_map2[lr.probabilityCol] = "myProbability"
params = param_map.copy()
params.update(param_map2)
model2 = lr.fit(training, params)
print model2
print "model2, params:"
print lr.extractParamMap()
test = training
prediction = model2.transform(test)
result = prediction.select("features", "label", "myProbability", "prediction").collect()
for row in result:
print "features=%s, label=%s --> prob=%s, prediction=%s" % (row.features, row.label, row.myProbability, row.prediction)
print "weights: "
print model2.coefficients
print "intercept: "
print model2.intercept
evaluate_result = model2.evaluate(test)
print "auc: " + str(evaluate_result.areaUnderROC)
print evaluate_result.fMeasureByThreshold.show()
print evaluate_result.precisionByThreshold.show()
print evaluate_result.recallByThreshold.show()
print evaluate_result.pr.show()
return
if __name__ == "__main__":
main()