# Setup

In [1]:
from __future__ import print_function
from __future__ import division
from __future__ import unicode_literals

In [2]:
import numpy as np
import pandas as pd

In [3]:
import pyspark.sql.functions as F
from pyspark.sql import Row

In [4]:
df = sc \
    .parallelize([(1, 'b', 'A'), (2, 'a', 'B'), (3, 'b', 'B'), (4, 'c', None), (5, 'd', 'B')]) \
    .toDF(["id", "x1", 'x2'])

In [5]:
df.show()

+---+---+----+
| id| x1|  x2|
+---+---+----+
|  1|  b|   A|
|  2|  a|   B|
|  3|  b|   B|
|  4|  c|null|
|  5|  d|   B|
+---+---+----+



# Test String Indexer 

In [6]:
from pyspark.ml.feature import StringIndexer

In [7]:
indexer = StringIndexer(inputCol='x1', outputCol='x1_indexed')

In [8]:
model = indexer.fit(df)

In [9]:
model.labels

[u'b', u'a', u'c', u'd']

In [10]:
model.transform(df).show()

+---+---+----+----------+
| id| x1|  x2|x1_indexed|
+---+---+----+----------+
|  1|  b|   A|       0.0|
|  2|  a|   B|       1.0|
|  3|  b|   B|       0.0|
|  4|  c|null|       2.0|
|  5|  d|   B|       3.0|
+---+---+----+----------+



In [11]:
indexer2 = StringIndexer(inputCol='x2', outputCol='x2_indexed')

In [12]:
model2 = indexer2.fit(df)

In [13]:
model2

StringIndexer_4ecb8fd105ac781e8fbc

In [14]:
model2.labels

[u'B', u'A']

In [15]:
#model2.transform(df).show()

In [16]:
indexer3 = StringIndexer(inputCol='x2', outputCol='x2_indexed', handleInvalid='keep')

In [17]:
model3 = indexer3.fit(df)

In [18]:
model3.labels

[u'B', u'A']

In [19]:
model3.transform(df).show()

+---+---+----+----------+
| id| x1|  x2|x2_indexed|
+---+---+----+----------+
|  1|  b|   A|       1.0|
|  2|  a|   B|       0.0|
|  3|  b|   B|       0.0|
|  4|  c|null|       2.0|
|  5|  d|   B|       0.0|
+---+---+----+----------+



In [20]:
indexer4 = StringIndexer(inputCol='x2', outputCol='x2_indexed', handleInvalid='skip')
model4 = indexer4.fit(df)
model4.labels

[u'B', u'A']

In [21]:
model4.transform(df).show()

+---+---+---+----------+
| id| x1| x2|x2_indexed|
+---+---+---+----------+
|  1|  b|  A|       1.0|
|  2|  a|  B|       0.0|
|  3|  b|  B|       0.0|
|  5|  d|  B|       0.0|
+---+---+---+----------+



# String Indexer + One Hot Encoder

In [22]:
from pyspark.ml.feature import OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.pipeline import Pipeline

In [23]:
stages = [
    StringIndexer(inputCol='x1', outputCol='x1_indexed'),
    StringIndexer(inputCol='x2', outputCol='x2_indexed', handleInvalid='keep'),
    OneHotEncoderEstimator(inputCols=['x1_indexed', 'x2_indexed'], outputCols=['x1_encoded', 'x2_encoded'])
]

In [24]:
pipeline_model = Pipeline(stages=stages).fit(df)

In [25]:
pipeline_model.stages

[StringIndexer_407fa5e6261610242351,
 StringIndexer_48cc87a9894d24c2f1ae,
 OneHotEncoderEstimator_45879edd939fdf7684c8]

In [26]:
pipeline_model.stages[0].labels, pipeline_model.stages[1].labels

([u'b', u'a', u'c', u'd'], [u'B', u'A'])

In [27]:
pipeline_model.transform(df).show()

