* построить распределение статей в датасете по `rating` с `bin_size = 10`
* написать функцию `ratingToClass(rating: Int): String`, которая определяет категорию статьи (A, B, C, D) на основе рейтинга. Границы для классов подобрать самостоятельно.
* добавить к датасету категориальную фичу `rating_class`. При добавлении колонки использовать `udf` из функции в предыдущем пункте
* построить модель логистической регрессии `one vs all` для классификации статей по рассчитанным классам.
* получить `F1 score` для получившейся модели

In [2]:
%pyspark

from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

habrData = spark.read.option("header", True)\
.option("inferSchema", True)\
.csv("/user/admin/habr_data.csv")\
.withColumn("rating", col("rating").cast(IntegerType()))\
.cache()

habrData.printSchema()

In [3]:
%pyspark

z.show(habrData)

In [4]:
%pyspark

z.show(
    habrData.orderBy(col("rating")).groupBy("rating").count().select("rating", "count")
)

In [5]:
%pyspark

from pyspark.sql.functions import round

z.show(
    habrData\
        .withColumn("rating_10", round(col("rating") / 10) * 10)\
        .orderBy("rating_10")\
        .groupBy("rating_10")\
        .count()\
        .select("rating_10", "count")\
        .where(col("rating_10").isNotNull())
)


In [6]:
%pyspark
from pyspark.sql.functions import udf, col

def ratingToClass(rating):
    article_class = '1'
    if rating > 40: 
        article_class = '4'
    elif rating > 20:
        article_class = '3'
    elif rating >=10:
        article_class = '2'
    elif rating >= 0:
        article_class = '1'
        
    return article_class
    
rating_class = udf(ratingToClass)

habrDataWithClass = habrData.withColumn("class", rating_class(col("rating")))

habrDataWithClass.select("title", "rating", "class").show(20, False)

In [7]:
%pyspark

z.show(
    habrDataWithClass \
        .groupBy("class").count() \
        .select("class", "count")
        .orderBy("class")
)

In [8]:
%pyspark
trainDF, testDF = habrDataWithClass.randomSplit([.8, .2], seed=42)

trainDF.coalesce(2).write.mode("overwrite").saveAsTable("habr.train")
testDF.coalesce(2).write.mode("overwrite").saveAsTable("habr.test")

print("В обучающей выборке " + str(trainDF.count()) + " строк, в тестовой " + str(testDF.count()) + " строк")

In [9]:
%pyspark
from pyspark.ml.feature import RegexTokenizer, HashingTF, IDF
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

train = spark.table("habr.train").selectExpr("description", "cast(class as Long) class").na.drop("any")
test = spark.table("habr.test").selectExpr("description", "cast(class as Long) class").na.drop("any")

regexTokenizer = RegexTokenizer(inputCol="description", outputCol="description_words", pattern="[^a-zа-яё]", gaps=True).setMinTokenLength(3)
regexTokenized = regexTokenizer.transform(train)

hashingTF = HashingTF(inputCol="description_words", outputCol="rawFeatures", numFeatures=200000)
featurizedData = hashingTF.transform(regexTokenized)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

lr = LogisticRegression(labelCol="class", featuresCol="features")
ovr = OneVsRest(classifier=lr, labelCol="class", featuresCol="features")

pipeline = Pipeline(stages=[regexTokenizer, hashingTF, idf, ovr])
model = pipeline.fit(train)


In [10]:
%pyspark
predictions  = model.transform(test)
result.select("prediction", "class").show(20, True)

In [11]:
%pyspark
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

predictions  = model.transform(test)

evaluator = MulticlassClassificationEvaluator(metricName="f1", labelCol="class")
f1Score = evaluator.evaluate(predictions)
print("F1 Score: ")
print(f1Score)