# PySpark in Machine Learning: practical example

### **Introduction**

In the current analysis, we engage with the task of refining a predictive model by leveraging PySpark capabilities, focusing on a dataset characterized by wind metrics. Our primary goal is to enhance the accuracy of predictions through effective feature selection, maintaining model performance with a reduced set of variables (we will use the dataset of the wind metrics in order to predict the necessary amount of energy).

The research is structured into four critical stages:

1. Data Division: We will commence by segregating the original dataset into distinct training and testing sets, an essential step for an impartial evaluation of the predictive models.

2. Model Formulation and Assessment: Our approach includes the construction and evaluation of three pipelines, each integrating diverse feature selection techniques:

3. Employing the UnivariateFeatureSelector with an FPR (False Positive Rate) strategy for its broad inclusiveness in feature selection.
    - Utilizing the same selection methodology but with an FWE (Family-Wise Error) strategy, noted for its stringent criteria to minimize the likelihood of Type I errors.
    - Incorporating PCA (Principal Component Analysis) to condense the dataset to its three most informative components, aiming to simplify the model.
    - Conjoint Feature Selection: We plan to develop an additional pipeline that combines the merits of features selected via the FPR strategy with the dimensionality reduction achieved through PCA.

4. Feature Selection Quantification: The final phase involves determining the number of features selected by the FPR and FWE strategies, offering insights into the selectivity of each method.


Data preparation steps such as imputation of missing values, normalization of features, and vector assembly will be methodically executed to ready the data for modeling. The performance of the models will be quantified using Mean Absolute Error (MAE) and Root Mean Squared Error (RMSE).

The investigation is directed towards striking a balance between the number of features and the precision of the model. It also seeks to identify the most appropriate feature selection approaches in line with the specific nature of the modeling task and data characteristics.


### 1.**Splitting the data.**

First, we create a spark session

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
        .appName("My PySpark code for homework2") \
        .config("spark.driver.memory", "4g") \
        .getOrCreate()


We upload our data file (csv) and convert it into a dataframe. 

In [3]:
df = spark.read.csv('wind_available_second.csv', header=True, inferSchema=True)

Now we can see the visual representation of the df (first 5 rows):

In [4]:
df.show(5)

+-------+----+-----+---+----+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+--

Upon inspecting the initial rows of the DataFrame, it is evident that certain entries are missing, as indicated by null values across various rows and columns. Addressing these missing values is imperative for the subsequent stages, since regression models necessitate a dataset devoid of null entries to function optimally.


Additionally, an examination of the DataFrame's schema and the data types of its columns will be conducted. This step is crucial to understand the structure and nature of the data we are working with, ensuring that appropriate preprocessing techniques are applied before proceeding to the modeling phase.

In [5]:
# type of class:
print('Type:', type(df))
# printSchema of the df
print('SCHEMA:')
print(df.printSchema())
# dataframe's column names and their types
print('COLUMNS:', df.columns, sep='\n')
print('DTYPES:', df.dtypes, sep='\n')


Type: <class 'pyspark.sql.dataframe.DataFrame'>
SCHEMA:
root
 |-- energy: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- p54_162_1: double (nullable = true)
 |-- p54_162_2: double (nullable = true)
 |-- p54_162_3: double (nullable = true)
 |-- p54_162_4: double (nullable = true)
 |-- p54_162_5: double (nullable = true)
 |-- p54_162_6: double (nullable = true)
 |-- p54_162_7: double (nullable = true)
 |-- p54_162_8: double (nullable = true)
 |-- p54_162_9: double (nullable = true)
 |-- p54_162_10: double (nullable = true)
 |-- p54_162_11: double (nullable = true)
 |-- p54_162_12: double (nullable = true)
 |-- p54_162_13: double (nullable = true)
 |-- p54_162_14: double (nullable = true)
 |-- p54_162_15: double (nullable = true)
 |-- p54_162_16: double (nullable = true)
 |-- p54_162_17: double (nullable = true)
 |-- p54_162_18: double (nullable = true)
 |-- p

The dataset completely consists of numerical columns. Notably, columns representing datetime information, despite being numerical in format, essentially possess a categorical characteristic due to their inherent cyclical nature. This distinction is crucial for appropriate data handling and analysis.

In [6]:
# We can mark columns as cathegorical, by implementing a threshold for number of unique values
categorical = []
# Iterate over each column in the DataFrame
for i in df.columns:
    distinct_count = df.select(i).distinct().count()
    if distinct_count < 35: # threshold is 35 unique values
        # Append the column name to the categorical list
        categorical.append(i)
        print(f'Feature "{i}" is truly categorical')

# Display the list of categorical columns
print("Categorical columns:", categorical)

Feature "year" is truly categorical
Feature "month" is truly categorical
Feature "day" is truly categorical
Feature "hour" is truly categorical
Categorical columns: ['year', 'month', 'day', 'hour']


For the modeling process, our main point is the 'energy' column. A brief statistical analysis is conducted to grasp the distribution and central tendencies of this variable, providing essential insights for predictive modeling.

In [7]:
# statistical summary for the variable 'energy'
df.select("energy").summary().show()

+-------+-----------------+
|summary|           energy|
+-------+-----------------+
|  count|             4748|
|   mean|693.1262468407749|
| stddev|665.5316090221474|
|    min|             0.01|
|    25%|           144.17|
|    50%|           464.27|
|    75%|          1089.35|
|    max|          2792.55|
+-------+-----------------+



The dataset comprises 4748 entries, each representing a data point for analysis. Statistical exploration of the 'energy' variable reveals:

- The median value is substantially lower than the mean, indicating a potential right-skew in the data distribution.

- The standard deviation is approximately equal to the mean value of energy, suggesting a high variability in energy consumption or generation figures.

- The energy values range from a minimum of 0.01 to a maximum of 2792.55, highlighting a vast spread in the dataset.

- The interquartile range reflects significant dispersion, with 25% of the values below 144.17 and 75% below 1089.35, which may point to the presence of outliers.

- The median energy value stands at 464.27, reinforcing the asymmetry in the data.


The presence of null values within the DataFrame has been noted, necessitating their identification and subsequent imputation. By employing PySpark's built-in functions to count null entries, we can pinpoint which columns contain these missing values, thus preparing the dataset for the rigorous demands of regression analysis.

