In [2]:
import sys
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.tuning import TrainValidationSplitModel

from pyspark.ml.feature import HashingTF, Tokenizer

from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

import warnings
warnings.filterwarnings("ignore")

In [3]:
spark = SparkSession.builder.appName('PySparkTasks').getOrCreate()

21/12/12 23:10:26 WARN Utils: Your hostname, andrei-TH67 resolves to a loopback address: 127.0.1.1; using 192.168.0.102 instead (on interface enp2s0)
21/12/12 23:10:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/12/12 23:10:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [13]:
input_train = '/home/andrei/data_engineer/Big_Data_Stepik/PySpark/spark_pipeline/train.parquet'
input_test='/home/andrei/data_engineer/Big_Data_Stepik/PySpark/spark_pipeline/test.parquet'

In [14]:
def process(input_train, input_test):
    """
    Функция принимает на вход путь к фалам train и test. Обучает три модели
    DecisionTreeRegressor, RandomForestRegression и GBTRegressor.
    Выбирает модель с наименьшим RMSE и сохраняет ее в папку 'spark_ml_model'
    """
    
    df_train = spark.read.parquet(input_train)
    df_test = spark.read.parquet(input_test)
    
    feature_train = VectorAssembler(inputCols=df_train.columns[:-1], outputCol='features')
    feature_vector_tr = feature_train.transform(df_train)
    
    feature_test = VectorAssembler(inputCols=df_test.columns[:-1], outputCol='features')
    feature_vector_tt = feature_test.transform(df_test)
    
    #DecisionTree
    dtr = DecisionTreeRegressor(labelCol='ctr', featuresCol='features', predictionCol='prediction')
    
    paramGrid_dtr = ParamGridBuilder()\
    .addGrid(dtr.maxDepth, [2,3,4,5])\
    .build()
    
    evaluator_dtr = RegressionEvaluator(metricName='rmse',
                                labelCol='ctr', 
                                predictionCol='prediction')
    
    tvs_dtr = TrainValidationSplit(estimator=dtr,
                           estimatorParamMaps=paramGrid_dtr,
                           evaluator= evaluator_dtr,
                           trainRatio=0.8)
    
    model_dtr = tvs_dtr.fit(feature_vector_tr)
    predictions_dtr = model_dtr.transform(feature_vector_tt)
    rmse_dtr = evaluator_dtr.evaluate(predictions_dtr)
    
    #RandomForest
    rfr = RandomForestRegressor(labelCol='ctr', featuresCol='features', predictionCol='prediction')
    
    paramGrid_rfr = ParamGridBuilder()\
    .addGrid(rfr.maxDepth, [2,3,4,5])\
    .addGrid(rfr.numTrees, [3, 6, 9, 12, 15, 18, 21])\
    .build()
    
    evaluator_rfr = RegressionEvaluator(metricName='rmse',
                                labelCol='ctr', 
                                predictionCol='prediction')
    
    tvs_rfr = TrainValidationSplit(estimator=rfr,
                           estimatorParamMaps=paramGrid_rfr,
                           evaluator= evaluator_rfr,
                           trainRatio=0.8)
    
    model_rfr = tvs_rfr.fit(feature_vector_tr)
    predictions_rfr = model_rfr.transform(feature_vector_tt)
    rmse_rfr = evaluator_rfr.evaluate(predictions_rfr)
    
    #GBTRegressor
    gbtr = GBTRegressor(labelCol='ctr', featuresCol='features', predictionCol='prediction')
    
    paramGrid_gbtr = ParamGridBuilder()\
    .addGrid(gbtr.maxDepth, [2,3,4,5])\
    .build()
    
    evaluator_gbtr = RegressionEvaluator(metricName='rmse',
                                labelCol='ctr', 
                                predictionCol='prediction')
    
    tvs_gbtr = TrainValidationSplit(estimator=gbtr,
                           estimatorParamMaps=paramGrid_gbtr,
                           evaluator= evaluator_gbtr,
                           trainRatio=0.8)
    
    model_gbtr = tvs_gbtr.fit(feature_vector_tr)
    predictions_gbtr = model_gbtr.transform(feature_vector_tt)
    rmse_gbtr = evaluator_gbtr.evaluate(predictions_gbtr)
    
    
    
    if (rmse_dtr<rmse_rfr) and (rmse_dtr<rmse_gbtr):
        rmse = rmse_dtr
        model_name = 'DecisionTreeRegressor'
        model = model_dtr
    elif (rmse_rfr<rmse_dtr) and (rmse_rfr<rmse_gbtr):
        rmse = rmse_rfr
        model_name = 'RandomForestRegressor'
        model = model_rfr
    elif (rmse_gbtr<rmse_dtr) and (rmse_gbtr<rmse_rfr):
        rmse = rmse_gbtr
        model_name = 'GBTRegressor'
        model = model_gbtr
    
    model.save('spark_ml_model')
    
    
    return print('RMSE = {}'.format(rmse))
        

In [15]:
process(input_train, input_test)

                                                                                

RMSE = 0.0789683491183892


In [53]:
def process_1(input_path, result_path):
    '''
    Функция принимает на вход путь до test файла и путь к файлу, в коорый записывает результат.
    Она загружает модель из 'spark_ml_model' и при попмощи нее делает предсказания по данным test.
    '''
    df = spark.read.parquet(input_path)
    feature = VectorAssembler(inputCols=df.columns[:-1], outputCol='features')
    feature_vector = feature.transform(df)
    
    model = TrainValidationSplitModel.load('spark_ml_model')
    
    predictions = model.transform(feature_vector)
    predictions = predictions.select('ad_id','prediction')
    predictions.coalesce(1).write.csv(result_path)

### Перепишем через pipeline

