In [1]:
from pyspark.sql import SparkSession

# Add here your team number teamx
team = 24

warehouse = 'project/hive/warehouse'

spark = SparkSession.builder\
    .appName("teаm {} - spark ML".format(team))\
    .master("yarn")\
    .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
    .config("spark.sql.warehouse.dir", warehouse)\
    .config("spark.sql.avro.compression.codec", "snappy")\
    .enableHiveSupport()\
    .getOrCreate()

spark

In [2]:
spark.sql("SHOW DATABASES").show(100)
spark.sql("USE team24_projectdb").show()
spark.sql("SHOW TABLES").show()

+--------------------+
|           namespace|
+--------------------+
|             default|
|             retake1|
|             root_db|
|                show|
|     team0_projectdb|
|    team11_projectdb|
|           team12_db|
|team12_hive_proje...|
|    team12_projectdb|
|    team13_projectdb|
|    team14_projectdb|
|    team15_projectdb|
|    team16_projectdb|
|    team17_projectdb|
|    team18_projectdb|
|    team19_projectdb|
|     team1_projectdb|
|    team20_projectdb|
| team21_projectdb_v2|
| team21_projectdb_v3|
| team21_projectdb_v4|
|    team22_projectdb|
|    team23_projectdb|
|    team24_projectdb|
|    team25_projectdb|
|    team26_projectdb|
|    team27_projectdb|
|    team28_projectdb|
|    team29_projectdb|
|     team2_projectdb|
|    team30_projectdb|
|    team31_projectdb|
|    team34_projectdb|
|    team36_projectdb|
|            team36db|
|    team37_projectdb|
|    team38_projectdb|
|    team39_projectdb|
|     team3_projectdb|
|     team4_projectdb|
|     team5

In [3]:
airbnb_df = spark.read.format("avro").table(
    'team24_projectdb.airbnb'
)

In [4]:
airbnb_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- neighborhood_overview: string (nullable = true)
 |-- transit: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- property_type: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- accommodates: float (nullable = true)
 |-- bathrooms: float (nullable = true)
 |-- bedrooms: float (nullable = true)
 |-- beds: float (nullable = true)
 |-- bed_type: string (nullable = true)
 |-- square_feet: float (nullable = true)
 |-- price: float (nullable = true)
 |-- number_of_reviews: float (nullable = true)
 |-- review_scores_rating: float (nullable = true)
 |-- review_scores_cleanliness: float (nullable = true)
 |-- review_scores_location: float (nullable = true)
 |-- latitude: decimal(8,6) (nullable = true)
 |-- longitude: decim

In [5]:
airbnb_df.show(5,truncate=False,vertical=True)

-RECORD 0-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 id                        | 13941534                                                                                                                                                                                                                                                                                                                                                                                                                                                      

In [6]:
from pyspark.sql.functions import col, count, when

from pyspark.sql.functions import col, count, when

def check_nulls(df):
    """
    Analyzes null values in a PySpark DataFrame and prints a summary.
    
    Args:
        df: PySpark DataFrame to analyze
    """
    # Calculate total number of rows
    total_rows = df.count()
    
    # Create a list of expressions to count nulls for each column
    exprs = [count(when(col(c).isNull(), c)).alias(c) for c in df.columns]
    
    # Calculate null counts
    null_counts = df.select(*exprs).collect()[0].asDict()
    
    # Calculate null percentages
    null_percentages = {col: (count/total_rows)*100 for col, count in null_counts.items()}
    
    # Create a summary DataFrame
    from pyspark.sql import Row
    summary_data = [Row(column_name=col, 
                       null_count=null_counts[col], 
                       null_percentage=null_percentages[col],
                       non_null_count=total_rows - null_counts[col],
                       non_null_percentage=100 - null_percentages[col]) 
                   for col in df.columns]
    
    summary_df = spark.createDataFrame(summary_data)
    
    # Show the results sorted by null percentage (descending)
    print(f"\nNull Value Analysis for DataFrame (Total Rows: {total_rows})")
    summary_df.orderBy(col("null_percentage").desc()).show(len(df.columns), truncate=False)

# Usage
check_nulls(airbnb_df)


Null Value Analysis for DataFrame (Total Rows: 247252)
+-------------------------+----------+---------------------+--------------+-------------------+
|column_name              |null_count|null_percentage      |non_null_count|non_null_percentage|
+-------------------------+----------+---------------------+--------------+-------------------+
|square_feet              |243059    |98.30415931923704    |4193          |1.6958406807629558 |
|neighborhood_overview    |101550    |41.07145746040477    |145702        |58.92854253959523  |
|transit                  |95562     |38.649636807791246   |151690        |61.350363192208754 |
|neighbourhood            |74827     |30.26345590733341    |172425        |69.73654409266659  |
|review_scores_location   |64403     |26.047514276932038   |182849        |73.95248572306797  |
|review_scores_cleanliness|64158     |25.9484250885736     |183094        |74.0515749114264   |
|review_scores_rating     |63824     |25.81334023587271    |183428        |74.18