In [8]:
# import necessary commands.
from pyspark.sql.functions import isnan, when, count, col
# we count number of null values in pyspark.
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+------+----+-----+---+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+-----

To address the missing data within our dataset, a systematic approach is employed to locate and quantify null instances across the columns. Utilizing PySpark's functionality, we execute a count of null values for each column. This process aids in pinpointing specific columns that require attention before advancing with data modeling.

In [9]:
# Creating list of columns with nan values
missing_values = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict() # interpret data into dictionary
columns_with_missing_values = [k for k, v in missing_values.items() if v > 0] # creating list based on values of the columns; unpacking dictionary
columns_with_missing_values[:5]

['p54_162_1', 'p54_162_2', 'p54_162_3', 'p54_162_4', 'p54_162_5']

The resulting list of columns with missing values, derived from this operation, facilitates targeted data cleaning efforts. This step is critical to ensure the integrity and completeness of the dataset, thus laying a robust foundation for the accuracy of our predictive models.

For our understanding, we quantify the proportion of NaN (Not a Number) values present within each variable. This analysis is critical as it sheds light on the extent of data imputation required, which is a pivotal preprocessing step to ensure the robustness of our regression models.

In [10]:
# identify % of empty shares to estimate the severity. 
missing_vals_percentage = df.select([(count(when(isnan(c) | col(c).isNull(), c)) / df.count() * 100).alias(c) for c in df.columns])
missing_vals_percentage.show()

# setting a dictionary with share values.
missing_vals_list = missing_vals_percentage.collect()[0].asDict()
# set a list of shares with null.
missing_vals_perc_list = [p for c, p in missing_vals_list.items() if p > 0]

print('Max:', str(round(max(missing_vals_perc_list), 2)) + ' %')
print('Min:', str(round(min(missing_vals_perc_list), 2)) + ' %')

+------+----+-----+---+----+-----------------+-----------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+-----------------+-----------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+------------------+-----------------+---------

The observed NaN values across different variables vary, ranging from approximately 5% to 20%. Addressing these missing values is a prerequisite for reliable modeling. To that end, an imputation strategy will be employed to fill these gaps, ensuring a complete dataset is presented to the model during the pipeline's assembly phase.

The dataset includes columns with date and time data, which inherently exhibit cyclical behavior, thus influencing the modeling approach. While columns like year, month, and day might be treated as quasi-categorical due to their discrete nature, the cyclical aspect of time-related data requires a distinct treatment to capture the temporal dynamics effectively. Although PySpark does not directly support gradient boosting models such as LightGBM or XGBoost, we can adapt by incorporating linear regression within our pipelines, necessitating a different strategy for temporal data.

To make temporal data more interpretable by linear models used in PySpark, we resort to trigonometric transformations, namely sine and cosine functions. This method effectively transforms time-related data into continuous features that retain their cyclical properties within a linear framework. For instance, the hours of a day are translated into a continuous cycle using these trigonometric functions, ensuring the model can recognize the transition from the end of one day to the beginning of the next, like midnight to early morning hours.


To demonstrate the effectiveness of this transformation, we will examine and contrast the outcomes of models using the original dataframe 'df' and the modified dataframe 'df_t', where datetime columns have been appropriately transformed.

Changed DF:

In [11]:
from pyspark.sql.functions import cos, sin, radians, col, min, lit

# Calculate the minimum year to normalize it
min_year = df.select(min("year")).collect()[0][0]

# Normalize year, and create cyclical features for month, day, and hour
df_t = df.withColumn("year", col("year") - lit(min_year)+1) \
       .withColumn("month_cos", cos(radians(col("month") * (360/12)))) \
       .withColumn("month_sin", sin(radians(col("month") * (360/12)))) \
       .withColumn("day_cos", cos(radians(col("day") * (360/31)))) \
       .withColumn("day_sin", sin(radians(col("day") * (360/31)))) \
       .withColumn("hour_cos", cos(radians(col("hour") * (360/24)))) \
       .withColumn("hour_sin", sin(radians(col("hour") * (360/24))))

# Drop original columns
df_t = df_t.drop(*['month', 'day', 'hour'])

# Show the transformed DataFrame
df_t.show(5, truncate=False)


+-------+----+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+

The next step involves dividing the data into subsets for training and testing purposes. Given the dataset's manageable size, a simple two-way split suffices. However, it is important to note the distribution of data across different years, which may have implications for the training process and the model's eventual performance.

In [12]:
df.groupBy("year").count().orderBy("count").show()

+----+-----+
|year|count|
+----+-----+
|2008|  178|
|2009|  921|
|2007| 1121|
|2005| 1256|
|2006| 1272|
+----+-----+



The dataset exhibits a notable imbalance in the number of records across the years, with a higher concentration of data points between 2005 and 2007 compared to the years 2008 and 2009. This discrepancy should be taken into account when partitioning the data to ensure that the training and testing sets are representative of the overall dataset.

For second df:

In [13]:
df_t.groupBy("year").count().orderBy("count").show()

+----+-----+
|year|count|
+----+-----+
|   4|  178|
|   5|  921|
|   3| 1121|
|   1| 1256|
|   2| 1272|
+----+-----+



Acknowledging the significance of maintaining chronological integrity in the dataset, a stratified approach to dividing the data is employed:

- For the original dataset, we allocate records from 2005 to 2007 to the training set and those from 2008 to 2009 to the testing set.

- Similarly, for the transformed dataset df_t, we categorize years 1 to 3 for training purposes, while years 4 to 5 are reserved for testing. This strategy ensures the separation of training and testing data by distinct time periods, adhering to the structure established in the preceding task.

In [14]:
# Splitting data
train_set = df.filter(col("year") < 2008)
test_set = df.filter(col("year") > 2007)

train_set.show(5)
test_set.show(5)

+-------+----+-----+---+----+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+--

In [15]:
# Splitting transformed data
train_set_t = df_t.filter(df_t['year'] <= 3)
test_set_t = df_t.filter(df_t['year'] > 3)

train_set_t.show(5)
test_set_t.show(5)

+-------+----+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+

### Imputer

The following phase involves utilizing the Imputer from PySpark's ml.feature library to address missing values within our dataset. This tool provides us with three distinct statistical strategies for imputation:

- Mean: Replaces missing values with the mean of the available data.

- Median: Utilizes the median of the data, offering robustness against outliers.

