<a href="https://colab.research.google.com/github/Abhishekravindran/PySpark-Mlib/blob/main/pySpark_mlib.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# spark mlib has two techniques i.e RDD and dataframe api so we will be looking into dataframe api

We will take a simple use case and solve using pyspark where in we are predicting the salary with respect to age and experience

In [2]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/89/db/e18cfd78e408de957821ec5ca56de1250645b05f8523d169803d8df35a64/pyspark-3.1.2.tar.gz (212.4MB)
[K     |████████████████████████████████| 212.4MB 72kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 19.8MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=9ec0ffd41bad2c38a7313398ac471acf7cf4bd447fd0cc25cc99967f0684808a
  Stored in directory: /root/.cache/pip/wheels/40/1b/2c/30f43be2627857ab80062bef1527c0128f7b4070b6b2d02139
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [3]:
import pyspark

In [4]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Mlib').getOrCreate()

In [5]:
spark

In [6]:
training=spark.read.csv('/content/test1.csv',header=True,inferSchema=True)
training.show()

+--------+---+----------+------+
|    Name|age|Experience|Salary|
+--------+---+----------+------+
|Abhishek| 31|        10| 30000|
| lavisha| 30|         8| 25000|
|     Tin| 29|         4| 20000|
|   Bittu| 24|         3| 20000|
|   manoj| 21|         1| 15000|
|    raks| 23|         2| 18000|
+--------+---+----------+------+



In [7]:
training.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [8]:
training.columns

['Name', 'age', 'Experience', 'Salary']

Here we have to create a vector assembler such thar in our normal sklearn train test where we passed dependent and independent variables same as that here we have to follow a bit different mechanism

In [12]:
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=['age','Experience'],outputCol='Independent_features')

In [13]:
output=featureassembler.transform(training)

In [14]:
output.show()

+--------+---+----------+------+--------------------+
|    Name|age|Experience|Salary|Independent_features|
+--------+---+----------+------+--------------------+
|Abhishek| 31|        10| 30000|         [31.0,10.0]|
| lavisha| 30|         8| 25000|          [30.0,8.0]|
|     Tin| 29|         4| 20000|          [29.0,4.0]|
|   Bittu| 24|         3| 20000|          [24.0,3.0]|
|   manoj| 21|         1| 15000|          [21.0,1.0]|
|    raks| 23|         2| 18000|          [23.0,2.0]|
+--------+---+----------+------+--------------------+



In [15]:
output.columns

['Name', 'age', 'Experience', 'Salary', 'Independent_features']

In [16]:
final_data=output.select('Independent_features','Salary')

In [17]:
final_data.show()

+--------------------+------+
|Independent_features|Salary|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 20000|
|          [24.0,3.0]| 20000|
|          [21.0,1.0]| 15000|
|          [23.0,2.0]| 18000|
+--------------------+------+



#train test split

In [18]:
from pyspark.ml.regression import LinearRegression
train_dt,test_dt=final_data.randomSplit([0.75,0.25])
regressor=LinearRegression(featuresCol='Independent_features',labelCol='Salary')
regressor=regressor.fit(train_dt)

In [19]:
regressor.coefficients

DenseVector([-2087.7193, 3561.4035])

In [20]:
regressor.intercept

59140.35087715416

In [21]:
predict=regressor.evaluate(test_dt)


In [22]:
predict.predictions.show()

+--------------------+------+------------------+
|Independent_features|Salary|        prediction|
+--------------------+------+------------------+
|          [21.0,1.0]| 15000|18859.649122805153|
|          [29.0,4.0]| 20000|12842.105263165002|
+--------------------+------+------------------+



In [23]:
predict.meanAbsoluteError

5508.7719298200755

In [24]:
predict.meanSquaredError

33066174.20739037

In [None]:
# so the above is a simple implementation on a given data set using pyspark mlib

in case we have to do a ci-cd pipe line use data bricks and can worlk with Mlflow also