# Goal of these Experiments is to determine the best approach for missing value treatment

In [1]:
import mlflow
import time
import requests
import logging
import concurrent.futures
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, Tokenizer, HashingTF, IDF
from pyspark.sql.functions import *
from pyspark.ml.linalg import Vectors
from pyspark.sql import functions as F , SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.types import *

In [2]:
def calculate_mape(predictions, label_col="Impact", prediction_col="prediction"):
    """Calculate Mean Absolute Percentage Error (MAPE) on the predictions DataFrame."""
    mape = predictions.select(avg(abs((col(label_col) - col(prediction_col)) / col(label_col))).alias("mape")).collect()[0]["mape"]
    mape_percentage = mape * 100  # Convert to percentage
    return mape_percentage

In [3]:
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class MissingValueExperiment:
    def __init__(self, df, experiment_name="Missing Value Treatment Experiments"):
        self.df = df
        self.experiment_name = experiment_name
        mlflow.set_experiment(experiment_name)
        logger.info("Initialized MissingValueExperiment class.")

    def apply_missing_value_strategy(self, strategy):
        """Applies a missing value treatment strategy to the DataFrame."""
        logger.info(f"Applying missing value strategy: {strategy}")
        if strategy == "drop":
            return self.df.dropna(how="any")
        elif strategy == "fill_empty":
            return self.df.fillna({"Title": "", "description": "", "publishedDate": ""})
        elif strategy == "fetch_api":
            return self.fetch_missing_data_from_api()
        else:
            raise ValueError(f"Unknown strategy: {strategy}")

    def fetch_book_details_google_books(self, title, authors=None, max_retries=3, backoff_factor=2):
        """Fallback API: Fetch book details from Google Books API with retry and backoff."""
        API_KEY = "AIzaSyBDVLGPYKHGQvL0q8TijDJPRPbhe1M5MrI"  # Replace with your actual API key
        API_URL = f'https://www.googleapis.com/books/v1/volumes?q=intitle:"{title}"&maxResults=1&key={API_KEY}'
        if authors:
            API_URL = f'https://www.googleapis.com/books/v1/volumes?q=intitle:"{title}"+inauthor:{authors}&maxResults=1&key={API_KEY}'

        for attempt in range(max_retries):
            try:
                response = requests.get(API_URL)
                if response.status_code == 200:
                    data = response.json()
                    if "items" in data:
                        book_info = data["items"][0]["volumeInfo"]
                        logger.info(f"Book details found for '{title}' using Google Books.")
                        
                        # Delay after a successful API call
                        time.sleep(5)
                        
                        # Format authors as a string in '[]' format
                        formatted_authors = f"[{', '.join(map(str, book_info.get('authors', [])))}]"
                        
                        return {
                            "title": str(title),
                            "authors": str(formatted_authors),
                            "published_date": str(book_info.get("publishedDate", "")),
                            "description": str(book_info.get("description", "")),
                            "publisher": str(book_info.get("publisher", ""))
                        }
            except requests.exceptions.RequestException as e:
                logger.error(f"Error fetching book details from Google Books for '{title}': {e}")
                time.sleep(backoff_factor ** attempt)

        logger.warning(f"Google Books max retries exceeded for '{title}'.")
        return None  # Return None if all attempts are exhausted

    def fetch_book_details(self, title, authors=None):
        """use Google Books API to retrieve missing information"""
        details = self.fetch_book_details_open_library(title)
        # logger.info(f"Switching to Google Books API for '{title}' after Open Library retries exhausted.")
        # details = self.fetch_book_details_google_books(title, authors)
        
        return details or {"title": "", "authors": "", "published_date": "", "description": ""}

    def fetch_details_for_part(self, books_part):
        """Fetch details for a subset of books."""
        results = []
        for book in books_part:
            data = self.fetch_book_details(book["Title"], book.get("authors"))
            if data:
                data["original_title"] = book["Title"]
            results.append(data)
        return results

    def update_row_with_fetched_data(self, row, api_data):
        """Update only missing, null, or inconsistent values in a row based on API data."""
        updated_row = row.asDict()
        
        # Replace only if the original value is missing, null, or inconsistent
        if not updated_row["Title"]:
            updated_row["Title"] = api_data.get("title", updated_row["Title"])
        if not updated_row["description"]:
            updated_row["description"] = api_data.get("description", updated_row["description"])
        if not updated_row["authors"] or len(updated_row["authors"].strip()) == 0:
            updated_row["authors"] = api_data.get("authors", updated_row["authors"])
        if not updated_row["publisher"]:
            updated_row["publisher"] = api_data.get("publisher", updated_row["publisher"])
        
        # Check for special characters or formats in `publishedDate`
        if not updated_row["publishedDate"] or "?" in updated_row["publishedDate"] or "*" in updated_row["publishedDate"] or len(updated_row["publishedDate"]) <= 3:
            updated_row["publishedDate"] = api_data.get("published_date", updated_row["publishedDate"])

        return updated_row


    def fetch_book_details_open_library(self, title, authors=None, max_retries=3, backoff_factor=2):
        """Primary API: Fetch book details from Open Library API with mode=everything, retry, and backoff.
           If title-only and simplified title searches fail, attempt search with both title and author."""
        
        base_url = 'https://openlibrary.org'
        search_url = f'{base_url}/search.json?q={title}&limit=1&mode=everything'
        
        def make_request(url):
            """Helper function to handle requests with retries and backoff."""
            try:
                response = requests.get(url)
                if response.status_code == 200:
                    return response.json()
                else:
                    logger.warning(f"Received status code {response.status_code} for URL '{url}'")
            except requests.exceptions.RequestException as e:
                logger.error(f"Error fetching book details: {e}")
            return None
        
        def process_data(data, title):
            """Process Open Library data and return structured details."""
            if data and data.get("docs"):
                book_info = data["docs"][0]
                key = book_info.get("key", "")
                author_name = book_info.get("author_name", "")
                first_publish_year = str(book_info.get("first_publish_year", ""))
                
                # Proceed with the second API call if 'key' exists
                description = ''
                first_publish_date = ''
                if key:
                    time.sleep(2)
                    details_url = f"{base_url}{key}.json"
                    details_data = make_request(details_url)
                    if details_data:
                        # Extract additional fields from the second API response
                        description = details_data.get("description", {}).get("value", "") if isinstance(details_data.get("description"), dict) else details_data.get("description", "")
                        first_publish_date = str(details_data.get("first_publish_date", ""))
                        
                return {
                    "title": str(title),
                    "authors": author_name,
                    "published_date": first_publish_year if first_publish_year else first_publish_date,
                    "description": description if description else ''
                }
            return None
        
        # First attempt: search by the original title
        time.sleep(3)
        data = make_request(search_url)
        result = process_data(data, title)
        if result:
            return result
    
        # Second attempt: search with both title and author if previous attempts failed
        if authors:
            simple_title = title.split("(")[0].strip()
            author_search_title = f"{simple_title} {authors}"
            logger.info(f"Both title and simplified title failed. Retrying with title and author '{author_search_title}'.")
            author_search_url = f'{base_url}/search.json?q={author_search_title}&limit=1&mode=everything'
            time.sleep(2)
            data = make_request(author_search_url)
            result = process_data(data, author_search_title)
            if result:
                return result
        
        # Final attempt: search with a simplified title if the original title search failed
        if "(" in title:
            simple_title = title.split("(")[0].strip()
            logger.info(f"No results for '{title}'. Retrying with simplified title '{simple_title}'.")
            simplified_url = f'{base_url}/search.json?q={simple_title}&limit=1&mode=everything'
            time.sleep(3)
            data = make_request(simplified_url)
            result = process_data(data, simple_title)
            if result:
                return result
        
        logger.warning(f"Open Library max retries exceeded for '{title}' and all fallback approaches.")
        return None  # Return None if all attempts are exhausted

        
    def fetch_missing_data_from_api(self):
        """Fetch missing data from APIs in parallel, updating only specific fields."""
        logger.info("Starting to fetch missing data from APIs with multithreading.")
        # Filter for rows with missing or inconsistent data
        missing_df = self.df.filter(
            (F.col("Title").isNull()) | (F.col("description").isNull()) | (F.col("authors").isNull()) |
            (F.col("publishedDate").rlike("[?*]")) | (F.col("publishedDate").isNull()) | 
            (F.length(F.col("publishedDate")) <= 3))
        # Collect rows with missing data as a list of dictionaries
        books_with_missing_data = [
            {"Title": row["Title"], "authors": row["authors"], "publishedDate": row["publishedDate"],"description":row["description"]}
            for row in missing_df.collect()
        ]

        # Divide the books into 10 parts for parallel processing
        num_parts = 10
        part_size = len(books_with_missing_data) // num_parts
        books_parts = [books_with_missing_data[i:i + part_size] for i in range(0, len(books_with_missing_data), part_size)]

        # Ensure exactly 10 parts by appending any remaining books to the last part
        if len(books_parts) > num_parts:
            books_parts[-2].extend(books_parts[-1])
            books_parts = books_parts[:-1]

        # # Process each part in a separate thread and collect results
        with concurrent.futures.ThreadPoolExecutor(max_workers=num_parts) as executor:
            future_to_part = {executor.submit(self.fetch_details_for_part, part): part for part in books_parts}
            
            all_results = []
            for future in concurrent.futures.as_completed(future_to_part):
                part_results = future.result()
                all_results.extend(part_results)
                logger.info(f"Completed processing for a subset of books, collected {len(part_results)} results.")

        # Update rows selectively based on fetched data
        updated_rows = []
        for row in missing_df.collect():
            api_data = next((res for res in all_results if res["original_title"] == row["Title"]), None)
            if api_data:
                # Update only specific fields
                updated_row = self.update_row_with_fetched_data(row, api_data)
                updated_rows.append(updated_row)

        # Convert updated rows to DataFrame and merge
        updated_df = spark.createDataFrame(updated_rows, schema=self.df.schema)
        self.df = self.df.join(updated_df, on="Title", how="left_anti").union(updated_df)
        self.df.write.csv("cleaned_data.csv", index=False)
        return self.df
    
    def feature_engineering(self, df_cleaned):
        """Performs feature engineering and assembles features into a single vector column."""
        logger.info("Starting feature engineering.")

        # Handle null values in columns that will be tokenized
        df_cleaned = df_cleaned.withColumn("Title", F.coalesce(F.col("Title"), F.lit("")))
        df_cleaned = df_cleaned.withColumn("description", F.coalesce(F.col("description"), F.lit("")))
        df_cleaned = df_cleaned.withColumn("authors", F.coalesce(F.col("authors"), F.lit("")))

        # 1. TF-IDF for 'Title'
        logger.info("Applying TF-IDF to 'Title'.")
        tokenizer_title = Tokenizer(inputCol="Title", outputCol="title_tokens")
        df_cleaned = tokenizer_title.transform(df_cleaned)
        hashing_tf_title = HashingTF(inputCol="title_tokens", outputCol="title_tf", numFeatures=1000)
        df_cleaned = hashing_tf_title.transform(df_cleaned)
        idf_title = IDF(inputCol="title_tf", outputCol="title_tfidf")
        idf_model_title = idf_title.fit(df_cleaned)
        df_cleaned = idf_model_title.transform(df_cleaned)

        # 2. TF-IDF for 'Description'
        logger.info("Applying TF-IDF to 'Description'.")
        tokenizer_desc = Tokenizer(inputCol="description", outputCol="desc_tokens")
        df_cleaned = tokenizer_desc.transform(df_cleaned)
        hashing_tf_desc = HashingTF(inputCol="desc_tokens", outputCol="desc_tf", numFeatures=1000)
        df_cleaned = hashing_tf_desc.transform(df_cleaned)
        idf_desc = IDF(inputCol="desc_tf", outputCol="desc_tfidf")
        idf_model_desc = idf_desc.fit(df_cleaned)
        df_cleaned = idf_model_desc.transform(df_cleaned)

        # 3. TF-IDF for 'Authors'
        logger.info("Applying TF-IDF to 'authors'.")
        tokenizer_authors = Tokenizer(inputCol="authors", outputCol="authors_tokens")
        df_cleaned = tokenizer_authors.transform(df_cleaned)
        hashing_tf_authors = HashingTF(inputCol="authors_tokens", outputCol="authors_tf", numFeatures=1000)
        df_cleaned = hashing_tf_authors.transform(df_cleaned)
        idf_authors = IDF(inputCol="authors_tf", outputCol="authors_tfidf")
        idf_model_authors = idf_authors.fit(df_cleaned)
        df_cleaned = idf_model_authors.transform(df_cleaned)

        # 4. One-Hot Encoding for 'categories'
        logger.info("Applying One-Hot Encoding to 'categories'.")
        indexer = StringIndexer(inputCol="categories", outputCol="category_index")
        df_cleaned = indexer.fit(df_cleaned).transform(df_cleaned)
        encoder = OneHotEncoder(inputCol="category_index", outputCol="category_onehot")
        df_cleaned = encoder.fit(df_cleaned).transform(df_cleaned)

        # 5. Assemble all features into a single vector, excluding publishedDate_str
        assembler = VectorAssembler(
            inputCols=["title_tfidf", "desc_tfidf", "authors_tfidf", "category_onehot"],
            outputCol="features"
        )
        df_with_features = assembler.transform(df_cleaned)

        logger.info("Feature engineering completed.")
        return df_with_features

    def cross_val_train_and_evaluate(self, df_cleaned):
        """Performs cross-validation, trains a linear regression model, and calculates MAPE."""
        logger.info("Starting cross-validation and model training.")
        df_with_features = self.feature_engineering(df_cleaned)
        
        lr = LinearRegression(featuresCol="features", labelCol="Impact")
        param_grid = ParamGridBuilder().addGrid(lr.regParam, [0.01, 0.1, 0.5]).build()
        
        crossval = CrossValidator(estimator=lr,
                                  estimatorParamMaps=param_grid,
                                  evaluator=RegressionEvaluator(labelCol="Impact", metricName="rmse"),
                                  numFolds=3)
        
        start_time = time.time()
        cv_model = crossval.fit(df_with_features)
        total_training_time = time.time() - start_time
        logger.info(f"Model training completed in {total_training_time:.2f} seconds.")
        
        evaluator_rmse = RegressionEvaluator(labelCol="Impact", metricName="rmse")
        predictions = cv_model.transform(df_with_features)
        rmse = evaluator_rmse.evaluate(predictions)
        
        mape = calculate_mape(predictions, label_col="Impact", prediction_col="prediction")
        logger.info(f"Cross-validation completed. RMSE: {rmse}, MAPE: {mape}")
        
        return rmse, mape, total_training_time

    def run_experiment(self, strategy):
        """Runs an experiment with a specified missing value treatment strategy and logs results to MLflow."""
        logger.info(f"Starting experiment with missing value strategy: {strategy}")
        with mlflow.start_run(run_name=f"Linear Regression with {strategy}"):
            df_cleaned = self.apply_missing_value_strategy(strategy)
            original_row_count = self.df.count()
            cleaned_row_count = df_cleaned.count()
            mlflow.log_metric("original_row_count", original_row_count)
            mlflow.log_metric("cleaned_row_count", cleaned_row_count)
            
            mlflow.log_param("missing_value_strategy", strategy)
            rmse, mape, total_training_time = self.cross_val_train_and_evaluate(df_cleaned)
            
            mlflow.log_metric("rmse", rmse)
            mlflow.log_metric("mape", mape)
            mlflow.log_metric("total_training_time", total_training_time)
            
            cleaned_csv_path = f"/tmp/cleaned_data_{strategy}.csv"
            df_cleaned.toPandas().to_csv(cleaned_csv_path, index=False)
            mlflow.log_artifact(cleaned_csv_path)
            
            logger.info(f"Experiment completed for strategy: {strategy}. RMSE: {rmse}, MAPE: {mape}")


