In [1]:
%pylab inline

from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression

Populating the interactive namespace from numpy and matplotlib


In [2]:
raw_data=sc.textFile("/home/csxion/Desktop/project/data/p9_data_11_16_1q_sed.csv")
records=raw_data.map(lambda line:line.split(","))
records.first()

['가락시장역',
 '2016-01-01',
 '5',
 '0.077606673',
 '0.420556606',
 '0.414592146',
 '0.820293973',
 '0.553426589',
 '0.549782942',
 '0.99369488',
 '78']

In [3]:
subway_dict=records.map(lambda r:r[0]).distinct().zipWithIndex().collectAsMap() # distinct 꼭 사용할 것
time_dict=records.map(lambda r:r[2]).distinct().zipWithIndex().collectAsMap()
#date_dict=records.map(lambda r:r[1]).distinct().zipWithIndex().collectAsMap()

In [4]:
# r[0] : subway station name // Category feature => binary vector
# r[1] : Date(Not used)
# r[2] : Time(5~23) # Category feature => binary vector

def check_error_float(data, econd, subt): #data가 equal_cond와 같을때, subt로 치환한다.
    if(data==econd):
        return subt
    else:
        return float(data)

def extract_features_linear(r): #line별
    name_vec=np.zeros(len(subway_dict)) # 역 이름에 대하여 카테고리 특징을 바이너리 벡터 형태로 변환한다.
    name_vec[subway_dict[r[0]]]=1
    time_vec=np.zeros(len(time_dict)) # 시간 데이터에 대하여(5~23) 카테고리 특징을 바이너리 벡터 형태로 변환한다.
    time_vec[time_dict[r[2]]]=1
    #numbers=np.array([float(r[i]) for i in range(3,len(r)-1)])
    #numbers=np.array([float(e) for e in r[3:]])
    for i in range(3,len(r)-1):
        r[i]=check_error_float(r[i], "#N/A", 0.0)
    numbers=np.array([float(d) for d in r[3:len(r)-1]])
    return np.concatenate((name_vec, time_vec, numbers)) # 역 + 시간 + 특징벡터 값으로 구성되는 numpy.array를 반환한다.
#flatMap을 사용할 필요가 없음

def extract_label(r):
    return float(r[len(r)-1]) # int는 소문자로 지정해야 함
#float

def extract_features_dt(r): # dt모델에 적용할 특징벡터를 추출하는 함수. 여기서는 바이너리 벡터를 생성하지 않는다.
    name=float(subway_dict[r[0]])
    time=float(time_dict[r[2]])
    for i in range(3,len(r)-1):
        r[i]=check_error_float(r[i], "#N/A", 0.0)
    numbers=np.array([float(d) for d in r[3:len(r)-1]])
    return np.concatenate((np.array([name, time]), numbers))

# np.concatenate 함수는 꼭 이중괄호를 해주어야 index 에러가 나지 않는다.
# extract_label 함수의 경우 linearRegression Model과 DecisionTree 모두 동일하게 사용한다

### LinearRegression(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, standardization=True, solver="auto", weightCol=None, aggregationDepth=2)

- elasticNetParam = Param(parent='undefined', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.')

- maxIter = Param(parent='undefined', name='maxIter', doc='max number of iterations (>= 0).')
- regParam = Param(parent='undefined', name='regParam', doc='regularization parameter (>= 0).')

###### https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.regression.LinearRegression

In [6]:
df=spark.createDataFrame(records.map(lambda line:(extract_label(line), Vectors.dense(extract_features_linear(line))))).toDF("label", "features")
(train_df, test_df) = df.randomSplit([0.7, 0.3]) #training Data와 Test Data를 7 : 3의 비율로 나눈다
lr = LinearRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8) # LinearRegression 모델의 parameter를 설정한다.
lrModel=lr.fit(df)
lrModel.transform(df.sample(True, 0.2, 42)).collect() # 20%의 데이터, 42는 random seed value

#.transform 메소드를 활용하여 데이터를 테스트 할 수 있다.

# RDD보다 비교적 정확한 데이터를 확인할 수 있었다.