- Mode: Employs the most frequently occurring value in the dataset.

The imputation process will be conducted separately on both the training and testing datasets. The choice of strategy is paramount, as it can significantly influence the model's performance. Therefore, we will apply all three methods and assess their impact on the model's accuracy. The optimal strategy, which yields the most accurate predictions, will then be selected for subsequent stages of the modeling process.

Original:

In [16]:
from pyspark.ml.feature import Imputer

# Creating Imputer object
imputer = Imputer(
    inputCols=columns_with_missing_values,
    outputCols=["{}_imputed".format(c) for c in columns_with_missing_values]
).setStrategy("mean")  # or 'median' or 'mode'

# Fit the Imputer on the training set and transform the training set
train_set = imputer.fit(train_set).transform(train_set)

# Transform the test set using the same imputer model
test_set = imputer.fit(train_set).transform(test_set)

# Selecting columns for the training set
columns_to_select_train = [col(c) for c in train_set.columns if c not in columns_with_missing_values]
columns_to_select_train += [col(f"{c}_imputed").alias(c) for c in columns_with_missing_values]
train_set = train_set.select(columns_to_select_train)

# Selecting columns for the testing set
columns_to_select_test = [col(c) for c in test_set.columns if c not in columns_with_missing_values]
columns_to_select_test += [col(f"{c}_imputed").alias(c) for c in columns_with_missing_values]
test_set = test_set.select(columns_to_select_test)

# Show the results
train_set.show(5)
test_set.show(5)


+-------+----+-----+---+----+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+--------------

Second set:

In [17]:
from pyspark.ml.feature import Imputer

# Creating Imputer object
imputer2 = Imputer(
    inputCols=columns_with_missing_values,
    outputCols=["{}_imputed".format(c) for c in columns_with_missing_values]
).setStrategy("mean")

# Fit the Imputer on the training set
imputer_model = imputer2.fit(train_set_t)

# Transform the training and test sets using the fitted imputer model
train_set_t = imputer_model.transform(train_set_t)
test_set_t = imputer_model.transform(test_set_t)

# Select necessary columns, as we are renaming the imputed columns
columns_to_select_train_t = [col(c) for c in train_set_t.columns if c not in columns_with_missing_values] + [col(f"{c}_imputed").alias(c) for c in columns_with_missing_values]
train_set_t = train_set_t.select(columns_to_select_train_t)

columns_to_select_test_t = [col(c) for c in test_set_t.columns if c not in columns_with_missing_values] + [col(f"{c}_imputed").alias(c) for c in columns_with_missing_values]
test_set_t = test_set_t.select(columns_to_select_test_t)

# Show the results
train_set_t.show(5)
test_set_t.show(5)


+-------+----+------------------+-------------------+------------------+-------------------+--------------------+--------------------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+---

### Assembler

Moving forward in the data preparation pipeline, the implementation of the VectorAssembler from PySpark's ml.feature module is the subsequent step. This tool is critical in the pre-modeling phase as it aggregates the chosen features into a unified vector column. Such consolidation is vital for the subsequent modeling process, particularly for PySpark's machine learning algorithms.

The VectorAssembler constructs a single dense vector from the selected predictor columns, designated as 'features', and pairs them with the response variable, labeled as 'label'. Here 'energy' serves as the target variable. This transformation into a dense vector format is not only a requirement for PySpark's algorithms but also enhances computational efficiency.

First:

In [18]:
from pyspark.ml.feature import VectorAssembler

# Features and label for the training set
train_set = train_set.withColumnRenamed("energy", "label")
input_cols_train = [c for c in train_set.columns if c != 'label']
assembler_train = VectorAssembler(inputCols=input_cols_train, outputCol="features")
train_set = assembler_train.transform(train_set).select("label", "features")

# Features and label for the testing set
test_set = test_set.withColumnRenamed("energy", "label")
input_cols_test = [c for c in test_set.columns if c != 'label']
assembler_test = VectorAssembler(inputCols=input_cols_test, outputCol="features")
test_set = assembler_test.transform(test_set).select("label", "features")

# Display the transformed datasets
print('After assembler (Training Set):')
train_set.show(5, truncate=False)

print('After assembler (Testing Set):')
test_set.show(5, truncate=False)


After assembler (Training Set):
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Second assembler:

In [19]:
from pyspark.ml.feature import VectorAssembler

# Features and label for the training set
train_set_t = train_set_t.withColumnRenamed("energy", "label")
input_cols_train_t = [c for c in train_set_t.columns if c != 'label']
assembler_train_t = VectorAssembler(inputCols=input_cols_train_t, outputCol="features")
train_set_t = assembler_train_t.transform(train_set_t).select("label", "features")

# Features and label for the testing set
test_set_t = test_set_t.withColumnRenamed("energy", "label")
input_cols_test_t = [c for c in test_set_t.columns if c != 'label']
assembler_test_t = VectorAssembler(inputCols=input_cols_test_t, outputCol="features")
test_set_t = assembler_test_t.transform(test_set_t).select("label", "features")

# Display the transformed datasets
print('After assembler (Training Set):')
train_set_t.show(5, truncate=False)

print('After assembler (Testing Set):')
test_set_t.show(5, truncate=False)

After assembler (Training Set):
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

We are now at the stage where we can begin constructing pipelines.

### 2. **Formulate three pipelines, train and evaluate them:**

### Pipelines ###

As part of our model training process, we'll implement feature standardization using PySpark's StandardScaler. This step is crucial because it ensures that all features contribute equally to the model's predictions. Without standardization, features with larger scales could dominate the model's learning process, potentially leading to suboptimal performance. By standardizing the features, we aim to improve the model's sensitivity to all variables, thus enhancing its predictive accuracy. This step is particularly important given the diverse range of features in our dataset, from wind speeds to temperature readings, each with its unique scale and distribution.
(this part will be present in the pipeline stage)

### a) UnivariateFeatureSelector and the fpr strategy

The current strategy is a feature selection method used to reduce the number of features to the amount of the most impactfull on the model. to those most likely to have a real impact on the predictive model. This method tests each feature individually to see if it has a statistically significant relationship with the outcome variable.

The fpr strategy uses a criterion based on the false positive rate to select features - threshold is defined by the statistical tests, such as: p-value, ANOVA F-test.
For model assessment we will use MAE and RMSE.

* Mean Absolute Error (MAE)

