In [2]:
import io
import sys

from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler, VectorIndexer
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor, GBTRegressor
from pyspark.sql import SparkSession
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Используйте как путь куда сохранить модель
MODEL_PATH = 'spark_ml_model'


def process(spark, train_data, test_data):
    #train_data - путь к файлу с данными для обучения модели
    #test_data - путь к файлу с данными для оценки качества модели
    
    #Загрузим датасеты
    trainSet = pqt_into_spark(spark, train_data)
    testSet = pqt_into_spark(spark, test_data)
    
    # составим feature vectors (не используя ad_id)
    # Переименуем 'ctr' в 'label' - нужно для кросс-валидации
    trainSet = trainSet.withColumnRenamed("ctr", "label")
    testSet = testSet.withColumnRenamed("ctr", "label")
    features = VectorAssembler(inputCols = trainSet.columns[1:-1], outputCol = 'features')
    
    # Создадим индексированный вектор для категориальных фич
    features_indexing = VectorIndexer(inputCol = 'features', outputCol = 'indexedFeatures', maxCategories = 2)
    
    # Создадим этап пайплайна для тренировки моделей
    lr = LinearRegression(featuresCol = 'features', labelCol = 'label')
    (dt, rf, gbt) = [x(featuresCol = 'indexedFeatures', 
                       labelCol = 'label') 
                     for x in [DecisionTreeRegressor, RandomForestRegressor, GBTRegressor]]
    
    # Соберем пайплайны для моделей
    pipelines = {
    "pipelineLR" : Pipeline(stages = [features, lr]),
    "pipelineDT" : Pipeline(stages = [features, features_indexing, dt]),
    "pipelineRF" : Pipeline(stages = [features, features_indexing, rf]),
    "pipelineGBT" : Pipeline(stages = [features, features_indexing, gbt])
    }
    
    # Соберем paramGrids для кросс-валидации
    paramGrids = {
        "paramGridLR" : ParamGridBuilder() \
            .addGrid(lr.maxIter, [10, 20, 40, 80, 150, 300]) \
            .addGrid(lr.regParam, [0.1, 0.2, 0.4, 0.6, 0.8, 0.9]) \
            .addGrid(lr.elasticNetParam, [0.5, 0.6, 0.7, 0.8, 0.9])\
            .build(),
        "paramGridDT" : ParamGridBuilder() \
            .addGrid(dt.maxBins, [24, 28, 32, 36, 40]) \
            .addGrid(dt.maxDepth, [3, 4, 5, 6, 7]) \
            .build(),
        "paramGridRF" : ParamGridBuilder() \
            .addGrid(rf.numTrees, [10, 15, 20, 25, 30]) \
            .addGrid(rf.maxBins, [24, 28, 32, 36, 40]) \
            .addGrid(rf.maxDepth, [3, 4, 5, 6, 7]) \
            .build(),
        "paramGridGBT" : ParamGridBuilder() \
            .addGrid(gbt.maxDepth, [3, 4, 5, 6, 7]) \
            .addGrid(gbt.maxBins, [24, 28, 32, 36, 40]) \
            .addGrid(gbt.stepSize, [0.05, 0.1, 0.2]) \
            .build()
    }
    
    # подготовим разные кросс-валидированные модели -- ОЧЕНЬ долгий этап
    cv = {modeltype : CrossValidator(estimator=pipelines["pipeline" + modeltype],
                          estimatorParamMaps=paramGrids["paramGrid" + modeltype],
                          evaluator=RegressionEvaluator(),
                          numFolds=3).fit(trainSet)
          for modeltype in ["LR", "DT", "RF", "GBT"]}
    
    # подготовим предсказания с использованием этих кросс-валидированных моделей, посчитаем RMSE и выберем лучшую
    cv_predictions = {modeltype : cv[modeltype].transform(testSet) for modeltype in cv}
    evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
    evaluation = {modeltype : evaluator.evaluate(cv_predictions[modeltype]) for modeltype in cv_predictions}

    # Выберем модель с наименьшей RMSE и сохраним в указанную директорию
    bestModel = cv[min(evaluation.items())[0]]
    bestModel.save(MODEL_PATH)
    
def pqt_into_spark(spark, input_file):
    # Эта функция прицельно загружает паркет-файл в спарк
    sparkdata = spark.read.parquet(input_file)
    return sparkdata

def main(argv):
    train_data = argv[0]
    print("Input path to train data: " + train_data)
    test_data = argv[1]
    print("Input path to test data: " + test_data)
    spark = _spark_session()
    process(spark, train_data, test_data)


def _spark_session():
    return SparkSession.builder.appName('PySparkMLFitJob').getOrCreate()


#if __name__ == "__main__":
#    arg = sys.argv[1:]
#    if len(arg) != 2:
#        sys.exit("Train and test data are require.")
#    else:
#        main(arg)


In [3]:
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler, VectorIndexer
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor, GBTRegressor
from pyspark.sql import SparkSession
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit

In [6]:
spark = _spark_session()

In [4]:
def pqt_into_spark(spark, input_file):
    # Эта функция прицельно загружает паркет-файл в спарк
    sparkdata = spark.read.parquet(input_file)
    return sparkdata

In [7]:
train_data = "./train.parquet"
test_data = "./test.parquet"

trainSet = pqt_into_spark(spark, train_data)
testSet = pqt_into_spark(spark, test_data)

In [325]:
trainSet.show()

+-----+---------------------+---------+------+------+----------------+---------+-----------------+
|ad_id|target_audience_count|has_video|is_cpm|is_cpc|         ad_cost|day_count|              ctr|
+-----+---------------------+---------+------+------+----------------+---------+-----------------+
|    1|     10707.2440058622|        1|     1|     0|201.829292651124|       15|0.431740082807281|
|    5|     10643.3872649482|        1|     1|     0|192.577221699704|       15|0.809264519216201|
|    6|     11418.7085911347|        1|     1|     0|204.104562956739|       11|0.909738306804039|
|    7|     10109.3278687796|        1|     1|     0|194.255798599684|       12|0.941221039774456|
|    8|     10665.1119991977|        1|     1|     0|202.658042557742|       14|0.986790019690954|
|    9|     10888.7521785156|        1|     1|     0|197.085338772736|       15|0.995306486518015|
|   11|     9637.20484730933|        1|     1|     0|192.092306095236|       18| 1.02222752080496|
|   12|   

In [8]:
    trainSet = trainSet.withColumnRenamed("ctr", "label")
    testSet = testSet.withColumnRenamed("ctr", "label")
    features = VectorAssembler(inputCols = trainSet.columns[1:-1], outputCol = 'features')
    features_indexing = VectorIndexer(inputCol = 'features', outputCol = 'indexedFeatures', maxCategories = 2)

In [9]:
lr = LinearRegression(featuresCol = 'features', labelCol = 'label')

In [10]:
    (dt, rf, gbt) = [x(featuresCol = 'indexedFeatures', 
                       labelCol = 'label') 
                     for x in [DecisionTreeRegressor, RandomForestRegressor, GBTRegressor]]

In [11]:
    pipelines = {
    "pipelineLR" : Pipeline(stages = [features, lr]),
    "pipelineDT" : Pipeline(stages = [features, features_indexing, dt]),
    "pipelineRF" : Pipeline(stages = [features, features_indexing, rf]),
    "pipelineGBT" : Pipeline(stages = [features, features_indexing, gbt])
    }

In [12]:
    paramGrids = {
        "paramGridLR" : ParamGridBuilder() \
            .addGrid(lr.maxIter, [10, 20, 40, 80, 150, 300]) \
            .addGrid(lr.regParam, [0.1, 0.2, 0.4, 0.6, 0.8, 0.9]) \
            .addGrid(lr.elasticNetParam, [0.5, 0.6, 0.7, 0.8, 0.9])\
            .build(),
        "paramGridDT" : ParamGridBuilder() \
            .addGrid(dt.maxBins, [24, 28, 32, 36, 40]) \
            .addGrid(dt.maxDepth, [3, 4, 5, 6, 7]) \
            .build(),
        "paramGridRF" : ParamGridBuilder() \
            .addGrid(rf.numTrees, [10, 15, 20, 25, 30]) \
            .addGrid(rf.maxBins, [24, 28, 32, 36, 40]) \
            .addGrid(rf.maxDepth, [3, 4, 5, 6, 7]) \
            .build(),
        "paramGridGBT" : ParamGridBuilder() \
            .addGrid(gbt.maxDepth, [3, 4, 5, 6, 7]) \
            .addGrid(gbt.maxBins, [24, 28, 32, 36, 40]) \
            .addGrid(gbt.stepSize, [0.05, 0.1, 0.2]) \
            .build()
    }

In [14]:
    cv = {modeltype : CrossValidator(estimator=pipelines["pipeline" + modeltype],
                          estimatorParamMaps=paramGrids["paramGrid" + modeltype],
                          evaluator=RegressionEvaluator(),
                          numFolds=3, parallelism=5).fit(trainSet)
          for modeltype in ["LR", "DT", "RF", "GBT"]}

KeyboardInterrupt: 

In [337]:
cv

{'LR': CrossValidatorModel_42dc5693f990,
 'DT': CrossValidatorModel_4ee4eafea9b2,
 'RF': CrossValidatorModel_3ccdc5a34a8d,
 'GBT': CrossValidatorModel_abe291a4cd13}

In [346]:
cv_predictions = {modeltype : cv[modeltype].transform(testSet) for modeltype in cv}

In [347]:
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")

In [350]:
evaluation = {modeltype : evaluator.evaluate(cv_predictions[modeltype]) for modeltype in cv_predictions}

In [363]:
bestModel = cv[min(evaluation.items())[0]]

