**20L-0961**

# Introduction to Machine Learning in Spark

In [1]:
!pip install -q -U pyspark findspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
import findspark
findspark.init()
findspark.find()

'/usr/local/lib/python3.10/dist-packages/pyspark'

Basic Statistics

1. Correlation
---
Calculating the correlation between two series of data is a common operation in Statistics. In
spark.ml we provide the flexibility to calculate pairwise correlations among many series. The
supported correlation methods are currently Pearson’s and Spearman’s correlation.

Correlation computes the correlation matrix for the input Dataset of Vectors using the specified
method. The output will be a DataFrame that contains the correlation matrix of the column of
vectors.

In [3]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation

from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("CorrelationExample") \
        .getOrCreate()

    # $example on$
    data = [(Vectors.sparse(4, [(0, 1.0), (3, -2.0)]),),
            (Vectors.dense([4.0, 5.0, 0.0, 3.0]),),
            (Vectors.dense([6.0, 7.0, 0.0, 8.0]),),
            (Vectors.sparse(4, [(0, 9.0), (3, 1.0)]),)]
    df = spark.createDataFrame(data, ["features"])

    r1 = Correlation.corr(df, "features").head()

    # $example off$
    assert r1 is not None
    # $example on$
    print("Pearson correlation matrix:\n" + str(r1[0]))

    r2 = Correlation.corr(df, "features", "spearman").head()

    # $example off$
    assert r2 is not None
    # $example on$
    print("Spearman correlation matrix:\n" + str(r2[0]))
    # $example off$

    spark.stop()



Pearson correlation matrix:
DenseMatrix([[1.        , 0.05564149,        nan, 0.40047142],
             [0.05564149, 1.        ,        nan, 0.91359586],
             [       nan,        nan, 1.        ,        nan],
             [0.40047142, 0.91359586,        nan, 1.        ]])
Spearman correlation matrix:
DenseMatrix([[1.        , 0.10540926,        nan, 0.4       ],
             [0.10540926, 1.        ,        nan, 0.9486833 ],
             [       nan,        nan, 1.        ,        nan],
             [0.4       , 0.9486833 ,        nan, 1.        ]])


In [4]:
from pyspark.ml.linalg import DenseMatrix, Vectors
from pyspark.ml.stat import Correlation
spark = SparkSession.builder.appName('correlationLab').getOrCreate()
dataset = [[Vectors.dense([1, 0, 0, -2])],
           [Vectors.dense([4, 5, 0, 3])],
           [Vectors.dense([6, 7, 0, 8])],
           [Vectors.dense([9, 0, 0, 1])]]
dataset = spark.createDataFrame(dataset, ['features'])
pearsonCorr = Correlation.corr(dataset, 'features', 'pearson').collect()[0][0]
print(str(pearsonCorr).replace('nan', 'NaN'))




spearmanCorr = Correlation.corr(dataset, 'features', method='spearman').collect()[0][0]
print(str(spearmanCorr).replace('nan', 'NaN'))





DenseMatrix([[1.        , 0.05564149,        NaN, 0.40047142],
             [0.05564149, 1.        ,        NaN, 0.91359586],
             [       NaN,        NaN, 1.        ,        NaN],
             [0.40047142, 0.91359586,        NaN, 1.        ]])
DenseMatrix([[1.        , 0.10540926,        NaN, 0.4       ],
             [0.10540926, 1.        ,        NaN, 0.9486833 ],
             [       NaN,        NaN, 1.        ,        NaN],
             [0.4       , 0.9486833 ,        NaN, 1.        ]])


2. Hypothesis testing
---
Hypothesis testing is a powerful tool in statistics to determine whether a result is statistically

significant, whether this result occurred by chance or not. spark.ml currently supports Pearson’s Chi-
squared ( χ2) tests for independence.

ChiSquareTest
---
ChiSquareTest conducts Pearson’s independence test for every feature against the label. For each
feature, the (feature, label) pairs are converted into a contingency matrix for which the Chi-squared
statistic is computed. All label and feature values must be categorical.

