In [12]:
import zipfile
import os

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassificationModel
from pyspark.ml.classification import GBTClassifier
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, explode, collect_list, avg, stddev, max, array, udf
from pyspark.sql.types import ArrayType, FloatType

In [13]:
spark = SparkSession.builder.appName("Time Series Binary Classification").getOrCreate()

# Загрузка тестовых данных

In [14]:
test_df = spark.read.parquet("/content/test.parquet")
test_df = test_df.fillna(0)

In [15]:
def array_mean(values):
    filtered_values = [v for v in values if v is not None]
    if len(filtered_values) == 0:
        return None
    return sum(filtered_values) / len(filtered_values)

def median(values):
    values = [v for v in values if v is not None]
    sorted_values = sorted(values)
    n = len(sorted_values)
    mid = n // 2
    return (sorted_values[mid] if n % 2 != 0 else (sorted_values[mid - 1] + sorted_values[mid]) / 2) if values else None

def moving_average(values, window_size=5):
    values = [v for v in values if v is not None]
    if len(values) < window_size:
        return None
    return sum(values[-window_size:]) / window_size

def first_diff(values):
    filtered_values = [v for v in values if v is not None]
    # Вычисляем разности
    return [filtered_values[i] - filtered_values[i - 1] for i in range(1, len(filtered_values))] if len(filtered_values) > 1 else []

def mean_diff(differences):
    return sum(differences) / len(differences) if len(differences) > 0 else None

def stddev_diff(differences):
    if len(differences) == 0:
        return None
    mean = sum(differences) / len(differences)
    return (sum((x - mean) ** 2 for x in differences) / len(differences)) ** 0.5

array_mean = F.udf(array_mean, FloatType())
median_udf = F.udf(median, FloatType())
moving_average_udf = F.udf(moving_average, FloatType())
first_diff = F.udf(first_diff, ArrayType(FloatType()))
mean_diff = F.udf(mean_diff, FloatType())
stddev_diff = F.udf(stddev_diff, FloatType())

In [16]:
stats_test_df = test_df.withColumn("mean_value", array_mean("values")) \
       .withColumn("stddev_value", F.expr("sqrt(aggregate(values, 0D, (acc, x) -> acc + pow(x - mean_value, 2)) / size(values))")) \
       .withColumn("min_value", F.array_min("values")) \
       .withColumn("max_value", F.array_max("values")) \
       .withColumn("median_value", median_udf("values")) \
       .withColumn("lag1", F.expr("values[size(values) - 1]")) \
       .withColumn("lag2", F.expr("values[size(values) - 2]")) \
       .withColumn("moving_average", moving_average_udf("values")) \
       .withColumn("count_dates", F.size("dates")) \
       .withColumn("date_diff",
                   F.datediff(
                       F.to_date(F.array_max("dates"), "yyyy-MM-dd"),
                       F.to_date(F.array_min("dates"), "yyyy-MM-dd")
                   )) \
       .withColumn("first_diff", first_diff("values")) \
       .withColumn("mean_diff", mean_diff("first_diff")) \
       .withColumn("stddev_diff", stddev_diff("first_diff"))

In [17]:
stats_test_df = stats_test_df.fillna(0)
vector_assembler = VectorAssembler(inputCols=["mean_value",
                                              "stddev_value",
                                              "min_value",
                                              "max_value",
                                              "median_value",
                                              "lag1",
                                              "lag2",
                                              "moving_average",
                                              "count_dates",
                                              "date_diff",
                                              "mean_diff",
                                              "stddev_diff"
                                              ], outputCol="features")

stats_test_df = vector_assembler.transform(stats_test_df)

# Загрузка модели

In [18]:
os.makedirs("/content", exist_ok=True)

with zipfile.ZipFile("/content/model.zip", 'r') as zip_ref:
    zip_ref.extractall("/content/model")

In [19]:
model = GBTClassificationModel.load("/content/model")
predictions = model.transform(stats_test_df)

# Сохранение результатов

In [20]:
def vector_to_array(v):
    return v.toArray().tolist() if v is not None else None

vector_to_array_udf = udf(vector_to_array, ArrayType(FloatType()))

predictions = predictions.withColumn("probability_array", vector_to_array_udf(col("probability")))
predictions = predictions.withColumn("score", col("probability_array").getItem(1))
answer_df = predictions.select("id", "score")
answer_df.show()

+-----+-----------+
|   id|      score|
+-----+-----------+
| 6125|  0.1321214|
|26781| 0.08898953|
|13333|  0.4680012|
|53218|0.026135966|
|84204| 0.33392382|
|69997|  0.7856886|
|99301|  0.9422647|
| 4361| 0.90713954|
|46607| 0.16508186|
|29836|  0.5854949|
|59154|0.035277657|
|80632|  0.8092701|
|33723| 0.06686582|
|  663| 0.04800655|
|  764|0.101208024|
|11059| 0.87399036|
| 5358|0.023375642|
|92154|  0.7972783|
|73744|  0.5476228|
|62536| 0.48465627|
+-----+-----------+
only showing top 20 rows



In [21]:
answer_df.toPandas().to_csv('/content/submission.csv')