# PySpark Cookbook

### Tomasz Drabas, Denny Lee
#### Version: 0.1
#### Date: 3/10/2018

# Loading the data

In [1]:
forest_path = '../data/forest_coverage_type.csv'

forest = spark.read.csv(
    forest_path
    , header=True
    , inferSchema=True
)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,,pyspark,idle,,,✔


SparkSession available as 'spark'.


In [2]:
forest.printSchema()

root
 |-- Elevation: integer (nullable = true)
 |-- Aspect: integer (nullable = true)
 |-- Slope: integer (nullable = true)
 |-- Horizontal_Distance_To_Hydrology: integer (nullable = true)
 |-- Vertical_Distance_To_Hydrology: integer (nullable = true)
 |-- Horizontal_Distance_To_Roadways: integer (nullable = true)
 |-- Hillshade_9am: integer (nullable = true)
 |-- Hillshade_Noon: integer (nullable = true)
 |-- Hillshade_3pm: integer (nullable = true)
 |-- Horizontal_Distance_To_Fire_Points: integer (nullable = true)
 |-- Wilderness_Area_Rawah: integer (nullable = true)
 |-- Wilderness_Area_Neota: integer (nullable = true)
 |-- Wilderness_Area_Comanche: integer (nullable = true)
 |-- Wilderness_Area_CacheLaPoudre: integer (nullable = true)
 |-- Soil_type_2702: integer (nullable = true)
 |-- Soil_type_2703: integer (nullable = true)
 |-- Soil_type_2704: integer (nullable = true)
 |-- Soil_type_2705: integer (nullable = true)
 |-- Soil_type_2706: integer (nullable = true)
 |-- Soil_type_2

# Introducing Transformers

List of most popular **Transformers**
* Binarizer
* Bucketizer
* ChiSqSelector
* CountVectorizer
* DCT
* ElementwiseProduct
* HashingTF
* IDF
* IndexToString
* MaxAbsScaler
* MinMaxScaler
* NGram
* Normalizer
* OneHotEncoder
* PCA
* PolynomialExpansion
* QuantileDiscretizer
* RegexTokenizer
* RFormula
* SQLTransformer
* StandardScaler
* StopWordsRemover
* StringIndexer
* Tokenizer
* VectorAssembler
* VectorIndexer
* VectorSlicer
* Word2Vec

In [3]:
import pyspark.sql.functions as f
import pyspark.ml.feature as feat
import numpy as np

# Bucketize

In [4]:
buckets_no = 10

dist_min_max = (
    forest.agg(
          f.min('Horizontal_Distance_To_Hydrology')
            .alias('min')
        , f.max('Horizontal_Distance_To_Hydrology')
            .alias('max')
    )
    .rdd
    .map(lambda row: (row.min, row.max))
    .collect()[0]
)

rng = dist_min_max[1] - dist_min_max[0]

splits = list(np.arange(
    dist_min_max[0]
    , dist_min_max[1]
    , rng / (buckets_no + 1)))

bucketizer = feat.Bucketizer(
    splits=splits
    , inputCol= 'Horizontal_Distance_To_Hydrology'
    , outputCol='Horizontal_Distance_To_Hydrology_Bkt'
)

(
    bucketizer
    .transform(forest)
    .select(
         'Horizontal_Distance_To_Hydrology'
        ,'Horizontal_Distance_To_Hydrology_Bkt'
    ).show(5)
)

+--------------------------------+------------------------------------+
|Horizontal_Distance_To_Hydrology|Horizontal_Distance_To_Hydrology_Bkt|
+--------------------------------+------------------------------------+
|                             258|                                 2.0|
|                             212|                                 1.0|
|                             268|                                 2.0|
|                             242|                                 1.0|
|                             153|                                 1.0|
+--------------------------------+------------------------------------+
only showing top 5 rows

# Principal Components Analysis

In [5]:
vectorAssembler = (
    feat.VectorAssembler(
        inputCols=forest.columns, 
        outputCol='feat'
    )
)

pca = (
    feat.PCA(
        k=5
        , inputCol=vectorAssembler.getOutputCol()
        , outputCol='pca_feat'
    )
)

(
    pca
    .fit(vectorAssembler.transform(forest))
    .transform(vectorAssembler.transform(forest))
    .select('feat','pca_feat')
    .take(1)
)

