# Prep

In [1]:
!pip install pyspark
!pip install --upgrade pip
!pip install --upgrade pyspark

[0m

In [2]:
import pandas as pd
import numpy as np

In [3]:
from google.colab import drive
drive.mount('/content/drive/')

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [4]:
cd '/content/drive/MyDrive/Colab Notebooks/UC3M/Big Data Intelligence/Assignment 2 - Spark'

/content/drive/MyDrive/Colab Notebooks/UC3M/Big Data Intelligence/Assignment 2 - Spark


In [5]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Pyspark Assignment") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()
sc = spark.sparkContext

# This is the spark context
print(spark)
print(sc)

<pyspark.sql.session.SparkSession object at 0x798e8b91db40>
<SparkContext master=local[*] appName=Pyspark Assignment>


In [6]:
df = pd.read_csv('wind_available_second.csv.gz', compression='gzip')
df.head(2)

Unnamed: 0,energy,year,month,day,hour,p54_162_1,p54_162_2,p54_162_3,p54_162_4,p54_162_5,...,v100_16,v100_17,v100_18,v100_19,v100_20,v100_21,v100_22,v100_23,v100_24,v100_25
0,402.71,2005,1,2,18,2534970.0,2526864.0,2518754.0,2510648.0,2502537.0,...,-4.683596,,-4.407196,,-4.131295,-4.669626,-4.528932,-4.388736,-4.24854,-4.107846
1,696.8,2005,1,3,0,,,2521184.0,2513088.0,,...,-3.397886,-3.257192,-3.115998,-2.975304,-2.834609,-3.39639,-3.254198,-3.112506,-2.970314,


In [7]:
from pyspark.ml.feature import PCA
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler

from sklearn.feature_selection import SelectKBest, f_regression,SelectPercentile

from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import DecisionTreeRegressor

import pyspark.ml.pipeline as pl
from pyspark.ml.feature import StandardScaler, VectorAssembler, Imputer
from sklearn.impute import SimpleImputer
from pyspark.ml.feature import  UnivariateFeatureSelector

from pyspark.ml import Pipeline
from pyspark.ml.feature import PCA
from pyspark.ml.evaluation import RegressionEvaluator


In [8]:
rand_state = 100510993

test = df[df['year'] == 2009]
train = df[df['year'] != 2009]

train_sdf = spark.createDataFrame(train)
test_sdf = spark.createDataFrame(test)

# rename energy col
train_sdf = train_sdf.withColumnRenamed("energy", "label")
test_sdf = test_sdf.withColumnRenamed("energy", "label")

## Pipeline

Base pipeline consisting of an imputer for missing values to replace them with the mean of each column, a vector assembler to combine features into a single vector, a scaler for feature normalization, and a linear regression model. We will use this pipeline as a base for the various feature selection methods to be applied later.

In [9]:
# Imputer
imputer = Imputer(
    inputCols=train_sdf.columns,
    outputCols=[col for col in train_sdf.columns],
    strategy="mean"
)

# Vector assembler
ignore = ['label']
assembler = VectorAssembler(
    inputCols=[x for x in train_sdf.columns if x not in ignore],
    outputCol='features')

# Scaler
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)

# Linear Regression
lr = LinearRegression(featuresCol='scaledFeatures', labelCol='label')


pipeline = Pipeline(stages=[imputer, assembler, scaler, lr])

model = pipeline.fit(train_sdf)

In [10]:
predictions = model.transform(test_sdf)

# PART 1

 UnivariateFeatureSelector for feature selection (selection threshold of 0.05)
 - FPR (False Positive Rate)
 - FWE (Family-wise Error Rate) -> more conservative -> expecting a lower number of features selected

## Feature Selection with UnivariateFeatureSelector (FPR Strategy)

In [11]:
# FPR
selector_fpr = UnivariateFeatureSelector(featuresCol="scaledFeatures", outputCol="selectedFeaturesFpr",
                                         labelCol="label", selectionMode="fpr")

selector_fpr.setFeatureType("continuous").setLabelType("continuous").setSelectionThreshold(0.05)
lr_fpr = LinearRegression(featuresCol='selectedFeaturesFpr', labelCol='label')

pipeline_fpr = Pipeline(stages=[imputer, assembler, scaler, selector_fpr, lr_fpr])

model_fpr = pipeline_fpr.fit(train_sdf)
predictions_fpr = model_fpr.transform(test_sdf)

## Feature Selection with UnivariateFeatureSelector (FWE Strategy)

In [12]:
# FWE
selector_fwe = UnivariateFeatureSelector(featuresCol="features", outputCol="selectedFeaturesFWE",
                                         labelCol="label", selectionMode="fwe")
selector_fwe.setFeatureType("continuous").setLabelType("continuous").setSelectionThreshold(0.05)

lr_fwe = LinearRegression(featuresCol='selectedFeaturesFWE', labelCol='label')

pipeline_fwe = Pipeline(stages=[imputer, assembler, scaler, selector_fwe, lr_fwe])

model_fwe = pipeline_fwe.fit(train_sdf)
predictions_fwe = model_fwe.transform(test_sdf)

## PCA with 3 Components

In [13]:
# PCA
pca = PCA(k=3, inputCol="scaledFeatures", outputCol="pcaFeatures")

lr_pca = LinearRegression(featuresCol='pcaFeatures', labelCol='label')

pipeline_pca = Pipeline(stages=[imputer, assembler, scaler, pca, lr_pca])

