In [1]:
import os
import shutil
import subprocess

import pyspark.sql.functions as F
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import (
    OneHotEncoder,
    SQLTransformer,
    StringIndexer,
    VectorAssembler,
    Word2Vec,
)
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import DataFrame, Row, SparkSession
from pyspark.sql.functions import (
    col,
    cos,
    radians,
    sin,
    sqrt,
    to_date,
)

In [2]:
def evaluate_cv_model(cv_model, test_df, evaluator_rmse, evaluator_mae, model_name=""):
    results = []
    param_maps = cv_model.getEstimatorParamMaps()
    sub_models = cv_model.subModels

    if not sub_models:
        return []

    for i, param_map in enumerate(param_maps):
        try:
            model = sub_models[i][0]  # берём первую модель folds
            predictions = model.transform(test_df)
            rmse = evaluator_rmse.evaluate(predictions)
            mae = evaluator_mae.evaluate(predictions)

            param_str = ", ".join(
                f"{param.name}={param_map[param]}" for param in param_map
            )

            results.append(
                Row(
                    model=model_name,
                    params=param_str,
                    rmse=rmse,
                    mae=mae,
                ),
            )
        except Exception:
            pass
    return results


class GeoToECEFTransformer(Transformer):
    def __init__(self, lat_col="latitude", lon_col="longitude", alt_col=None) -> None:
        super().__init__()
        self.lat_col = lat_col
        self.lon_col = lon_col
        self.alt_col = alt_col

    def _transform(self, df: DataFrame) -> DataFrame:
        # Constants
        a = 6378137.0
        e_sq = 6.69437999014e-3

        # Use altitude if available, else 0
        if self.alt_col and self.alt_col in df.columns:
            alt = col(self.alt_col)
        else:
            alt = F.lit(0.0)

        lat_rad = radians(col(self.lat_col))
        lon_rad = radians(col(self.lon_col))

        N = a / sqrt(1 - e_sq * sin(lat_rad) ** 2)

        x = (N + alt) * cos(lat_rad) * cos(lon_rad)
        y = (N + alt) * cos(lat_rad) * sin(lon_rad)
        z = (N * (1 - e_sq) + alt) * sin(lat_rad)

        return df.withColumn("x", x).withColumn("y", y).withColumn("z", z)


def hdfs_delete_if_exists(hdfs_path) -> None:
    subprocess.call(["hdfs", "dfs", "-rm", "-r", "-f", hdfs_path])

In [3]:
model_path = "project/models/model1"
model_path2 = "project/models/model2"


hdfs_delete_if_exists(model_path)
hdfs_delete_if_exists(model_path2)

if os.path.exists(model_path):
    shutil.rmtree(model_path)

if os.path.exists(model_path2):
    shutil.rmtree(model_path2)


warehouse = "project/hive/warehouse"


spark = (
    SparkSession.builder.appName("ML_team1")
    .master("yarn")
    .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")
    .config("spark.sql.warehouse.dir", warehouse)
    .config("spark.sql.avro.compression.codec", "snappy")
    .config("spark.hadoop.hive.metastore.client.socket.timeout", "300")
    .enableHiveSupport()
    .getOrCreate()
)
sc = spark.sparkContext
sc.setLogLevel("ERROR")

2025-05-19 17:50:15,033 INFO fs.TrashPolicyDefault: Moved: 'hdfs://hadoop-02.uni.innopolis.ru:8020/user/team1/project/models/model1' to trash at: hdfs://hadoop-02.uni.innopolis.ru:8020/user/team1/.Trash/Current/user/team1/project/models/model11747666215025
2025-05-19 17:50:16,694 INFO fs.TrashPolicyDefault: Moved: 'hdfs://hadoop-02.uni.innopolis.ru:8020/user/team1/project/models/model2' to trash at: hdfs://hadoop-02.uni.innopolis.ru:8020/user/team1/.Trash/Current/user/team1/project/models/model21747666216688
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/19 17:50:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/19 17:50:19 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/05/19 17:50:20 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop

In [4]:
spark.sql("USE team1_projectdb")
spark.sql("SHOW TABLES").show()


df = spark.read.format("parquet").table("team1_projectdb.records_part")
df = df.dropna(subset=["review_scores_rating"])

df = df.withColumn("host_since", to_date(col("host_since").cast("string"), "yyyyMMdd"))
df = df.filter(col("review_scores_rating").isNotNull())
df = df.sample(fraction=0.0001)

+---------------+--------------------+-----------+
|      namespace|           tableName|isTemporary|
+---------------+--------------------+-----------+
|team1_projectdb|  evaluation_results|      false|
|team1_projectdb| feature_description|      false|
|team1_projectdb|  feature_importance|      false|
|team1_projectdb|feature_importanc...|      false|
|team1_projectdb|feature_importanc...|      false|
|team1_projectdb| grid_search_results|      false|
|team1_projectdb|        model_params|      false|
|team1_projectdb|      predictions_lr|      false|
|team1_projectdb|      predictions_rf|      false|
|team1_projectdb|          q1_results|      false|
|team1_projectdb|          q2_results|      false|
|team1_projectdb|          q3_results|      false|
|team1_projectdb|          q4_results|      false|
|team1_projectdb|          q5_results|      false|
|team1_projectdb|          q6_results|      false|
|team1_projectdb|        records_part|      false|
+---------------+--------------

