In [1]:
pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=23c1df278a85b3cb8812e36ec0919162355e31769636163e9d44441f7a3adf4b
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


# Application 1 - Logisitc Regression using text data

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

spark = SparkSession.builder.getOrCreate()
training = spark.createDataFrame([
(0, "This is a testing for spark", 1.0),
(1, "kudu ozone", 0.0),
(2, "spark is in-memory engine", 1.0),
(3, "hive is data warehouse", 0.0),
(4, "hadoop is mapreduce for batch", 0.0)
], ["id", "text", "label"])

**Configure the Spark machine learning pipeline**

 This Spark machine learning pipeline consists of three stages.
*   The tokenizer Transformer stage takes in raw text and converts them to words DataFrame.
*   The hashingTF Transformer stage takes those words and creates a feature
vector DataFrame.
*   Finally, the logistic regression Estimator takes in the feature vectors and fits them to create a new model – which is a Transformer.



In [3]:
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

In [4]:
# Fit the pipeline to the training document and verify the training data.

model = pipeline.fit(training)
training.show(5,False)


# We can see that sentences containing the word 'spark' will have a label of 1 , o.w , it will be zero

+---+-----------------------------+-----+
|id |text                         |label|
+---+-----------------------------+-----+
|0  |This is a testing for spark  |1.0  |
|1  |kudu ozone                   |0.0  |
|2  |spark is in-memory engine    |1.0  |
|3  |hive is data warehouse       |0.0  |
|4  |hadoop is mapreduce for batch|0.0  |
+---+-----------------------------+-----+



**Prepare data set to run against trained model**


*   The model accepts a id and text as input.
*   The model predicts whether a sentence with ID and text contains spark (1.0) or not
(0.0).



In [8]:
# Import 'Row' class to create row objects in our spark data frame

from pyspark.sql import Row

# Starting spark context from the spark session
sc = spark.sparkContext

In [9]:
Document = Row("id", "text")
test = sc.parallelize([(5, "K O 1"),
(6, "spark hadoop spark impala"),
(7, "apache spark open-source"),
(8, "spark is a platform"),
(9, "Hadoop is for Big Data")]).map(lambda x: Document(*x)).toDF()

**Make prediction on test data**


*   model is the new Transformer that resulted from executing the pipeline.
*   I will now use the transform method of the newly created predictor model against
the test data.



In [11]:
# Make predictions on test documents and print columns of interest
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print(
    "(%d, %s) --> prob=%s, prediction=%f" % (
    rid, text, str(prob), prediction
    )
    )

(5, K O 1) --> prob=[0.9719231922061782,0.028076807793821823], prediction=0.000000
(6, spark hadoop spark impala) --> prob=[0.30207973983131137,0.6979202601686887], prediction=1.000000
(7, apache spark open-source) --> prob=[0.6450881900811496,0.3549118099188504], prediction=0.000000
(8, spark is a platform) --> prob=[0.064777768484776,0.935222231515224], prediction=1.000000
(9, Hadoop is for Big Data) --> prob=[0.9922736248651602,0.007726375134839758], prediction=0.000000




*   Using the 0.5 threshold for classification, we can see that the model is reasonably well at predicting whether the sentence contains 'spark' or not



# Application 2 - Logisitc Regression using vectors

In [12]:
from pyspark.ml.linalg import Vectors

In [13]:
#training data with label and features
training = spark.createDataFrame([
(1.0, Vectors.dense([0.0, 1.1, 0.1])),
(0.0, Vectors.dense([2.0, 1.0, -1.0])),
(0.0, Vectors.dense([2.0, 1.3, 1.0])),
(1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])

In [14]:
# Creating an instance of LogisticRegreesion (an estimator for now)
lr = LogisticRegression(maxIter=10, regParam=0.01)

# Creating model with the parameters stored in lr.
Model1=lr.fit(training)

In [15]:
# Verify the training document to checkpoint the application progress

training.show(10,False)

# Now model1 is a transformer produced by an Estimator.

+-----+--------------+
|label|features      |
+-----+--------------+
|1.0  |[0.0,1.1,0.1] |
|0.0  |[2.0,1.0,-1.0]|
|0.0  |[2.0,1.3,1.0] |
|1.0  |[0.0,1.2,-0.5]|
+-----+--------------+



In [16]:
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30 # Specify 1 Param, overwriting the original maxIter.
# Specify multiple Params.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55})
paramMap2 = {lr.probabilityCol: "myProbability"} # type: ignore
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2) # type: ignore

# paramMapCombined overrides all parameters set earlier via lr.set* methods.
model2 = lr.fit(training, paramMapCombined)

In [17]:
# Prepare data set to run against trained model. And make a prediction on test data.

test = spark.createDataFrame([
(1.0, Vectors.dense([-1.0, 1.5, 1.3])),
(0.0, Vectors.dense([3.0, 2.0, -0.1])),
(1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])


# Make predictions on test data using the Transformer.transform() method.
# LogisticRegression.transform will only use the 'features' column.
# Note that model2.transform() outputs a "myProbability" column instead of the usual
# 'probability' column since we renamed the lr.probabilityCol parameter previously.
prediction = model2.transform(test)
result = prediction.select("features", "label", "myProbability", "prediction") \
.collect()

In [18]:
# Observing the outcome after prediction

for row in result:
    print("features=%s, label=%s -> prob=%s, prediction=%s"
  % (row.features, row.label, row.myProbability, row.prediction))

features=[-1.0,1.5,1.3], label=1.0 -> prob=[0.05707304993572537,0.9429269500642746], prediction=1.0
features=[3.0,2.0,-0.1], label=0.0 -> prob=[0.9238521956443227,0.07614780435567725], prediction=0.0
features=[0.0,2.2,-1.5], label=1.0 -> prob=[0.10972780286187774,0.8902721971381222], prediction=1.0
