## Uwaga
Wersja zgodna z masterem z repo post_extractor. Przed odpaleniem trzeba zrobić checkout

In [36]:
import os
# os.environ['SPARK_HOME'] = 'C:\\Users\\Mateusz\\Downloads\\Spark\\spark-2.2.1-bin-hadoop2.7'
# os.environ['HADOOP_HOME'] = os.environ['PWD']
import findspark
findspark.init('/home/marta/Pobrane/spark-2.2.1-bin-hadoop2.7')

In [37]:
import json

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import ArrayType, IntegerType, DoubleType

from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator



from post_extractor.modules.posts import (
    SentenceTransformer,
    PostTransformer,
    TranslateTransformer,
    SpeechPartsTransformer,
    SentimentTransformer
)
from post_extractor.modules.features_ import (
    FeatureTransformer
)

from post_extractor.modules.universal import (
    ConvertDictToVectorTransformer,
    SelectRecordsTransformer,
    MaxTransformer,
    MeanTransformer,
    MedianTransformer,
    NumberOfOccurrencesTransformer,
)

sconf = SparkConf()              \
    .setMaster('local[*]')       \
    .setAppName('PipelineFlow')

sc = SparkContext.getOrCreate(sconf)
sess = SparkSession(sc)
sqlContext = SQLContext(sc)
    

In [38]:
from pyspark.ml.param import Param
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.ml import Transformer

class TransformerProxy(Transformer):

    def __init__(self):
        super(TransformerProxy, self).__init__()
        self.transformer = Param(self, "transformer", "")

    def set_transformer(self, transformer):
        self._paramMap[self.transformer] = transformer
        return self

    def get_transformer(self):
        return self.getOrDefault(self.transformer)

    def _transform(self, dataset):
        return self.get_transformer().transform(dataset)

In [39]:
from pyspark.ml import Estimator

class EstimatorProxy(Estimator):
    def __init__(self):
        super(EstimatorProxy, self).__init__()
        self.estimator = Param(self, "estimator", "")

    def set_estimator(self, estimator):
        self._paramMap[self.estimator] = estimator
        return self

    def get_estimator(self):
        return self.getOrDefault(self.estimator)

    def _fit(self, dataset):
        return self.get_estimator().fit(dataset)

In [40]:
def create_estimators_grid(estimator, param_grid):
    result = []
    for param_map in param_grid:
        est_copy = estimator.copy()
        est_copy.setParams(**param_map)
        result.append(est_copy)
    return result

In [41]:
def load_data(spark_ctx, root):
    posts_rdd = spark_ctx.wholeTextFiles(root + 'posts')
    posts_rdd = posts_rdd.map(lambda x: (x[0].split('/')[-1].rstrip('.json'), json.loads(x[1])))
    posts_df = posts_rdd.toDF(['key', 'content_post'])

    features_rdd = spark_ctx.wholeTextFiles(root + 'features')
    features_rdd = features_rdd.map(lambda x: (x[0].split('/')[-1].rstrip('.features'), x[1]))
    features_df = features_rdd.toDF(['key', 'content_features'])
    
    features_df.show()
    return posts_df.join(features_df, 'key')

In [43]:
load_data(sc, 'data/').show()

+--------------------+--------------------+
|                 key|    content_features|
+--------------------+--------------------+
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1

In [44]:
features_choices = [["leaf", "has-attribute-class",], ["contains-adjectives", "contains-date"]]

featurer = FeatureTransformer();
featurer.setInputCol('content_features').setOutputCol('features')

feature_selector = SelectRecordsTransformer(keys=features_choices[0], element_type=ArrayType(DoubleType()))
feature_selector.setInputCol(featurer.getOutputCol()).setOutputCol('selected_features')

aggregated_features = 'aggregated_features'

max_feature_transformer = MaxTransformer()
max_feature_transformer.setInputCol(feature_selector.getOutputCol()).setOutputCol(aggregated_features)

mean_feature_transformer = MeanTransformer()
mean_feature_transformer.setInputCol(feature_selector.getOutputCol()).setOutputCol(aggregated_features)

median_feature_transformer = MedianTransformer()
median_feature_transformer.setInputCol(feature_selector.getOutputCol()).setOutputCol(aggregated_features)

number_of_occurences_feature_transformer = NumberOfOccurrencesTransformer()
number_of_occurences_feature_transformer.setInputCol(feature_selector.getOutputCol()).setOutputCol(aggregated_features)

feature_aggregation_proxy = TransformerProxy()
feature_aggregation_transformers = [
    max_feature_transformer,
    mean_feature_transformer,
    median_feature_transformer,
    number_of_occurences_feature_transformer,
]

features_dict_to_list_converter = ConvertDictToVectorTransformer(keys=features_choices[0])
features_dict_to_list_converter.setInputCol(aggregated_features).setOutputCol('features_from_file')

features_stages = [
    featurer,
    feature_selector,
    feature_aggregation_proxy,
    features_dict_to_list_converter
]


In [45]:
poster = PostTransformer()
poster.setInputCol('content_post').setOutputCol('posts')

translator = TranslateTransformer()
translator.setInputCol('posts').setOutputCol('translated')

sentencer = SentenceTransformer()
sentencer.setInputCol('translated').setOutputCol('sentences')

speech_parter = SpeechPartsTransformer()
speech_parter.setInputCol('translated').setOutputCol('speech_parts')

