In [None]:
df = spark.read.csv('bank-additional/bank-additional-full.csv',
                    sep=';',
                    header=True,
                    inferSchema=True)
df.show(5, False)

In [None]:
# 可以透過.toDF來更改column名稱
df = df.toDF(*[c.replace('.', '_') for c in df.columns])

In [None]:
from pyspark.sql.functions import isnull, count, when, col

def data_overview():
    """
    input: dataset
    output: data summary
    """
    print('Rows:', df.count())
    print('Columns:', len(df.columns))
    print('Missing', df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).\
                        toPandas().sum().sum())
    for c in df.columns:
        print('{:15}: '.format(c), df.select(c).distinct().count(), '\t', 
              [row[c] for row in df.select(c).distinct().collect()[:5]])
        
data_overview()

In [None]:
count(when(isnull(c), c)).alias(c) for c in df.columns
# isnull表示判斷是不是null值，當是null值時，就返回該資料，其他的則被判定成nan，而在計算期column下的數量

In [None]:
# Count plot
grouped = df.groupBy('job', 'y').count()

import plotly.tools as tls

# 透過row[feature]得到其所代表的值
trace1 = go.Bar(x=[row['job'] for row in grouped.filter(grouped['y'] == 0).collect()],
                y=[row['count'] for row in grouped.filter(grouped['y'] == 0).collect()],
                name='No',showlegend=True)

trace2 = go.Bar(x=[row['job'] for row in grouped.filter(grouped['y'] == 1).collect()],
                y=[row['count'] for row in grouped.filter(grouped['y'] == 1).collect()],
                name='Yes')

# Mean Plot
grouped_mean = df.groupBy('job').agg({'y': 'mean'})
trace3 = go.Bar(x=[row['job'] for row in grouped_mean.collect()],
                y=[round(row['avg(y)'], 2) for row in grouped_mean.collect()],
                name='Mean Subscription Rate', showlegend=False)

fig = tls.make_subplots(rows=1, cols=2,
                        subplot_titles=('Count Plot of Job',
                                        'Mean Subscription Rate of Job'))
fig.append_trace(trace1, 1, 1)
fig.append_trace(trace2, 1, 1)
fig.append_trace(trace3, 1, 2)

py.iplot(fig)

In [None]:
trace1 = ff.create_distplot([df.filter(df['y'] == 1).rdd.map(lambda x: x['age']).collect(),
                            df.filter(df['y'] == 0).rdd.map(lambda x: x['age']).collect()],
                            group_labels=['Subscription 1',
                                          'Subscription 0'],
                            show_rug=False)

grouped_mean = df.groupBy('age').agg({'y': 'mean'}).orderBy('age')
trace2 = go.Scatter(x=grouped_mean.rdd.map(lambda x: x['age']).collect(),
                    y=grouped_mean.rdd.map(lambda x: x['avg(y)']).collect(),
                    mode='lines+markers',
                    showlegend=False)

fig = tls.make_subplots(rows=1, cols=2,
                        subplot_titles=('Count Plot of Age',
                                        'Mean Subscription Rate vs. age'))

# If you want to make distplot in the subplots,
# you need to plot if separately after make ff.creat_distplot
fig.add_trace(go.Histogram(trace1['data'][1],
                           marker_color='blue',
                           marker={'opacity':0.3},
                           showlegend=True), 1, 1)
fig.add_trace(go.Histogram(trace1['data'][0],
                           marker_color='red',
                           marker={'opacity':0.4},
                           showlegend=True), 1, 1)

fig.append_trace(trace2, 1, 2)
fig.show()

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, StandardScaler

continuous_features = [d[0] for d in df.dtypes if (d[1] != 'string') & (d[0] != 'y')]
categorical_features = [d[0] for d in df.dtypes if (d[1] == 'string') & (d[0] != 'y')]

# Every categorical features have own indexer
indexers = [StringIndexer(inputCol=c, outputCol='{}_indexed'.format(c)) for c in categorical_features]
# Depend each indexer to build encoder
encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(), 
                          outputCol='{}_encoded'.format(indexer.getOutputCol())) for indexer in indexers]
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]+continuous_features,
                            outputCol='features')

pipeline = Pipeline(stages=indexers+encoders+[assembler])

model = pipeline.fit(df)
data = model.transform(df)

data = data.withColumn('label', col('y'))
data = data.select('features', 'label')

In [None]:
trainingData, testData = data.randomSplit([0.8, 0.2])