In [367]:
type(bestModel)

pyspark.ml.tuning.CrossValidatorModel

In [5]:
import io
import sys

from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession
from pyspark.ml.tuning import CrossValidatorModel

# Используйте как путь откуда загрузить модель
MODEL_PATH = 'spark_ml_model'


def process(spark, input_file, output_file):
    #input_file - путь к файлу с данными для которых нужно предсказать ctr
    #output_file - путь по которому нужно сохранить файл с результатами [ads_id, prediction]
    
    bestModel = CrossValidatorModel.load(MODEL_PATH)
    predictionData = pqt_into_spark(spark, input_file)
    
    # Применим нашу модель
    predictedValues = bestModel.transform(predictionData).select(['ad_id', 'prediction'])
    
    # Запишем предсказание в файл
    predictedValues.coalesce(1).write.csv(output_file)
    
def pqt_into_spark(spark, input_file):
    # Эта функция прицельно загружает паркет-файл в спарк
    sparkdata = spark.read.parquet(input_file)
    return sparkdata

def main(argv):
    input_path = argv[0]
    print("Input path to file: " + input_path)
    output_file = argv[1]
    print("Output path to file: " + output_file)
    spark = _spark_session()
    process(spark, input_path, output_file)


def _spark_session():
    return SparkSession.builder.appName('PySparkMLPredict').getOrCreate()


#if __name__ == "__main__":
#    arg = sys.argv[1:]
#    if len(arg) != 2:
#        sys.exit("Input and Target path are require.")
#    else:
#        main(arg)



In [1]:
import io
import sys

from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession
from pyspark.ml.tuning import CrossValidatorModel

In [2]:
MODEL_PATH = 'spark_ml_model'
input_file = "./test.parquet"

In [6]:
spark = _spark_session()

In [7]:
bestModel = CrossValidatorModel.load(MODEL_PATH)

In [8]:
def pqt_into_spark(spark, input_file):
    # Эта функция прицельно загружает паркет-файл в спарк
    sparkdata = spark.read.parquet(input_file)
    return sparkdata

In [9]:
predictionData = pqt_into_spark(spark, input_file)

In [10]:
 predictedValues = bestModel.transform(predictionData).select(['ad_id', 'prediction'])

In [13]:
predictedValues.tail(20)

[Row(ad_id=199909, prediction=7.137802258670191),
 Row(ad_id=199915, prediction=7.137802258670191),
 Row(ad_id=199919, prediction=7.137802258670191),
 Row(ad_id=199921, prediction=7.165715294275506),
 Row(ad_id=199922, prediction=7.165715294275506),
 Row(ad_id=199926, prediction=7.165715294275506),
 Row(ad_id=199927, prediction=7.165715294275506),
 Row(ad_id=199936, prediction=7.071821188068931),
 Row(ad_id=199941, prediction=7.071821188068931),
 Row(ad_id=199943, prediction=7.137802258670191),
 Row(ad_id=199946, prediction=7.137802258670191),
 Row(ad_id=199953, prediction=7.165715294275506),
 Row(ad_id=199957, prediction=7.137802258670191),
 Row(ad_id=199960, prediction=7.165715294275506),
 Row(ad_id=199961, prediction=7.165715294275506),
 Row(ad_id=199962, prediction=7.165715294275506),
 Row(ad_id=199979, prediction=7.137802258670191),
 Row(ad_id=199982, prediction=7.195304985144779),
 Row(ad_id=199987, prediction=7.165715294275506),
 Row(ad_id=199997, prediction=7.165715294275506)]

In [15]:
predictionData.tail(20)

[Row(ad_id=199909, target_audience_count=2726.00483717456, has_video=1, is_cpm=1, is_cpc=0, ad_cost=202.85202451412, day_count=15, ctr=8.27266867036885),
 Row(ad_id=199915, target_audience_count=3249.00937186662, has_video=1, is_cpm=1, is_cpc=0, ad_cost=203.086682185664, day_count=15, ctr=8.29511095173232),
 Row(ad_id=199919, target_audience_count=2168.74700461789, has_video=1, is_cpm=1, is_cpc=0, ad_cost=202.399669590209, day_count=15, ctr=8.30892779651573),
 Row(ad_id=199921, target_audience_count=2232.58001844152, has_video=1, is_cpm=1, is_cpc=0, ad_cost=198.771291912571, day_count=15, ctr=8.3125568979919),
 Row(ad_id=199922, target_audience_count=2905.90745280996, has_video=1, is_cpm=1, is_cpc=0, ad_cost=200.430681059033, day_count=18, ctr=8.31504202384652),
 Row(ad_id=199926, target_audience_count=1504.33904732107, has_video=1, is_cpm=1, is_cpc=0, ad_cost=199.565148016933, day_count=17, ctr=8.33828596983834),
 Row(ad_id=199927, target_audience_count=2434.71225604191, has_video=1, 