In [1]:
import os
import pyspark

os.environ["PYSPARK_PYTHON"]="python3"

In [2]:
try:
    if isinstance(sc, pyspark.SparkContext):
        print("sc:", sc)
except Exception as ex:
    sc = pyspark.SparkContext()
    print("Created sc:", sc)
    
try:
    if isinstance(sqlContext, pyspark.SQLContext):
        print("sqlContext:", sqlContext)
except Exception as ex:
    sqlContext = pyspark.SQLContext(sc)
    print("Created sqlContext:", sqlContext)

Created sc: <pyspark.context.SparkContext object at 0x7f4523710908>
Created sqlContext: <pyspark.sql.context.SQLContext object at 0x7f45237108d0>


In [283]:
import pyspark.ml
import pyspark.mllib.linalg

In [308]:
rdd_data = [(pyspark.mllib.linalg.Vectors.dense([0.0, 1.0]),), 
            (pyspark.mllib.linalg.Vectors.dense([1.0, 1.0]),), 
            (pyspark.mllib.linalg.Vectors.dense([2.0, 3.0]),), 
            (pyspark.mllib.linalg.Vectors.dense([8.0, 9.0]),)]
df_data = sqlContext.createDataFrame(rdd_data, ["feature"])
df_data.show()
df_data.collect()

+---------+
|  feature|
+---------+
|[0.0,1.0]|
|[1.0,1.0]|
|[2.0,3.0]|
|[8.0,9.0]|
+---------+



[Row(feature=DenseVector([0.0, 1.0])),
 Row(feature=DenseVector([1.0, 1.0])),
 Row(feature=DenseVector([2.0, 3.0])),
 Row(feature=DenseVector([8.0, 9.0]))]

In [309]:
rdd = df_data.select("feature").map(lambda r: r.feature)
rdd.collect()

[DenseVector([0.0, 1.0]),
 DenseVector([1.0, 1.0]),
 DenseVector([2.0, 3.0]),
 DenseVector([8.0, 9.0])]

In [310]:
rdd = rdd.zipWithIndex()
rdd = rdd.flatMap(lambda r: [(r[1],r[0])]*1)
rdd.collect()

[(0, DenseVector([0.0, 1.0])),
 (1, DenseVector([1.0, 1.0])),
 (2, DenseVector([2.0, 3.0])),
 (3, DenseVector([8.0, 9.0]))]

In [311]:
df = sqlContext.createDataFrame(rdd, ["id", "feature"])
df.show()

+---+---------+
| id|  feature|
+---+---------+
|  0|[0.0,1.0]|
|  1|[1.0,1.0]|
|  2|[2.0,3.0]|
|  3|[8.0,9.0]|
+---+---------+



In [312]:
df2 = df.join(df).toDF('x_id', 'x_feature', 'y_id', 'y_feature')
df2.show()
for i in df2.collect():
    print(i)
df2.printSchema()

+----+---------+----+---------+
|x_id|x_feature|y_id|y_feature|
+----+---------+----+---------+
|   0|[0.0,1.0]|   0|[0.0,1.0]|
|   0|[0.0,1.0]|   1|[1.0,1.0]|
|   0|[0.0,1.0]|   2|[2.0,3.0]|
|   0|[0.0,1.0]|   3|[8.0,9.0]|
|   1|[1.0,1.0]|   0|[0.0,1.0]|
|   1|[1.0,1.0]|   1|[1.0,1.0]|
|   1|[1.0,1.0]|   2|[2.0,3.0]|
|   1|[1.0,1.0]|   3|[8.0,9.0]|
|   2|[2.0,3.0]|   0|[0.0,1.0]|
|   2|[2.0,3.0]|   1|[1.0,1.0]|
|   2|[2.0,3.0]|   2|[2.0,3.0]|
|   2|[2.0,3.0]|   3|[8.0,9.0]|
|   3|[8.0,9.0]|   0|[0.0,1.0]|
|   3|[8.0,9.0]|   1|[1.0,1.0]|
|   3|[8.0,9.0]|   2|[2.0,3.0]|
|   3|[8.0,9.0]|   3|[8.0,9.0]|
+----+---------+----+---------+