The MAE is calculated as the average of the absolute differences between the predicted and actual values. It provides a straightforward measure of the average magnitude of errors in predictions.

Formula:
$$ \text{MAE} = \frac{1}{n} \sum_{i=1}^{n} \left| y_i - \hat{y}_i \right| $$
where $y_i$ is the actual value, $\hat{y}_i$ is the predicted value, and $n$ is the number of observations.

* Root Mean Squared Error (RMSE)

The RMSE emphasizes larger errors by squaring the differences before averaging, and then taking the square root. It is sensitive to the variance in prediction errors, making it useful when large errors are particularly undesirable.

Formula:
$$ \text{RMSE} = \sqrt{\frac{1}{n} \sum_{i=1}^{n} \left( y_i - \hat{y}_i \right)^2} $$
where $y_i$ is the actual value, $\hat{y}_i$ is the predicted value, and $n$ is the number of observations.



In [20]:
# Upload necessary libraries
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import UnivariateFeatureSelector, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

### Original dataset:

In [21]:
#Configure the feature selector fpr

#standardize feature data by implementing scaler:

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

# Initialize the feature selector 
selector = UnivariateFeatureSelector(featuresCol="scaledFeatures", outputCol="selectedFeatures", labelCol="label", selectionMode="fpr")
selector.setFeatureType("continuous").setLabelType("continuous").setSelectionThreshold(0.05) # we set threshold to the 0.05, according to standard practice


# Initialize the linear regression model
lr = LinearRegression(featuresCol="selectedFeatures", labelCol="label")

# Assemble the pipeline with the stages
pipeline = Pipeline(stages=[scaler, selector, lr])

# Train the pipeline on the training set
model = pipeline.fit(train_set)

# Make predictions on the test set
predictions = model.transform(test_set)

# Show some predictions - first 10 iterations
predictions.select("prediction", "label").show(10)

# Evaluate the model using MAE
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="mae")
mae = evaluator.evaluate(predictions)
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})

# Print the evaluation result
print(f"Mean Absolute Error (MAE) on test data: {mae}")
print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")

+------------------+-------+
|        prediction|  label|
+------------------+-------+
| 534.9765694018024|   46.0|
| 892.0743692610231|  948.0|
|432.89435903657795| 552.74|
| 835.4185905655249|1686.08|
|1318.4696927442237|1186.88|
| 912.9932127017528| 809.92|
|1911.8743192886723|1528.01|
|1250.2857487930887| 803.05|
| 532.5726873406929|1715.42|
|  903.394381426253|1310.24|
+------------------+-------+
only showing top 10 rows

Mean Absolute Error (MAE) on test data: 411.7940564109689
Root Mean Squared Error (RMSE) on test data: 537.2946505121483


Utilizing the mode strategy for imputation, we observed:

- **MAE of 427.10**: This metric reflects the average absolute discrepancy between our model's predictions and the actual values, with a lower MAE indicating improved model accuracy.
- **RMSE of 550.77**: This value provides insight into the average size of the model's prediction errors. Since the RMSE squares the errors, it gives more weight to larger errors, making it a critical measure of prediction precision.

When comparing with alternative imputation strategies, the following was noted:

With the mean strategy for imputation:

- **MAE on test data: 411.79**, suggesting a marginally improved accuracy in average error magnitude.
- **RMSE on test data: 537.29**, indicating a modest enhancement in predictive accuracy.

With the median strategy for imputation:

- **MAE on test data: 416.89**, showing performance that lies between the mode and mean strategies.
- **RMSE on test data: 544.83**, which is slightly better than the mode strategy's performance.

The inference drawn is that imputing with the mean value leads to more precise model predictions compared to the mode and median strategies. For the subsequent stages of this assignment, while the mean method will be the primary imputation technique, all three methods will still be evaluated to ensure thorough verification.


### Version for the transformed datetime:

In [22]:
#Configure the feature selector fpr

#standardize feature data by implementing scaler:

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

# Initialize the feature selector 
selector = UnivariateFeatureSelector(featuresCol="scaledFeatures", outputCol="selectedFeatures", labelCol="label", selectionMode="fpr")
selector.setFeatureType("continuous").setLabelType("continuous").setSelectionThreshold(0.05)


# Initialize the linear regression model
lr = LinearRegression(featuresCol="selectedFeatures", labelCol="label")

# Assemble the pipeline with the stages
pipeline = Pipeline(stages=[scaler, selector, lr])

# Train the pipeline on the training set
model = pipeline.fit(train_set_t)

# Make predictions on the test set
predictions = model.transform(test_set_t)

# Show some predictions - first 10 iterations
predictions.select("prediction", "label").show(10)

# Evaluate the model using MAE
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="mae")
mae = evaluator.evaluate(predictions)
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})

# Print the evaluation result
print(f"Mean Absolute Error (MAE) on test data: {mae}")
print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")

+------------------+-------+
|        prediction|  label|
+------------------+-------+
| 581.0740969466733|   46.0|
| 866.7566634376471|  948.0|
|422.67633963195294| 552.74|
| 876.0000989199634|1686.08|
| 1340.238058242221|1186.88|
| 899.5497018922779| 809.92|
|1910.8644561562414|1528.01|
|1324.1652236011923| 803.05|
| 553.0063202099973|1715.42|
|  917.554174493127|1310.24|
+------------------+-------+
only showing top 10 rows

Mean Absolute Error (MAE) on test data: 411.0472778403553
Root Mean Squared Error (RMSE) on test data: 536.0683352254026


Evaluating the efficacy of different imputation strategies produced the following results:

Imputer: median strategy.
- **Mean Absolute Error (MAE) on test data**: 414.14, denoting the average distance between the forecasted and actual figures.
- **Root Mean Squared Error (RMSE) on test data**: 541.72, reflecting the standard deviation of the prediction errors.

Imputer: mean strategy.
- **Mean Absolute Error (MAE) on test data**: 411.04, a slight improvement indicating closer predictions to the true values.
- **Root Mean Squared Error (RMSE) on test data**: 536.07, suggesting better predictive performance with reduced error variance.

Imputer: mode strategy.
- **Mean Absolute Error (MAE)**: 424.10, which is the highest among the strategies, suggesting less accurate predictions.
- **Root Mean Squared Error (RMSE)**: 547.87, also the highest, indicating larger errors in prediction.

