In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark MLLib tutorial") \
    .getOrCreate()

## 1. Basic Statistics

### 1.1 Correlation

In [3]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation

data = [(Vectors.sparse(4, [(0, 1.0), (3, -2.0)]),),
        (Vectors.dense([4.0, 5.0, 0.0, 3.0]),),
        (Vectors.dense([6.0, 7.0, 0.0, 8.0]),),
        (Vectors.sparse(4, [(0, 9.0), (3, 1.0)]),)]
df = spark.createDataFrame(data, ["features"])

r1 = Correlation.corr(df, "features").head()
print("Pearson correlation matrix:\n" + str(r1[0]))

r2 = Correlation.corr(df, "features", "spearman").head()
print("Spearman correlation matrix:\n" + str(r2[0]))



Pearson correlation matrix:
DenseMatrix([[1.        , 0.05564149,        nan, 0.40047142],
             [0.05564149, 1.        ,        nan, 0.91359586],
             [       nan,        nan, 1.        ,        nan],
             [0.40047142, 0.91359586,        nan, 1.        ]])
Spearman correlation matrix:
DenseMatrix([[1.        , 0.10540926,        nan, 0.4       ],
             [0.10540926, 1.        ,        nan, 0.9486833 ],
             [       nan,        nan, 1.        ,        nan],
             [0.4       , 0.9486833 ,        nan, 1.        ]])


### 1.2 Summarizer

Available metrics are the column-wise max, min, mean, variance, and number of nonzeros, as well as the total count.

In [12]:
from pyspark.ml.stat import Summarizer
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

df = spark.sparkContext.parallelize([Row(weight=1.0, features=Vectors.dense(1.0, 1.0, 1.0)),
                     Row(weight=0.0, features=Vectors.dense(1.0, 2.0, 3.0))]).toDF()



In [13]:
# create summarizer for multiple metrics "mean" and "count"
summarizer = Summarizer.metrics("mean", "count")



In [14]:
# compute statistics for multiple metrics with weight
df.select(summarizer.summary(df.features, df.weight)).show(truncate=False)



+-----------------------------------+
|aggregate_metrics(features, weight)|
+-----------------------------------+
|{[1.0,1.0,1.0], 1}                 |
+-----------------------------------+



In [15]:
# compute statistics for multiple metrics without weight
df.select(summarizer.summary(df.features)).show(truncate=False)



+--------------------------------+
|aggregate_metrics(features, 1.0)|
+--------------------------------+
|{[1.0,1.5,2.0], 2}              |
+--------------------------------+



In [16]:
# compute statistics for single metric "mean" with weight
df.select(Summarizer.mean(df.features, df.weight)).show(truncate=False)



+--------------+
|mean(features)|
+--------------+
|[1.0,1.0,1.0] |
+--------------+



In [17]:
# compute statistics for single metric "mean" without weight
df.select(Summarizer.mean(df.features)).show(truncate=False)

+--------------+
|mean(features)|
+--------------+
|[1.0,1.5,2.0] |
+--------------+



## 2.0 Sample Pipeline

In [4]:
from pyspark.ml import Pipeline,PipelineModel
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

training.show()

+---+----------------+-----+
| id|            text|label|
+---+----------------+-----+
|  0| a b c d e spark|  1.0|
|  1|             b d|  0.0|
|  2|     spark f g h|  1.0|
|  3|hadoop mapreduce|  0.0|
+---+----------------+-----+



In [5]:
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

In [6]:
# Fit the pipeline to training documents.
model = pipeline.fit(training)

In [None]:


# Now we can optionally save the fitted pipeline to disk
model.write().overwrite().save("spark-logistic-regression-model")

# And load it back in during production
sameModel = PipelineModel.load("spark-logistic-regression-model")


In [7]:


# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

test.show()

+---+------------------+
| id|              text|
+---+------------------+
|  4|       spark i j k|
|  5|             l m n|
|  6|spark hadoop spark|
|  7|     apache hadoop|
+---+------------------+



In [9]:
# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
prediction.show()

+---+------------------+--------------------+--------------------+--------------------+--------------------+----------+
| id|              text|               words|            features|       rawPrediction|         probability|prediction|
+---+------------------+--------------------+--------------------+--------------------+--------------------+----------+
|  4|       spark i j k|    [spark, i, j, k]|(262144,[19036,68...|[0.52882855227968...|[0.62920984896684...|       0.0|
|  5|             l m n|           [l, m, n]|(262144,[1303,526...|[4.16914139534005...|[0.98477000676230...|       0.0|
|  6|spark hadoop spark|[spark, hadoop, s...|(262144,[173558,1...|[-1.8649814141188...|[0.13412348342566...|       1.0|
|  7|     apache hadoop|    [apache, hadoop]|(262144,[68303,19...|[5.41564427200184...|[0.99557321143985...|       0.0|
+---+------------------+--------------------+--------------------+--------------------+--------------------+----------+



In [10]:
selected = prediction.select("id", "text", "probability", "prediction")
selected.show()

+---+------------------+--------------------+----------+
| id|              text|         probability|prediction|
+---+------------------+--------------------+----------+
|  4|       spark i j k|[0.62920984896684...|       0.0|
|  5|             l m n|[0.98477000676230...|       0.0|
|  6|spark hadoop spark|[0.13412348342566...|       1.0|
|  7|     apache hadoop|[0.99557321143985...|       0.0|
+---+------------------+--------------------+----------+



In [11]:
for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

(4, spark i j k) --> prob=[0.6292098489668488,0.37079015103315116], prediction=0.000000
(5, l m n) --> prob=[0.984770006762304,0.015229993237696027], prediction=0.000000
(6, spark hadoop spark) --> prob=[0.13412348342566147,0.8658765165743385], prediction=1.000000
(7, apache hadoop) --> prob=[0.9955732114398529,0.00442678856014711], prediction=0.000000


## Class assisgnment 

Take any simple ML problem you solved in you ML classes and use Spark ML to implement the same solution