+---+---+----+----------+----------+-------------+-------------+
| id| x1|  x2|x1_indexed|x2_indexed|   x1_encoded|   x2_encoded|
+---+---+----+----------+----------+-------------+-------------+
|  1|  b|   A|       0.0|       1.0|(3,[0],[1.0])|(2,[1],[1.0])|
|  2|  a|   B|       1.0|       0.0|(3,[1],[1.0])|(2,[0],[1.0])|
|  3|  b|   B|       0.0|       0.0|(3,[0],[1.0])|(2,[0],[1.0])|
|  4|  c|null|       2.0|       2.0|(3,[2],[1.0])|    (2,[],[])|
|  5|  d|   B|       3.0|       0.0|    (3,[],[])|(2,[0],[1.0])|
+---+---+----+----------+----------+-------------+-------------+



In [28]:
stages2 = [
    StringIndexer(inputCol='x1', outputCol='x1_indexed'),
    StringIndexer(inputCol='x2', outputCol='x2_indexed', handleInvalid='keep'),
    OneHotEncoderEstimator(inputCols=['x1_indexed', 'x2_indexed'], 
                           outputCols=['x1_encoded', 'x2_encoded'],
                           dropLast=False)
]

In [29]:
pipeline_model2 = Pipeline(stages=stages2).fit(df)

In [30]:
pipeline_model2.transform(df).show()

+---+---+----+----------+----------+-------------+-------------+
| id| x1|  x2|x1_indexed|x2_indexed|   x1_encoded|   x2_encoded|
+---+---+----+----------+----------+-------------+-------------+
|  1|  b|   A|       0.0|       1.0|(4,[0],[1.0])|(3,[1],[1.0])|
|  2|  a|   B|       1.0|       0.0|(4,[1],[1.0])|(3,[0],[1.0])|
|  3|  b|   B|       0.0|       0.0|(4,[0],[1.0])|(3,[0],[1.0])|
|  4|  c|null|       2.0|       2.0|(4,[2],[1.0])|(3,[2],[1.0])|
|  5|  d|   B|       3.0|       0.0|(4,[3],[1.0])|(3,[0],[1.0])|
+---+---+----+----------+----------+-------------+-------------+



In [31]:
stages2 = [
    StringIndexer(inputCol='x1', outputCol='x1_indexed'),
    StringIndexer(inputCol='x2', outputCol='x2_indexed', handleInvalid='skip'),
    OneHotEncoderEstimator(inputCols=['x1_indexed', 'x2_indexed'], 
                           outputCols=['x1_encoded', 'x2_encoded'],
                           dropLast=False),
    VectorAssembler(inputCols=['x1_encoded', 'x2_encoded'], outputCol='features')
]

In [32]:
pipeline_model3 = Pipeline(stages=stages2).fit(df)

In [33]:
dataset = pipeline_model3.transform(df)

In [34]:
dataset.show()

+---+---+---+----------+----------+-------------+-------------+-------------------+
| id| x1| x2|x1_indexed|x2_indexed|   x1_encoded|   x2_encoded|           features|
+---+---+---+----------+----------+-------------+-------------+-------------------+
|  1|  b|  A|       0.0|       1.0|(4,[0],[1.0])|(2,[1],[1.0])|(6,[0,5],[1.0,1.0])|
|  2|  a|  B|       1.0|       0.0|(4,[1],[1.0])|(2,[0],[1.0])|(6,[1,4],[1.0,1.0])|
|  3|  b|  B|       0.0|       0.0|(4,[0],[1.0])|(2,[0],[1.0])|(6,[0,4],[1.0,1.0])|
|  5|  d|  B|       3.0|       0.0|(4,[3],[1.0])|(2,[0],[1.0])|(6,[3,4],[1.0,1.0])|
+---+---+---+----------+----------+-------------+-------------+-------------------+



# Transfer Spark Dataframe to Pandas

In [35]:
import functools

In [36]:
feature_names = ['x1', 'x2']
cate_feature_names = []

for n, s in zip(feature_names, pipeline_model3.stages[:2]):
    names = map(lambda l: 'is_{}_{}'.format(n, l), s.labels)
    cate_feature_names.extend(names)
    
cate_feature_names

[u'is_x1_b', u'is_x1_a', u'is_x1_c', u'is_x1_d', u'is_x2_B', u'is_x2_A']

In [37]:
def transform_features(field, r):
    return np.expand_dims(r[field], axis=0)

def merge_features(field, d1, d2):
    return np.concatenate([d1, d2])