sentimenter = SentimentTransformer()
sentimenter.setInputCol('translated').setOutputCol('sentiments')

tags = [
    'NN',
    'NNS',
    'NNPS'
]

aggregated_nouns_col = 'aggregated_nouns'
nouns_col = 'nouns'

speech_parts_selector = SelectRecordsTransformer(keys=tags, element_type=ArrayType(IntegerType()))
speech_parts_selector.setInputCol(speech_parter.getOutputCol()).setOutputCol('nouns')

max_nouns_transformer = MaxTransformer()
max_nouns_transformer.setInputCol(speech_parts_selector.getOutputCol()).setOutputCol(aggregated_nouns_col)

mean_nouns_transformer = MeanTransformer()
mean_nouns_transformer.setInputCol(speech_parts_selector.getOutputCol()).setOutputCol(aggregated_nouns_col)

median_nouns_transformer = MedianTransformer()
median_nouns_transformer.setInputCol(speech_parts_selector.getOutputCol()).setOutputCol(aggregated_nouns_col)

post_aggregation_proxy = TransformerProxy()
post_aggregation_transformers = [max_nouns_transformer, mean_nouns_transformer, median_nouns_transformer]

posts_dict_to_list_converter = ConvertDictToVectorTransformer(keys=tags)
posts_dict_to_list_converter.setInputCol(aggregated_nouns_col).setOutputCol('post_features')

post_stages = [
    poster,
    translator, 
    sentencer, 
    speech_parter,
    sentimenter,
    speech_parts_selector,
    post_aggregation_proxy,
    posts_dict_to_list_converter
]

In [46]:
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors, VectorUDT
class DenseVectorTransformer(Transformer, HasInputCol, HasOutputCol):
    def __init__(self):
        super(DenseVectorTransformer, self).__init__()
    def _transform(self, dataset):
        toDenseVector = udf(lambda arr: Vectors.dense(arr), VectorUDT())
        return dataset.withColumn(self.getOutputCol(), toDenseVector(self.getInputCol()))
    
features_dv = DenseVectorTransformer().setInputCol(features_dict_to_list_converter.getOutputCol()).setOutputCol('features_dv')
posts_dv = DenseVectorTransformer().setInputCol(posts_dict_to_list_converter.getOutputCol()).setOutputCol('posts_dv')
hotfix_stages = [features_dv, posts_dv]

In [47]:
all_features = [
    features_dict_to_list_converter.getOutputCol(),
    posts_dict_to_list_converter.getOutputCol()
]

vector_assembler = VectorAssembler(inputCols=all_features, outputCol='feature_vector')

#classifier = DecisionTreeClassifier(featuresCol=vector_assembler.getOutputCol())

#classification_stages = [vector_assembler, classifier]

In [48]:
pipeline_join_stages = [vector_assembler]

In [49]:
label_col = 'label'

decision_tree_classifier = DecisionTreeClassifier(featuresCol=vector_assembler.getOutputCol(), labelCol=label_col)
dtParamGrid = ParamGridBuilder() \
  .baseOn({"labelCol": label_col}) \
  .addGrid("maxDepth", [2, 3]) \
  .addGrid("maxBins", [5, 10]) \
  .build()
  
estimators_grid = create_estimators_grid(decision_tree_classifier, dtParamGrid)

In [50]:
from pyspark.ml.classification import LogisticRegression
lrc = LogisticRegression(featuresCol=vector_assembler.getOutputCol(), labelCol=label_col)
lrParamGrid = ParamGridBuilder() \
  .baseOn({"labelCol": label_col}) \
  .addGrid("maxIter", [10, 50]) \
  .addGrid("regParam", [0, 0.1]) \
  .build()
  
estimators_grid += create_estimators_grid(lrc, lrParamGrid)

In [51]:
est_proxy = EstimatorProxy()

In [52]:
pipeline = Pipeline(stages = features_stages + post_stages + pipeline_join_stages + [est_proxy])

param_grid = ParamGridBuilder() \
    .addGrid(feature_aggregation_proxy.transformer, feature_aggregation_transformers) \
    .addGrid(post_aggregation_proxy.transformer, post_aggregation_transformers) \
    .addGrid(est_proxy.estimator, estimators_grid) \
    .build()

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol='label')

In [53]:
cross_validator = CrossValidator(
    estimator=pipeline, 
    estimatorParamMaps=param_grid, 
    evaluator=evaluator)
# add numFolds = 3 if CV crashes

In [54]:
from pyspark.sql.functions import rand, floor
input_data = load_data(sc, 'data/')
labeled_data = input_data.withColumn('label', floor(rand() * 3).cast(DoubleType()))

+--------------------+--------------------+
|                 key|    content_features|
+--------------------+--------------------+
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1,4,1,1,...|
|kascysko.blogspot...|{"path":[1

In [None]:
cv_result = cross_validator.fit(labeled_data)

In [29]:
input_data.show()

+---+------------+----------------+
|key|content_post|content_features|
+---+------------+----------------+
+---+------------+----------------+



In [20]:
prediction = cv_result.transform(data)

In [21]:
selected = prediction.select("file", "speech_parts", "probability", "prediction")
for row in selected.collect():
    print(row)

In [22]:
estimator = cv_result.bestModel.stages[-1]
print("Chosen estimator: ", type(estimator).__name__)
print(estimator._java_obj.extractParamMap().toString().split(r"\n\t")[0])