#(Ref. Big Data Community dan https://spark.apache.org/docs/latest/mllib-linear-methods.html) MK Analisis Big Data Filkom UB (Imam Cholissodin | imamcs@ub.ac.id)

# Regresi Linier (dengan data Simple)

In [2]:
from pyspark.ml.regression import LinearRegression

# Load training data
training = spark.read.format("libsvm")\
    .load("data/my/simplereg.txt")

lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)


Coefficients: [10.754139100117724]
Intercept: 829.9665547533602
numIterations: 3
objectiveHistory: [0.5000000000000142, 0.41689712560766806, 0.14247223398876466]
+-------------------+
|          residuals|
+-------------------+
|-20.886257858187037|
| -30.69006615971739|
| -82.47731806077672|
|  14.30993384028261|
| 103.83476844098914|
| -24.65695335877558|
|  -96.7231789606592|
| 24.030960139458557|
|  56.76854283910552|
|  56.48956913828124|
+-------------------+

RMSE: 60.101266
r2: 0.719013


In [96]:
sqlContext = SQLContext(sc)

house_df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('data/my/simplereg.csv')
house_df.take(1)
house_df.cache()
house_df.printSchema()

house_df.describe().toPandas().transpose()

import pandas as pd

numeric_features = [t[0] for t in house_df.dtypes if t[1] == 'int' or t[1] == 'double']
sampled_data = house_df.select(numeric_features).sample(False, 0.8).toPandas()
axs = pd.scatter_matrix(sampled_data, figsize=(10, 10))
n = len(sampled_data.columns)
for i in range(n):
    v = axs[i, 0]
    v.yaxis.label.set_rotation(0)
    v.yaxis.label.set_ha('right')
    v.set_yticks(())
    h = axs[n-1, i]
    h.xaxis.label.set_rotation(90)
    h.set_xticks(())

from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['x'], outputCol = 'features')
vhouse_df = vectorAssembler.transform(house_df)
vhouse_df = vhouse_df.select(['features', 'y'])
vhouse_df.show(10)

#splits = vhouse_df.randomSplit([0.7, 0.3])
splits = vhouse_df.randomSplit([1.0, 0.0])
train_df = splits[0]
#test_df = splits[1]
test_df = splits[0]
    
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features', labelCol='y', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

train_df.describe().show()

root
 |-- id: integer (nullable = true)
 |-- y: integer (nullable = true)
 |-- x: integer (nullable = true)

+--------+----+
|features|   y|
+--------+----+
|  [41.0]|1250|
|  [54.0]|1380|
|  [63.0]|1425|
|  [54.0]|1425|
|  [48.0]|1450|
|  [46.0]|1300|
|  [62.0]|1400|
|  [61.0]|1510|
|  [64.0]|1575|
|  [71.0]|1650|
+--------+----+

Coefficients: [10.754139100117724]
Intercept: 829.9665547533602
+-------+----------------+
|summary|               y|
+-------+----------------+
|  count|              10|
|   mean|          1436.5|
| stddev|119.514062398996|
|    min|            1250|
|    max|            1650|
+-------+----------------+



# Regresi Linier

In [34]:
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel

# Load and parse the data
def parsePoint(line):
    values = [float(x) for x in line.replace(',', ' ').split(' ')]
    return LabeledPoint(values[0], values[1:])

data = sc.textFile("data/mllib/ridge-data/lpsa.data")
parsedData = data.map(parsePoint)

# Build the model
model = LinearRegressionWithSGD.train(parsedData, iterations=100, step=0.00000001)

# Evaluate the model on training data
valuesAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
MSE = valuesAndPreds \
    .map(lambda vp: (vp[0] - vp[1])**2) \
    .reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))

# Save and load model
#model.save(sc, "target/tmp/pythonLinearRegressionWithSGDModel")
#sameModel = LinearRegressionModel.load(sc, "target/tmp/pythonLinearRegressionWithSGDModel")


Mean Squared Error = 7.4510328101026015


# Logistic Regression

In [2]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint

# Load and parse the data
def parsePoint(line):
    values = [float(x) for x in line.split(' ')]
    return LabeledPoint(values[0], values[1:])

data = sc.textFile("data/mllib/sample_svm_data.txt")
parsedData = data.map(parsePoint)

# Build the model
model = LogisticRegressionWithLBFGS.train(parsedData)

# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))

# Save and load model
model.save(sc, "target/tmp/pythonLogisticRegressionWithLBFGSModel")
sameModel = LogisticRegressionModel.load(sc,
                                         "target/tmp/pythonLogisticRegressionWithLBFGSModel")

Training Error = 0.36645962732919257


# Note: Logistic Regression (lainnya)

mencoba custom menggunakan DataFrames (vs. Logistic Reg dari Spark Mllib yang menggunakan RDDs)

### Konsep:

DataFrame: API Machine Learning ini menggunakan DataFrame dari Spark SQL sebagai dataset Machine Learning, yang dapat menampung berbagai jenis data. Misalnya, DataFrame dapat memiliki kolom berbeda dalam menyimpan teks, vektor fitur, label aktual, dan prediksi.

Transformer: algoritma yang dapat mengubah satu DataFrame menjadi DataFrame lain. Misalnya, model Machine Learning adalah suatu alat Transformer atau transformasi yang dapat mengubah DataFrame yang berisi fitur-fitur (input) menjadi DataFrame dalam bentuk prediksi (output).

Estimator: algoritma yang dapat masuk dalam bentuk DataFrame untuk menghasilkan Transformer. Misalnya, algoritme pembelajaran adalah suatu Estimator yang melakukan proses pelatihan dari DataFrame untuk menghasilkan model yang siap digunakan untuk proses pengujian.

Pipeline: Sebuah Pipeline memadukan beberapa Transformer dan Estimator bersama-sama untuk menentukan alur kerja dari suatu Machine Learning.

(sumber: spark.apache.org)