In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("missing").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/20 13:40:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# read the source data
training = spark.read.csv("test3.csv", header=True, inferSchema=True)

                                                                                

In [4]:
training.show()

+------+---+----------+------+
|  Name|Age|Experience|Salary|
+------+---+----------+------+
|Anshul| 37|        15|250000|
| Rohan| 33|        11|450000|
|Poorva| 35|         4| 10000|
|Saloni| 31|         1|  9000|
| Pasha| 29|         1|  8000|
|Ruchir|  5|         0|   100|
|Cheeku|  1|         0|     0|
+------+---+----------+------+



In [5]:
training.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [6]:
training.columns

['Name', 'Age', 'Experience', 'Salary']

In [7]:
from pyspark.ml.feature import VectorAssembler

In [8]:
feature_assembler = VectorAssembler(inputCols=["Age", "Experience"], outputCol="Independent Features")

In [9]:
output = feature_assembler.transform(training)

In [10]:
output.show()

+------+---+----------+------+--------------------+
|  Name|Age|Experience|Salary|Independent Features|
+------+---+----------+------+--------------------+
|Anshul| 37|        15|250000|         [37.0,15.0]|
| Rohan| 33|        11|450000|         [33.0,11.0]|
|Poorva| 35|         4| 10000|          [35.0,4.0]|
|Saloni| 31|         1|  9000|          [31.0,1.0]|
| Pasha| 29|         1|  8000|          [29.0,1.0]|
|Ruchir|  5|         0|   100|           [5.0,0.0]|
|Cheeku|  1|         0|     0|           [1.0,0.0]|
+------+---+----------+------+--------------------+



22/10/20 14:11:24 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 955706 ms exceeds timeout 120000 ms
22/10/20 14:11:24 WARN SparkContext: Killing executors is not supported by current scheduler.


In [13]:
# choose only the relevant columns - Independent Features and Salary
finalized_data = output.select(["Independent Features", "Salary"])

In [14]:
finalized_data.show()

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|         [37.0,15.0]|250000|
|         [33.0,11.0]|450000|
|          [35.0,4.0]| 10000|
|          [31.0,1.0]|  9000|
|          [29.0,1.0]|  8000|
|           [5.0,0.0]|   100|
|           [1.0,0.0]|     0|
+--------------------+------+



In [15]:
# applying linear regression

from pyspark.ml.regression import LinearRegression

# train test split
train_data, test_data = finalized_data.randomSplit([0.75, 0.25])
regressor = LinearRegression(featuresCol='Independent Features', labelCol='Salary')
regressor_fit = regressor.fit(train_data)

22/10/20 15:27:08 WARN Instrumentation: [ab3b7efe] regParam is zero, which might cause numerical instability and overfitting.
22/10/20 15:27:08 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/10/20 15:27:08 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/10/20 15:27:09 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [16]:
regressor_fit.coefficients

DenseVector([-191.9831, 25014.3291])

In [17]:
regressor_fit.intercept

7134.747760709284

In [18]:
## prediction
pred_result = regressor_fit.evaluate(test_data)

In [20]:
pred_result.predictions.show()

+--------------------+------+-----------------+
|Independent Features|Salary|       prediction|
+--------------------+------+-----------------+
|          [35.0,4.0]| 10000|100472.6555901053|
+--------------------+------+-----------------+

