In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import expr
from sentence_transformers import SentenceTransformer
import csv
from io import StringIO


# Create a Spark session
spark = SparkSession.builder.appName("BookEmbeddings1").getOrCreate()


# Read data into RDD using textFile
rdd = spark.sparkContext.textFile("books_task.csv")

# Use csv.reader to parse each line
header = rdd.first()
rdd = rdd.filter(lambda line: line != header).map(lambda x: next(csv.reader(StringIO(x))))

# Convert RDD to DataFrame
columns = ["Index","Title", "description", "authors", "publisher", "publishedDate", "categories", "Impact"]
df = spark.createDataFrame(rdd, columns)

df = df.withColumn("text", expr("concat(Title, ' ', description)"))

embeddings_model = SentenceTransformer('paraphrase-MiniLM-L6-v2')

# Define a PySpark UDF to apply the sentence embeddings model
def embed_sentences(text):
    embeddings = embeddings_model.encode([text])
    return embeddings.tolist()[0]

# Register the UDF
spark.udf.register("embed_sentences", embed_sentences, "array<double>")

# Apply the UDF to create a new column with embeddings
df_with_embeddings = df.withColumn("embeddings", expr("embed_sentences(text)"))

# Show the resulting DataFrame
df_with_embeddings.select("Title", "text", "embeddings").show(truncate=False)

output_path_for_first_10_rows = "output5"
df_with_embeddings.write.parquet(output_path_for_first_10_rows)

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from sklearn.metrics import mean_absolute_error
import numpy as np
# Initialize Spark session
spark = SparkSession.builder.appName("YourAppName").getOrCreate()


# Read the Parquet file into a DataFrame
df_sample = spark.read.parquet("output5")


df_read_parquet = df_sample.limit(5000)

# Assuming you have a variable named embedding_length
embedding_length = 384  # Replace with your actual embedding length


# Show the DataFrame
df_read_parquet.show(truncate=False)

# Check if 'Impact' is float and convert it to double
df_read_parquet = df_read_parquet.withColumn('Impact', col('Impact').cast('double'))

# Check the schema after conversion
df_read_parquet.printSchema()

# Create separate columns for each dimension of the embeddings
for i in range(embedding_length):
    col_name = f"embedding_{i + 1}"
    df_read_parquet = df_read_parquet.withColumn(col_name, df_read_parquet["embeddings"][i])

# Check the updated schema
df_read_parquet.printSchema()

# Select embedding variables and 'Impact' for modeling
feature_columns = [f"embedding_{i + 1}" for i in range(embedding_length)]
feature_columns += ['Impact']

df_model = df_read_parquet.select(feature_columns)

# Check the statistics of the 'Impact' column
df_model.describe('Impact').show()



# Train-test split
train_df, test_df = df_model.randomSplit([0.7, 0.3], seed=42)

# Prepare feature vector
assembler = VectorAssembler(inputCols=feature_columns[:-1], outputCol="features")

# Create a Linear Regression model
lr = LinearRegression(labelCol="Impact", featuresCol="features")

# Create a pipeline
pipeline = Pipeline(stages=[assembler, lr])

# Define a parameter grid for tuning
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.0, 0.1, 0.01]).build()

# Create a cross-validator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol="Impact"),
                          numFolds=3)

# Fit the model
cv_model = crossval.fit(train_df)

# Predict on the training set
train_predictions = cv_model.transform(train_df)

# Predict on the test set
test_predictions = cv_model.transform(test_df)

# Calculate MAPE for training set
train_mape = np.mean(np.abs((train_predictions.select("Impact", "prediction").toPandas()["Impact"] -
                             train_predictions.select("Impact", "prediction").toPandas()["prediction"]) /
                            train_predictions.select("Impact", "prediction").toPandas()["Impact"])) * 100