The introduction of trigonometric transformations for temporal features in the second model appears to have slightly enhanced the predictive accuracy, as evidenced by reduced MAE and RMSE. While the MAE provides an average error magnitude without direction, the lower RMSE indicates improved model performance due to its sensitivity to larger errors by squaring them.

The imputation strategy utilizing the mean proved to be the most effective, leading to the most accurate predictions.

In summary, the integration of trigonometrically transformed time features has yielded incremental improvements in the model's performance. These results suggest that while the inclusion of such features is beneficial, they are not the sole factors influencing model outcomes, pointing to the potential for additional data manipulation techniques to be explored.


## Feature Engineering with Lag Inclusion

To refine our predictive model, we introduce lag features into our dataset's enhanced version. The integration of lag variables is intended to bolster prediction accuracy by enabling the model to account for historical data, which is often predictive of future trends, particularly in energy forecasting contexts.

Incorporating lag into our dataset is a crucial step for recognizing and leveraging patterns and cyclical behaviors inherent in the data, thus bolstering our forecasting prowess.

### Implementing Lag Features

To implement lag features, we:

- **Data Handling**: Incorporate historical data points as features.
- **Pre-Modeling Processing**: Prepare the dataset with necessary transformations.
- **Pipeline Development**: Set up the data flow for model training.

Utilizing PySpark's `Window` and `lag` functions from the `pyspark.sql.window` and `pyspark.sql.functions` libraries, we apply lag to the time-transformed columns. This process generates a new 'lag' feature, augmenting our dataset with past observations to create a more informed model.

In [23]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

# Creating specifics of lag, which will define how the groups are formed for the current lag.
# Sort data in order
windowSpec = Window.orderBy('year', 'month_cos', 'month_sin', 'day_cos', 'day_sin', 'hour_cos', 'hour_sin')

# Adding lag for 'energy' (= 1 step)
df_l = df_t.withColumn('energy_lag', lag('energy', 1).over(windowSpec))

df_l.show()


+-------+----+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+---

Further, we process the columns with the missing values.

In [24]:
missing_values_l = df_l.select([count(when(col(c).isNull(), c)).alias(c) for c in df_l.columns]).collect()[0].asDict() # interpret data into dictionary
columns_with_missing_values_l = [k for k, v in missing_values_l.items() if v > 0]

In [25]:
columns_with_missing_values_l

['p54_162_1',
 'p54_162_2',
 'p54_162_3',
 'p54_162_4',
 'p54_162_5',
 'p54_162_6',
 'p54_162_7',
 'p54_162_8',
 'p54_162_9',
 'p54_162_10',
 'p54_162_11',
 'p54_162_12',
 'p54_162_13',
 'p54_162_14',
 'p54_162_15',
 'p54_162_16',
 'p54_162_17',
 'p54_162_18',
 'p54_162_19',
 'p54_162_20',
 'p54_162_21',
 'p54_162_22',
 'p54_162_23',
 'p54_162_24',
 'p54_162_25',
 'p55_162_1',
 'p55_162_2',
 'p55_162_3',
 'p55_162_4',
 'p55_162_5',
 'p55_162_6',
 'p55_162_7',
 'p55_162_8',
 'p55_162_9',
 'p55_162_10',
 'p55_162_11',
 'p55_162_12',
 'p55_162_13',
 'p55_162_14',
 'p55_162_15',
 'p55_162_16',
 'p55_162_17',
 'p55_162_18',
 'p55_162_19',
 'p55_162_20',
 'p55_162_21',
 'p55_162_22',
 'p55_162_23',
 'p55_162_24',
 'p55_162_25',
 'cape_1',
 'cape_2',
 'cape_3',
 'cape_4',
 'cape_5',
 'cape_6',
 'cape_7',
 'cape_8',
 'cape_9',
 'cape_10',
 'cape_11',
 'cape_12',
 'cape_13',
 'cape_14',
 'cape_15',
 'cape_16',
 'cape_17',
 'cape_18',
 'cape_19',
 'cape_20',
 'cape_21',
 'cape_22',
 'cape_23',
 

The subsequent phases will mirror the previous procedures, encompassing:

- Data division into subsets

- Imputation of missing values

- Assembly of features

In [26]:
# Splitting transformed data
train_set_l = df_l.filter(df_l['year'] <= 3)
test_set_l = df_l.filter(df_l['year'] > 3)

train_set_l.show(5)
test_set_l.show(5)

+-------+----+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+----------

Imputation with Lag Features

In [27]:
from pyspark.ml.feature import Imputer

# Creating Imputer object
imputer3 = Imputer(
    inputCols=columns_with_missing_values_l,
    outputCols=["{}_imputed".format(c) for c in columns_with_missing_values_l]
).setStrategy("mean")

# Fit the Imputer on the training set
imputer_model = imputer3.fit(train_set_l)

# Transform the training and test sets using the fitted imputer model
train_set_l = imputer_model.transform(train_set_l)
test_set_l = imputer_model.transform(test_set_l)

# Select necessary columns, as we are renaming the imputed columns
columns_to_select_train_l = [col(c) for c in train_set_l.columns if c not in columns_with_missing_values_l] + [col(f"{c}_imputed").alias(c) for c in columns_with_missing_values_l]
train_set_l = train_set_l.select(columns_to_select_train_l)

columns_to_select_test_l = [col(c) for c in test_set_l.columns if c not in columns_with_missing_values_l] + [col(f"{c}_imputed").alias(c) for c in columns_with_missing_values_l]
test_set_l = test_set_l.select(columns_to_select_test_l)

# Show the results
train_set_l.show(5)
test_set_l.show(5)


+-------+----+---------+--------------------+-------------------+--------------------+--------------------+--------------------+-----------------+-----------------+-----------------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+---------

Assembler with Lag Features

In [28]:
from pyspark.ml.feature import VectorAssembler

# Features and label for the training set
train_set_l = train_set_l.withColumnRenamed("energy", "label")
input_cols_train_l = [c for c in train_set_l.columns if c != 'label']
assembler_train_l = VectorAssembler(inputCols=input_cols_train_l, outputCol="features")
train_set_l = assembler_train_l.transform(train_set_l).select("label", "features")

# Features and label for the testing set
test_set_l = test_set_l.withColumnRenamed("energy", "label")
input_cols_test_l = [c for c in test_set_l.columns if c != 'label']
assembler_test_l = VectorAssembler(inputCols=input_cols_test_l, outputCol="features")
test_set_l = assembler_test_l.transform(test_set_l).select("label", "features")

# Display the transformed datasets
print('After assembler (Training Set):')
train_set_l.show(5, truncate=False)

print('After assembler (Testing Set):')
test_set_l.show(5, truncate=False)

After assembler (Training Set):
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Pipeline with Lag Features

In [29]:
#Configure the feature selector fpr

#standardize feature data by implementing scaler:

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

# Initialize the feature selector 
selector = UnivariateFeatureSelector(featuresCol="scaledFeatures", outputCol="selectedFeatures", labelCol="label", selectionMode="fpr")
selector.setFeatureType("continuous").setLabelType("continuous").setSelectionThreshold(0.05)


# Initialize the linear regression model
lr = LinearRegression(featuresCol="selectedFeatures", labelCol="label")

# Assemble the pipeline with the stages
pipeline = Pipeline(stages=[scaler, selector, lr])

# Train the pipeline on the training set
model = pipeline.fit(train_set_l)

# Make predictions on the test set
predictions = model.transform(test_set_l)

# Show some predictions - first 10 iterations
predictions.select("prediction", "label").show(10)

# Evaluate the model using MAE
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="mae")
mae = evaluator.evaluate(predictions)
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})

