### Step 1: Initialize Spark Session

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, concat, concat_ws

# Инициализация Spark Session
spark = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .appName("Personalized_Tariff_Creation") \
    .getOrCreate()

spark

### Step 2: Data understanding

In [3]:
# Загрузка датасета
file_path = "file:///home/jovyan/work/user_experience_data.csv"
data = spark.read.csv(file_path, header=True, inferSchema=True)

In [4]:
# Выводим первую строку
data.show(1, vertical=True)

-RECORD 0--------------------------------
 ip_country                | ro          
 device_country            | es          
 device_language           | es          
 platform                  | Android     
 platform_store            | Google Play 
 smartphone_brand          | redmi       
 is_organic                | 0           
 zones_place_create_cnt    | 0           
 open_function_history_cnt | 0           
 listen_live_cnt           | 26          
 noise_sent_cnt            | 0           
 sos_push_cnt              | 0           
 invite_second_parent_cnt  | 8           
 subscription_days_amount  | 60          
 total_payments_usd        | 8           
 child_age                 | null        
 child_smartphone_brand    | null        
only showing top 1 row



In [5]:
data.printSchema()

root
 |-- ip_country: string (nullable = true)
 |-- device_country: string (nullable = true)
 |-- device_language: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- platform_store: string (nullable = true)
 |-- smartphone_brand: string (nullable = true)
 |-- is_organic: integer (nullable = true)
 |-- zones_place_create_cnt: integer (nullable = true)
 |-- open_function_history_cnt: integer (nullable = true)
 |-- listen_live_cnt: integer (nullable = true)
 |-- noise_sent_cnt: integer (nullable = true)
 |-- sos_push_cnt: integer (nullable = true)
 |-- invite_second_parent_cnt: integer (nullable = true)
 |-- subscription_days_amount: integer (nullable = true)
 |-- total_payments_usd: integer (nullable = true)
 |-- child_age: integer (nullable = true)
 |-- child_smartphone_brand: string (nullable = true)



In [6]:
# Проверяем значения на null
data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns]).show(1, vertical=True)

-RECORD 0-------------------------
 ip_country                | 144  
 device_country            | 23   
 device_language           | 22   
 platform                  | 0    
 platform_store            | 0    
 smartphone_brand          | 44   
 is_organic                | 0    
 zones_place_create_cnt    | 0    
 open_function_history_cnt | 0    
 listen_live_cnt           | 0    
 noise_sent_cnt            | 0    
 sos_push_cnt              | 0    
 invite_second_parent_cnt  | 0    
 subscription_days_amount  | 0    
 total_payments_usd        | 0    
 child_age                 | 1098 
 child_smartphone_brand    | 1142 



In [7]:
# Общая статистика
data.describe().show(1, vertical=True)

-RECORD 0--------------------------
 summary                   | count 
 ip_country                | 7348  
 device_country            | 7469  
 device_language           | 7470  
 platform                  | 7492  
 platform_store            | 7492  
 smartphone_brand          | 7448  
 is_organic                | 7492  
 zones_place_create_cnt    | 7492  
 open_function_history_cnt | 7492  
 listen_live_cnt           | 7492  
 noise_sent_cnt            | 7492  
 sos_push_cnt              | 7492  
 invite_second_parent_cnt  | 7492  
 subscription_days_amount  | 7492  
 total_payments_usd        | 7492  
 child_age                 | 6394  
 child_smartphone_brand    | 6350  
only showing top 1 row



### Step 3: Data cleaning and feature engineering

In [8]:
# Заполнение пропущенных значений
numerical_columns = [col[0] for col in data.dtypes if col[1] in ['int', 'double']]
categorical_columns = [col[0] for col in data.dtypes if col[1] == 'string']

# Заполнение числовых колонок средним значением
for column in numerical_columns:
    mean_value = data.select(avg(col(column))).first()[0]
    data = data.fillna({column: mean_value})

# Заполнение категориальных колонок значением "Unknown"
data = data.fillna({col: "Unknown" for col in categorical_columns})

### Step 4: Feature engineering

In [10]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler

# Индексация и кодирование категориальных признаков
categorical_indexers = [
    StringIndexer(inputCol=col, outputCol=f"{col}_index", handleInvalid="keep") 
    for col in categorical_columns]

categorical_encoders = [
    OneHotEncoder(inputCol=f"{col}_index", outputCol=f"{col}_vec") 
    for col in categorical_columns]

# Объединение признаков для обучения
training_feature_columns = [f"{col}_vec" for col in categorical_columns] + numerical_columns
assembler_training = VectorAssembler(inputCols=training_feature_columns, outputCol="features")
scaler_training = StandardScaler(inputCol="features", outputCol="scaled_features")

