# spark sharing


In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import LogisticRegressionModel
from pyspark.ml.classification import LogisticRegressionSummary
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorSlicer
from pyspark.sql.functions import regexp_replace
# from pyspark.sql.functions import when
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.ml.linalg import Vector

from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer

import numpy as np
import pandas as pd

## 1、basis

### 1.1 SparkSession是读取数据、处理元数据、配置会话和管理集群资源的入口

In [None]:
# main entry of programming Spark with DataFrame. 'enableHiveSupport()' enables spark to utilize SQL or HQL
# A SparkSession can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files.

spark = SparkSession.builder.master("local").appName("spark_sharing").config("", "").enableHiveSupport().getOrCreate()


In [None]:
spark

### 1.2 读取数据文件

In [None]:
# 虽然是txt格式，但是按照csv来读取可以设置seperator
read_data = spark.read.csv('C:\spark\data\my_data\iris_data.txt', sep=',')
read_data.head(5)

In [None]:
## 读写hive
# df = spark.sql('select * from xxx')

### 1.3 拆分数据集

In [None]:
train_data, test_data = read_data.randomSplit([0.7, 0.3], 1)

In [None]:
train_data

In [None]:
test_data

### 1.4 操作dataframe

#### A distributed collection of data grouped into named columns. A DataFrame is equivalent to a relational table in Spark SQL

In [None]:
# show the structure of dataframe - [train_data]
train_data.show(50)

In [None]:
train_data

In [None]:
# get all columns' names
train_data.columns

In [None]:
# get the num of rows
train_data.count()

In [None]:
# filter
train_data.filter(train_data._c3=='0.1').show()

In [None]:
# contains
train_data.filter(train_data._c4.contains('virginica')).show(5)

In [None]:
# The available aggregate functions are avg, max, min, sum, count.
train_data.agg({'_c3': 'avg'}).show()

In [None]:
# Creates or replaces a local temporary view
train_data.createOrReplaceTempView('train_table')

In [None]:
# query like sql
train_1 = spark.sql("select * from train_table where _c4=='Iris-setosa'")
train_1.show()

In [None]:
# add some quartiles statistics infomation
train_data.summary().show()

In [None]:
# summary for specific column and statistics
train_data.select('_c0').summary('mean').show()

In [None]:
train_data.select('_c0')

In [None]:
train_data.select('_c0').show(10)

In [None]:
train_data.select('_c0').collect()

In [None]:
train_data.select('_c0').head(5)

In [None]:
type(train_data.select('_c0').head(5))

In [None]:
train_data.select('_c0').take(5)

In [None]:
train_data.select('_c0').take(5)[0]

In [None]:
type(train_data.select('_c0').take(5)[0])

In [None]:
train_data.select('_c0').take(5)[0][0]

In [None]:
type(train_data.select('_c0').take(5)[0][0])

### 1.5 操作column

In [None]:
# pick a column in a dataframe
train_data._c0

In [None]:
train_data['_c0']

In [None]:
# show the Error - 'column' object is not callable
train_data['_c0'].collect()

In [None]:
# do calculations on dataframe through column
operate_data = train_data.select(['_c0', '_c1'])
operate_data
operate_data.show(5)

In [None]:
# sum
add_col = operate_data._c0 + operate_data['_c1']
add_col

In [None]:
add_operate_data = operate_data.withColumn('sum', add_col)

In [None]:
add_operate_data.show(5)

## 1、LR

In [None]:
train_data.show(10)

In [None]:
stringIndexer = StringIndexer(inputCol='_c4', outputCol='indexedLabel')
stringIndexer_model = stringIndexer.fit(train_data)
train_data_2 = stringIndexer_model.transform(train_data)
train_data_2.show(20)

In [None]:
# check the maps relation
sorted( set([(i[0], i[1]) for i in train_data_2.select(['_c4', 'indexedLabel']).collect()]), key=lambda x:x[0] )