# Print the evaluation result
print(f"Mean Absolute Error (MAE) on test data: {mae}")
print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")

+------------------+------+
|        prediction| label|
+------------------+------+
| 508.3249706656825|152.06|
| 689.1131575573127|  98.8|
|487.46165742710764|226.39|
|459.48756022680936|393.36|
| 299.0308519303344|  6.32|
| 227.8692685609294| 75.66|
|144.89392732489432| 77.74|
| 413.6700926255362|143.11|
| 441.0892212569179| 81.61|
| 591.2688343410064|256.45|
+------------------+------+
only showing top 10 rows

Mean Absolute Error (MAE) on test data: 382.82375829463126
Root Mean Squared Error (RMSE) on test data: 499.93105114185994



- **Mode Strategy:**
  - Mean Absolute Error (MAE) on test data: 392.40
  - Root Mean Squared Error (RMSE) on test data: 505.54

- **Mean Strategy:**
  - Mean Absolute Error (MAE) on test data: 382.83
  - Root Mean Squared Error (RMSE) on test data: 499.93

- **Median Strategy:**
  - Mean Absolute Error (MAE) on test data: 385.27
  - Root Mean Squared Error (RMSE) on test data: 503.52

Incorporating lag features significantly enhances the model's predictive performance over prior iterations. Notably, there is a marked decrease in both MAE and RMSE across all strategies, indicating that lag features provide a more nuanced understanding of temporal patterns previously unaccounted for.

The improvements observed with the lag-enhanced model highlight the critical role of historical data in forecasting energy trends, suggesting that factoring in past values leads to more accurate future predictions.

Consequently, we opted for the dataset processed with lag features, selecting the mean imputation strategy for handling missing values.


Following the results, the decision was made to proceed with the dataset that has undergone lag processing, applying the mean strategy for imputation in cases of missing values.


### b) UnivariateFeatureSelector with the fwe strategy (most conservative).

The FWE (Family-Wise Error rate) strategy adjusts the significance level when multiple hypothesis tests are conducted simultaneously, which is crucial when dealing with numerous features.

The objective of using the FWE strategy is to control the total rate of Type I errors, which occur when a true null hypothesis is incorrectly rejected. In the context of feature selection, this means we are very careful not to attribute predictive power to a feature that does not truly have it. By applying this strategy in our pipeline, we aim to identify a subset of features that have a robust and reliable relationship with our target variable, ensuring that our model's performance is not compromised by spurious correlations.

The code is the same, but with changed strategy in the selector (from 'fpr' to 'fwe').

In [30]:
#Configure the feature selector fwe

#standardize feature data by implementing scaler:

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

# Initialize the feature selector 
selector = UnivariateFeatureSelector(featuresCol="scaledFeatures", outputCol="selectedFeatures", labelCol="label", selectionMode="fwe")
selector.setFeatureType("continuous").setLabelType("continuous").setSelectionThreshold(0.05)


# Initialize the linear regression model
lr = LinearRegression(featuresCol="selectedFeatures", labelCol="label")

# Assemble the pipeline with the stages
pipeline = Pipeline(stages=[scaler, selector, lr])

# Train the pipeline on the training set
model = pipeline.fit(train_set_l)

# Make predictions on the test set
predictions = model.transform(test_set_l)

# Show some predictions - first 6 iterations
predictions.select("prediction", "label").show(10)

# Evaluate the model using MAE
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="mae")
mae = evaluator.evaluate(predictions)
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})

# Print the evaluation result
print(f"Mean Absolute Error (MAE) on test data: {mae}")
print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")


+------------------+------+
|        prediction| label|
+------------------+------+
| 524.2468051259548|152.06|
| 647.5079514601912|  98.8|
| 540.0592893063476|226.39|
| 437.3567224537419|393.36|
| 312.3330580542133|  6.32|
|239.22955360529704| 75.66|
|156.87268153443074| 77.74|
| 416.9707337561904|143.11|
|  411.801597675214| 81.61|
| 632.5188286998164|256.45|
+------------------+------+
only showing top 10 rows

Mean Absolute Error (MAE) on test data: 384.56007614540596
Root Mean Squared Error (RMSE) on test data: 502.03782454273653


- MAE on test data: 384.56007614540596

- RMSE on test data: 502.03782454273653


The application of the FWE (Family-Wise Error rate) strategy in feature selection has yielded interesting results. Unlike the FPR  strategy, which is less conservative and selects features based on individual p-values, the FWE approach adjusts for multiple testing and is more stringent, reducing the chances of type I errors (false positives).

Comparing the results of the FWE strategy with those from the FPR strategy, we observe a marginal increase in the MAE from 382.82 to 384.56 and a slight increase in the RMSE from 499.93 to 502.04. This suggests that while the FWE strategy is more conservative, it does not necessarily translate to a better predictive performance in this case. It's possible that by being too conservative, the FWE strategy might be excluding features that, despite their individual p-values, could collectively provide valuable predictive power.