[Row(feat=SparseVector(55, {0: 2596.0, 1: 51.0, 2: 3.0, 3: 258.0, 5: 510.0, 6: 221.0, 7: 232.0, 8: 148.0, 9: 6279.0, 10: 1.0, 42: 1.0, 54: 5.0}), pca_feat=DenseVector([-3887.7711, 4996.8103, 2323.0932, 1014.5873, -135.1702]))]

# Introducing Estimators

List of most popular **Estimators**
1. Classification
 * LinearSVC
 * LogisticRegression 
 * DecisionTreeClassifier
 * GBTClassifier
 * RandomForestClassifier
 * NaiveBayes
 * MultilayerPerceptronClassifier
 * OneVsRest
2. Regression
 * AFTSurvivalRegression
 * DecisionTreeRegressor
 * GBTRegressor
 * GeneralizedLinearRegression
 * IsotonicRegression
 * LinearRegression
 * RandomForestRegressor
3. Clustering
 * BisectingKMeans
 * Kmeans
 * GaussianMixture
 * LDA

In [2]:
forest.select('CoverType').groupBy('CoverType').count().show()

+---------+------+
|CoverType| count|
+---------+------+
|        1|211840|
|        6| 17367|
|        3| 35754|
|        5|  9493|
|        4|  2747|
|        7| 20510|
|        2|283301|
+---------+------+

# Linear SVM

In [5]:
import pyspark.ml.classification as cl

vectorAssembler = feat.VectorAssembler(
    inputCols=forest.columns[0:-1]
    , outputCol='features')

fir_dataset = (
    vectorAssembler
    .transform(forest)
    .withColumn(
        'label'
        , (f.col('CoverType') == 1).cast('integer'))
    .select('label', 'features')
)

svc_obj = cl.LinearSVC(maxIter=10, regParam=0.01)
svc_model = svc_obj.fit(fir_dataset)

svc_model.coefficients

DenseVector([-0.0001, -0.0, -0.0023, -0.0, -0.0001, 0.0, -0.001, -0.0017, -0.0003, -0.0, 0.0, 0.0401, -0.0071, -0.0958, -0.0901, -0.0653, -0.0655, -0.0437, -0.0928, -0.0848, -0.0211, -0.0045, -0.0498, -0.0829, -0.0522, -0.0325, -0.0263, -0.0923, -0.0889, -0.0275, -0.0606, -0.0595, 0.0341, -0.003, 0.0822, 0.0607, 0.0351, 0.0093, 0.0048, -0.0154, 0.0422, -0.0673, -0.0039, -0.0142, 0.0036, 0.0078, 0.0, -0.0117, 0.0283, -0.0002, -0.0463, 0.0394, 0.0292, 0.0358])

# Linear Regression

In [8]:
import pyspark.ml.regression as rg

vectorAssembler = feat.VectorAssembler(
    inputCols=forest.columns[1:]
    , outputCol='features')

elevation_dataset = (
    vectorAssembler
    .transform(forest)
    .withColumn(
        'label'
        , f.col('Elevation').cast('float'))
    .select('label', 'features')
)
    
lr_obj = rg.LinearRegression(
    maxIter=10
    , regParam=0.01
    , elasticNetParam=1.00)
lr_model = lr_obj.fit(elevation_dataset)

lr_model.coefficients

DenseVector([0.0309, 0.6522, 0.1911, 0.1424, 0.0342, 0.7402, 1.053, -0.0017, -0.0041, 2.7163, 189.0362, 27.8238, -265.8505, -407.4379, -346.0612, -364.3841, -302.6788, -400.5852, -212.9918, -126.1329, -117.7423, -312.0478, -248.7118, -221.4788, -155.1459, -84.5129, -398.0433, -387.8102, -179.4485, -261.3875, -337.7875, 48.0629, -94.7813, 149.8043, 135.144, 80.0901, 64.3659, 124.0233, -115.0126, 119.1285, -181.7498, 10.8056, -42.7849, 65.5441, 102.2562, 36.9865, -48.1163, 379.2091, 256.0169, 497.1714, 313.0607, 337.172, 397.0758, -14.4551])

In [9]:
summary = lr_model.summary

print(
    summary.r2
    , summary.rootMeanSquaredError
    , summary.meanAbsoluteError
)

0.7860412464754236 129.50871925702438 103.34079732698483

# Introducing Pipelines

In [25]:
from pyspark.ml import Pipeline