In [None]:
# transform string to double
train_data = train_data.withColumn('_c0', train_data['_c0'].cast('double'))\
       .withColumn('_c1', train_data['_c1'].cast('double'))\
       .withColumn('_c2', train_data['_c2'].cast('double'))\
       .withColumn('_c3', train_data['_c3'].cast('double'))

In [None]:
##### pipeline #####

# assemble features to vector and indicate the label
input_col = ['_c0', '_c1', '_c2', '_c3']
vecAssembler = VectorAssembler(inputCols=input_col, outputCol="features")
# new_train_data = vecAssembler.transform(train_data)
stringIndexer = StringIndexer(inputCol="_c4", outputCol="label")
pipeline = Pipeline(stages=[vecAssembler, stringIndexer])
pipelineFit = pipeline.fit(train_data)
new_train_data = pipelineFit.transform(train_data)

In [None]:
new_train_data.show(20)

In [None]:
new_train_data.select('features')

In [None]:
new_train_data.select('features').collect()[0][0].toArray()

In [None]:
# transform test_data using pipeline
test_data = test_data.withColumn('_c0', test_data['_c0'].cast('double'))\
       .withColumn('_c1', test_data['_c1'].cast('double'))\
       .withColumn('_c2', test_data['_c2'].cast('double'))\
       .withColumn('_c3', test_data['_c3'].cast('double'))

new_test_data = pipelineFit.transform(test_data)

In [None]:
new_test_data.show(5)

#### train model

In [None]:
"""
class pyspark.ml.classification.LogisticRegression(featuresCol='features', labelCol='label', predictionCol='prediction', 
maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-06, fitIntercept=True, threshold=0.5, thresholds=None,probabilityCol='probability', 
rawPredictionCol='rawPrediction', standardization=True, weightCol=None, aggregationDepth=2, family='auto', 
lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None)
"""

In [None]:
# default -> pick 'features' and 'label' columns and input them into the LR model 
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0.7)
lr_model = lr.fit(new_train_data)
lr_model

In [None]:
model_summary = lr_model.summary

In [None]:
# display the objective per iteration
objectiveHistory = model_summary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

In [None]:
print("True positive rate by label:")
for i, rate in enumerate(model_summary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

#### predict

In [None]:
prediction = lr_model.transform(new_test_data)

In [None]:
prediction.show(10)

In [None]:
# rawPrediction may vary between algorithms, but it intuitively gives a measure of confidence in each possible label (where larger = more confident).
prediction.select('rawPrediction').collect()[0][0]

In [None]:
prediction.select('probability').collect()[0][0]

In [None]:
# metricName = f1|weightedPrecision|weightedRecall|accuracy  
# f1-score
evaluator_f1 = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol='label', metricName='f1')

In [None]:
print('f1-score: {}'.format(evaluator_f1.evaluate(prediction)))

In [None]:
# accuracy
evaluator_acc = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol='label', metricName='accuracy')

In [None]:
print('accuracy: {}'.format(evaluator_acc.evaluate(prediction)))

#### grid search + cross validation

In [None]:
# lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0.7)  .baseOn({lr_new.labelCol: 'label'}).baseOn([lr_new.predictionCol, 'prediction'])
lr_new = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr_new.regParam, [0.3, 0.5]).addGrid(lr_new.maxIter, [10, 15, 50, 100]).addGrid(lr_new.elasticNetParam, [0.7]).build()

In [None]:
evaluator_new = MulticlassClassificationEvaluator()

In [None]:
cv = CrossValidator(estimator=lr_new, estimatorParamMaps=grid, evaluator=evaluator_new)

In [None]:
cvModel = cv.fit(new_train_data).bestModel

In [None]:
cvModel.getOrDefault('regParam')

In [None]:
cvModel.getOrDefault('maxIter')

In [None]:
cvModel.getOrDefault('elasticNetParam')

In [None]:
new_prediction = cvModel.transform(new_test_data)

In [None]:
new_prediction.show()

In [None]:
print('f1-score: {}'.format(evaluator_f1.evaluate(new_prediction)))

In [None]:
print('accuracy: {}'.format(evaluator_acc.evaluate(new_prediction)))