Row(x_id=0, x_feature=DenseVector([0.0, 1.0]), y_id=0, y_feature=DenseVector([0.0, 1.0]))
Row(x_id=0, x_feature=DenseVector([0.0, 1.0]), y_id=1, y_feature=DenseVector([1.0, 1.0]))
Row(x_id=0, x_feature=DenseVector([0.0, 1.0]), y_id=2, y_feature=DenseVector([2.0, 3.0]))
Row(x_id=0, x_feature=DenseVector([0.0, 1.0]), y_id=3, y_feature=DenseVector([8.0, 9.0]))

In [313]:
rdd2 = df2.map(lambda r: pyspark.sql.Row(x_id=r.x_id, 
                                         x_feature=r.x_feature,
                                         y_id=r.y_id,
                                         y_feature=r.y_feature,
                                         distance=float(r.x_feature.squared_distance(r.y_feature))**0.5 ))
df2 = sqlContext.createDataFrame(rdd2)
df2.show()

+------------------+---------+----+---------+----+
|          distance|x_feature|x_id|y_feature|y_id|
+------------------+---------+----+---------+----+
|               0.0|[0.0,1.0]|   0|[0.0,1.0]|   0|
|               1.0|[0.0,1.0]|   0|[1.0,1.0]|   1|
|2.8284271247461903|[0.0,1.0]|   0|[2.0,3.0]|   2|
|11.313708498984761|[0.0,1.0]|   0|[8.0,9.0]|   3|
|               1.0|[1.0,1.0]|   1|[0.0,1.0]|   0|
|               0.0|[1.0,1.0]|   1|[1.0,1.0]|   1|
|  2.23606797749979|[1.0,1.0]|   1|[2.0,3.0]|   2|
| 10.63014581273465|[1.0,1.0]|   1|[8.0,9.0]|   3|
|2.8284271247461903|[2.0,3.0]|   2|[0.0,1.0]|   0|
|  2.23606797749979|[2.0,3.0]|   2|[1.0,1.0]|   1|
|               0.0|[2.0,3.0]|   2|[2.0,3.0]|   2|
|  8.48528137423857|[2.0,3.0]|   2|[8.0,9.0]|   3|
|11.313708498984761|[8.0,9.0]|   3|[0.0,1.0]|   0|
| 10.63014581273465|[8.0,9.0]|   3|[1.0,1.0]|   1|
|  8.48528137423857|[8.0,9.0]|   3|[2.0,3.0]|   2|
|               0.0|[8.0,9.0]|   3|[8.0,9.0]|   3|
+------------------+---------+-

In [314]:
df2.printSchema()

root
 |-- distance: double (nullable = true)
 |-- x_feature: vector (nullable = true)
 |-- x_id: long (nullable = true)
 |-- y_feature: vector (nullable = true)
 |-- y_id: long (nullable = true)



In [372]:
import pyspark
import pyspark.ml.param
from pyspark.ml.param.shared import *

