In [None]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

In [None]:
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import rand

In [None]:
#Create SparkSession
#Ignore any warnings by SparkSession command

spark = SparkSession.builder.appName("Feature Extraction and Transformation using Spark").getOrCreate()

# Tokenizer - is used to break a sentence into words

In [None]:
#import tokenizer
from pyspark.ml.feature import Tokenizer

In [None]:
#create a sample dataframe
sentenceDataFrame = spark.createDataFrame([
    (1, "Spark is a distributed computing system."),
    (2, "It provides interfaces for multiple languages"),
    (3, "Spark is built on top of Hadoop")
], ["id", "sentence"])

In [None]:
#display the dataframe
sentenceDataFrame.show(truncate = False)

In [None]:
#create tokenizer instance.
#mention the column to be tokenized as inputcol
#mention the output column name where the tokens are to be stored.
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

In [None]:
#tokenize
token_df = tokenizer.transform(sentenceDataFrame)

In [None]:
#display the tokenized data
token_df.show(truncate=False)

# CountVectorizer -  convert text into numerical format. It gives the count of each word in a given document.

In [None]:
#import CountVectorizer
from pyspark.ml.feature import CountVectorizer

In [None]:
#create a sample dataframe and display it.
textdata = [(1, "I love Spark Spark provides Python API ".split()),
            (2, "I love Python Spark supports Python".split()),
            (3, "Spark solves the big problem of big data".split())]

textdata = spark.createDataFrame(textdata, ["id", "words"])

textdata.show(truncate=False)

In [None]:
# Create a CountVectorizer object
# mention the column to be count vectorized as inputcol
# mention the output column name where the count vectors are to be stored.
cv = CountVectorizer(inputCol="words", outputCol="features")

In [None]:
# Fit the CountVectorizer model on the input data
model = cv.fit(textdata)

In [None]:
# Transform the input data to bag-of-words vectors
result = model.transform(textdata)

In [None]:
# display the dataframe
result.show(truncate=False)

# TF - IDF

## Term Frequency-Inverse Document Frequency is used to quantify the importance of a word in a document. TF-IDF is computed by multiplying the number of times a word occurs in a document by the inverse document frequency of the word.

In [None]:
#import necessary classes for TF-IDF calculation
from pyspark.ml.feature import HashingTF, IDF, Tokenizer


In [None]:
#create a sample dataframe and display it.
sentenceData = spark.createDataFrame([
        (1, "Spark supports python"),
        (2, "Spark is fast"),
        (3, "Spark is easy")
    ], ["id", "sentence"])

sentenceData.show(truncate = False)

In [None]:
#tokenize the "sentence" column and store in the column "words"
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
wordsData.show(truncate = False)

In [None]:
# Create a HashingTF object
# mention the "words" column as input
# mention the "rawFeatures" column as output

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=10)
featurizedData = hashingTF.transform(wordsData)

featurizedData.show(truncate = False)

In [None]:
# Create an IDF object
# mention the "rawFeatures" column as input
# mention the "features" column as output

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
tfidfData = idfModel.transform(featurizedData)

In [None]:
#display the tf-idf data
tfidfData.select("sentence", "features").show(truncate=False)

# StopWordsRemover 
### StopWordsRemover is a transformer that filters out stop words like "a","an" and "the"

In [None]:
#import StopWordsRemover
from pyspark.ml.feature import StopWordsRemover

In [None]:
#create a dataframe with sample text and display it
textData = spark.createDataFrame([
    (1, ['Spark', 'is', 'an', 'open-source', 'distributed', 'computing', 'system']),
    (2, ['IT', 'has', 'interfaces', 'for', 'multiple', 'languages']),
    (3, ['It', 'has', 'a', 'wide', 'range', 'of', 'libraries', 'and', 'APIs'])
], ["id", "sentence"])

textData.show(truncate = False)

In [None]:
# remove stopwords from "sentence" column and store the result in "filtered_sentence" column
remover = StopWordsRemover(inputCol="sentence", outputCol="filtered_sentence")
textData = remover.transform(textData)

In [None]:
# display the dataframe
textData.show(truncate = False)

# StringIndexer

In [None]:
#import StringIndexer
from pyspark.ml.feature import StringIndexer

In [None]:
#create a dataframe with sample text and display it
colors = spark.createDataFrame(
    [(0, "red"), (1, "red"), (2, "blue"), (3, "yellow" ), (4, "yellow"), (5, "yellow")],
    ["id", "color"])

colors.show()

In [None]:
# index the strings in the column "color" and store their indexes in the column "colorIndex"
indexer = StringIndexer(inputCol="color", outputCol="colorIndex")
indexed = indexer.fit(colors).transform(colors)

In [None]:
# display the dataframe
indexed.show()

# StandardScaler

### StandardScaler transforms the data so that it has a mean of 0 and a standard deviation of 1

In [None]:
#import StandardScaler
from pyspark.ml.feature import StandardScaler


In [None]:
# Create a sample dataframe and display it
from pyspark.ml.linalg import Vectors
data = [(1, Vectors.dense([70, 170, 17])),
        (2, Vectors.dense([80, 165, 25])),
        (3, Vectors.dense([65, 150, 135]))]
df = spark.createDataFrame(data, ["id", "features"])

df.show()

In [None]:
# Define the StandardScaler transformer
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)

In [None]:
# Fit the transformer to the dataset
scalerModel = scaler.fit(df)

In [None]:
# Scale the data
scaledData = scalerModel.transform(df)

In [None]:
# Show the scaled data
scaledData.show(truncate = False)

In [None]:
spark.stop()