In [None]:
import timeit
import pandas as pd
import numpy as np
from scipy.stats import entropy
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from sklearn.base import BaseEstimator, TransformerMixin
import os
from sklearn.pipeline import Pipeline
from transformers import AutoTokenizer, AutoModel, TFAutoModel
from sklearn.linear_model import Ridge, Lasso
from gensim.models import KeyedVectors
import scipy.stats as stats



# Clean and tokenize text
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
import string
from sklearn.model_selection import cross_val_score, KFold

nltk.download('stopwords') # download stopwords corpus
nltk.download('punkt') # download punkt tokenizer

# For linear regression
import sklearn
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

from sklearn.feature_extraction.text import TfidfVectorizer


## Linear Regression

In [None]:
# Transformer for reading CSV file into DataFrame
class CSVReader(BaseEstimator, TransformerMixin):
    def __init__(self, file_path):
        self.file_path = file_path
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        # Read the CSV file into a DataFrame
        df = pd.read_csv(self.file_path)
        
        # Return the DataFrame
        return df

# Transformer for Gensim Dutch Word2Vec embeddings
class GensimEmbeddings(BaseEstimator, TransformerMixin):
    def __init__(self, model_path, embedding_size):
        self.model_path = model_path
        self.embedding_size = embedding_size
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        # Load the Gensim Word2Vec model
        model = KeyedVectors.load_word2vec_format(self.model_path)
        
        # Initialize an empty numpy array to store the embeddings
        embeddings = np.zeros((len(X), self.embedding_size))
        
        # Iterate over the words in the text and calculate the embeddings
        for i, word in enumerate(X['prep_content']):
            if word in model:
                embeddings[i, :] = model[word]
            else:
                embeddings[i, :] = np.random.randn(self.embedding_size)
        
        # Add the embeddings to the DataFrame
        for j in range(embeddings.shape[1]):
            X[f'embedding_{j}'] = embeddings[:, j]
        
        return X