In [16]:
def process(train_data, test_data):
    '''
    Функция принимает на вход путь до файлов train и test. Обучает три модели, рассчитвает RMSE,
    выбирает модель с наименьшей метрикой и сохраняет выбранную модель в файл.
    '''
    
    df_train = spark.read.parquet(input_train)
    df_test = spark.read.parquet(input_test)
    
    stage_1 = VectorAssembler(inputCols=df_train.columns[1:-1], outputCol='features')#исключам ad_id
    
    #DecisionTreeRegression
    dtr = DecisionTreeRegressor(labelCol='ctr', featuresCol='features', predictionCol='prediction')   
    paramGrid_dtr = ParamGridBuilder()\
    .addGrid(dtr.maxDepth, [2,3,4,5])\
    .build()
    evaluator_dtr = RegressionEvaluator(metricName='rmse',
                                labelCol='ctr', 
                                predictionCol='prediction')
    tvs_dtr = TrainValidationSplit(estimator=dtr,
                           estimatorParamMaps=paramGrid_dtr,
                           evaluator= evaluator_dtr,
                           trainRatio=0.8)
    stage_2_dtr = TrainValidationSplit(estimator=dtr,
                           estimatorParamMaps=paramGrid_dtr,
                           evaluator= evaluator_dtr,
                           trainRatio=0.8)
    pipeline_dtr = Pipeline(stages=[stage_1, stage_2_dtr])
    model_dtr = pipeline_dtr.fit(df_train)
    predictions_dtr = model_dtr.transform(df_test)
    rmse_dtr = evaluator_dtr.evaluate(predictions_dtr)
    
    #RandomForestRegressor
    rfr = RandomForestRegressor(labelCol='ctr', featuresCol='features', predictionCol='prediction')   
    paramGrid_rfr = ParamGridBuilder()\
    .addGrid(rfr.maxDepth, [2,3,4,5])\
    .build()
    evaluator_rfr = RegressionEvaluator(metricName='rmse',
                                labelCol='ctr', 
                                predictionCol='prediction')
    tvs_rfr = TrainValidationSplit(estimator=rfr,
                           estimatorParamMaps=paramGrid_rfr,
                           evaluator= evaluator_rfr,
                           trainRatio=0.8)
    stage_2_rfr = TrainValidationSplit(estimator=rfr,
                           estimatorParamMaps=paramGrid_rfr,
                           evaluator= evaluator_rfr,
                           trainRatio=0.8)
    pipeline_rfr = Pipeline(stages=[stage_1, stage_2_rfr])
    model_rfr = pipeline_rfr.fit(df_train)
    predictions_rfr = model_rfr.transform(df_test)
    rmse_rfr = evaluator_rfr.evaluate(predictions_rfr)
    
    #GBTRegressor
    gbtr = GBTRegressor(labelCol='ctr', featuresCol='features', predictionCol='prediction')   
    paramGrid_gbtr = ParamGridBuilder()\
    .addGrid(gbtr.maxDepth, [2,3,4,5])\
    .build()
    evaluator_gbtr = RegressionEvaluator(metricName='rmse',
                                labelCol='ctr', 
                                predictionCol='prediction')
    tvs_gbtr = TrainValidationSplit(estimator=gbtr,
                           estimatorParamMaps=paramGrid_gbtr,
                           evaluator= evaluator_gbtr,
                           trainRatio=0.8)
    stage_2_gbtr = TrainValidationSplit(estimator=gbtr,
                           estimatorParamMaps=paramGrid_gbtr,
                           evaluator= evaluator_gbtr,
                           trainRatio=0.8)
    pipeline_gbtr = Pipeline(stages=[stage_1, stage_2_gbtr])
    model_gbtr = pipeline_gbtr.fit(df_train)
    predictions_gbtr = model_gbtr.transform(df_test)
    rmse_gbtr = evaluator_gbtr.evaluate(predictions_gbtr)
    
    if (rmse_dtr<rmse_rfr) and (rmse_dtr<rmse_gbtr):
        rmse = rmse_dtr
        #model_name = 'DecisionTreeRegressor'
        model = model_dtr
    elif (rmse_rfr<rmse_dtr) and (rmse_rfr<rmse_gbtr):
        rmse = rmse_rfr
        #model_name = 'RandomForestRegressor'
        model = model_rfr
    elif (rmse_gbtr<rmse_dtr) and (rmse_gbtr<rmse_rfr):
        rmse = rmse_gbtr
        #model_name = 'GBTRegressor'
        model = model_gbtr
    
    model.save('spark_ml_model_1')
    
    
    return print('RMSE = {}'.format(rmse))

In [17]:
input_train = '/home/andrei/data_engineer/Big_Data_Stepik/PySpark/spark_pipeline/train.parquet'
input_test='/home/andrei/data_engineer/Big_Data_Stepik/PySpark/spark_pipeline/test.parquet'

In [18]:
process(input_train, input_test)

                                                                                

RMSE = 0.25641402586252304


In [22]:
def process_1(input_path, result_path):
    '''
    Функция принимает на вход путь до test файла и путь к файлу, в коорый записывает результат.
    Она загружает модель из 'spark_ml_model' и при попмощи нее делает предсказания по данным test.
    '''
    df = spark.read.parquet(input_path)
       
    model = PipelineModel.load('spark_ml_model_1')
    
    predictions = model.transform(df)
    predictions = predictions.select('ad_id','prediction')
    predictions.coalesce(1).write.csv(result_path)

In [23]:
input_path='/home/andrei/data_engineer/Big_Data_Stepik/PySpark/spark_pipeline/test.parquet'
result_path='/home/andrei/data_engineer/Big_Data_Stepik/PySpark/spark_pipeline/result_p'

In [24]:
process_1(input_path, result_path)

                                                                                