model_pca = pipeline_pca.fit(train_sdf)
predictions_pca = model_pca.transform(test_sdf)

# PART 2

Combining feature selection based on PCR and PCA.

In [14]:
# from before
imputer = Imputer(
    inputCols=train_sdf.columns,
    outputCols=[col for col in train_sdf.columns],
    strategy="mean"
)

ignore = ['label']
assembler = VectorAssembler(
    inputCols=[x for x in train_sdf.columns if x not in ignore],
    outputCol='features')

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)

lr_combined = LinearRegression(featuresCol="combinedFeatures", labelCol="label")

# fpr
selector_fpr = UnivariateFeatureSelector(featuresCol="scaledFeatures", outputCol="selectedFeaturesFpr",
                                         labelCol="label", selectionMode="fpr")
selector_fpr.setFeatureType("continuous").setLabelType("continuous").setSelectionThreshold(0.05)

# PCA
pca = PCA(k=3, inputCol="scaledFeatures", outputCol="pcaFeatures") #scaledFeatures

# Combine FPR features and PCA components
assembler_combined = VectorAssembler(inputCols=["selectedFeaturesFpr", "pcaFeatures"], outputCol="combinedFeatures")


pipeline_combined = Pipeline(stages=[imputer, assembler, scaler, selector_fpr, pca, assembler_combined, lr_combined])
model_combined = pipeline_combined.fit(train_sdf)
predictions_combined = model_combined.transform(test_sdf)

## Model Evaluation

In [15]:
# evaluators for  MAE & RMSE
evaluator_mae = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")
evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

#MAE
mae_base = evaluator_mae.evaluate(predictions)
mae_fpr = evaluator_mae.evaluate(predictions_fpr)
mae_fwe = evaluator_mae.evaluate(predictions_fwe)
mae_pca = evaluator_mae.evaluate(predictions_pca)
mae_combined = evaluator_mae.evaluate(predictions_combined)

print("MAE")
print(f"Base Pipeline: {mae_base}")
print(f"FPR: {mae_fpr}")
print(f"FWE: {mae_fwe}")
print(f"PCA: {mae_pca}")
print(f"Combined: {mae_combined}")


#RMSE
rmse_base = evaluator_rmse.evaluate(predictions)
rmse_fpr = evaluator_rmse.evaluate(predictions_fpr)
rmse_fwe = evaluator_rmse.evaluate(predictions_fwe)
rmse_pca = evaluator_rmse.evaluate(predictions_pca)
rmse_combined = evaluator_rmse.evaluate(predictions_combined)

print("\nRMSE")
print(f"Base Pipeline: {rmse_base}")
print(f"FPR: {rmse_fpr}")
print(f"FWE: {rmse_fwe}")
print(f"PCA: {rmse_pca}")
print(f"Combined: {rmse_combined}")


MAE
Base Pipeline: 403.0693682068891
FPR: 403.91288914768865
FWE: 404.22277479980056
PCA: 500.5333706603713
Combined: 400.69015238439994

RMSE
Base Pipeline: 532.1118163982943
FPR: 529.0331249949043
FWE: 530.5544449517731
PCA: 619.8620192323816
Combined: 530.4409593542097


The results indicate that **feature selection can improve or at least maintain model performance with fewer features.**

**FWE** feature selection, is able to maintain a decent error rate, comparable to the basic model with all the features, but with less. Which we see as an improvememnt.

**The Combined strategy was the most effective in reducing the number of features while maintaining model performance with an MAE of aproximately 400.**

To be noted is the very bad performance of the PCA. From our research, we think the main ideas leading to this are:
   
   - **Redundant or Irrelevant Features**: features selected by one method may not complement the features selected by another. For example as PCA is unsupervised and selects features based on variance it does not have much predictive power. If the variance doesn't align well with the target variable, PCA components might not be useful, hence also the PCA RMSE score.  
   - **Feature Interactions**: features selected through FPR are individually hence but their combination with PCA features might be leading the good results of the combined model.

# PART 3

Determining Number of Features Selected

In [18]:
selector_model = model_fpr.stages[3]
selected_features = selector_model.selectedFeatures

print("Number of Selected Features FPR:", len(selected_features))

Number of Selected Features FPR: 522


In [19]:
selector_model = model_fwe.stages[3]
selected_features = selector_model.selectedFeatures

print("Number of Selected Features FWE:", len(selected_features))

Number of Selected Features FWE: 495


Using FPR strategy, from the original 550 features, 522 were kept, while 28 were discarded. Using FWE, only 495 features were kept.

# Conclusions

As we know based on our previous assignment, the dataset is time-series, with relevant patterns of seasonality. Last time we did feature selection by hand, focusing on a limited number of them. Now on the Feature Selection procedure.

**In conclusion**, our analysis suggests that **feature selection techniques such as FPR and FWE can enhance model performance over the base model, however the relevance of features in time series can fluctuate over time**.

**PCA's as an unsupervised method to reduce dimensionality** does not account for new information as it emerges, particularly in dataset like ours which presents seasonal patterns and temporal shifts.

On the other hand,  **FWE's conservative looks more suitable** for time series, as it selects features with robust statistical significance potentially offering a more stable signal.

**The combination between features selected by FPR is suitable when combined with PCA components**, hence the performance.

In conclusion the FWE and Combined FPR+ PCA strategies showed improved/stable performance over the base model.