vectorAssembler = feat.VectorAssembler(
    inputCols=forest.columns[1:]
    , outputCol='features')

lr_obj = rg.GeneralizedLinearRegression(
    labelCol='Elevation'
    , maxIter=10
    , regParam=0.01
    , link='identity'
    , linkPredictionCol="p"
)

pip = Pipeline(stages=[vectorAssembler, lr_obj])

(
    pip
    .fit(forest)
    .transform(forest)
    .select('Elevation', 'prediction')
    .show(5)
)

+---------+------------------+
|Elevation|        prediction|
+---------+------------------+
|     2596|2840.7801831411316|
|     2590|2828.7464246669683|
|     2804| 2842.761272955131|
|     2785| 2966.057500325109|
|     2595|2817.1687155114637|
+---------+------------------+
only showing top 5 rows

In [28]:
import matplotlib.pyplot as plt

transformed_df = forest.select('Elevation')
transformed_df.toPandas().hist()

plt.savefig('Elevation_histogram.png')

plt.close('all')

# Selecting the most predictable features

## Chi-Square selector

In [40]:
vectorAssembler = feat.VectorAssembler(
    inputCols=forest.columns[0:-1]
    , outputCol='features'
)

selector = feat.ChiSqSelector(
    labelCol='CoverType'
    , numTopFeatures=10
    , outputCol='selected')

pipeline_sel = Pipeline(stages=[vectorAssembler, selector])

(
    pipeline_sel
    .fit(forest)
    .transform(forest)
    .select(selector.getOutputCol())
    .show(5)
)