# Убираем целевые переменные из признаков
feature_columns = [f"{col}_vec" for col in categorical_columns if col not in [
    "zones_place_create_cnt", "noise_sent_cnt", "listen_live_cnt", "open_function_history_cnt",
    "sos_push_cnt", "invite_second_parent_cnt"]
                  ] + ["is_organic", "child_age"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

In [9]:
pip install numpy

Collecting numpy
  Downloading numpy-1.19.5-cp36-cp36m-manylinux2010_x86_64.whl (14.8 MB)
     |################################| 14.8 MB 996 kB/s            
[?25hInstalling collected packages: numpy
Successfully installed numpy-1.19.5
Note: you may need to restart the kernel to use updated packages.


### Step 5: Multi-output modeling

In [11]:
from pyspark.sql.functions import col, when

# Целевые переменные
function_mapping = {
    "function_map": "zones_place_create_cnt",
    "function_ignore_sound": "noise_sent_cnt",
    "function_listen_sound": "listen_live_cnt",
    "function_detailed_routes": "open_function_history_cnt",
    "function_sos_signal": "sos_push_cnt",
    "function_invite_second_parent": "invite_second_parent_cnt"
}

for function, column in function_mapping.items():
    data = data.withColumn(function, when(col(column) > 0, 1).otherwise(0))
    
# Проверка результата
data.select(list(function_mapping.keys())).show(1)

+------------+---------------------+---------------------+------------------------+-------------------+-----------------------------+
|function_map|function_ignore_sound|function_listen_sound|function_detailed_routes|function_sos_signal|function_invite_second_parent|
+------------+---------------------+---------------------+------------------------+-------------------+-----------------------------+
|           0|                    0|                    1|                       0|                  0|                            1|
+------------+---------------------+---------------------+------------------------+-------------------+-----------------------------+
only showing top 1 row



### Step 6: Train-test split

In [12]:
# Разделение на тренировочные и тестовые данные
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

### Step 7: Define models for multi-output prediction

In [18]:
# Модели для регрессии (цена и срок подписки)
cost_model = GBTRegressor(
    featuresCol="scaled_features",
    labelCol="total_payments_usd",
    predictionCol="predicted_cost",
    maxIter=50,  # Количество итераций
    maxDepth=6,  # Глубина дерева
    stepSize=0.1,  # Шаг обучения
    subsamplingRate=0.8,  # Подвыборка данных
    minInstancesPerNode=5,  # Минимальное количество экземпляров на узел
    minInfoGain=0.1,  # Минимальное улучшение информации для разделения узлов
    lossType="squaredError",  # Тип функции потерь
    maxBins=32  # Количество бинов
)

subscription_model = GBTRegressor(
    featuresCol="scaled_features",
    labelCol="subscription_days_amount",
    predictionCol="predicted_subscription",
    maxIter=50,
    maxDepth=6,
    stepSize=0.1,
    subsamplingRate=0.8,
    minInstancesPerNode=5,
    minInfoGain=0.1,
    lossType="squaredError",
    maxBins=32
)

# Модели для классификации (функции тарифа)
function_classifiers = [
    GBTClassifier(  # Для классификации используем GBTClassifier
        featuresCol="scaled_features",
        labelCol=function,
        predictionCol=f"predicted_{function}",
        maxIter=50,
        maxDepth=5,  # Для классификации может быть достаточно меньшей глубины
        stepSize=0.1,
        subsamplingRate=0.8,
        minInstancesPerNode=5,
        minInfoGain=0.1,
        lossType="logLoss",  # Для классификации используется logLoss
        maxBins=32
    ) for function in function_mapping.keys()
]

In [None]:
from pyspark.ml import Pipeline

# Pipeline
pipeline = Pipeline(stages=categorical_indexers + categorical_encoders +
                    [assembler, scaler, cost_model, subscription_model] + function_classifiers)

### Step 8: Train the pipeline

In [19]:
# Обучение модели
pipeline_model = pipeline.fit(train_data)

### Step 9: Evaluate the models

In [20]:
# Предсказания
predictions = pipeline_model.transform(test_data)

In [4]:
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator

# Оценка моделей
# Оценка регрессии
cost_evaluator = RegressionEvaluator(labelCol="total_payments_usd", predictionCol="predicted_cost", metricName="rmse")
subscription_evaluator = RegressionEvaluator(labelCol="subscription_days_amount",
                                             predictionCol="predicted_subscription", metricName="rmse")
rmse_cost = cost_evaluator.evaluate(predictions)
rmse_subscription = subscription_evaluator.evaluate(predictions)

print(f"RMSE для предсказания стоимости: {rmse_cost}")
print(f"RMSE для предсказания срока подписки: {rmse_subscription}")

# Оценка классификации
for function in function_mapping.keys():
    evaluator = BinaryClassificationEvaluator(labelCol=function, rawPredictionCol=f"predicted_{function}",
                                              metricName="areaUnderROC")
    auc = evaluator.evaluate(predictions)
    print(f"AUC для {function}: {auc}")


RMSE для предсказания стоимости: 6.171183161774798
RMSE для предсказания срока подписки: 12.403555036988
AUC для function_map: 0.8691419164334303
AUC для function_ignore_sound: 0.8117484138495623
AUC для function_listen_sound: 0.9481908580965188
AUC для function_detailed_routes: 0.9083479327259986
AUC для function_sos_signal: 0.8966833765160904
AUC для function_invite_second_parent: 0.878556481767349



### Step 10: Save the model for deployment

In [22]:
# Сохранение модели
pipeline_model.write().overwrite().save("enhanced_tariff_model")

### Step 11: Model check and real-time data processing simulation

In [23]:
from pyspark.ml.pipeline import PipelineModel

# Загрузка сохраненной модели
deployed_model = PipelineModel.load("enhanced_tariff_model")

In [24]:
# Симуляция входящих данных
real_time_data = spark.createDataFrame([
    {
        "id": 12345,
        "ip_country": "ru",
        "device_country": "ru",
        "device_language": "ru",
        "platform": "iOS",
        "platform_store": "App Store",
        "smartphone_brand": "iphone",
        "is_organic": 1,
        "child_age": 13,
        "child_smartphone_brand": "realme"
    }
])



In [33]:
# Обработка данных вручную для реального времени
processed_data = real_time_data
for stage in deployed_model.stages:
    processed_data = stage.transform(processed_data)

In [36]:
# Прогноз для новых данных
real_time_predictions = processed_data
real_time_predictions.show(truncate=False, vertical=True)

-RECORD 0---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 child_age                               | 13                                                                                                                                                                                                               
 child_smartphone_brand                  | realme                                                                                                                                                                                                           
 device_country                          | ru                                                                                                                                                                                                    

In [46]:
from pyspark.sql.functions import col, when, concat, lit


# Map predicted functions to descriptions
results = real_time_predictions.withColumn(
    "function_map", when(col("predicted_function_map") > 0.5, "10 мест на карте вместо 3").otherwise("")
).withColumn(
    "function_ignore_sound", when(col("predicted_function_ignore_sound") > 0.5, "Игнорирование режима 'без звука'").otherwise("")
).withColumn(
    "function_listen_sound", when(col("predicted_function_listen_sound") > 0.5, "Послушать звук вокруг ребенка").otherwise("")
).withColumn(
    "function_detailed_routes", when(col("predicted_function_detailed_routes") > 0.5, "Подробная карта маршрутов").otherwise("")
).withColumn(
    "function_sos_signal", when(col("predicted_function_sos_signal") > 0.5, "Сигнал SOS").otherwise("")
).withColumn(
    "function_invite_second_parent", when(col("predicted_function_invite_second_parent") > 0.5, "Сигнал SOS").otherwise("")
)


# Concatenate selected functions into a single string
results = results.withColumn(
    "selected_functions",
    concat_ws(", ",
        col("function_map"),
        col("function_ignore_sound"),
        col("function_listen_sound"),
        col("function_detailed_routes"),
        col("function_sos_signal"),              
        col("function_invite_second_parent")
    )
)

# Формирование поля tariff_description
real_time_tariffs = results.withColumn(
    "tariff_description",
    when(
        col("selected_functions").isNull() | (col("selected_functions") == ""),
        concat(
            lit("Обязательные функции: Блокировка приложений, Защита от незнакомых звонков, Время в приложениях. Дополнительные функции: "),
            lit("нет")
        )
    ).otherwise(
        concat(
            lit("Обязательные функции: Блокировка приложений, Защита от незнакомых звонков, Время в приложениях. Дополнительные функции: "),
            col("selected_functions")
        )
    )
)

# Вывод результата
real_time_tariffs.select(
    "id",
    "predicted_cost",
    "predicted_subscription",
    "predicted_function_map",
    "predicted_function_ignore_sound",
    "predicted_function_listen_sound",
    "predicted_function_detailed_routes",
    "predicted_function_sos_signal",
    "predicted_function_invite_second_parent",
    "selected_functions",
    "tariff_description"
).show(truncate=False, vertical=True)

-RECORD 0---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 id                                      | 12345                                                                                                                                                                                                                              
 predicted_cost                          | 15.463137991766507                                                                                                                                                                                                                 
 predicted_subscription                  | 525.9684748297383                                                                                                                               

In [47]:
spark.stop()