In [None]:
def select_model_prediction(algo, train, test, cf='coefficient'):

    model = algo.fit(train)
    predictions = model.transform(test)
    pred = predictions.select('label', 'prediction')
    probabilities = predictions.select('label', 'probability')
    
    # Categorical Features (得到類別性變數feature names)
    binary_metadata = predictions.select("features").schema[0].metadata.\
                              get('ml_attr').get('attrs').get('binary')
    # Numerical Features (得到連續型變數feature names)
    numeric_metadata = predictions.select("features").schema[0].metadata.\
                               get('ml_attr').get('attrs').get('numeric')
    all_metadata = [m['name'] for m in binary_metadata + numeric_metadata]
    
    if cf == 'coefficient':
        coefficients = np.array(model.coefficients).tolist() # For Logistic Regression
    elif cf == 'features':
        coefficients = np.array(model.featureImportances).tolist() # For tree
        
    coef_sumry = spark.createDataFrame([*zip(all_metadata, coefficients)],
                                          ['feature', 'coefficients']).orderBy('coefficients')
    
    accuracy = eval_result('accuracy', pred)
    pred_and_label = predictions.withColumn('label', col('label').cast(FloatType())).select('prediction', 'label')
    metrics = MulticlassMetrics(pred_and_label.rdd.map(tuple))
    #f1 = eval_result('f1', pred)
    #precision = eval_result('weightedPrecision', pred)
    #recall = eval_result('weightedRecall', pred)
    
    print(type(algo).__name__)
    print('-' * 80)
    print('Accuracy: {:5.2f}'.format(accuracy))
    print('F1: {:11.2f}'.format(metrics.fMeasure()))
    print('Precision: {:4.2f}'.format(metrics.precision()))
    print('Recall: {:7.2f}'.format(metrics.recall()))
    
    pred_and_label = predictions.withColumn('label', col('label').cast(FloatType())).select('prediction', 'label')
    conf_matrix = metrics.confusionMatrix().toArray()
            
    preds = predictions.select('label','probability').\
                        rdd.map(lambda row: (float(row['probability'][1]), float(row['label']))).\
                        collect()
    
    y_score, y_true = zip(*preds)
    fpr, tpr, thresholds = roc_curve(y_true, y_score, pos_label = 1)
        
    # plot confusion matrix
    trace1 = go.Heatmap(z=conf_matrix,
                        x=['Not Subscribe', 'Subscribe'],
                        y=['Not Subscribe', 'Subscribe'],
                        showscale=False, colorscale='Picnic',
                        name='Confusion_matrix')
    
    # plot roc curve
    trace2 = go.Scatter(x=fpr,y=tpr,
                        line=dict(color=('rgb(205, 12, 24)'), width=2))
    
    trace3 = go.Scatter(x=[0, 1], y=[0, 1],
                        line=dict(color=('rgb(205, 12, 24)'), width=2,
                                  dash='dot'))
    
    trace4 = go.Bar(y=list(map(lambda x: x['feature'], coef_sumry.select('feature').collect())),
                    x=list(map(lambda x: x['coefficients'], coef_sumry.select('coefficients').collect())),
                    orientation='h',
                    marker=dict(color=list(map(lambda x: x['coefficients'], coef_sumry.select('coefficients').collect())),
                                colorscale='Picnic',
                                line=dict(width=0.6, color='black')))
    
    fig = tls.make_subplots(rows=2, cols=2, specs=[[{}, {}], [{'colspan': 2}, None]],
                            subplot_titles=('Confusion Matrix',
                                            'Receiver Operating Characteristic'))
    fig.append_trace(trace1, 1, 1)
    fig.append_trace(trace2, 1, 2)
    fig.append_trace(trace3, 1, 2)
    fig.append_trace(trace4, 2, 1)
    
    fig['layout'].update(showlegend=False, title='Model performance',
                         autosize=False, height=900, width=1000,
                         plot_bgcolor='rgba(240, 240, 240, 0.95)',
                         paper_bgcolor='rgba(240, 240, 240, 0.95)',
                         margin=dict(b=195))
    fig['layout']['xaxis2'].update(dict(title='false positive rate'))
    fig['layout']['yaxis2'].update(dict(title='true positive rate'))
    fig['layout']['xaxis3'].update(dict(showgrid=True, tickfont=dict(size=10)),
                                   tickangle=0)
    py.iplot(fig)