[Row(label=316.0, features=DenseVector([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0947, 0.4206, 0.1634, 0.8203, 0.5534, 0.5498, 0.9937]), prediction=171.75480724513545),
 Row(label=393.0, features=DenseVector([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 

In [14]:
lrModel.transform(df.sample(True, 0.2, 42)).select('label', 'prediction').take(20)

[Row(label=316.0, prediction=171.75480724513545),
 Row(label=393.0, prediction=185.4163460872337),
 Row(label=436.0, prediction=2922.2936966438747),
 Row(label=299.0, prediction=433.69399696938126),
 Row(label=1890.0, prediction=8701.20078473309),
 Row(label=1436.0, prediction=10355.271947221085),
 Row(label=6092.0, prediction=10796.494186845342),
 Row(label=6634.0, prediction=11330.16189452806),
 Row(label=5027.0, prediction=10379.733855040584),
 Row(label=3309.0, prediction=8951.335296585392),
 Row(label=2009.0, prediction=4727.127496739394),
 Row(label=4046.0, prediction=4367.843753856287),
 Row(label=4669.0, prediction=4844.001569055005),
 Row(label=4981.0, prediction=5186.249127084634),
 Row(label=1767.0, prediction=2137.6743378388746),
 Row(label=1491.0, prediction=6652.525635785705),
 Row(label=1892.0, prediction=4148.809026256581),
 Row(label=2570.0, prediction=4162.47056509868),
 Row(label=4807.0, prediction=5813.368960906972),
 Row(label=2933.0, prediction=3120.40903986352)]

### Regularization
- none (a.k.a. ordinary least squares)
- L2 (ridge regression)
- L1 (Lasso)
- L2 + L1 (elastic net)

In [21]:
paramMap = {lr.maxIter: 1000, lr.regParam:0.001, lr.elasticNetParam:0.8} # ParamMap이라는 매개변수 구조체로 변수를 넘겨줄 수 있다.
#step size의 경우 RDD에서만 사용하는듯, ml에서는 관련 정보가 없음

#paramMap[lr.maxIter] = 100  # 특정 변수를 지정하여 넘길 수 있다.

lrModel2=lr.fit(df, paramMap) # lr2Model의 경우 변경된 paramter에 대해 학습을 진행한다.
lrModel2.transform(test_df).select('label', 'prediction').take(20)

# Refer : https://spark.apache.org/docs/latest/ml-classification-regression.html#regression
# label이 작은 값일수록 오차가 심함

[Row(label=0.0, prediction=-2871.535880092546),
 Row(label=1.0, prediction=-2719.770007143892),
 Row(label=2.0, prediction=-2399.005854618362),
 Row(label=3.0, prediction=-2568.635193003629),
 Row(label=4.0, prediction=-2962.967584852461),
 Row(label=4.0, prediction=-2827.927814211029),
 Row(label=4.0, prediction=-2588.537103023027),
 Row(label=4.0, prediction=-2739.574578703884),
 Row(label=5.0, prediction=-2740.8035782579936),
 Row(label=5.0, prediction=-2964.32846231469),
 Row(label=6.0, prediction=-2001.5259487717476),
 Row(label=6.0, prediction=-2971.7602858628925),
 Row(label=7.0, prediction=-2585.9683706557025),
 Row(label=8.0, prediction=-2251.805630854895),
 Row(label=8.0, prediction=-1943.663817039962),
 Row(label=8.0, prediction=-2648.142873943969),
 Row(label=9.0, prediction=-2295.5198992195637),
 Row(label=9.0, prediction=-2773.9887320800954),
 Row(label=10.0, prediction=-1594.5811777314166),
 Row(label=10.0, prediction=-2715.6626030883526)]

In [20]:
lrModel2.explainParams()

''

In [27]:
# GLR(GeneralizedLinearRegression) Model 테스트 하기

from pyspark.ml.regression import GeneralizedLinearRegression
glr = GeneralizedLinearRegression(family="gaussian", link="identity", maxIter=10, regParam=0.3)

# Fit the model
model=glr.fit(df)

# Print the coefficients and intercept for generalized linear regression model
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))

# Summarize the model over the training set and print out some metrics
summary = model.summary
print("Coefficient Standard Errors: " + str(summary.coefficientStandardErrors))
print("T Values: " + str(summary.tValues)) # 두 값의 차이를 표본오차로 나눈 값(=두 값의 차이는 표준 오차의 몇 배인가)
#T value의 절대값이 커지게 되면, 해당 샘플과의 모집단 자체가 많이 틀어짐을 의미함
print("P Values: " + str(summary.pValues))
#귀무가설이 맞다는 전제 하에, 통계값이 실제로 관측된 값 이상일 확률
print("Dispersion: " + str(summary.dispersion)) #분산
print("Null Deviance: " + str(summary.nullDeviance)) # The null deviance shows how well the response is predicted by the model with nothing but an intercept.
print("Residual Degree Of Freedom Null: " + str(summary.residualDegreeOfFreedomNull))
print("Deviance: " + str(summary.deviance))
print("Residual Degree Of Freedom: " + str(summary.residualDegreeOfFreedom))
print("AIC: " + str(summary.aic))
print("Deviance Residuals: ")
summary.residuals().show()

model.transform(testData).map(lambda d:(d['label'], d['prediction']).collect()

Coefficients: [-2340.84973224,-102.071197864,-1185.4469747,-656.697476055,-1374.67191933,-81.0970801758,185.302367084,-925.00862181,772.879166214,3930.43861945,2469.75834729,-2129.77393692,-404.757381634,3797.51838649,-1818.01174074,4829.60969311,2213.75458749,1990.62104646,-274.025418591,-490.641177289,-1388.01716045,-1238.18638721,5366.15470376,-1737.66383046,1468.97082819,-1723.53938985,1813.05275766,-1620.8965715,-1757.70441692,2469.49783957,2563.30133011,100.181931213,-456.131012434,-1967.81666097,-1721.64132694,711.952608128,-892.370287695,-748.850665958,-1491.92531431,-1620.16830009,584.850256696,1914.49136587,-1143.90233173,-2411.94015905,5602.90905205,1365.95600377,708.286176002,42.6657373839,778.064991873,1256.2198431,-590.151232409,-450.839192364,2724.98854883,8598.44238621,544.585166214,608.673561971,1263.11617085,845.656913032,1205.97262702,-2290.99957174,-250.913469023,-1255.78033807,-2231.36796086,-1557.13439572,-738.228342986,-1067.53830521,-560.572372837,1671.51006812,

[Row(label=4.0, features=DenseVector([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.7035, 0.492, 0.1992, 0.7756, 0.7587, 0.7787, 0.7724]), prediction=-1892.9471314037269),
 Row(label=4.0, features=DenseVector([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,

In [30]:
model.transform(testData).select('label', 'prediction').collect()
# dataframe의 경우 map 함수가 아닌 select함수를 이용하여 거른다.

[Row(label=4.0, prediction=-1892.9471314037269),
 Row(label=4.0, prediction=-2391.265787517062),
 Row(label=6.0, prediction=-2340.8048242557356),
 Row(label=8.0, prediction=-2064.370819500888),
 Row(label=9.0, prediction=-1889.526747209526),
 Row(label=10.0, prediction=-2226.67939843861),
 Row(label=11.0, prediction=-2259.3218281877903),
 Row(label=11.0, prediction=-2201.74428718639),
 Row(label=13.0, prediction=-1797.3709669983796),
 Row(label=14.0, prediction=-1523.0622739738142),
 Row(label=15.0, prediction=-2471.707389878165),
 Row(label=16.0, prediction=-1857.808335626773),
 Row(label=17.0, prediction=-386.5179775995507),
 Row(label=18.0, prediction=-1314.8506711050554),
 Row(label=18.0, prediction=-1764.058515857378),
 Row(label=19.0, prediction=356.0686320824175),
 Row(label=19.0, prediction=356.0686320824175),
 Row(label=20.0, prediction=-881.7613744029964),
 Row(label=20.0, prediction=-1274.4980869269352),
 Row(label=20.0, prediction=-1505.6738285830597),
 Row(label=20.0, pred

In [None]:
# DecisionTree
# VectorIndexer에 대해 알아봐야할듯 
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("/usr/local/spark/data/mllib/sample_libsvm_data.txt")

# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="features")

# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, dt])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

treeModel = model.stages[1]
# summary only
print(treeModel)

In [None]:
trainingData.take(3)

In [32]:
#VectorIndexer test code
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

vi = spark.createDataFrame([(Vectors.dense([-1.0, 0.0]),),(Vectors.dense([0.0, 1.0]),), (Vectors.dense([0.0, 2.0]),)], ["a"])
indexer = VectorIndexer(maxCategories=2, inputCol="a", outputCol="indexed")
model = indexer.fit(vi)
model.transform(vi).head().indexed
model.numFeatures

2

model.categoryMaps #무언가 카테고리를 나누려 했음. 

In [None]:
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(df) # indexedFeatures라는 새로운 열 생성
dt = DecisionTreeRegressor(featuresCol="features")
pipeline = Pipeline(stages=[featureIndexer, dt])
#https://spark.apache.org/docs/latest/ml-pipeline.html : 파이프라인에 대한 설명
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

treeModel = model.stages[1]
# summary only
print(treeModel)

In [None]:
trainingData.take(5)

In [None]:
testData.take(10)

In [None]:
trainingData.take(78)

In [None]:
df.take(78)