In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[*]")\
        .appName("Colab_pyspark")\
        .config('spark.ui.port', '4050')\
        .config('spark.executor.memory', '3g')\
        .getOrCreate()
        # .config('spark.sql.execution.arrow.enabled', 'true')\
        # .config('spark."Broadcastsizetable"', '-1')\
        # .config('preferSortHashJoin', 'true')\

In [None]:
pip install pyspark==2.4.4

Collecting pyspark==2.4.4
  Downloading pyspark-2.4.4.tar.gz (215.7 MB)
[K     |████████████████████████████████| 215.7 MB 60 kB/s 
[?25hCollecting py4j==0.10.7
  Downloading py4j-0.10.7-py2.py3-none-any.whl (197 kB)
[K     |████████████████████████████████| 197 kB 22.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130392 sha256=356a262c17eec42a202fcfddd88a415c6459960123d75dd1e8d5687acb50dc4b
  Stored in directory: /root/.cache/pip/wheels/11/48/19/c3b6b66e4575c164407a83bc065179904ddc33c9d6500846f0
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.4


[Transformer base class](https://github.com/apache/spark/blob/v2.4.0/python/pyspark/ml/base.py#L139)

[Estimator base class](https://github.com/apache/spark/blob/v2.4.0/python/pyspark/ml/base.py#L70)

### Создадим базовый трансформатор

In [None]:
from pyspark.ml import Transformer
import pyspark.sql.functions as F

In [None]:
class ConstTransformer(Transformer):
    """Constant transformer.
    
    It just adds one constant column with a predefined name
    """
    def _transform(self, dataset):
        return dataset.withColumn("mytransformer", F.lit("I am a constant"))

In [None]:
df = spark.range(0, 10, numPartitions=1)

In [None]:
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+



In [None]:
transformer = ConstTransformer()

In [None]:
transformer.transform(df).show()

+---+---------------+
| id|  mytransformer|
+---+---------------+
|  0|I am a constant|
|  1|I am a constant|
|  2|I am a constant|
|  3|I am a constant|
|  4|I am a constant|
|  5|I am a constant|
|  6|I am a constant|
|  7|I am a constant|
|  8|I am a constant|
|  9|I am a constant|
+---+---------------+



### Как специфицируются параметры transformer'а?

[HasOutputCol mixin](https://github.com/apache/spark/blob/v2.4.0/python/pyspark/ml/param/shared.py#L235)

In [None]:
from pyspark.ml.param.shared import HasOutputCol

In [None]:
class ConstTransformer(Transformer, HasOutputCol):
    """Constant transformer with variable name."""
    def __init__(self): 
        super(ConstTransformer, self).__init__()
        
    def _transform(self, dataset):
        return dataset.withColumn(self.getOutputCol(), F.lit("I am a constant"))

In [None]:
transformer = ConstTransformer()

In [None]:
transformer.extractParamMap()

{Param(parent='ConstTransformer_95357fafbf7f', name='outputCol', doc='output column name.'): 'ConstTransformer_95357fafbf7f__output'}

In [None]:
transformer.getOutputCol()

'ConstTransformer_95357fafbf7f__output'

In [None]:
transformer.transform(df).show()

+---+-------------------------------------+
| id|ConstTransformer_95357fafbf7f__output|
+---+-------------------------------------+
|  0|                      I am a constant|
|  1|                      I am a constant|
|  2|                      I am a constant|
|  3|                      I am a constant|
|  4|                      I am a constant|
|  5|                      I am a constant|
|  6|                      I am a constant|
|  7|                      I am a constant|
|  8|                      I am a constant|
|  9|                      I am a constant|
+---+-------------------------------------+



In [None]:
transformer.setOutputCol("lalalalala")

ConstTransformer_95357fafbf7f

In [None]:
transformer.transform(df).show()

+---+---------------+
| id|     lalalalala|
+---+---------------+
|  0|I am a constant|
|  1|I am a constant|
|  2|I am a constant|
|  3|I am a constant|
|  4|I am a constant|
|  5|I am a constant|
|  6|I am a constant|
|  7|I am a constant|
|  8|I am a constant|
|  9|I am a constant|
+---+---------------+



In [None]:
from pyspark import keyword_only

In [None]:
class ConstTransformer(Transformer, HasOutputCol):
    @keyword_only
    def __init__(self, outputCol=None): 
        super(ConstTransformer, self).__init__()
        if outputCol is not None:
            self.setOutputCol(outputCol)
        
    def _transform(self, dataset):
        return dataset.withColumn(self.getOutputCol(), F.lit("I am a constant"))

In [None]:
transformer = ConstTransformer("mycolumn")

TypeError: ignored

In [None]:
transformer = ConstTransformer(outputCol="myColumn")

In [None]:
transformer.getOutputCol()

'myColumn'

In [None]:
transformer.transform(df).show()

+---+---------------+
| id|       myColumn|
+---+---------------+
|  0|I am a constant|
|  1|I am a constant|
|  2|I am a constant|
|  3|I am a constant|
|  4|I am a constant|
|  5|I am a constant|
|  6|I am a constant|
|  7|I am a constant|
|  8|I am a constant|
|  9|I am a constant|
+---+---------------+



In [None]:
transformer.setOutputCol("anotherColumn")

ConstTransformer_604ff1cb3ec4

In [None]:
transformer.transform(df).show()

+---+---------------+
| id|  anotherColumn|
+---+---------------+
|  0|I am a constant|
|  1|I am a constant|
|  2|I am a constant|
|  3|I am a constant|
|  4|I am a constant|
|  5|I am a constant|
|  6|I am a constant|
|  7|I am a constant|
|  8|I am a constant|
|  9|I am a constant|
+---+---------------+



### Давайте создадим transformer с заданными input и output колонками?
[HasInputCol mixin](https://github.com/apache/spark/blob/v2.4.0/python/pyspark/ml/param/shared.py#L189)

In [None]:
from pyspark.ml.param.shared import HasInputCol

In [None]:
class HashTransformer(Transformer, HasInputCol, HasOutputCol):
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(HashTransformer, self).__init__()
        if inputCol is not None:
            self.setInputCol(inputCol)
        if outputCol is not None:
            self.setOutputCol(outputCol)
            
    def _transform(self, dataset):
        return dataset.withColumn(self.getOutputCol(), F.md5(F.col(self.getInputCol()).cast("string")))

In [None]:
transformer = HashTransformer(inputCol="id", outputCol="hash")

In [None]:
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+



In [None]:
transformer.transform(df).show()

+---+--------------------+
| id|                hash|
+---+--------------------+
|  0|cfcd208495d565ef6...|
|  1|c4ca4238a0b923820...|
|  2|c81e728d9d4c2f636...|
|  3|eccbc87e4b5ce2fe2...|
|  4|a87ff679a2f3e71d9...|
|  5|e4da3b7fbbce2345d...|
|  6|1679091c5a880faf6...|
|  7|8f14e45fceea167a5...|
|  8|c9f0f895fb98ab915...|
|  9|45c48cce2e2d7fbde...|
+---+--------------------+



### Как определить кастомный параметр?
[Param](https://github.com/apache/spark/blob/v2.4.0/python/pyspark/ml/param/__init__.py#L37)

In [None]:
from pyspark.ml.param import Param, Params, TypeConverters

In [None]:
class HashTransformer(Transformer, HasInputCol, HasOutputCol):
    
    algorithm = Param(Params._dummy(), "algorithm",
                      "hash function to use, must be one of (md5|sha1)",
                      typeConverter=TypeConverters.toString)
    
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, algorithm="md5"):
        super(HashTransformer, self).__init__()
        if inputCol is not None:
            self.setInputCol(inputCol)
        if outputCol is not None:
            self.setOutputCol(outputCol)
        self._set(algorithm=algorithm)
        
    def get_hash_function(self):
        try:
            return getattr(F, self.getOrDefault("algorithm"))
        except AttributeError as e:
            raise ValueError("Unsupported algorithm {}".format(self.getOrDefault("algorithm")))
            
    def setAlgorithm(self, algorithm):
        self._set(algorithm=algorithm)
            
    def _transform(self, dataset):
        hash_col = self.get_hash_function()
        return dataset.withColumn(self.getOutputCol(), hash_col(F.col(self.getInputCol()).cast("string")))

In [None]:
transformer = HashTransformer(inputCol="id", outputCol="hash", algorithm="lalalal")

In [None]:
print(transformer.explainParams())

algorithm: hash function to use, must be one of (md5|sha1) (current: lalalal)
inputCol: input column name. (current: id)
outputCol: output column name. (default: HashTransformer_24239f508634__output, current: hash)


In [None]:
transformer.getOrDefault("algorithm")

'lalalal'

In [None]:
transformer.transform(df).show()

ValueError: ignored

In [None]:
transformer.setAlgorithm("sha1")

In [None]:
transformer.getOrDefault("algorithm")

'sha1'

In [None]:
transformer.transform(df).show()

+---+--------------------+
| id|                hash|
+---+--------------------+
|  0|b6589fc6ab0dc82cf...|
|  1|356a192b7913b04c5...|
|  2|da4b9237bacccdf19...|
|  3|77de68daecd823bab...|
|  4|1b6453892473a467d...|
|  5|ac3478d69a3c81fa6...|
|  6|c1dfd96eea8cc2b62...|
|  7|902ba3cda18838015...|
|  8|fe5dbbcea5ce7e298...|
|  9|0ade7c2cf97f75d00...|
+---+--------------------+



#  Передадим параметр из одного Transformer'а в другой

In [None]:
class HashTransformer(Transformer, HasInputCol, HasOutputCol):
    
    algorithm = Param(Params._dummy(), "algorithm",
                      "hash function to use, must be one of (md5|sha1)",
                      typeConverter=TypeConverters.toString)
    
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, algorithm="md5"):
        super(HashTransformer, self).__init__()
        if inputCol is not None:
            self.setInputCol(inputCol)
        if outputCol is not None:
            self.setOutputCol(outputCol)
        self._set(algorithm=algorithm)
        
    def get_hash_function(self):
        try:
            return getattr(F, self.getOrDefault("algorithm"))
        except AttributeError as e:
            raise ValueError("Unsupported algorithm {}".format(self.getOrDefault("algorithm")))
            
    def setAlgorithm(self, algorithm):
        self._set(algorithm=algorithm)
    
    def getAlgorithm(self):
        return self.getOrDefault("algorithm")

    def _transform(self, dataset):
        hash_col = self.get_hash_function()
        res = dataset.withColumn(self.getOutputCol(), hash_col(F.col(self.getInputCol()).cast("string")))
        self._set(algorithm='md5')
        return res

In [None]:
transformer1 = HashTransformer(inputCol="id", outputCol="hash1", algorithm="sha1")
transformer2 = HashTransformer(inputCol="hash1", outputCol="hash2", algorithm=transformer1.getAlgorithm())

In [None]:
transformer2.getAlgorithm()

'sha1'

In [None]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[
    transformer1, transformer2
])

In [None]:
pipeline_model = pipeline.fit(df)

In [None]:
pipeline_model.transform(df).show(5)

+---+--------------------+--------------------+
| id|               hash1|               hash2|
+---+--------------------+--------------------+
|  0|b6589fc6ab0dc82cf...|784a97bf1955d5f7a...|
|  1|356a192b7913b04c5...|9c1c01dc3ac1445a5...|
|  2|da4b9237bacccdf19...|f4f59e822581d785b...|
|  3|77de68daecd823bab...|08743582456b52abe...|
|  4|1b6453892473a467d...|6a58b6c7e02f6d921...|
+---+--------------------+--------------------+
only showing top 5 rows



In [None]:
transformer2.getAlgorithm()

'md5'

In [None]:
spark.stop()