# Calculate MAPE for test set
test_mape = np.mean(np.abs((test_predictions.select("Impact", "prediction").toPandas()["Impact"] -
                            test_predictions.select("Impact", "prediction").toPandas()["prediction"]) /
                           test_predictions.select("Impact", "prediction").toPandas()["Impact"])) * 100

# Print MAPE for training and test sets
print(f"MAPE on training set: {train_mape:.2f}%")
print(f"MAPE on test set: {test_mape:.2f}%")

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder.appName("BookEmbeddings1").getOrCreate()

# Read data directly into DataFrame with appropriate CSV options
csv_path = "books_task.csv"
df = spark.read.option("header", "true").option("quote", "\"").option("escape", "\"").csv(csv_path)

# Rename the '_c0' column to 'Index'
df = df.withColumnRenamed("_c0", "Index")



In [None]:
# Convert 'publishedDate' to date format
df = df.withColumn("publishedDate", col("publishedDate").cast("date"))

In [None]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import ArrayType, StringType
# Define a UDF to convert string representation of list to actual list
@udf(ArrayType(StringType()))
def convert_to_list(authors_str):
    return (authors_str[1:-1].replace("'", "").split(',')) if authors_str is not None else []

# Apply the UDF to the 'authors' column
df = df.withColumn("authors_list", convert_to_list(col("authors")))

df = df.withColumn("categories_list", convert_to_list(col("categories")))


In [None]:
# Count missing values in each column
missing_counts = [df.where(col(c).isNull()).count() for c in df.columns]

# Create a dictionary to map column names to missing value counts
missing_dict = dict(zip(df.columns, missing_counts))

# Display the missing value counts for each column
for column, missing_count in missing_dict.items():
    print(f"Column '{column}': {missing_count} missing values")

# Alternatively, you can display the missing values in a more tabular format
missing_df = spark.createDataFrame([(column, missing_count) for column, missing_count in missing_dict.items()],
                                   ["Column", "Missing Values"])
missing_df.show(truncate=False)


In [None]:
from pyspark.sql.functions import col, min, max

# Calculate the range of 'publishedDate'
date_range = df.agg(min("publishedDate").alias("min_date"), max("publishedDate").alias("max_date")).collect()[0]

# Display the range
print("Range of publishedDate:")
print(f"Minimum Date: {date_range['min_date']}")
print(f"Maximum Date: {date_range['max_date']}")

In [None]:
df.printSchema()


In [None]:
from pyspark.sql.functions import col

df = df.withColumn('Impact', col('Impact').cast('double'))


In [None]:
df.printSchema()


In [None]:
df.select('categories_list').show()

In [None]:
df.show()

In [None]:
from pyspark.ml.feature import CountVectorizer

# Create a CountVectorizer instance
cv = CountVectorizer(inputCol="authors_list", outputCol="authors_features")


In [None]:
model = cv.fit(df)
df_features = model.transform(df)

# Display the resulting DataFrame with features
df_features.select("Index", "authors_list", "authors_features").show(truncate=False)

In [None]:
from pyspark.ml.linalg import Vectors, VectorUDT

# Function to create MultiHot Encoding
@udf(VectorUDT())
def multi_hot_encoding(authors):
    unique_authors = list(set(authors))
    encoding = [1.0 if author in unique_authors else 0.0 for author in all_authors]
    return Vectors.dense(encoding)

# Extract all unique authors
all_authors = df.select("authors_list").rdd.flatMap(lambda x: x[0]).distinct().collect()

# Apply the MultiHot Encoding UDF
df_encoded = df.withColumn("multi_hot_encoding", multi_hot_encoding("authors_list"))


In [None]:
df_encoded.select('multi_hot_encoding').show(5)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
from sentence_transformers import SentenceTransformer

# Create a Spark session
spark = SparkSession.builder.appName("BookEmbeddings1").getOrCreate()

# Read data directly into DataFrame
csv_path = "books_task.csv"
df = spark.read.csv(csv_path, header=True, inferSchema=True)

