In [1]:
#  Spark 2.0 DataFrame API for MLlib

In [1]:
from __future__ import print_function

from pyspark.ml.regression import LinearRegression

from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors


# ----------------------------------------------------------
# PCP 20230328
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
# ----------------------------------------------------------


if __name__ == "__main__":

    # Create a SparkSession (Note, the config section is only for Windows!)
    spark = SparkSession.builder.config("spark.sql.warehouse.dir", "file:///C:/temp").appName("LinearRegression").getOrCreate()

    # Load up our data and convert it to the format MLLib expects.
    # inputLines = spark.sparkContext.textFile("regression.txt")
    inputLines = spark.sparkContext.textFile("regression_PCP_20230329.txt")
    data = inputLines.map(lambda x: x.split(",")).map(lambda x: (float(x[0]), Vectors.dense(float(x[1]))))

    # Convert this RDD to a DataFrame
    colNames = ["label", "features"]
    df = data.toDF(colNames)

    # Note, there are lots of cases where you can avoid going from an RDD to a DataFrame.
    # Perhaps you're importing data from a real database. Or you are using structured streaming
    # to get your data.

    # Let's split our data into training data and testing data
    trainTest = df.randomSplit([0.5, 0.5])
    trainingDF = trainTest[0]
    testDF = trainTest[1]

    # Now create our linear regression model
    lir = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

    # Train the model using our training data
    model = lir.fit(trainingDF)

    # Now see if we can predict values in our test data.
    # Generate predictions using our linear regression model for all features in our
    # test dataframe:
    fullPredictions = model.transform(testDF).cache()

    # Extract the predictions and the "known" correct labels.
    predictions = fullPredictions.select("prediction").rdd.map(lambda x: x[0])
    labels = fullPredictions.select("label").rdd.map(lambda x: x[0])

    # Zip them together
    predictionAndLabel = predictions.zip(labels).collect()

    # Print out the predicted and actual values for each point
    for prediction in predictionAndLabel:
      print(prediction)


    # Stop the session
    spark.stop()


(-2.667622909520904, -3.74)
(-1.82853570474812, -2.58)
(-1.8712011558382617, -2.36)
(-1.5583211811772237, -2.27)
(-1.3521048342415396, -2.12)
(-1.4018811938467048, -2.09)
(-1.4089921023617284, -1.96)
(-1.3236612001814454, -1.91)
(-1.3449939257265162, -1.88)
(-1.4089921023617284, -1.87)
(-1.3094393831513982, -1.8)
(-1.1814430298809735, -1.77)
(-1.0392248595805018, -1.67)
(-1.1601103043359027, -1.6)
(-1.1529993958208793, -1.59)
(-1.1743321213659499, -1.58)
(-1.0036703170053838, -1.46)
(-0.9396721403701717, -1.39)
(-1.0036703170053838, -1.36)
(-1.053446676610549, -1.33)
(-1.0321139510654782, -1.3)
(-0.8116757870997469, -1.29)
(-0.8472303296748649, -1.27)
(-0.8330085126448177, -1.26)
(-0.9396721403701717, -1.25)
(-0.776121244524629, -1.24)
(-0.8472303296748649, -1.23)
(-0.8543412381898885, -1.22)
(-0.8543412381898885, -1.2)
(-0.8827848722499828, -1.17)
(-0.8898957807650064, -1.16)
(-0.7832321530396527, -1.12)
(-0.7690103360096054, -1.1)
(-0.7334557934344875, -1.07)
(-0.7121230678894167, -1