In [7]:
### Preprocessing pipeline stage

In [8]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, VectorAssembler, StandardScaler, StringIndexer, OneHotEncoder
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.sql.functions import col, lit, coalesce, when
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.types import DoubleType
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, VectorAssembler, StandardScaler, StringIndexer, OneHotEncoder
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.sql.functions import col, lit, coalesce, when
from pyspark.sql.types import NumericType
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCols
from pyspark.sql import DataFrame
import math
import pyspark.sql.functions as F
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF

import math

class GeodeticToECEFTransformer(Transformer):
    def __init__(self, input_cols=["latitude", "longitude"], output_cols=["x", "y", "z"], altitude=0.0):
        super().__init__()
        self.input_cols = input_cols
        self.output_cols = output_cols
        self.altitude = altitude
        self.a = 6378137.0  # WGS-84 semi-major axis
        self.e2 = 6.69437999014e-3  # WGS-84 first eccentricity squared
        self.b = self.a * math.sqrt(1 - self.e2)  # Precompute b as scalar

    def _transform(self, df: DataFrame) -> DataFrame:
        lat = F.radians(df[self.input_cols[0]])
        lon = F.radians(df[self.input_cols[1]])
        alt = F.lit(self.altitude)

        N = self.a / F.sqrt(1 - self.e2 * F.sin(lat)**2)
        x = (N + alt) * F.cos(lat) * F.cos(lon)
        y = (N + alt) * F.cos(lat) * F.sin(lon)
        z = ((self.a**2 / self.b**2) * (1 - self.e2)) * (N + alt) * F.sin(lat)

        return df.withColumn(self.output_cols[0], x) \
                 .withColumn(self.output_cols[1], y) \
                 .withColumn(self.output_cols[2], z)

class CyclicalTimeEncoder(Transformer):
    def __init__(self, input_col="timestamp", output_cols=["sin_time", "cos_time"], period=24):
        super().__init__()
        self.input_col = input_col
        self.output_cols = output_cols
        self.period = period

    def _transform(self, df: DataFrame) -> DataFrame:
        t = df[self.input_col]
        sin_col = F.sin(2 * math.pi * t / self.period)
        cos_col = F.cos(2 * math.pi * t / self.period)
        return df.withColumn(self.output_cols[0], sin_col) \
                 .withColumn(self.output_cols[1], cos_col)

from pyspark.sql.functions import regexp_replace, col