In [5]:
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import ChiSquareTest

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("ChiSquareTestExample") \
        .getOrCreate()

    # $example on$
    data = [(0.0, Vectors.dense(0.5, 10.0)),
            (0.0, Vectors.dense(1.5, 20.0)),
            (1.0, Vectors.dense(1.5, 30.0)),
            (0.0, Vectors.dense(3.5, 30.0)),
            (0.0, Vectors.dense(3.5, 40.0)),
            (1.0, Vectors.dense(3.5, 40.0))]
    df = spark.createDataFrame(data, ["label", "features"])

    r = ChiSquareTest.test(df, "features", "label").head()

    # $example off$
    assert r is not None
    # $example on$

    print("pValues: " + str(r.pValues))
    print("degreesOfFreedom: " + str(r.degreesOfFreedom))
    print("statistics: " + str(r.statistics))
    # $example off$

    spark.stop()



pValues: [0.6872892787909721,0.6822703303362126]
degreesOfFreedom: [2, 3]
statistics: [0.75,1.5]


In [9]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import ChiSquareTest
spark = SparkSession.builder.appName("ChiSquareTestLab").getOrCreate()
dataset = [[0, Vectors.dense([0, 0, 1])],
           [0, Vectors.dense([1, 0, 1])],
           [1, Vectors.dense([2, 1, 1])],
           [1, Vectors.dense([3, 1, 1])]]
dataset = spark.createDataFrame(dataset, ["label", "features"])
chiSqResult = ChiSquareTest.test(dataset, 'features', 'label')
print(chiSqResult.select("degreesOfFreedom").collect()[0])

chiSqResult = ChiSquareTest.test(dataset, 'features', 'label', True)
row = chiSqResult.orderBy("featureIndex").collect()
assert row is not None
print("statistics: ",row[0].statistic)
spark.stop()

Row(degreesOfFreedom=[3, 1, 0])
statistics:  4.0


3. Summarizer
---
We provide vector column summary statistics for Dataframe through Summarizer. Available metrics are
the column-wise max, min, mean, sum, variance, std, and number of nonzeros, as well as the total count.

In [7]:
from pyspark.sql import SparkSession
from pyspark.ml.stat import Summarizer
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("SummarizerExample") \
        .getOrCreate()
    sc = spark.sparkContext

    # $example on$
    df = sc.parallelize([Row(weight=1.0, features=Vectors.dense(1.0, 1.0, 1.0)),
                         Row(weight=0.0, features=Vectors.dense(1.0, 2.0, 3.0))]).toDF()

    # create summarizer for multiple metrics "mean" and "count"
    summarizer = Summarizer.metrics("mean", "count")

    # compute statistics for multiple metrics with weight
    df.select(summarizer.summary(df.features, df.weight)).show(truncate=False)

    # compute statistics for multiple metrics without weight
    df.select(summarizer.summary(df.features)).show(truncate=False)

    # compute statistics for single metric "mean" with weight
    df.select(Summarizer.mean(df.features, df.weight)).show(truncate=False)

    # compute statistics for single metric "mean" without weight
    df.select(Summarizer.mean(df.features)).show(truncate=False)
    # $example off$

    spark.stop()


+-----------------------------------+
|aggregate_metrics(features, weight)|
+-----------------------------------+
|{[1.0,1.0,1.0], 1}                 |
+-----------------------------------+

+--------------------------------+
|aggregate_metrics(features, 1.0)|
+--------------------------------+
|{[1.0,1.5,2.0], 2}              |
+--------------------------------+

+--------------+
|mean(features)|
+--------------+
|[1.0,1.0,1.0] |
+--------------+

+--------------+
|mean(features)|
+--------------+
|[1.0,1.5,2.0] |
+--------------+



In [11]:
from pyspark.ml.stat import Summarizer
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

spark = SparkSession.builder.appName("SummarizerLab").getOrCreate()
sc = spark.sparkContext

# another way of using summarizer
df = sc.parallelize([Row(weight=1.0, features=Vectors.dense(1.0, 1.0, 1.0)),
                         Row(weight=0.0, features=Vectors.dense(1.0, 2.0, 3.0))]).toDF()

summarizer = Summarizer.metrics ("sum", "variance")
df.select(summarizer.summary(df.features, df.weight)).show(truncate=False)

df.select(Summarizer.mean(df.features)).show(truncate=False)

spark.stop()

+-----------------------------------+
|aggregate_metrics(features, weight)|
+-----------------------------------+
|{[1.0,1.0,1.0], [0.0,0.0,0.0]}     |
+-----------------------------------+

+--------------+
|mean(features)|
+--------------+
|[1.0,1.5,2.0] |
+--------------+

