In [9]:
!java -version

java version "24.0.2" 2025-07-15
Java(TM) SE Runtime Environment (build 24.0.2+12-54)
Java HotSpot(TM) 64-Bit Server VM (build 24.0.2+12-54, mixed mode, sharing)


In [3]:
#Install Java Development kit for Spark
#!apt-get install openjdk-8-jdk

'apt-get' is not recognized as an internal or external command,
operable program or batch file.


In [2]:
import os
#Set the JAVA_HOME env variable
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64"

In [3]:
#Current working directory
!pwd

/content


In [4]:
!echo $JAVA_HOME

/usr/lib/jvm/java-8-openjdk-amd64


In [17]:
!pip install pyspark



In [None]:
#Install PySpark with latest version
#!pip install pyspark==3.5

In [19]:
!pip install --upgrade cloudpickle



In [20]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

In [21]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoder, StringIndexer

Foundational Code Snippets StringIndexer

StringIndexer assigns indices to categories based on how often they appear in the input column. The category that occurs most frequently gets the lowest index (0.0), the next most frequent gets the next index (1.0), and so on.

In [None]:
# Create Spark session
spark = SparkSession.builder.appName("StringIndexerExample").getOrCreate()

# Sample data
data = spark.createDataFrame([
    (0, "cat"),
    (1, "dog"),
    (2, "dog"),
    (3, "cat"),
    (4, "rabbit"),
    (5, "dog")
], ["id", "animal"])

data.show()

In [None]:

# Create StringIndexer
indexer = StringIndexer(inputCol="animal", outputCol="animalIndex")

# Fit the indexer model and transform the data
indexed_data = indexer.fit(data).transform(data)
indexed_data.show()

In [None]:
#One Hot Encoding

In PySpark, OneHotEncoder is used to convert categorical variables into a binary (one-hot) encoded format, which is often necessary for machine learning algorithms. It converts the categorical column into multiple columns, each representing one category as a binary value (0 or 1).


In PySpark, OHE returns a sparse vector which is generally represented as:


(size, [indices], [values])


Where:

size is the total number of categories.

indices is a list of positions where the vector has non-zero elements.

values is a list of the actual non-zero values.

In [None]:
# OneHotEncoder - Convert the indexed column to one-hot encoded format
encoder = OneHotEncoder(inputCol="animalIndex", outputCol="animalVec",dropLast=False)
encoded_data = encoder.fit(indexed_data).transform(indexed_data)
encoded_data.show()

+---+------+-----------+-------------+
| id|animal|animalIndex|    animalVec|
+---+------+-----------+-------------+
|  0|   cat|        1.0|(3,[1],[1.0])|
|  1|   dog|        0.0|(3,[0],[1.0])|
|  2|   dog|        0.0|(3,[0],[1.0])|
|  3|   cat|        1.0|(3,[1],[1.0])|
|  4|rabbit|        2.0|(3,[2],[1.0])|
|  5|   dog|        0.0|(3,[0],[1.0])|
+---+------+-----------+-------------+



#VectorAssembler
VectorAssembler in PySpark is a feature transformer used to combine multiple columns into a single vector column. It is particularly useful in machine learning pipelines where models expect input features to be in the form of vectors. The output of VectorAssembler can then be fed into machine learning algorithms.

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("VectorAssemblerExample").getOrCreate()

# Sample data
data = spark.createDataFrame([
    (0, 18.0, 1.0, 5.0),
    (1, 20.0, 0.0, 3.0),
    (2, 22.0, 1.0, 8.0),
    (3, 25.0, 0.0, 2.0),
], ["id", "age", "gender", "experience"])

data.show()


+---+----+------+----------+
| id| age|gender|experience|
+---+----+------+----------+
|  0|18.0|   1.0|       5.0|
|  1|20.0|   0.0|       3.0|
|  2|22.0|   1.0|       8.0|
|  3|25.0|   0.0|       2.0|
+---+----+------+----------+



In [None]:
# Step 1: Define VectorAssembler
assembler = VectorAssembler(inputCols=["age", "gender", "experience"], outputCol="features")

# Step 2: Apply VectorAssembler to transform the data
output = assembler.transform(data)

# Display the output with the combined feature vector
output.show()


+---+----+------+----------+--------------+
| id| age|gender|experience|      features|
+---+----+------+----------+--------------+
|  0|18.0|   1.0|       5.0|[18.0,1.0,5.0]|
|  1|20.0|   0.0|       3.0|[20.0,0.0,3.0]|
|  2|22.0|   1.0|       8.0|[22.0,1.0,8.0]|
|  3|25.0|   0.0|       2.0|[25.0,0.0,2.0]|
+---+----+------+----------+--------------+



In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.sql import SparkSession

# Step 1: Create Spark session
spark = SparkSession.builder.appName("PipelineWithOHEExample").getOrCreate()

# Sample data (including a categorical column)
data = spark.createDataFrame([
    (0, 18.0, "male", 5.0, 35000),
    (1, 20.0, "female", 3.0, 45000),
    (2, 22.0, "male", 8.0, 58000),
    (3, 25.0, "female", 2.0, 62000),
], ["id", "age", "gender", "experience", "salary"])

data.show()

+---+----+------+----------+------+
| id| age|gender|experience|salary|
+---+----+------+----------+------+
|  0|18.0|  male|       5.0| 35000|
|  1|20.0|female|       3.0| 45000|
|  2|22.0|  male|       8.0| 58000|
|  3|25.0|female|       2.0| 62000|
+---+----+------+----------+------+



In [None]:
# Step 2: Use StringIndexer to convert the categorical column 'gender' to numerical index
indexer = StringIndexer(inputCol="gender", outputCol="genderIndex")

# Step 3: Use OneHotEncoder to encode the indexed gender column into a vector
encoder = OneHotEncoder(inputCol="genderIndex", outputCol="genderOHE")

# Step 4: Use VectorAssembler to combine the feature columns into a single vector
assembler = VectorAssembler(inputCols=["age", "genderOHE", "experience"], outputCol="features")

# Step 5: Define a LinearRegression model
lr = LinearRegression(featuresCol="features", labelCol="salary")

# Step 6: Create a pipeline with stages: indexer, encoder, assembler, and linear regression model
pipeline = Pipeline(stages=[indexer, encoder, assembler, lr])

# Step 7: Fit the pipeline model to the data
model = pipeline.fit(data)

# Step 8: Make predictions
predictions = model.transform(data)

# Display the predictions
predictions.show()


+---+----+------+----------+------+-----------+-------------+--------------+------------------+
| id| age|gender|experience|salary|genderIndex|    genderOHE|      features|        prediction|
+---+----+------+----------+------+-----------+-------------+--------------+------------------+
|  0|18.0|  male|       5.0| 35000|        1.0|    (1,[],[])|[18.0,0.0,5.0]| 35000.00000000013|
|  1|20.0|female|       3.0| 45000|        0.0|(1,[0],[1.0])|[20.0,1.0,3.0]|45000.000000000306|
|  2|22.0|  male|       8.0| 58000|        1.0|    (1,[],[])|[22.0,0.0,8.0]| 57999.99999999991|
|  3|25.0|female|       2.0| 62000|        0.0|(1,[0],[1.0])|[25.0,1.0,2.0]| 61999.99999999968|
+---+----+------+----------+------+-----------+-------------+--------------+------------------+



In [None]:
#Task - write code for Eval metrics to evaluate L.R. model performance
# pyspark documentation, stackoverflow, ChatGPT