In conclusion, while both feature selection strategies aim to improve model performance by eliminating irrelevant features, they do so with different levels of strictness regarding statistical significance. The FPR strategy may be more suited for scenarios where we want to explore the potential of each feature, while the FWE strategy is better when we want to be highly confident about the relevance of each selected feature. 



### c) PCA  with 3 components.

For this part of the assignment, we will focus on the first three principal components. This approach allows us to capture the most significant underlying patterns of the data while reducing its complexity. By doing so, we aim to build a model that is simpler, faster, and potentially more robust to overfitting. The use of three components is a decision based on the need to balance between simplifying the model and retaining enough information for effective prediction.

Let's proceed to incorporate PCA into our pipeline and evaluate how this dimensionality reduction technique impacts the performance of our linear regression model.

We implement the same scaler, but substitute UnivariateFeatureSelector for the PCA with k=3:

In [31]:
from pyspark.ml.feature import PCA


#standardize feature data by implementing scaler:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

# Setting PCA with 3 components
pca = PCA(k=3, inputCol="scaledFeatures", outputCol="pcaFeatures")

# Initialize the linear regression model
 
lr = LinearRegression(featuresCol="pcaFeatures", labelCol="label")

# Assembley of the pipeline
pipeline = Pipeline(stages=[scaler, pca, lr])

# Training the pipeline
model = pipeline.fit(train_set_l)

# Applying model to the testing data
predictions = model.transform(test_set_l)

# Show first 10 predictions
predictions.select("prediction", "label").show(10)

# Evaluation of the model with  MAE & RMSE
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="mae")
mae = evaluator.evaluate(predictions)
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})

# Printing te values of MAE & RMSE
print(f"Mean Absolute Error (MAE) on test data: {mae}")
print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")

+-----------------+------+
|       prediction| label|
+-----------------+------+
|639.5901889925108|152.06|
|688.4904775433424|  98.8|
|735.4264660528106|226.39|
|718.1764868306163|393.36|
|697.5974020571948|  6.32|
|788.6384025630641| 75.66|
|780.2298912409733| 77.74|
|748.4887173664674|143.11|
|739.2227163882935| 81.61|
|782.4526126211776|256.45|
+-----------------+------+
only showing top 10 rows

Mean Absolute Error (MAE) on test data: 504.78951986235757
Root Mean Squared Error (RMSE) on test data: 617.2929557470093


- MAE on test data: 504.78951986235757
- RMSE on test data: 617.2929557470093

The rise in MAE to 504.79 signals that the model's predictions are now more deviated from the actual values on average. This increase may imply that the application of PCA, despite reducing feature space dimensionality, might have also eliminated valuable predictive information.

Concurrently, the escalation of RMSE to 617.29 suggests an increase in the magnitude of errors in model predictions compared to earlier models. Given RMSE's heightened sensitivity to larger errors, this could indicate the model's diminished capability to encapsulate data variance, thereby compromising prediction accuracy, particularly for outliers.

In essence, PCA's role in simplifying complexity must be carefully considered against any inadvertent diminishment in predictive accuracy. This scenario underscores that an increase in model features does not unconditionally translate to enhanced performance, and underscores the necessity of prudent feature selection.


### 3. **Formulate, train and evaluate another pipeline that uses features obtained from two sources:**
- **Feature selection with the fpr**
- **The 3 components from PCA**

Essentially, this method follows the previous approach, with the modification that two feature selectors will be employed concurrently. Subsequently, the selected features will be merged into a unified vector for fitting the model.


In [32]:
# Apply scaler 
scaler = StandardScaler(inputCol='features', outputCol="scaledFeatures")

# First selector: UnivariateFeatureSelector
un_sel = UnivariateFeatureSelector(featuresCol="scaledFeatures", outputCol="selectedFeatures", labelCol="label", selectionMode="fpr")
un_sel.setFeatureType("continuous").setLabelType("continuous").setSelectionThreshold(0.05)

# Second selector: PCA
pca_sel = PCA(k=3, inputCol="scaledFeatures", outputCol="pcaFeatures")

# Assembley new vector
new_assembler = VectorAssembler(inputCols = ["selectedFeatures", "pcaFeatures"], outputCol = 'combined_features')

# Initialize the linear regression model
lr = LinearRegression(featuresCol="combined_features", labelCol="label")

# Assemble the pipeline with the stages
pipeline = Pipeline(stages=[scaler, un_sel, pca_sel, new_assembler, lr])

# New model:
model = pipeline.fit(train_set_l)

# Predictors:
predictions = model.transform(test_set_l)
# show first 10 predictions:
predictions.select('prediction', 'label').show(10)

# Evaluation of the model with  MAE & RMSE
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='label', metricName='mae')
mae = evaluator.evaluate(predictions)
rmse = evaluator.evaluate(predictions, {evaluator.metricName: 'rmse'})

# Printing te values of MAE & RMSE
print(f"Mean Absolute Error (MAE) on test data: {mae}")
print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")

+------------------+------+
|        prediction| label|
+------------------+------+
| 502.9716083291605|152.06|
| 697.2309128880497|  98.8|
| 485.2241152716415|226.39|
|  461.064429610331|393.36|
|294.26479686083076|  6.32|
| 236.1256644149289| 75.66|
|147.97451177210132| 77.74|
| 414.6962533368569|143.11|
|  442.649391505538| 81.61|
| 594.2029277338879|256.45|
+------------------+------+
only showing top 10 rows

Mean Absolute Error (MAE) on test data: 383.86132303546555
Root Mean Squared Error (RMSE) on test data: 501.2607001502467


- **MAE on test data**: 383.86
- **RMSE on test data**: 501.26

The integration of the UnivariateFeatureSelector with FPR strategy and PCA comprising three components has yielded a refined model with an improved Mean Absolute Error of 383.86, offering a slightly enhanced average prediction capability over the individual FPR and PCA models. Additionally, a marginal enhancement is observed in the Root Mean Squared Error, now at 501.26, suggesting that the model's predictions have become more aligned with actual values.

The subtle yet positive shift in predictive performance can be ascribed to the synergistic impact of feature selection coupled with dimensionality reduction efforts. By adeptly selecting pertinent features and distilling the data into principal components, the model strives to encapsulate a fuller spectrum of the data's characteristics. However, the modest scale of improvement signals the inherent complexity of the dataset and underscores the intricacies involved in harnessing its complete predictive potential through linear methodologies. This could also indicate that the model is nearing a plateau within the existing set of constraints.