def data_transformation_pipeline(df):
    # Fill nulls in string/text columns
    df = df.na.fill({
        "name": "",
        "summary": "",
        "neighborhood_overview": "",
        "transit": ""
    })

    # Fill nulls in categorical columns
    most_frequent_country = df.groupBy("country").count().orderBy("count", ascending=False).first()[0]
    df = df.withColumn("country", coalesce(col("country"), lit(most_frequent_country)))
    df = df.withColumn("neighbourhood", coalesce(col("neighbourhood"), lit("Unknown")))
    most_frequent_state = df.groupBy("state").count().orderBy("count", ascending=False).first()[0]
    df = df.withColumn("state", coalesce(col("state"), lit(most_frequent_state)))

    # Fill nulls in numerical columns
    review_cols = ["review_scores_rating", "review_scores_cleanliness", "review_scores_location"]
    df = df.na.fill(0, subset=review_cols)
    df = df.withColumn("number_of_reviews", coalesce(col("number_of_reviews"), lit(0)))
    df = df.withColumn("has_square_feet", when(col("square_feet").isNull(), 0).otherwise(1))

    # Clean and cast numerical columns
    numerical_cols = [
        "accommodates", "bathrooms", "bedrooms", "beds", "price",
        "latitude", "longitude", "has_square_feet", "number_of_reviews",
        "review_scores_rating", "review_scores_cleanliness", "review_scores_location"
    ]
    df = df.withColumn("price", regexp_replace(col("price"), "[,$]", "").cast(DoubleType()))
    for col_name in numerical_cols:
        df = df.withColumn(col_name, col(col_name).cast(DoubleType()))

    # Impute latitude and longitude
    df = df.na.fill({"latitude": 0.0, "longitude": 0.0})

    # Text processing
    text_cols = ["name", "summary", "neighborhood_overview", "transit"]
    tokenizers = [Tokenizer(inputCol=col, outputCol=f"{col}_tokens") for col in text_cols]
    removers = [
        StopWordsRemover(
            inputCol=f"{col}_tokens", 
            outputCol=f"{col}_cleaned", 
        ) for col in text_cols
    ]
    count_vectorizers = [
        CountVectorizer(
            inputCol=f"{col}_cleaned",
            outputCol=f"{col}_tf",
            minDF=2,
            minTF=0.001
        ) for col in text_cols
    ]
    idf_transformers = [IDF(inputCol=f"{col}_tf", outputCol=f"{col}_tfidf") for col in text_cols]

    # Categorical encoding
    categorical_cols = ["property_type", "room_type", "neighbourhood", "city", "state"]
    indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_index", handleInvalid="keep") for col in categorical_cols]
    encoders = [OneHotEncoder(inputCol=f"{col}_index", outputCol=f"{col}_vec") for col in categorical_cols]

    # Geodetic to ECEF
    geo_transformer = GeodeticToECEFTransformer(input_cols=["latitude", "longitude"], output_cols=["x", "y", "z"])
    
    imputer = Imputer(
        inputCols=[
            "price", "bathrooms", "bedrooms", "review_scores_rating", "x", "y", "z"
        ],
        outputCols=[
            "price", "bathrooms", "bedrooms", "review_scores_rating", "x", "y", "z"
        ]
    ).setStrategy("mean")

    # Feature assembly
    assembler = VectorAssembler(
        inputCols=[
            "property_type_vec", "room_type_vec", "neighbourhood_vec", "city_vec", "state_vec",
            "name_tfidf", "summary_tfidf", "neighborhood_overview_tfidf", "transit_tfidf",
            "price", "bathrooms", "bedrooms", "review_scores_rating", "x", "y", "z"
        ],
        outputCol="raw_features"
    )

    # Standard scaling
    scaler = StandardScaler(inputCol="raw_features", outputCol="features", withMean=True, withStd=True)

    # Build pipeline
    stages = (
        indexers + encoders +
        tokenizers + removers +
        count_vectorizers + idf_transformers +
        [geo_transformer, imputer, assembler, scaler]
    )

    pipeline = Pipeline(stages=stages)
    model = pipeline.fit(df)
    transformed_data = model.transform(df)
    return model, transformed_data

pipeline, transformed_data = data_transformation_pipeline(airbnb_df)

In [9]:
# Verify the output
transformed_data.select('features').show(1, truncate=True,vertical=True)

-RECORD 0------------------------
 features | [-1.5614431080929... 
only showing top 1 row



In [10]:
hdfs_train_path = "project/data/train"
hdfs_test_path = "project/data/test"

train_ratio = 0.8  # 80% for training, 20% for testing
train_data, test_data = transformed_data.randomSplit([train_ratio, 1 - train_ratio], seed=42)

print(f"Training set count: {train_data.count()}")
print(f"Test set count: {test_data.count()}")

train_data.write.mode("overwrite").json(hdfs_train_path)

# Save test data
test_data.write.mode("overwrite").json(hdfs_test_path)

print(f"Data saved to HDFS:\n- Training: {hdfs_train_path}\n- Test: {hdfs_test_path}")

Py4JJavaError: An error occurred while calling o2130.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 64.0 failed 4 times, most recent failure: Lost task 1.3 in stage 64.0 (TID 523) (hadoop-04.uni.innopolis.ru executor 7): ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: Container from a bad node: container_e43_1745788519616_3093_02_000004 on host: hadoop-04.uni.innopolis.ru. Exit status: 143. Diagnostics: [2025-05-06 01:27:57.239]Container killed on request. Exit code is 143
[2025-05-06 01:27:57.239]Container exited with a non-zero exit code 143. 
[2025-05-06 01:27:57.239]Killed by external signal
.
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2450)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2399)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2398)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2398)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1156)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1156)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1156)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2638)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2580)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2569)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)


In [None]:
hdfs_train_path = "project/data/train"
hdfs_test_path = "project/data/test"

def verify_file_existence():
    hdfs_train_files = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration()) \
        .listStatus(spark._jvm.org.apache.hadoop.fs.Path(hdfs_train_path))
    hdfs_test_files = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration()) \
        .listStatus(spark._jvm.org.apache.hadoop.fs.Path(hdfs_test_path))
    
    print("\nHDFS Verification:")
    print(f"Training files found: {len(hdfs_train_files)}")
    print(hdfs_train_files)
    print(f"Test files found: {len(hdfs_test_files)}")
    print(hdfs_test_files)
    
verify_file_existence()

In [None]:
pipeline_path = "project/models/pipeline"
fitted_pipeline_model.write().overwrite().save('pipeline_path')

In [11]:
spark.stop()