In [1]:
import findspark
findspark.init()


def extract_year(input_string):
    if len(input_string) < 4:
        return None
    return input_string[:4]

def calculate_mape(predictions):
    return predictions.withColumn("abs_diff", spark_abs(col("Impact") - col("prediction"))) \
        .withColumn("mape", spark_abs(col("Impact") - col("prediction")) / col("Impact")) \
        .agg(mean("mape")) \
        .collect()[0][0]


import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
import time
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.ml.feature import Tokenizer, CountVectorizer, IDF
from pyspark.ml.feature import Word2Vec
from pyspark.ml.feature import NGram
from pyspark.ml.feature import RegexTokenizer
from pyspark.conf import SparkConf
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, size,split
from pyspark.sql.functions import abs as spark_abs, mean
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import mean, abs as spark_abs, col
from pyspark.sql.functions import col, when
from pyspark.sql import functions as F
import bisect
from pyspark.ml.feature import QuantileDiscretizer


start_time = time.time()


spark_conf = SparkConf()
spark_conf.set("spark.executor.instances","2")  
spark_conf.set("spark.executor.cores", "2")      

# Create SparkSession with the configured parameters
spark = SparkSession.builder \
    .appName("Spark in Jupyter") \
    .config(conf=spark_conf) \
    .getOrCreate()


# Load dataset
pandas_df = pd.read_csv('books_task.csv')
pandas_df.drop('Unnamed: 0',inplace=True,axis=1)

df = spark.createDataFrame(pandas_df)
df = df.na.drop()


#Author
author_counts = df.groupBy("authors").count()


categorized_df_author = author_counts.withColumn("author_category", 
                        when(col("count") == 1, 1)
                        .when(col("count") == 2, 2)
                        .otherwise(3))

encoder = OneHotEncoder(inputCols=["author_category"], outputCols=["author_category_encoded"])
encoder_model = encoder.fit(categorized_df_author)
encoded_df_author = encoder_model.transform(categorized_df_author)


df = df.join(encoded_df_author, on="authors", how="left")


#Publisher
publisher_counts = df.groupBy("publisher").count()

bins = [0, 5, 30, 150, 250, 1000, float('inf')]
labels = [1, 2, 3, 4, 5, 6]

bucketizer = F.udf(lambda count: labels[min(bisect.bisect_right(bins, count), len(labels) - 1)], IntegerType())
categorized_df_publisher = publisher_counts.withColumn("publisher_bucket", bucketizer("count"))

encoder = OneHotEncoder(inputCols=["publisher_bucket"], outputCols=["publisher_category_encoded"])
encoder_model = encoder.fit(categorized_df_publisher)
encoded_df_publisher = encoder_model.transform(categorized_df_publisher)

df = df.join(encoded_df_publisher.select("publisher", "publisher_category_encoded"), on="publisher", how="left")

#Category
category_counts = df.groupBy("categories").count()


num_buckets = 5

discretizer = QuantileDiscretizer(numBuckets=num_buckets, inputCol="count", outputCol="category_bucket")
discretized_df = discretizer.fit(category_counts).transform(category_counts)

encoder = OneHotEncoder(inputCols=["category_bucket"], outputCols=["category_encoded"])
encoder_model = encoder.fit(discretized_df)
encoded_df_categories = encoder_model.transform(discretized_df)

df = df.join(encoded_df_categories.select("categories", "category_encoded"), on="categories", how="left")


#Title

df = df.withColumn("Title_Word_Count", 
                   udf(lambda x: len(x.split()), IntegerType())("Title"))

df = df.withColumn("Title_Character_Count", 
                   udf(lambda x: len(x), IntegerType())("Title"))

df = df.withColumn("Title_Avg_Word_Length", 
                   udf(lambda x: sum(len(word) for word in x.split()) / len(x.split()), DoubleType())("Title"))
#Description

df = df.withColumn("Description_Word_Count", 
                   udf(lambda x: len(x.split()), IntegerType())("description"))

df = df.withColumn("Description_Character_Count", 
                   udf(lambda x: len(x), IntegerType())("description"))

df = df.withColumn("Description_Avg_Word_Length", 
                   udf(lambda x: sum(len(word) for word in x.split()) / len(x.split()), DoubleType())("description"))
#publishedDate

extract_year_udf = udf(extract_year, StringType())
df = df.withColumn("Year", extract_year_udf("publishedDate"))
df = df.withColumn("Year", col("Year").cast("integer"))

#authors

df = df.withColumn("authors", split(col("authors"), ","))
df = df.withColumn("NumAuthors", size(col("authors")))



Processed_Featured=['author_category_encoded','publisher_category_encoded','category_encoded','Title_Word_Count','Title_Character_Count','Title_Avg_Word_Length','Description_Word_Count','Description_Character_Count','Description_Avg_Word_Length','Year','NumAuthors','Impact']

processed_df = df.select(Processed_Featured)


# Model
assembler = VectorAssembler(inputCols=['author_category_encoded','publisher_category_encoded','category_encoded',
                                       'Title_Word_Count', 'Title_Character_Count', 'Title_Avg_Word_Length',
                                       'Description_Word_Count', 'Description_Character_Count',
                                       'Description_Avg_Word_Length', 'Year', 'NumAuthors'],
                            outputCol='features', handleInvalid="skip")
rf = RandomForestRegressor(featuresCol="features", labelCol="Impact")

pipeline = Pipeline(stages=[assembler, rf])

paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20]) \
    .addGrid(rf.maxDepth, [5, 10,]) \
    .build()



# Create CrossValidator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol="Impact"),
                          numFolds=3,
                          collectSubModels=True)

# Run cross-validation and choose the best set of parameters
cvModel = crossval.fit(processed_df)

training_time = time.time() - start_time
# Get best model from CrossValidator
best_model = cvModel.bestModel

# Make predictions
predictions = best_model.transform(processed_df)

# Calculate MAPE manually
mape = calculate_mape(predictions)



print(mape)

print(training_time)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/08 21:24:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/02/08 21:24:50 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/02/08 21:24:50 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
24/02/08 21:24:50 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
24/02/08 21:24:50 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
24/02/08 21:25:02 WARN TaskSetManager: Stage 0 contains a task of very large size (12602 KiB). The maximum recommended task size is 1000 KiB.
24/02/08 21:25:10 WARN TaskSetManager: Stage 3 contains a task of very large size (12602 KiB). The maximum recommended task size is 1000 KiB.
24/02/08 21:25:15 WARN TaskSetManager: Stage 6 

0.05950649145940839
224.77991890907288


                                                                                