def map_spark_features_to_pandas(col_names, field, feature_df):
    map_func = functools.partial(transform_features, field)
    reduce_func = functools.partial(merge_features, field)
    features_array = feature_df.select(field).rdd \
        .map(map_func) \
        .reduce(reduce_func)
    feature_pd = pd.DataFrame(data=features_array, columns=col_names)
    return feature_pd

In [38]:
data_pd = map_spark_features_to_pandas(cate_feature_names, 'features', dataset)

In [39]:
data_pd

Unnamed: 0,is_x1_b,is_x1_a,is_x1_c,is_x1_d,is_x2_B,is_x2_A
0,1.0,0.0,0.0,0.0,0.0,1.0
1,0.0,1.0,0.0,0.0,1.0,0.0
2,1.0,0.0,0.0,0.0,1.0,0.0
3,0.0,0.0,0.0,1.0,1.0,0.0


# Custom String Indexing to OneHot in individual columns

In [40]:
from collections import OrderedDict

In [41]:
from pyspark import keyword_only
from pyspark.ml.pipeline import Estimator, Model, Pipeline, PipelineModel
from pyspark.ml.param.shared import *
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable

In [42]:
field = 'x1'
field_values = df.rdd.map(lambda x: x[field]).distinct().collect()
field_values

[u'a', u'c', u'b', u'd']

In [43]:
field = 'x1'
field_values = df \
    .groupBy(field) \
    .agg(F.count('*').alias('count')) \
    .orderBy(F.desc('count')) \
    .rdd.map(lambda r: r[field]) \
    .collect()
    
index_map = {v: i for i, v in enumerate(field_values)}
index_map

{u'a': 3, u'b': 0, u'c': 2, u'd': 1}

In [44]:
test_row = df.take(1)[0]

In [45]:
test_row

Row(id=1, x1=u'b', x2=u'A')

In [46]:
test_row[field], field_values

(u'b', [u'b', u'd', u'c', u'a'])

In [47]:
onehot_vals = [('is_{}_{}'.format(field, v), float(test_row[field]==v))
               for v in field_values]
onehot_dict = OrderedDict(onehot_vals)
onehot_dict

OrderedDict([(u'is_x1_b', 1.0),
             (u'is_x1_d', 0.0),
             (u'is_x1_c', 0.0),
             (u'is_x1_a', 0.0)])

In [48]:
for k in onehot_dict: print(k)

is_x1_b
is_x1_d
is_x1_c
is_x1_a


In [49]:
Row(**onehot_dict)

Row(is_x1_a=0.0, is_x1_b=1.0, is_x1_c=0.0, is_x1_d=0.0)

## Custom Estimator 

In [50]:
class HasOutputColsPrefix(Params):

    output_prefix = Param(Params._dummy(), "output_prefix", 
                         "prefix for every output column name",
                         typeConverter=TypeConverters.toString)

    def __init__(self):
        super(HasOutputColsPrefix, self).__init__()
        self._setDefault(output_prefix='is')
    
    def setOutputColsPrefix(self, value):
        return self._set(output_prefix=value)

    def getOutputColsPrefix(self):
        return self.getOrDefault(self.output_prefix)

In [51]:
class HasFieldValues(Params):

    field_values = Param(Params._dummy(), "field_values", 
                         "all possible values for a field",
                         typeConverter=TypeConverters.toList)

    def __init__(self):
        super(HasFieldValues, self).__init__()

    def setFieldValues(self, value):
        return self._set(field_values=value)

    def getFieldValues(self):
        return self.getOrDefault(self.field_values)

In [52]:
class StringDisassembleModel(Model, HasInputCol, 
                             HasOutputColsPrefix, HasFieldValues, 
                             DefaultParamsReadable, DefaultParamsWritable):
    
    def get_new_cols(self):
        x = self.getInputCol()
        prefix = self.getOutputColsPrefix()
        field_values = self.getFieldValues()
        return ['{}_{}_{}'.format(prefix, x, v) for v in field_values]
    
    def disassemble_row(self, field_values, row):
        x = self.getInputCol()
        prefix = self.getOutputColsPrefix()
        new_data = {'{}_{}_{}'.format(prefix, x, v): float(row[x]==v)
                     for v in field_values}
        data = row.asDict()
        data.update(new_data)
        return Row(**data) 
    
    def _transform(self, dataset):
        cols = dataset.columns + self.get_new_cols()
        field_values = self.getFieldValues()
        disassemble_func = functools.partial(self.disassemble_row, field_values)
        return dataset.rdd \
            .map(disassemble_func) \
            .toDF() \
            .select(*cols)