@pyspark.mllib.common.inherit_doc
class PairwiseEuclideanDistance(pyspark.ml.pipeline.Transformer, HasInputCol, HasOutputCol):
    
    # a placeholder to make it appear in the generated doc
    squared = pyspark.ml.param.Param(Params._dummy(), "squared", 
                                     "boolean to indicate squared Euclidean distance")

    @pyspark.ml.util.keyword_only
    def __init__(self, squared=False, inputCol="feature", outputCol="distance"):
        """
        __init__(self, squared=False, outputCol=None)
        """
        super(PairwiseEuclideanDistance, self).__init__()
        self.squared = pyspark.ml.param.Param(self, "squared", 
                                     "boolean to indicate squared Euclidean distance")
        self._setDefault(squared=False, inputCol="feature", outputCol="distance")
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)
    
    @pyspark.ml.util.keyword_only
    def setParams(self, squared=False, inputCol="feature", outputCol="distance"):
        """
        setParams(self, squared=False, outputCol=None)
        Sets params for this EuclideanPairwiseDistance.
        """
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)
    
    def setSquared(self, value):
        """
        Sets the value of :py:attr:`squared`.
        """
        self._paramMap[self.squared] = value
        return self

    def getSquared(self):
        """
        Gets the value of squared or its default value.
        """
        return self.getOrDefault(self.squared)
    
    def _transform(self, dataset):
        """
        Transforms the input dataset.
        """
        rdd = dataset.select(self.getInputCol()).map(lambda r: r[0])
        rdd = rdd.zipWithIndex()
        df = rdd.toDF()
        df = df.join(df)
        if self.getSquared():
            rdd = df.map(lambda r: [r[1], r[0], r[3], r[2], float(r[0].squared_distance(r[2]))])
        else:
            rdd = df.map(lambda r: [r[1], r[0], r[3], r[2], float(r[0].squared_distance(r[2]))**0.5])
        df = rdd.toDF(["x_id", "x_"+self.getInputCol(), "y_id", "y_"+self.getInputCol(), self.getOutputCol()])
        return df
    

rdd_data = [(pyspark.mllib.linalg.Vectors.dense([0.0, 1.0]),), 
            (pyspark.mllib.linalg.Vectors.dense([1.0, 1.0]),), 
            (pyspark.mllib.linalg.Vectors.dense([2.0, 3.0]),), 
            (pyspark.mllib.linalg.Vectors.dense([8.0, 9.0]),)]
df_data = sqlContext.createDataFrame(rdd_data, ["feature"])
df_data.show()
df_data.collect()

ped = PairwiseEuclideanDistance(squared=False, inputCol="feature", outputCol="distance")

ped.setSquared(True)
ped.getSquared()
ped.getInputCol()
ped.hasDefault("squared")
print(ped.explainParams())
ped.transform(df_data).show()
ped.setSquared(False)
print(a.explainParams())
ped.transform(df_data).show()

+---------+
|  feature|
+---------+
|[0.0,1.0]|
|[1.0,1.0]|
|[2.0,3.0]|
|[8.0,9.0]|
+---------+

inputCol: input column name. (default: feature, current: feature)
outputCol: output column name. (default: distance, current: distance)
squared: boolean to indicate squared Euclidean distance (default: False, current: True)
+----+---------+----+---------+--------+
|x_id|x_feature|y_id|y_feature|distance|
+----+---------+----+---------+--------+
|   0|[0.0,1.0]|   0|[0.0,1.0]|     0.0|
|   0|[0.0,1.0]|   1|[1.0,1.0]|     1.0|
|   0|[0.0,1.0]|   2|[2.0,3.0]|     8.0|
|   0|[0.0,1.0]|   3|[8.0,9.0]|   128.0|
|   1|[1.0,1.0]|   0|[0.0,1.0]|     1.0|
|   1|[1.0,1.0]|   1|[1.0,1.0]|     0.0|
|   1|[1.0,1.0]|   2|[2.0,3.0]|     5.0|
|   1|[1.0,1.0]|   3|[8.0,9.0]|   113.0|
|   2|[2.0,3.0]|   0|[0.0,1.0]|     8.0|
|   2|[2.0,3.0]|   1|[1.0,1.0]|     5.0|
|   2|[2.0,3.0]|   2|[2.0,3.0]|     0.0|
|   2|[2.0,3.0]|   3|[8.0,9.0]|    72.0|
|   3|[8.0,9.0]|   0|[0.0,1.0]|   128.0|
|   3|[8.0,9.0]|   1|[1

In [325]:
rdd = df_data.select(a.getInputCol()).map(lambda r: r.feature)
rdd.collect()


[DenseVector([0.0, 1.0]),
 DenseVector([1.0, 1.0]),
 DenseVector([2.0, 3.0]),
 DenseVector([8.0, 9.0])]