To encapsulate, the amalgamated approach to feature selection bears promise, yet the strides made are incremental. This underscores the necessity for ongoing exploration in feature engineering, model intricacy, and potentially the exploration of alternative algorithmic approaches.


### 4. **How to determine the number of features which are selected by fpr and fwe.**

In the fourth part of our analysis, we aim to ascertain the number of features selected by the fpr (false positive rate) and fwe (family-wise error rate) strategies. To achieve this, we'll delve into the attributes of the trained models to extract the exact count of features retained after the selection process.

For the fpr strategy, we will revisit the pipeline configuration that includes the StandardScaler and the UnivariateFeatureSelector with the fpr and fwe selection modes. After training these pipeline on our dataset, we will examine the stages of the resultant model, specifically looking at the selector stage to identify the number of features it has chosen based on our specified threshold.

The subsequent phase of our analysis concentrates on quantifying the number of features culled through the fpr and fwe methods. This step is pivotal for understanding the feature selection dynamics and their impact on model performance.

We are going to re-examine our pipeline which incorporates the StandardScaler and UnivariateFeatureSelector, applying fpr and fwe methods for feature picking. Post-pipeline training on the dataset, a detailed review of the model's selection stage will be conducted to ascertain the volume of features retained as per our established threshold criteria.

FPR:

In [33]:
#Configure the feature selector fpr

#standardize feature data by implementing scaler:

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

# Initialize the feature selector 
selector = UnivariateFeatureSelector(featuresCol="scaledFeatures", outputCol="selectedFeatures", labelCol="label", selectionMode="fpr")
selector.setFeatureType("continuous").setLabelType("continuous").setSelectionThreshold(0.05)


# Initialize the linear regression model
lr = LinearRegression(featuresCol="selectedFeatures", labelCol="label")

# Assemble the pipeline with the stages
pipeline = Pipeline(stages=[scaler, selector, lr])

# Train the pipeline on the training set
model = pipeline.fit(train_set_l)



In [34]:
model.stages[0:2] # look into attributes of the model


[StandardScalerModel: uid=StandardScaler_ef40adb86796, numFeatures=1109, withMean=false, withStd=true,
 UnivariateFeatureSelectorModel: uid=UnivariateFeatureSelector_1c7fd8b76dea, numSelectedFeatures=1046]

After applying the feature selection pipeline, we have successfully narrowed down from 1109 to 1046 features, retaining those most relevant for our model.

FWE:

In [35]:
#Configure the feature selector fwe

#standardize feature data by implementing scaler:

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

# Initialize the feature selector 
selector = UnivariateFeatureSelector(featuresCol="scaledFeatures", outputCol="selectedFeatures", labelCol="label", selectionMode="fwe")
selector.setFeatureType("continuous").setLabelType("continuous").setSelectionThreshold(0.05)


# Initialize the linear regression model
lr = LinearRegression(featuresCol="selectedFeatures", labelCol="label")

# Assemble the pipeline with the stages
pipeline = Pipeline(stages=[scaler, selector, lr])

# Train the pipeline on the training set
model = pipeline.fit(train_set_l)



In [36]:
model.stages[0:2]

[StandardScalerModel: uid=StandardScaler_913de7bd150c, numFeatures=1109, withMean=false, withStd=true,
 UnivariateFeatureSelectorModel: uid=UnivariateFeatureSelector_651f3832a298, numSelectedFeatures=994]

For the fwe, we downsized the number of features from 1109 to 994. 

Using the fpr strategy, a relatively liberal approach, we retained a substantial portion of the features — 1046 out of 1109. That is expected,as  this strategy's propensity to keep more features could be advantageous in scenarios where the cost of excluding potentially useful predictors is high. However, it also runs the risk of retaining noise alongside valuable information, which could lead to overfitting and reduced model generalizability.

On the other hand, the fwe strategy, known for its conservative nature, selected 994 features, as method's strictness helps mitigate the risk of overfitting by ensuring only the most statistically significant features are used for model training. 

In summary, the fpr strategy provides a model that captures a broader range of information, which may be beneficial when the relationship between features and the response variable is complex and not well-understood. In contrast, the fwe strategy offers a more refined model that focuses on the most critical features, which could be preferable in situations demanding high precision and interpretability. The choice between these strategies should be guided by the specific context and objectives of the modeling task at hand.

### **Conclusion** ###

In this PySpark project, we undertook a detailed investigation into data handling, feature engineering, and model training processes. The endeavor was structured into four critical stages:

1. **Data Preparation**: Initiating with the division of data into train and test sets, this foundational step guaranteed a solid evaluation framework for our models. With a dataset focused on wind energy, we navigated through initial cleaning tasks, ensuring null values were addressed and temporal aspects were suitably transformed to reflect their cyclical nature.

2. **Model Development and Evaluation**: We crafted three distinct modeling pipelines to explore varying feature selection and transformation strategies:

   - **FPR Strategy**: Cast a wide net in feature selection, potentially including noise alongside valuable information.
   - **FWE Strategy**: Opted for a stringent selection process, prioritizing the reduction of overfitting and focusing on the model's precision.
   - **PCA with Three Components**: Employed Principal Component Analysis to distill the feature set to three core components, simplifying the model at the potential cost of losing useful data.

3. **Combined Feature Selection**: Merged selected features from the fpr method with principal components derived from PCA to achieve a model that balanced detail against simplicity.

4. **Feature Selection Strategy Assessment**: Evaluated the quantity of features retained by both the fpr and fwe strategies post-selection. While fpr maintained a broader feature base, it raised concerns about overfitting. Conversely, fwe's conservative approach led to a more streamlined yet potentially less comprehensive feature set.

Model performance was primarily gauged using Mean Absolute Error (MAE) and Root Mean Squared Error (RMSE) as metrics. Incorporation of lag features into the model significantly boosted accuracy, emphasizing the predictive value of historical data. Despite the dimensionality reduction achieved by PCA, it didn't consistently enhance prediction quality.

To encapsulate, the project underscored the delicate balance between feature richness and model tractability. Incremental gains were observed with the hybrid feature selection approach, which underscores the necessity for continuous exploration of feature engineering techniques and model complexity. The choice between broader (fpr) and narrower (fwe) selection strategies should be informed by the specific objectives of the modeling effort and the dataset's intrinsic properties.