df = df.withColumn("text", expr("concat(Title, ' ', description)"))

embeddings_model = SentenceTransformer('paraphrase-MiniLM-L6-v2')

# Define a PySpark UDF to apply the sentence embeddings model
def embed_sentences(text):
    embeddings = embeddings_model.encode([text])
    return embeddings.tolist()[0]

# Register the UDF
spark.udf.register("embed_sentences", embed_sentences, "array<double>")

# Apply the UDF to create a new column with embeddings
df_with_embeddings = df.withColumn("embeddings", expr("embed_sentences(text)"))

# Show the resulting DataFrame
df_with_embeddings.select("Title", "text", "embeddings").show(truncate=False)

output_path_for_first_10_rows = "output5"
df_with_embeddings.write.parquet(output_path_for_first_10_rows)

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, expr, coalesce, current_date, year, udf
from pyspark.ml.linalg import Vectors, VectorUDT
from sentence_transformers import SentenceTransformer
from pyspark.sql.types import ArrayType, StringType




# Create a Spark session
spark = SparkSession.builder.appName("BookEmbeddings1").getOrCreate()



  from .autonotebook import tqdm as notebook_tqdm
Setting default log level to "

In [None]:
# Read data directly into DataFrame with appropriate CSV options
csv_path = "books_task.csv"
df = spark.read.option("header", "true").option("quote", "\"").option("escape", "\"").csv(csv_path)

In [None]:
pandas_df = df.toPandas()


In [None]:
total_rows = len(pandas_df)
part1_end = int(total_rows * 0.2)
part2_end = int(total_rows * 0.4)
part3_end = int(total_rows * 0.6)
part4_end = int(total_rows * 0.8)

# Split the DataFrame into five parts
part1 = pandas_df.iloc[:part1_end, :]
part2 = pandas_df.iloc[part1_end:part2_end, :]
part3 = pandas_df.iloc[part2_end:part3_end, :]
part4 = pandas_df.iloc[part3_end:part4_end, :]
part5 = pandas_df.iloc[part4_end:, :]

# Save each part to a separate file (e.g., CSV)
part1.to_csv('part1.csv', index=False)
part2.to_csv('part2.csv', index=False)
part3.to_csv('part3.csv', index=False)
part4.to_csv('part4.csv', index=False)
part5.to_csv('part5.csv', index=False)

In [4]:
csv_path = "part1.csv"
df = spark.read.option("header", "true").option("quote", "\"").option("escape", "\"").csv(csv_path)

In [5]:
df.show(2)