+--------------------+
|            selected|
+--------------------+
|(10,[0,1,2,3,5,6,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
+--------------------+
only showing top 5 rows

## Correlation matrix

In [41]:
import pyspark.ml.stat as st

features_and_label = feat.VectorAssembler(
    inputCols=forest.columns
    , outputCol='features'
)

corr = st.Correlation.corr(
    features_and_label.transform(forest), 
    'features', 
    'pearson'
)

print(str(corr.collect()[0][0]))

DenseMatrix([[ 1.        ,  0.01573494, -0.24269664, ...,  0.19359464,
               0.21261232, -0.26955378],
             [ 0.01573494,  1.        ,  0.07872841, ...,  0.00829428,
              -0.00586558,  0.0170798 ],
             [-0.24269664,  0.07872841,  1.        , ...,  0.09360193,
               0.02563691,  0.14828541],
             ...,
             [ 0.19359464,  0.00829428,  0.09360193, ...,  1.        ,
              -0.01929168,  0.15566826],
             [ 0.21261232, -0.00586558,  0.02563691, ..., -0.01929168,
               1.        ,  0.1283513 ],
             [-0.26955378,  0.0170798 ,  0.14828541, ...,  0.15566826,
               0.1283513 ,  1.        ]])

In [67]:
num_of_features = 10
cols = dict([
    (i, e) 
    for i, e 
    in enumerate(forest.columns)
])

corr_matrix = corr.collect()[0][0]
label_corr_with_idx = [
    (i[0], e) 
    for i, e 
    in np.ndenumerate(corr_matrix.toArray()[:,0])
][1:]

label_corr_with_idx_sorted = sorted(
    label_corr_with_idx
    , key=lambda el: -abs(el[1])
)

features_selected = np.array([
    cols[el[0]] 
    for el 
    in label_corr_with_idx_sorted
])[0:num_of_features]

features_selected

array(['Wilderness_Area_CacheLaPoudre', 'Soil_type_4703',
       'Horizontal_Distance_To_Roadways',
       'Horizontal_Distance_To_Hydrology', 'CoverType', 'Slope',
       'Wilderness_Area_Neota', 'Soil_type_8771', 'Soil_type_2717',
       'Soil_type_8776'], dtype='<U34')

# Predicting forest coverage type

## Logistic regression

In [69]:
forest_train, forest_test = (
    forest
    .randomSplit([0.7, 0.3], seed=666)
)

vectorAssembler = feat.VectorAssembler(
    inputCols=forest.columns[0:-1]
    , outputCol='features'
)

selector = feat.ChiSqSelector(
    labelCol='CoverType'
    , numTopFeatures=10
    , outputCol='selected'
)

logReg_obj = cl.LogisticRegression(
    labelCol='CoverType'
    , featuresCol=selector.getOutputCol()
    , regParam=0.01
    , elasticNetParam=1.0
    , family='multinomial'
)

pipeline = Pipeline(
    stages=[
        vectorAssembler
        , selector
        , logReg_obj
    ])

pModel = pipeline.fit(forest_train)

In [71]:
import pyspark.ml.evaluation as ev

results_logReg = (
    pModel
    .transform(forest_test)
    .select('CoverType', 'probability', 'prediction')
)

evaluator = ev.MulticlassClassificationEvaluator(
    predictionCol='prediction'
    , labelCol='CoverType')

(
    evaluator.evaluate(results_logReg)
    , evaluator.evaluate(
        results_logReg
        , {evaluator.metricName: 'weightedPrecision'}
    ) 
    , evaluator.evaluate(
        results_logReg
        , {evaluator.metricName: 'accuracy'}
    )
)

(0.6638467009427569, 0.6632784396900246, 0.691296432850954)

## Random Forest classifier

In [77]:
rf_obj = cl.RandomForestClassifier(
    labelCol='CoverType'
    , featuresCol=selector.getOutputCol()
    , minInstancesPerNode=10
    , numTrees=10
)

pipeline = Pipeline(
    stages=[vectorAssembler, selector, rf_obj]
)

pModel = pipeline.fit(forest_train)

results_rf = (
    pModel
    .transform(forest_test)
    .select('CoverType', 'probability', 'prediction')
)

evaluator = ev.MulticlassClassificationEvaluator(
    predictionCol='prediction'
    , labelCol='CoverType')

(
    evaluator.evaluate(results_rf)
    , evaluator.evaluate(
        results_rf
        , {evaluator.metricName: 'weightedPrecision'}
    )
    , evaluator.evaluate(
        results_rf
        , {evaluator.metricName: 'accuracy'}
    )
)

(0.6638467009427569, 0.6632784396900246, 0.691296432850954)

# Estimating forest elevation

## Random Forest regression

In [17]:
vectorAssembler = feat.VectorAssembler(
    inputCols=forest.columns[1:]
    , outputCol='features')

rf_obj = rg.RandomForestRegressor(
    labelCol='Elevation'
    , maxDepth=10
    , minInstancesPerNode=10
    , minInfoGain=0.1
    , numTrees=10
)

pip = Pipeline(stages=[vectorAssembler, rf_obj])

results = (
    pip
    .fit(forest)
    .transform(forest)
    .select('Elevation', 'prediction')
)

evaluator = ev.RegressionEvaluator(labelCol='Elevation')
evaluator.evaluate(results, {evaluator.metricName: 'r2'})

0.8264236722093034

## Gradient Boosted Trees regression

In [18]:
gbt_obj = rg.GBTRegressor(
    labelCol='Elevation'
    , minInstancesPerNode=10
    , minInfoGain=0.1
)

pip = Pipeline(stages=[vectorAssembler, gbt_obj])

results = (
    pip
    .fit(forest)
    .transform(forest)
    .select('Elevation', 'prediction')
)

evaluator = ev.RegressionEvaluator(labelCol='Elevation')
evaluator.evaluate(results, {evaluator.metricName: 'r2'})

0.833598109692272

# Clustering forest cover type

In [79]:
import pyspark.ml.clustering as clust

vectorAssembler = feat.VectorAssembler(
    inputCols=forest.columns[:-1]
    , outputCol='features')

kmeans_obj = clust.KMeans(k=7, seed=666)

pip = Pipeline(stages=[vectorAssembler, kmeans_obj])

In [80]:
results = (
    pip
    .fit(forest)
    .transform(forest)
    .select('features', 'CoverType', 'prediction')
)

results.show(5)

+--------------------+---------+----------+
|            features|CoverType|prediction|
+--------------------+---------+----------+
|(54,[0,1,2,3,5,6,...|        5|         1|
|(54,[0,1,2,3,4,5,...|        5|         1|
|(54,[0,1,2,3,4,5,...|        2|         1|
|(54,[0,1,2,3,4,5,...|        2|         1|
|(54,[0,1,2,3,4,5,...|        5|         1|
+--------------------+---------+----------+
only showing top 5 rows

In [21]:
clustering_ev = ev.ClusteringEvaluator()
clustering_ev.evaluate(results)

0.4999826131644061

# Tuning hyper parameters

## Grid search

In [82]:
import pyspark.ml.tuning as tune

vectorAssembler = feat.VectorAssembler(
    inputCols=forest.columns[0:-1]
    , outputCol='features')

selector = feat.ChiSqSelector(
    labelCol='CoverType'
    , numTopFeatures=5
    , outputCol='selected')

logReg_obj = cl.LogisticRegression(
    labelCol='CoverType'
    , featuresCol=selector.getOutputCol()
    , family='multinomial'
)

logReg_grid = (
    tune.ParamGridBuilder()
    .addGrid(logReg_obj.regParam
            , [0.01, 0.1]
        )
    .addGrid(logReg_obj.elasticNetParam
            , [1.0, 0.5]
        )
    .build()
)

logReg_ev = ev.MulticlassClassificationEvaluator(
    predictionCol='prediction'
    , labelCol='CoverType')

cross_v = tune.CrossValidator(
    estimator=logReg_obj
    , estimatorParamMaps=logReg_grid
    , evaluator=logReg_ev
)

pipeline = Pipeline(stages=[vectorAssembler, selector])
data_trans = pipeline.fit(forest_train)

logReg_modelTest = cross_v.fit(
    data_trans.transform(forest_train)
)

In [83]:
data_trans_test = data_trans.transform(forest_test)
results = logReg_modelTest.transform(data_trans_test)

print(logReg_ev.evaluate(results, {logReg_ev.metricName: 'weightedPrecision'}))
print(logReg_ev.evaluate(results, {logReg_ev.metricName: 'weightedRecall'}))
print(logReg_ev.evaluate(results, {logReg_ev.metricName: 'accuracy'}))

0.6024281861281453
0.6602048575905612
0.6602048575905614

## Train-validation splitting

In [27]:
train_v = tune.TrainValidationSplit(
    estimator=logReg_obj
    , estimatorParamMaps=logReg_grid
    , evaluator=logReg_ev
    , parallelism=4
)

logReg_modelTrainV = (
    train_v
    .fit(data_trans.transform(forest_train))

results = logReg_modelTrainV.transform(data_trans_test)

print(logReg_ev.evaluate(results, {logReg_ev.metricName: 'weightedPrecision'}))
print(logReg_ev.evaluate(results, {logReg_ev.metricName: 'weightedRecall'}))
print(logReg_ev.evaluate(results, {logReg_ev.metricName: 'accuracy'}))

0.6024281861281453
0.6602048575905612
0.6602048575905614

# Feature engineering - NLP

In [28]:
some_text = spark.createDataFrame([
    ['''
    Apache Spark achieves high performance for both batch
    and streaming data, using a state-of-the-art DAG scheduler, 
    a query optimizer, and a physical execution engine.
    ''']
    , ['''
    Apache Spark is a fast and general-purpose cluster computing 
    system. It provides high-level APIs in Java, Scala, Python 
    and R, and an optimized engine that supports general execution 
    graphs. It also supports a rich set of higher-level tools including 
    Spark SQL for SQL and structured data processing, MLlib for machine 
    learning, GraphX for graph processing, and Spark Streaming.
    ''']
    , ['''
    Machine learning is a field of computer science that often uses 
    statistical techniques to give computers the ability to "learn" 
    (i.e., progressively improve performance on a specific task) 
    with data, without being explicitly programmed.
    ''']
], ['text'])

## Tokenizer

In [35]:
splitter = feat.RegexTokenizer(
    inputCol='text'
    , outputCol='text_split'
    , pattern='\s+|[,.\"]'
)

splitter.transform(some_text).select('text_split').take(1)

[Row(text_split=['apache', 'spark', 'achieves', 'high', 'performance', 'for', 'both', 'batch', 'and', 'streaming', 'data', 'using', 'a', 'state-of-the-art', 'dag', 'scheduler', 'a', 'query', 'optimizer', 'and', 'a', 'physical', 'execution', 'engine'])]

## Stop-words removal

In [45]:
sw_remover = feat.StopWordsRemover(
    inputCol=splitter.getOutputCol()
    , outputCol='no_stopWords'
)

sw_remover.transform(splitter.transform(some_text)).select('no_stopWords').take(1)

[Row(no_stopWords=['apache', 'spark', 'achieves', 'high', 'performance', 'batch', 'streaming', 'data', 'using', 'state-of-the-art', 'dag', 'scheduler', 'query', 'optimizer', 'physical', 'execution', 'engine'])]

## Hashing trick

In [54]:
hasher = feat.HashingTF(
    inputCol=sw_remover.getOutputCol()
    , outputCol='hashed'
    , numFeatures=20
)

hasher.transform(sw_remover.transform(splitter.transform(some_text))).select('hashed').take(1)

[Row(hashed=SparseVector(20, {2: 2.0, 3: 2.0, 4: 2.0, 5: 3.0, 8: 1.0, 9: 1.0, 15: 3.0, 16: 1.0, 18: 1.0, 19: 1.0}))]

## Term Frequency-Inverse Document Frequency

In [60]:
idf = feat.IDF(
    inputCol=hasher.getOutputCol()
    , outputCol='features'
)

idfModel = idf.fit(hasher.transform(sw_remover.transform(splitter.transform(some_text))))
idfModel.transform(hasher.transform(sw_remover.transform(splitter.transform(some_text)))).select('features').take(1)

[Row(features=SparseVector(20, {2: 0.0, 3: 0.0, 4: 0.0, 5: 0.863, 8: 0.2877, 9: 0.0, 15: 0.0, 16: 0.6931, 18: 0.2877, 19: 0.0}))]

In [62]:
pipeline = Pipeline(stages=[splitter, sw_remover, hasher, idf])

pipelineModel = pipeline.fit(some_text)
pipelineModel.transform(some_text).select('text','features').take(1)

[Row(text='\n    Apache Spark achieves high performance for both batch\n    and streaming data, using a state-of-the-art DAG scheduler, \n    a query optimizer, and a physical execution engine.\n    ', features=SparseVector(20, {2: 0.0, 3: 0.0, 4: 0.0, 5: 0.863, 8: 0.2877, 9: 0.0, 15: 0.0, 16: 0.6931, 18: 0.2877, 19: 0.0}))]

## Word-2-Vec model

In [57]:
w2v = feat.Word2Vec(
    vectorSize=5
    , minCount=2
    , inputCol=sw_remover.getOutputCol()
    , outputCol='vector'
)

model=w2v.fit(sw_remover.transform(splitter.transform(some_text)))
model.transform(sw_remover.transform(splitter.transform(some_text))).select('vector').take(1)

[Row(vector=DenseVector([0.0187, -0.0121, -0.0208, -0.0028, 0.002]))]

# Discretizing continuous variables

In [13]:
signal_df = spark.read.csv(
    '../data/fourier_signal.csv'
    , header=True
    , inferSchema=True
)

steps = feat.QuantileDiscretizer(
       numBuckets=10,
       inputCol='signal',
       outputCol='discretized')

transformed = (
    steps
    .fit(signal_df)
    .transform(signal_df)
)

In [161]:
import matplotlib.pyplot as plt

transformed_df = transformed.toPandas()

fig, ax1 = plt.subplots()
ax2 = ax1.twinx()

ax1.plot(transformed_df['signal'], 'k')
ax2.plot(transformed_df['discretized'], 'b-')

ax1.set_ylabel('original', color='k')
ax2.set_ylabel('discretized', color='b')

ax1.set_ylim((-55, 35))
ax2.set_ylim((-2, 12))
fig.tight_layout()

plt.savefig('discretized.png')

plt.close('all')

# Standardizing continuous variables

In [24]:
signal_df.describe().show()

+-------+--------------------+
|summary|              signal|
+-------+--------------------+
|  count|                 256|
|   mean|-5.59448321002520...|
| stddev|   8.056325329550202|
|    min|    -39.878842775021|
|    max|  15.718058116309553|
+-------+--------------------+

In [22]:
from pyspark.ml import Pipeline
vec = feat.VectorAssembler(
    inputCols=['signal']
    , outputCol='signal_vec'
)

norm = feat.StandardScaler(
    inputCol=vec.getOutputCol()
    , outputCol='signal_norm'
    , withMean=True
    , withStd=True
)

norm_pipeline = Pipeline(stages=[vec, norm])
signal_norm = (
    norm_pipeline
    .fit(signal_df)
    .transform(signal_df)
)

signal_norm.take(1)

[Row(signal=0.5233399378711634, signal_vec=DenseVector([0.5233]), signal_norm=DenseVector([0.065]))]

In [173]:
normalized_df = signal_norm.toPandas()
normalized_df['normalized'] = normalized_df.apply(lambda row: row[2][0], axis=1)

fig, ax1 = plt.subplots()
ax2 = ax1.twinx()

ax1.plot(normalized_df['signal'], 'k')
ax2.plot(normalized_df['normalized'], 'b-')

ax1.set_ylabel('original', color='k')
ax2.set_ylabel('discretized', color='b')

ax1.set_ylim((-105, 30))
ax2.set_ylim((-6, 12))
fig.tight_layout()

plt.savefig('normalized.png')

plt.close('all')

# Topic mining

In [25]:
articles = spark.createDataFrame([
    ('''
        The Andromeda Galaxy, named after the mythological 
        Princess Andromeda, also known as Messier 31, M31, 
        or NGC 224, is a spiral galaxy approximately 780 
        kiloparsecs (2.5 million light-years) from Earth, 
        and the nearest major galaxy to the Milky Way. 
        Its name stems from the area of the sky in which it 
        appears, the constellation of Andromeda. The 2006 
        observations by the Spitzer Space Telescope revealed 
        that the Andromeda Galaxy contains approximately one 
        trillion stars, more than twice the number of the 
        Milky Way’s estimated 200-400 billion stars. The 
        Andromeda Galaxy, spanning approximately 220,000 light 
        years, is the largest galaxy in our Local Group, 
        which is also home to the Triangulum Galaxy and 
        other minor galaxies. The Andromeda Galaxy's mass is 
        estimated to be around 1.76 times that of the Milky 
        Way Galaxy (~0.8-1.5×1012 solar masses vs the Milky 
        Way's 8.5×1011 solar masses).
    ''','Galaxy', 'Andromeda')
    , ('''
        The Milky Way is the galaxy that contains our Solar 
        System. The descriptive "milky" is derived from the 
        appearance from Earth of the galaxy – a band of light 
        seen in the night sky formed from stars that cannot be 
        individually distinguished by the naked eye. The term 
        Milky Way is a translation of the Latin via lactea, from 
        the Greek. From Earth, the Milky Way appears as a band 
        because its disk-shaped structure is viewed from within. 
        Galileo Galilei first resolved the band of light into 
        individual stars with his telescope in 1610. Observations 
        by Edwin Hubble showed that the Milky 
        Way is just one of many galaxies.
    ''','Galaxy','Milky Way')
    , ('''
        Australia, officially the Commonwealth of Australia, 
        is a sovereign country comprising the mainland of the 
        Australian continent, the island of Tasmania and numerous 
        smaller islands. It is the largest country in Oceania and 
        the world's sixth-largest country by total area. The 
        neighbouring countries are Papua New Guinea, Indonesia and 
        East Timor to the north; the Solomon Islands and Vanuatu to 
        the north-east; and New Zealand to the south-east. Australia's 
        capital is Canberra, and its largest city is Sydney.
    ''','Geography', 'Australia')
    , ('''
        The United States of America (USA), commonly known as the United 
        States (U.S.) or America, is a federal republic composed of 50 
        states, a federal district, five major self-governing territories, 
        and various possessions. At 3.8 million square miles (9.8 million 
        km2) and with over 325 million people, the United States is the 
        world's third- or fourth-largest country by total area and the 
        third-most populous country. The capital is Washington, D.C., and 
        the largest city by population is New York City. Forty-eight states 
        and the capital's federal district are contiguous and in North America 
        between Canada and Mexico. The State of Alaska is in the northwest 
        corner of North America, bordered by Canada to the east and across 
        the Bering Strait from Russia to the west. The State of Hawaii is 
        an archipelago in the mid-Pacific Ocean. The U.S. territories are 
        scattered about the Pacific Ocean and the Caribbean Sea, stretching 
        across nine official time zones. The extremely diverse geography, 
        climate, and wildlife of the United States make it one of the world's 
        17 megadiverse countries.
    ''','Geography', 'USA')
    , ('''
        China, officially the People's Republic of China (PRC), is a unitary 
        sovereign state in East Asia and, with a population of around 1.404 
        billion, the world's most populous country. Covering 9,600,000 
        square kilometers (3,700,000 sq mi), China has the most borders of 
        any country in the world. Governed by the Communist Party of China, 
        it exercises jurisdiction over 22 provinces, five autonomous regions, 
        four direct-controlled municipalities (Beijing, Tianjin, Shanghai, and 
        Chongqing), and the special administrative regions of Hong Kong and Macau.
    ''','Geography', 'China')
    , ('''
        Poland, officially the Republic of Poland, is a country located in 
        Central Europe. It is divided into 16 administrative subdivisions, 
        covering an area of 312,679 square kilometres (120,726 sq mi), and has 
        a largely temperate seasonal climate. With a population of approximately 
        38.5 million people, Poland is the sixth most populous member state of 
        the European Union. Poland's capital and largest metropolis is 
        Warsaw.
    ''','Geography', 'Poland')
    , ('''
        The domestic dog (Canis lupus familiaris when considered a subspecies 
        of the gray wolf or Canis familiaris when considered a distinct species) 
        is a member of the genus Canis (canines), which forms part of the 
        wolf-like canids, and is the most widely abundant terrestrial carnivore.
        The dog and the extant gray wolf are sister taxa as modern wolves are 
        not closely related to the wolves that were first domesticated, which 
        implies that the direct ancestor of the dog is extinct. The dog was 
        the first species to be domesticated and has been selectively bred over 
        millennia for various behaviors, sensory capabilities, and physical attributes.
    ''','Animal', 'Dog')
    , ('''
        The origin of the domestic dog is not clear. It is known that the dog was 
        the first domesticated species. The domestic dog is a member of the genus 
        Canis (canines), which forms part of the wolf-like canids, and is the most 
        widely abundant terrestrial carnivore. The closest living relative of the 
        dog is the gray wolf and there is no evidence of any other canine 
        contributing to its genetic lineage. The dog and the extant gray wolf 
        form two sister clades, with modern wolves not closely related to the 
        wolves that were first domesticated. The archaeological record shows 
        the first undisputed dog remains buried beside humans 14,700 years ago, 
        with disputed remains occurring 36,000 years ago. These dates imply 
        that the earliest dogs arose in the time of human hunter-gatherers 
        and not agriculturists.
    ''','Animal', 'Dog')
    , ('''
        Washington, officially the State of Washington, is a state in the Pacific 
        Northwest region of the United States. Named after George Washington, 
        the first president of the United States, the state was made out of the 
        western part of the Washington Territory, which was ceded by Britain in 
        1846 in accordance with the Oregon Treaty in the settlement of the 
        Oregon boundary dispute. It was admitted to the Union as the 42nd state 
        in 1889. Olympia is the state capital. Washington is sometimes referred 
        to as Washington State, to distinguish it from Washington, D.C., the 
        capital of the United States, which is often shortened to Washington.
    ''','Geography', 'Washington State')    
], ['articles', 'Topic', 'Object'])

In [36]:
import pyspark.ml.clustering as clust

splitter = feat.RegexTokenizer(
    inputCol='articles'
    , outputCol='articles_split'
    , pattern='\s+|[,.\"]'
)

sw_remover = feat.StopWordsRemover(
    inputCol=splitter.getOutputCol()
    , outputCol='no_stopWords'
)

count_vec = feat.CountVectorizer(
    inputCol=sw_remover.getOutputCol()
    , outputCol='vector'
)

lda_clusters = clust.LDA(
    k=3
    , optimizer='online'
    , featuresCol=count_vec.getOutputCol()
)

topic_pipeline = Pipeline(
    stages=[
        splitter
        , sw_remover
        , count_vec
        , lda_clusters
    ]
)

for topic in ( 
        topic_pipeline
        .fit(articles)
        .transform(articles)
        .select('Topic','Object','topicDistribution')
        .take(10)
):
    print(
        topic.Topic
        , topic.Object
        , np.argmax(topic.topicDistribution)
        , topic.topicDistribution
    )


Galaxy Andromeda 2 [0.003053456550444906,0.0033317477861422363,0.9936147956634129]
Galaxy Milky Way 2 [0.004752646858051239,0.0050467276024757125,0.9902006255394731]
Geography Australia 1 [0.00632938201257351,0.9877519489900843,0.005918668997342191]
Geography USA 1 [0.002525770470526258,0.9951088020926291,0.002365427436844653]
Geography China 1 [0.0051541381704948135,0.6008937537867546,0.3939521080427506]
Geography Poland 1 [0.006814345676648856,0.986849415140345,0.006336239183006135]
Animal Dog 0 [0.9901640623662747,0.005226762717124236,0.004609174916600995]
Animal Dog 0 [0.9926300349445092,0.003938103061207765,0.0034318619942831073]
Geography Washington State 1 [0.005261811808175384,0.9898606664191076,0.004877521772717041]