In [4]:
# Initialize Spark session with optimized configurations
spark = SparkSession.builder \
    .appName("OptimizedSparkJob") \
    .config("spark.sql.shuffle.partitions", "100") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "8") \
    .config("spark.executor.memoryOverhead", "512m") \
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:MaxGCPauseMillis=100") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
df = spark.read.format('CSV').options(header= True,
                            delimiter = ",",
                            quote = '"',
                            escape = '"',
                            inferSchema = 'false',
                            encoding = 'UTF8',
                            multiline = True,
                            rootTag = '',
                            rowTag = '',
                            attributePrefix = ''
                            ).load("books_task.csv")
df = df.withColumn("Impact", df["Impact"].cast(FloatType()))
# Define the regular expression to capture a four-digit year
year_pattern = r"(\d{4})"

# Update the publishedDate column: if a year is found, keep it; otherwise, set to None
df = df.withColumn(
    "publishedDate",
    when(
        col("publishedDate").rlike(year_pattern),
        regexp_extract(col("publishedDate"), year_pattern, 1)  # Extract the year if present
    ).otherwise(None)  # Set to None if no valid year is found
)
df = df.drop('_c0')
df.printSchema()

24/11/04 11:43:12 WARN Utils: Your hostname, Aditya resolves to a loopback address: 127.0.1.1; using 192.168.29.214 instead (on interface wlp4s0)
24/11/04 11:43:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/04 11:43:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