In [53]:
class StringDisassembler(Estimator, HasInputCol, HasOutputColsPrefix):
    
    def _fit(self, dataset):
        x = self.getInputCol()
        field_values = dataset.rdd \
            .map(lambda r: r[x]) \
            .distinct() \
            .filter(lambda v: v is not None) \
            .collect()
        model = StringDisassembleModel() \
            .setInputCol(self.getInputCol()) \
            .setOutputColsPrefix(self.getOutputColsPrefix()) \
            .setFieldValues(field_values)

        return model

In [54]:
df.show()

+---+---+----+
| id| x1|  x2|
+---+---+----+
|  1|  b|   A|
|  2|  a|   B|
|  3|  b|   B|
|  4|  c|null|
|  5|  d|   B|
+---+---+----+



In [55]:
disamb_model = StringDisassembler() \
    .setInputCol('x1') \
    .fit(df)

In [56]:
disamb_model.transform(df).show()

+---+---+----+-------+-------+-------+-------+
| id| x1|  x2|is_x1_a|is_x1_c|is_x1_b|is_x1_d|
+---+---+----+-------+-------+-------+-------+
|  1|  b|   A|    0.0|    0.0|    1.0|    0.0|
|  2|  a|   B|    1.0|    0.0|    0.0|    0.0|
|  3|  b|   B|    0.0|    0.0|    1.0|    0.0|
|  4|  c|null|    0.0|    1.0|    0.0|    0.0|
|  5|  d|   B|    0.0|    0.0|    0.0|    1.0|
+---+---+----+-------+-------+-------+-------+



In [57]:
StringDisassembler() \
    .setInputCol('x2') \
    .setOutputColsPrefix('is') \
    .fit(df).transform(df).show()

+---+---+----+-------+-------+
| id| x1|  x2|is_x2_A|is_x2_B|
+---+---+----+-------+-------+
|  1|  b|   A|    1.0|    0.0|
|  2|  a|   B|    0.0|    1.0|
|  3|  b|   B|    0.0|    1.0|
|  4|  c|null|    0.0|    0.0|
|  5|  d|   B|    0.0|    1.0|
+---+---+----+-------+-------+



In [58]:
disamb_pipeline = Pipeline(stages=[
    StringDisassembler().setInputCol('x1'),
    StringDisassembler().setInputCol('x2')
])

In [59]:
disamb_pipeline_model = disamb_pipeline.fit(df)

In [60]:
disamb_pipeline_model.transform(df).show()

+---+---+----+-------+-------+-------+-------+-------+-------+
| id| x1|  x2|is_x1_a|is_x1_c|is_x1_b|is_x1_d|is_x2_A|is_x2_B|
+---+---+----+-------+-------+-------+-------+-------+-------+
|  1|  b|   A|    0.0|    0.0|    1.0|    0.0|    1.0|    0.0|
|  2|  a|   B|    1.0|    0.0|    0.0|    0.0|    0.0|    1.0|
|  3|  b|   B|    0.0|    0.0|    1.0|    0.0|    0.0|    1.0|
|  4|  c|null|    0.0|    1.0|    0.0|    0.0|    0.0|    0.0|
|  5|  d|   B|    0.0|    0.0|    0.0|    1.0|    0.0|    1.0|
+---+---+----+-------+-------+-------+-------+-------+-------+



In [61]:
disamb_pipeline_model.save('tmp/onehot')

In [62]:
loaded_pipeline_model = PipelineModel.load('tmp/onehot')

In [63]:
loaded_pipeline_model

PipelineModel_4f91bf847ef4ac9dac55

In [64]:
x1_cnt = df.groupBy('x1').count().orderBy(F.desc('count')).toPandas()
x1_cnt = x1_cnt.set_index('x1')

In [65]:
x1_cumratio = (x1_cnt['count'].cumsum() / x1_cnt['count'].sum())

In [66]:
x1_outlier = list(x1_cumratio[x1_cumratio > 0.5].index)

In [67]:
if len(x1_outlier) > 1:
    x1_outlier = x1_outlier[1:]

In [68]:
x1_outlier

[u'c', u'a']