+---+--------------------+--------------------+----------------+--------------------+-------------+--------------------+-----------------+
|_c0|               Title|         description|         authors|           publisher|publishedDate|          categories|           Impact|
+---+--------------------+--------------------+----------------+--------------------+-------------+--------------------+-----------------+
|  0|Its Only Art If I...|                NULL|['Julie Strain']|Smithsonian Insti...|         1996|['Comics & Graphi...|784.3039243054303|
|  1|Dr. Seuss: Americ...|Philip Nel takes ...|  ['Philip Nel']|           A&C Black|   2005-01-01|['Biography & Aut...|825.4655354138016|
+---+--------------------+--------------------+----------------+--------------------+-------------+--------------------+-----------------+
only showing top 2 rows



In [6]:
df.printSchema()

# Rename the '_c0' column to 'Index'
df = df.withColumnRenamed("_c0", "Index")

# Convert 'publishedDate' to date format
df = df.withColumn("publishedDate", col("publishedDate").cast("date"))

# Count missing values in each column
missing_counts = [df.where(col(c).isNull()).count() for c in df.columns]

# Create a dictionary to map column names to missing value counts
missing_dict = dict(zip(df.columns, missing_counts))

# Display the missing value counts for each column
for column, missing_count in missing_dict.items():
    print(f"Column '{column}': {missing_count} missing values")
    
# Replace missing values with a default date 
default_date = "1900-01-01"
df = df.withColumn("publishedDate", when(col("publishedDate").isNull(), default_date).otherwise(col("publishedDate")))

df.printSchema()

# Count missing values in each column
missing_counts = [df.where(col(c).isNull()).count() for c in df.columns]

# Create a dictionary to map column names to missing value counts
missing_dict = dict(zip(df.columns, missing_counts))

# Display the missing value counts for each column
for column, missing_count in missing_dict.items():
    print(f"Column '{column}': {missing_count} missing values")
    

# Define a UDF to convert string representation of list to actual list
@udf(ArrayType(StringType()))
def convert_to_list(authors_str):
    return (authors_str[1:-1].replace("'", "").split(',')) if authors_str is not None else []

# Apply the UDF to the 'authors' column
df = df.withColumn("authors_list", convert_to_list(col("authors")))

# Apply the UDF to the 'categories' column
df = df.withColumn("categories_list", convert_to_list(col("categories")))


#Count missing values in each column
missing_counts = [df.where(col(c).isNull()).count() for c in df.columns]

# Create a dictionary to map column names to missing value counts
missing_dict = dict(zip(df.columns, missing_counts))

# Display the missing value counts for each column
for column, missing_count in missing_dict.items():
    print(f"Column '{column}': {missing_count} missing values")
    
df.printSchema()

# Extract all unique authors
all_authors = df.select("authors_list").rdd.flatMap(lambda x: x[0]).distinct().collect()

# Apply the MultiHot Encoding UDF
print(len(all_authors))


# Print unique publishers
unique_publishers = df.select("publisher").distinct().rdd.flatMap(lambda x: x).collect()
print(len(unique_publishers))


df = df.withColumn("text", coalesce(col("Title"), col("description")))


# Assuming df is your DataFrame
df = df.withColumn("publishedYear", year("publishedDate"))

# calculate the age
df = df.withColumn("age", year(current_date()) - year("publishedDate")) 


df = df.withColumn("Impact", col("Impact").cast("double"))




embeddings_model = SentenceTransformer('paraphrase-MiniLM-L6-v2')

def embed_sentences(text):
    embeddings = embeddings_model.encode([text])
    return embeddings.tolist()[0]

# Register the UDF
spark.udf.register("embed_sentences", embed_sentences, "array<double>")

# Apply the UDF to create a new column with embeddings
df = df.withColumn("text_embeddings", expr("embed_sentences(text)"))


embedding_length= 384

# Create separate columns for each dimension of the text_embeddings
for i in range(embedding_length):
        col_name = f"text_embeddings{i + 1}"
        df = df.withColumn(col_name, df["text_embeddings"][i])

all_categories = df.select("categories_list").rdd.flatMap(lambda x: x[0]).distinct().collect()



for i in range(len(all_categories)):
    # Use the 'when' function to create a binary column
    df = df.withColumn(f"category_{i}", when(array_contains(col("categories_list"), all_categories[i]), 1).otherwise(0))

df.show(2)


df = df.withColumn("title_word_count", size(split(col("Title"), " ")))
df = df.withColumn("title_avg_word_length", length(col("Title")) / col("title_word_count"))
df = df.withColumn("char_count", length(col("Title")))
df = df.withColumn("contains_digits", when(col("Title").rlike("\\d"), 1).otherwise(0))
df = df.withColumn("contains_uppercase", when(col("Title").rlike("[A-Z]"), 1).otherwise(0))
df = df.withColumn("contains_punctuation", when(col("Title").rlike("[^\w\s]"), 1).otherwise(0))


# Define a UDF for sentiment analysis using TextBlob
def analyze_sentiment(title):
    blob = TextBlob(title)
    return blob.sentiment.polarity

sentiment_udf = udf(analyze_sentiment, StringType())

# Apply the UDF to the 'Title' column
df = df.withColumn("sentiment_score", sentiment_udf(col("Title")))

df = df.withColumn("sentiment_score", col("sentiment_score").cast("double"))

columns_to_drop = ['description','categories_list', 'categories','publishedDate', 'publishedYear','authors_list', 'authors','Title','text','text_embeddings']
df = df.drop(*columns_to_drop)

# Coalesce the DataFrame to a single partition
df_single_partition = df.coalesce(1)

# Write the DataFrame to a CSV file
df_single_partition.write.csv("output_file1.csv", header=True, mode="overwrite")

root
 |-- _c0: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- publishedDate: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- Impact: string (nullable = true)

Column 'Index': 0 missing values
Column 'Title': 0 missing values
Column 'description': 2601 missing values
Column 'authors': 590 missing values
Column 'publisher': 0 missing values
Column 'publishedDate': 80 missing values
Column 'categories': 0 missing values
Column 'Impact': 0 missing values
root
 |-- Index: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- publishedDate: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- Impact: string (nullable = true)

Column 'Index': 0 missing values
Column 'Title': 0

NameError: name 'array_contains' is not defined

In [None]:
import shutil

# Specify the folder path to be zipped
folder_path = 'output5'

# Specify the output zip file path and name
output_zip_path = 'output6'

# Create a zip archive of the folder
shutil.make_archive(output_zip_path, 'zip', folder_path)

In [None]:




# Start MLflow run
with mlflow.start_run(run_name="LinearRegression_PCA"):

    # Read the Parquet file into a DataFrame
    df_sample = spark.read.parquet("/content/output")
    df_read_parquet = df_sample.limit(1000)

    # Assuming you have a variable named embedding_length
    embedding_length = 384  # Replace with your actual embedding length

    # Check if 'Impact' is float and convert it to double
    df_sample = df_read_parquet.withColumn('Impact', col('Impact').cast('double'))

    # Select embedding variables and 'Impact' for modeling
    feature_columns = [f"text_embeddings{i + 1}" for i in range(embedding_length)]
    df_model = df_sample.select(feature_columns + ['Impact'])

    # Use PCA for dimensionality reduction to 50 components
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    pca = PCA(k=50, inputCol="features", outputCol="pca_features")

    # Train-test split
    train_df, test_df = df_model.randomSplit([0.7, 0.3], seed=42)

    # Create a Linear Regression model
    lr = LinearRegression(labelCol="Impact", featuresCol="pca_features")

    # Create a pipeline
    pipeline = Pipeline(stages=[assembler, pca, lr])

    # Define a parameter grid for tuning
    paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.0, 0.1, 0.01]).build()

    # Create a cross-validator
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=RegressionEvaluator(labelCol="Impact"),
                              numFolds=3)

    # Fit the model
    cv_model = crossval.fit(train_df)

    # Log parameters
    mlflow.log_params({
        "regParam": cv_model.bestModel.stages[2].getRegParam(),
        "elasticNetParam": cv_model.bestModel.stages[2].getElasticNetParam()
    })

    # Log metrics
    train_predictions = cv_model.transform(train_df)
    test_predictions = cv_model.transform(test_df)

    # Calculate MAPE for training set
    train_mape = train_predictions.withColumn('mape', F.abs((col('Impact') - col('prediction')) / col('Impact')) * 100)
    training_mape = train_mape.select(F.mean('mape')).collect()[0][0]
    mlflow.log_metric("training_mape", training_mape)

    # Calculate MAPE for test set
    test_mape = test_predictions.withColumn('mape', F.abs((col('Impact') - col('prediction')) / col('Impact')) * 100)
    test_mape = test_mape.select(F.mean('mape')).collect()[0][0]
    mlflow.log_metric("test_mape", test_mape)

    # Log the model
    mlflow.spark.log_model(cv_model.bestModel, "model")

    # Show MAPE on training set
    print(f"MAPE on training set: {training_mape:.2f}%")

    # Show MAPE on test set
    print(f"MAPE on test set: {test_mape:.2f}%")