In [5]:
split_amenities = SQLTransformer(
    statement="""
    SELECT *, split(coalesce(amenities, ''), ',\\s*') AS amenities_tokens FROM __THIS__
""",
)

word2vec = Word2Vec(
    inputCol="amenities_tokens",
    outputCol="amenities_vec",
    vectorSize=8,
    minCount=1,
)

geo_transformer = GeoToECEFTransformer()

df = geo_transformer.transform(df)

categorical_cols = [
    "host_response_time",
    "neighbourhood",
    "property_type",
    "room_type",
    "bed_type",
    "cancellation_policy",
    "month",
]

boolean_cols = [
    "host_is_superhost",
    "host_has_profile_pic",
    "host_identity_verified",
    "instant_bookable",
    "require_guest_profile_picture",
    "kitchen",
    "wifi",
    "essentials",
    "tv",
    "air_conditioning",
    "elevator",
    "washer",
    "hangers",
    "iron",
    "laptop_friendly_workspace",
    "family_kid_friendly",
    "hot_water",
    "cable_tv",
    "free_parking_on_premises",
    "hair_dryer",
    "smoking_allowed",
    "doorman",
    "dishes_and_silverware",
    "buzzer_wireless_intercom",
    "refrigerator",
]

numerical_cols = [
    "x",
    "y",
    "z",
    "accommodates",
    "bathrooms",
    "bedrooms",
    "beds",
    "price",
    "security_deposit",
    "cleaning_fee",
    "guests_included",
    "extra_people",
    "minimum_nights",
    "maximum_nights",
]

In [6]:
indexers = [
    StringIndexer(inputCol=col, outputCol=col + "_index", handleInvalid="keep")
    for col in categorical_cols
]
encoders = [
    OneHotEncoder(inputCol=col + "_index", outputCol=col + "_vec")
    for col in categorical_cols
]


encoded_cols = [encoder.getOutputCol() for encoder in encoders]
assembler_inputs = encoded_cols + boolean_cols + numerical_cols + ["amenities_vec"]

assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

pipeline = Pipeline(
    stages=[
        split_amenities,
        word2vec,
        geo_transformer,
        *indexers,
        *encoders,
        assembler,
    ],
)
pipeline_model = pipeline.fit(df)
df_prepared = pipeline_model.transform(df)
train_df, test_df = df_prepared.randomSplit([0.8, 0.2], seed=42)

# train_df.select("features", "review_scores_rating").write.mode("overwrite").json(
#     "project/data/train",
# )
# test_df.select("features", "review_scores_rating").write.mode("overwrite").json(
#     "project/data/test",
# )

                                                                                

In [7]:
lr = LinearRegression(featuresCol="features", labelCol="review_scores_rating")
paramGrid = (
    ParamGridBuilder()
    .addGrid(lr.regParam, [0.01, 0.1, 1.0])
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
    .build()
)

evaluator = RegressionEvaluator(
    labelCol="review_scores_rating",
    predictionCol="prediction",
    metricName="rmse",
)


mae_evaluator = RegressionEvaluator(
    labelCol="review_scores_rating",
    predictionCol="prediction",
    metricName="mae",
)
cv = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,
    collectSubModels=True,
)


cv_model = cv.fit(train_df)
best_model = cv_model.bestModel
best_model.save(model_path)

                                                                                

In [10]:
cv_model.getEstimatorParamMaps()

[{Param(parent='LinearRegression_7411c9be5f09', name='regParam', doc='regularization parameter (>= 0).'): 0.01,
  Param(parent='LinearRegression_7411c9be5f09', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0},
 {Param(parent='LinearRegression_7411c9be5f09', name='regParam', doc='regularization parameter (>= 0).'): 0.01,
  Param(parent='LinearRegression_7411c9be5f09', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.5},
 {Param(parent='LinearRegression_7411c9be5f09', name='regParam', doc='regularization parameter (>= 0).'): 0.01,
  Param(parent='LinearRegression_7411c9be5f09', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 1.0},
 {Param(paren

In [12]:
cv_model.subModels

[[LinearRegressionModel: uid=LinearRegression_7411c9be5f09, numFeatures=85,
  LinearRegressionModel: uid=LinearRegression_7411c9be5f09, numFeatures=85,
  LinearRegressionModel: uid=LinearRegression_7411c9be5f09, numFeatures=85,
  LinearRegressionModel: uid=LinearRegression_7411c9be5f09, numFeatures=85,
  LinearRegressionModel: uid=LinearRegression_7411c9be5f09, numFeatures=85,
  LinearRegressionModel: uid=LinearRegression_7411c9be5f09, numFeatures=85,
  LinearRegressionModel: uid=LinearRegression_7411c9be5f09, numFeatures=85,
  LinearRegressionModel: uid=LinearRegression_7411c9be5f09, numFeatures=85,
  LinearRegressionModel: uid=LinearRegression_7411c9be5f09, numFeatures=85],
 [LinearRegressionModel: uid=LinearRegression_7411c9be5f09, numFeatures=85,
  LinearRegressionModel: uid=LinearRegression_7411c9be5f09, numFeatures=85,
  LinearRegressionModel: uid=LinearRegression_7411c9be5f09, numFeatures=85,
  LinearRegressionModel: uid=LinearRegression_7411c9be5f09, numFeatures=85,
  LinearReg