# Transformer for TFIDF embeddings
class TFIDFEmbeddings(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.tfidf_vectorizer = TfidfVectorizer()
    
    def fit(self, X, y=None):
        self.tfidf_vectorizer.fit(X['prep_content'])
        return self
    
    def transform(self, X):
        # Fit the TF-IDF vectorizer if it's not fitted
        if not hasattr(self.tfidf_vectorizer, 'idf_'):
            self.fit(X)
        
        # Calculate the TF-IDF embeddings
        tfidf_embeddings = self.tfidf_vectorizer.transform(X['prep_content'])

        # Add the embeddings to the DataFrame
        for i in range(tfidf_embeddings.shape[1]):
            X[f'tfidf_embedding_{i}'] = tfidf_embeddings[:, i].toarray().flatten()
        
        n_features = len(self.tfidf_vectorizer.vocabulary_)

        return X, n_features
    
class BERTEmbeddings(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.tokenizer = AutoTokenizer.from_pretrained("GroNLP/bert-base-dutch-cased")
        self.model = TFAutoModel.from_pretrained("GroNLP/bert-base-dutch-cased")  # Tensorflow
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        embeddings = np.zeros((len(X), 768))
        for i, text in enumerate(X['content']):
            input_ids = self.tokenizer.encode(str(text), add_special_tokens=True, return_tensors="tf")
            output = self.model(input_ids)
            embeddings[i] = np.max(output.last_hidden_state.numpy(), axis=1)
        
        for j in range(embeddings.shape[1]):
            X[f'embedding_{j}'] = embeddings[:, j]
        
        return X    
 # Transformer for baseline model   
class BaselineAnalysis(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        avg_entropy_per_source = X.groupby('name')['entropy'].mean()
        X['avg_entropy'] = X['name'].map(avg_entropy_per_source)

        mse = mean_squared_error(X['entropy'], X['avg_entropy'])
        print("Baseline: Mean Squared Error:", mse)

        errors = X['entropy'] - X['avg_entropy']
        squared_errors = errors ** 2        
        variance = np.var(squared_errors)
        print("Baseline: Variance of Squared Errors:", variance)

        std = np.std(squared_errors)
        print("Baseline: STD of Squared Errors:", std)
        
        # Bootstrapping to calculate confidence intervals around MSE
        n_iterations = 1000  # Number of bootstrap iterations
        mse_bootstrapped = []
        for _ in range(n_iterations):
            # Resample the squared errors
            resampled_errors = np.random.choice(squared_errors, size=len(squared_errors), replace=True)
            # Calculate MSE from resampled errors
            mse_bootstrapped.append(np.mean(resampled_errors))

        # Calculate the lower and upper percentiles for confidence interval
        lower_percentile = 10  # 2.5th percentile
        upper_percentile = 90  # 97.5th percentile
        ci_lower = np.percentile(mse_bootstrapped, lower_percentile)
        ci_upper = np.percentile(mse_bootstrapped, upper_percentile)
        print(f"Baseline: MSE Confidence Interval ({lower_percentile}%, {upper_percentile}%): ({ci_lower}, {ci_upper})")

        plt.figure(figsize=(8, 6))
        plt.hist(errors, bins=20, edgecolor='black', alpha=0.75)
        plt.xlabel('Error')
        plt.ylabel('Frequency')
        plt.title('Baseline: Distribution of Errors in Test Set')
        plt.show()

        return X 
      
 # Transformer for Linear Regression    
class LinearRegressionAnalysis(BaseEstimator, TransformerMixin):
    def __init__(self, embedding_option, regularization=None, alpha=1.0):
        self.embedding_option = embedding_option
        self.embedding_step = None
        self.regularization = regularization
        self.alpha = alpha
        self.model = None
    
    def fit(self, X, y=None):
        

        # Prepare the input features based on the chosen embedding option
        if self.embedding_option == 'gensim':
            self.embedding_step = GensimEmbeddings(model_path='INPUT EMBEDDING PATH', embedding_size=160)
            X = self.embedding_step.transform(X)
            # Prepare the input features
            X_features = X.iloc[:, -160:]
        elif self.embedding_option == 'tfidf':
            self.embedding_step = TFIDFEmbeddings()
            X, n_features = self.embedding_step.transform(X)
            X_features = X.iloc[:, -n_features:]
        elif self.embedding_option == 'bert':
            self.embedding_step = BERTEmbeddings()
            X = self.embedding_step.transform(X)
            # Prepare the input features
            X_features = X.iloc[:, -768:]
        
        # Define and train the linear regression model
        if self.regularization == 'ridge':
            self.model = Ridge(alpha=self.alpha)
        elif self.regularization == 'lasso':
            self.model = Lasso(alpha=self.alpha)
        else:
            self.model = LinearRegression()
            
        # Prepare the target variable
        y_target = X['entropy']
        
        # Split the data into training and test sets
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(X_features, y_target, test_size=0.2, random_state=42)
        
        # Perform cross-validation
        cv = KFold(n_splits=5, shuffle=True, random_state=42)
        scores = cross_val_score(self.model, X_features, y_target, scoring='neg_mean_squared_error', cv=cv)

        # Print the cross-validation scores
        print('Cross-Validation Scores:', scores)
        print('Average Cross-Validation MSE:', -np.mean(scores))
        print('Variance Cross-Validation MSE:', np.var(scores))
        
        self.model.fit(self.X_train, self.y_train)
        
        return self
        
    def transform(self, X):
        if self.embedding_step is not None:
            X = self.embedding_step.transform(X)
    
        # Make predictions on the test set
        y_pred = self.model.predict(self.X_test)

        # Calculate the errors
        errors = self.y_test - y_pred
        
        # Plot the distribution of errors
        plt.figure(figsize=(8, 6))
        plt.hist(errors, bins=20, edgecolor='black', color='red', alpha=0.75)
        plt.xlabel('Error')
        plt.ylabel('Frequency')
        plt.title('Linear Regression: Distribution of Errors in Test Set')
        plt.show()
        
        # Plot the distribution of entropy scores in the Train set
        plt.figure(figsize=(8, 6))
        plt.hist(self.y_train, bins=20, edgecolor='black', color='red', alpha=0.75)
        plt.xlabel('Entropy')
        plt.ylabel('Frequency')
        plt.title('Linear Regression: Distribution of Entropy Scores in Train Set')
        plt.show()
        
        # Plot the distribution of entropy scores in the Test set
        plt.figure(figsize=(8, 6))
        plt.hist(self.y_test, bins=20, edgecolor='black', color='red', alpha=0.75)
        plt.xlabel('Entropy')
        plt.ylabel('Frequency')
        plt.title('Linear Regression: Distribution of Entropy Scores in Test Set')
        plt.show()
        
        # Evaluate model performance
        mse = mean_squared_error(self.y_test, y_pred)
        r2 = r2_score(self.y_test, y_pred)
        var = np.var(errors)
        std = np.std(errors)
        
        # Print the evaluation metrics
        print(f'Linear Regression: Mean Squared Error: {mse:.4f}')
        print(f'Linear Regression: R-squared: {r2:.2f}')
        print(f'Linear Regression: Variance of Errors: {var:.4f}')
        print(f'Linear Regression: Standard Deviation of Errors: {std:.4f}')
        
        # Bootstrapping to calculate confidence intervals around MSE
        # Calculate the squared errors
        squared_errors = errors**2
        n_iterations = 1000  # Number of bootstrap iterations
        mse_bootstrapped = []
        for _ in range(n_iterations):
            # Resample the squared errors
            resampled_errors = np.random.choice(squared_errors, size=len(squared_errors), replace=True)
            # Calculate MSE from resampled errors
            mse_bootstrapped.append(np.mean(resampled_errors))

        # Calculate the lower and upper percentiles for confidence interval
        lower_percentile = 10  # 2.5th percentile
        upper_percentile = 90  # 97.5th percentile
        ci_lower = np.percentile(mse_bootstrapped, lower_percentile)
        ci_upper = np.percentile(mse_bootstrapped, upper_percentile)
        print(f"Linear Regression: MSE Confidence Interval ({lower_percentile}%, {upper_percentile}%): ({ci_lower}, {ci_upper})")
        
        return X

chosen_embedding = 'bert'  # INPUT CHOSEN EMBEDDING HERE
chosen_regularization = 'lasso'  # INPUT CHOSEN REGULARIZATION ('ridge' or 'lasso')
alpha_value = 1.0  # INPUT ALPHA VALUE HERE

pipeline = Pipeline([
    ('csv_reader', CSVReader('INPUT FILE PATH')), # INPUT FILE
    ('baseline_analysis', BaselineAnalysis()),
    ('linear_regression', LinearRegressionAnalysis(embedding_option=chosen_embedding, regularization=chosen_regularization, alpha=alpha_value)),
])

embedding_options = {
    'gensim': GensimEmbeddings(model_path='INPUT EMBEDDING PATH', embedding_size=160), #INPUT WORD2VEC MODEL PATH
    'tfidf': TFIDFEmbeddings(),
    'bert': BERTEmbeddings(),
}

pipeline.set_params(linear_regression__embedding_option=chosen_embedding)
df_transformed = pipeline.fit_transform(None)