root
 |-- Title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- publishedDate: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- Impact: float (nullable = true)



In [5]:
null_counts = df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df.columns])
null_counts.show()

                                                                                

+-----+-----------+-------+---------+-------------+----------+------+
|Title|description|authors|publisher|publishedDate|categories|Impact|
+-----+-----------+-------+---------+-------------+----------+------+
|    0|      12749|   2723|        0|          377|         0|     0|
+-----+-----------+-------+---------+-------------+----------+------+



In [6]:
df1 = spark.read.format('CSV').options(header= True,
                            delimiter = ",",
                            quote = '"',
                            escape = '"',
                            inferSchema = 'false',
                            encoding = 'UTF8',
                            multiline = True,
                            rootTag = '',
                            rowTag = '',
                            attributePrefix = ''
                            ).load("cleaned2.csv")
df1 = df1.withColumn("Impact", df1["Impact"].cast(FloatType()))
df1 = df1.drop('_c0')
df1.printSchema()

root
 |-- Title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- publishedDate: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- Impact: float (nullable = true)



In [7]:
# Instantiate and run experiments
experiment = MissingValueExperiment(df)
# for strategy in ["fetch_api"]:
#     experiment.run_experiment(strategy)
for strategy in ["drop", "fill_empty", "fetch_api"]:
    experiment.run_experiment(strategy)

2024/11/04 11:43:22 INFO mlflow.tracking.fluent: Experiment with name 'Missing Value Treatment Experiments' does not exist. Creating a new experiment.
INFO:__main__:Initialized MissingValueExperiment class.
INFO:__main__:Starting experiment with missing value strategy: drop
INFO:__main__:Applying missing value strategy: drop
INFO:__main__:Starting cross-validation and model training.                     
INFO:__main__:Starting feature engineering.
INFO:__main__:Applying TF-IDF to 'Title'.
INFO:__main__:Applying TF-IDF to 'Description'.                                 
INFO:__main__:Applying TF-IDF to 'authors'.                                     
INFO:__main__:Applying One-Hot Encoding to 'categories'.                        
INFO:__main__:Feature engineering completed.                                    
INFO:py4j.clientserver:Closing down clientserver connection                     
INFO:py4j.clientserver:Closing down clientserver connection
INFO:__main__:Model training completed in