# Advanced Certification Program in Computational Data Science
## A program by IISc and TalentSprint
### Mini-Project: Implementation of Multiple Linear Regression using MPI and OpenMP

## Learning Objectives

At the end of the mini-project, you will be able to :

* understand the collective communication operations like scatter, gather, broadcast
* understand the blocking and non-blocking communication
* implement multiple linear regression and run it using MPI
* implement the multiple linear regression based predictions using OpenMP

### Dataset

The dataset chosen for this mini-project is [Combined Cycle Power Plant](https://archive.ics.uci.edu/ml/datasets/combined+cycle+power+plant). The dataset is made up of 9568 records and 5 columns. Each record contains the values for Ambient Temperature, Exhaust Vaccum, Ambient Pressure, Relative Humidity and Energy Output.

Predicting full load electrical power output of a base load power plant is important in order to maximize the profit from the available megawatt hours.  The base load operation of a power plant is influenced by four main parameters, which are used as input variables in the dataset, such as ambient temperature, atmospheric pressure, relative humidity, and exhaust steam pressure. These parameters affect electrical power output, which is considered as the target variable.

**Note:** The data was collected over a six year period (2006-11).

## Information

#### MPI in a Nutshell

MPI stands for "Message Passing Interface". It is a library of functions (in C / Python) or subroutines (in Fortran) that you insert into source code to perform data communication between processes. MPI was developed over two years of discussions led by the MPI Forum, a group of roughly sixty people representing some forty organizations.

To know more about MPI click [here](https://hpc-tutorials.llnl.gov/mpi/)


#### Multiple Linear Regression

Multiple regression is an extension of simple linear regression. It is used when we want to predict the value of a variable based on the value of two or more other variables. The variable we want to predict is called the dependent variable (or sometimes, the outcome, target or criterion variable). The variables we are using to predict the value of the dependent variable are called the independent variables (or sometimes, the predictor, explanatory or regressor variables).

**Note:** We will be using the mpi4py Python package for MPI based code implementation

## Grading = 20 Points

**Run the below code to install mpi4py package**

In [None]:
!pip install mpi4py

Collecting mpi4py
  Downloading mpi4py-3.1.5.tar.gz (2.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m21.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: mpi4py
  Building wheel for mpi4py (pyproject.toml) ... [?25l[?25hdone
  Created wheel for mpi4py: filename=mpi4py-3.1.5-cp310-cp310-linux_x86_64.whl size=2746494 sha256=c345d8def7b50eef80d26a7cf30479196a4abb2f1e14abbd772beeb0d9621e2d
  Stored in directory: /root/.cache/pip/wheels/18/2b/7f/c852523089e9182b45fca50ff56f49a51eeb6284fd25a66713
Successfully built mpi4py
Installing collected packages: mpi4py
Successfully installed mpi4py-3.1.5


#### Importing Necessary Packages

In [None]:
# Importing pandas
import pandas as pd
# Importing Numpy
import numpy as np
# Importing MPI from mpi4py package
from mpi4py import MPI
# Importing sqrt function from the Math
from math import sqrt
# Importing Decimal, ROUND_HALF_UP functions from the decimal package
from decimal import Decimal, ROUND_HALF_UP
import time

#### Downloading the data

In [None]:
#@title Download the data
!wget -qq https://cdn.iisc.talentsprint.com/CDS/Datasets/PowerPlantData.csv

### Overview

* Load the data and perform data pre-processing
* Identify the features, target and split the data into train and test
* Implement multiple Linear Regression by estimating the coefficients on the given data
* Use MPI package to distribute the data and implement `communicator`
* Define functions for each objective and make a script (.py) file to execute using MPI command
* Use OpenMP component to predict the data and calculate the error on the predicted data
* Implement the Linear Regression from `sklearn` and compare the results

#### Exercise 1: Load data (1 point)

Write a function that takes the filename as input and loads the data in a pandas dataframe with the column names as Ambient Temperature, Exhaust Vaccum, Ambient Pressure, Relative Humidity and Energy Output respectively.

**Hint:** read_csv()


In [None]:
FILENAME = "/content/PowerPlantData.csv" # File path

def read_csv(FILENAME):
  df=pd.read_csv(FILENAME)
  return df
df_ppd=read_csv(FILENAME)
df_ppd.head()
# YOUR CODE HERE to Define a function to load the data

Unnamed: 0,AT,V,AP,RH,PE
0,8.34,40.77,1010.84,90.01,480.48
1,23.64,58.49,1011.4,74.2,445.75
2,29.74,56.9,1007.15,41.91,438.76
3,19.07,49.69,1007.22,76.79,453.09
4,11.8,40.66,1017.13,97.2,464.43


#### Exercise 2: Explore data (1 point)

Write a function that takes the data loaded using the above defined function as input and explore it.

**Hint:** You can define and check for following things in the dataset inside a function

- checking for the number of rows and columns
- summary of the dataset
- check for the null values
- check for the duplicate values

In [None]:
from numpy.lib.arraysetops import unique
# YOUR CODE HERE
df_ppd=read_csv(FILENAME)
def data_eda(df):
  num_row_col=df.shape
  summ=df.describe()
  null_col=df.isnull().values.any()
  # Find and filter the rows with duplicate values
  duplicate_rows = df[df.duplicated(keep='first')]
  print("The number of rows and columns is ",num_row_col[0],",",num_row_col[1],'\n')
  print("The summary of the dataset is ",'\n',summ,'\n')
  print("The number of  the null values is ",len(unique(null_col))-1,'\n')
  print("The number of  duplicate values is ",len(duplicate_rows),'\n')

  return(num_row_col,summ,null_col,duplicate_rows)

num_row_col,summ,null_col,duplicate_rows=data_eda(df_ppd)

The number of rows and columns is  9568 , 5 

The summary of the dataset is  
                 AT            V           AP           RH           PE
count  9568.000000  9568.000000  9568.000000  9568.000000  9568.000000
mean     19.651231    54.305804  1013.259078    73.308978   454.365009
std       7.452473    12.707893     5.938784    14.600269    17.066995
min       1.810000    25.360000   992.890000    25.560000   420.260000
25%      13.510000    41.740000  1009.100000    63.327500   439.750000
50%      20.345000    52.080000  1012.940000    74.975000   451.550000
75%      25.720000    66.540000  1017.260000    84.830000   468.430000
max      37.110000    81.560000  1033.300000   100.160000   495.760000 

The number of  the null values is  0 

The number of  duplicate values is  41 



#### Exercise 3: Handle missing data (1 point)

After exploring the dataset if there are any null values present in the dataset then define a function that takes data loaded using the above defined function as input and handle the null values accordingly.

**Hint:**

- Drop the records containing the null values - dropna()
- Replace the null values with the mean/median/mode - fillna()

In [None]:
# Function to handle missing data
def missing_value(df):
  for column in df.columns:
    mean_value = df[column].mean()
    df[column].fillna(mean_value, inplace=True)
    return(df)



# YOUR CODE HERE
df_not_null=missing_value(df_ppd)
df_not_null.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 9568 entries, 0 to 9567
Data columns (total 5 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   AT      9568 non-null   float64
 1   V       9568 non-null   float64
 2   AP      9568 non-null   float64
 3   RH      9568 non-null   float64
 4   PE      9568 non-null   float64
dtypes: float64(5)
memory usage: 373.9 KB


#### Exercise 4: Scale the data (1 point)

Write a function that takes the data after handling the missing data as input and returns the standardized data.

**Hint:**

- standardization of the data  can be performed using the below formula

$ (x - mean(x)) / std(x) $

In [None]:

def standardize_dataframe(df):
    """
    Standardize each column of a DataFrame.

    Parameters:
        df (pd.DataFrame): The input DataFrame.

    Returns:
        pd.DataFrame: A new DataFrame with standardized values.
    """
    standardized_df = (df - df.mean()) / df.std()
    return standardized_df



# Standardize the DataFrame
scaled_renamed_data = standardize_dataframe(df_not_null)
scaled_renamed_data.rename(columns={'AT': 'AmbientTemperature', 'V': 'ExhaustVaccum', 'AP': 'AmbientPressure', 'RH': 'RelativeHumidity', 'PE': 'EnergyOutput'},inplace=True)

print(scaled_renamed_data)
scaled_renamed_data.to_csv("scaled_renamed_data")


      AmbientTemperature  ExhaustVaccum  AmbientPressure  RelativeHumidity  \
0              -1.517782      -1.065149        -0.407336          1.143885   
1               0.535228       0.329260        -0.313040          0.061028   
2               1.353748       0.204141        -1.028675         -2.150575   
3              -0.077992      -0.363223        -1.016888          0.238422   
4              -1.053507      -1.073805         0.651804          1.636341   
...                  ...            ...              ...               ...   
9563           -0.608017      -0.423816        -0.245686         -0.025957   
9564            1.846202       1.860591        -0.498263         -0.930735   
9565           -0.491277      -0.862913         0.158437          0.366502   
9566           -0.268532       0.437854         0.895962          1.461687   
9567            0.540595      -0.236530        -0.235583         -0.141708   

      EnergyOutput  
0         1.530146  
1        -0.504776  


#### Exercise 5: Feature selection (1 point)

Write a function that takes scaled data as input and returns the features and target variable values

**Hint:**

- Features: AmbientTemperature, ExhaustVaccum, AmbientPressure, RelativeHumidity
- Target Variable: EnergyOutput

In [None]:
# Define a function
def features_target(df):
  Features=df[['AT','V','AP','RH']]
  Target_Variable=df[['PE']]
  return(Features,Target_Variable)
# YOUR CODE HERE
Features,Target_Variable=features_target(standardized_data)
print(Features.head(),'\n',Target_Variable.head())

         AT         V        AP        RH
0 -1.517782 -1.065149 -0.407336  1.143885
1  0.535228  0.329260 -0.313040  0.061028
2  1.353748  0.204141 -1.028675 -2.150575
3 -0.077992 -0.363223 -1.016888  0.238422
4 -1.053507 -1.073805  0.651804  1.636341 
          PE
0  1.530146
1 -0.504776
2 -0.914338
3 -0.074706
4  0.589734


#### Exercise 6: Correlation (1 point)

Calculate correlation between the variables

In [None]:
# YOUR CODE HERE

def calculate_correlation(data, target_column):
    """
    Calculate the correlation between input features and an output variable.

    Parameters:
        data (pd.DataFrame): The input DataFrame containing features and the output variable.
        target_column (str): The name of the output variable column.

    Returns:
        pd.Series: A Series containing correlation values for each feature with the output variable.
    """
    correlations = data.corr()[target_column]
    return correlations



# Calculate the correlation between features and the output variable
correlations = calculate_correlation(df_not_null,'PE')

print(correlations)


AT   -0.948128
V    -0.869780
AP    0.518429
RH    0.389794
PE    1.000000
Name: PE, dtype: float64


#### Exercise 7: Estimate the coefficients (2 points)

Write a function that takes features and target as input and returns the estimated coefficient values

**Hint:**

- Calculate the estimated coefficients using the below formula

$ β = (X^T X)^{-1} X^T y $

- transpose(), np.linalg.inv()

In [None]:
# Calculating the coeffients

# YOUR CODE HERE


def estimate_coefficients(features, target):
    """
    Calculate the estimated coefficients for a linear regression model.

    Parameters:
        features (np.array): Input features (X).
        target (np.array): Target variable (y).

    Returns:
        np.array: Estimated coefficients (β).
    """
      # Add a column of ones to X for the intercept term
    #print("np.ones(X.shape[0])",np.ones(features.shape[0]))
    X = np.c_[np.ones(features.shape[0]), features]
    #print("np.c_[np.ones(X.shape[0]), X]",features)
    # Calculate the coefficients using the formula
    X_transpose = features.T
    X_transpose_X = np.dot(X_transpose, features)
    #print("X_transpose_X",X_transpose_X)
    X_transpose_X_inv = np.linalg.inv(X_transpose_X)
    X_transpose_y = np.dot(X_transpose, target)
    coefficients = np.dot(X_transpose_X_inv, X_transpose_y)

    print("Coefficients:", coefficients)
    return coefficients

    # features = np.array(Features)
    # target = np.array(Target_Variable)
    # X=features
    # # Calculate the estimated coefficients
    # XtX_inv = np.linalg.inv(X.T @ X)
    # coefficients = XtX_inv @ X.T @ target

    # return coefficients
# Calculate the estimated coefficients
coefficients = estimate_coefficients(Features,Target_Variable)

print("Estimated coefficients:")
print(coefficients)

Coefficients: [[-0.86350078]
 [-0.17417154]
 [ 0.02160293]
 [-0.13521023]]
Estimated coefficients:
[[-0.86350078]
 [-0.17417154]
 [ 0.02160293]
 [-0.13521023]]


#### Exercise 8: Fit the data to estimate the coefficients (2 points)

Write a function named fit which takes features and targets as input and returns the intercept and coefficient values.

**Hint:**

- create a dummy column in the features dataframe which is made up of all ones
- convert the features dataframe into numpy array
- call the estimated coefficients function which is defined above
- np.ones(), np.concatenate()

In [None]:
# Calculate the estimated coefficients
def fit(x,y):
  # Create a DataFrame with a dummy column of ones
  ones_column = pd.DataFrame(np.ones(len(x)), columns=['intercept'])
  # Concatenate the ones column with the features DataFrame
  features = pd.concat([ones_column, x], axis=1)
  features = np.array(Features)
  target = np.array(y)
  coefficients = estimate_coefficients(features, target)
  return coefficients


coefficients=fit(Features,Target_Variable)
print("Estimated coefficients:")
print(coefficients)

Estimated coefficients:
[[-0.86350078]
 [-0.17417154]
 [ 0.02160293]
 [-0.13521023]]


#### Exercise 9: Predict the data on estimated coefficients (1 point)

Write a function named predict which takes features, intercept and coefficient values as input and returns the predicted values.

**Hint:**

- Fit the intercept, coefficients values in the below equation

  $y = b_0 + b_1*x + ... + b_i*x_i$

In [None]:
 # fucntion to predict the values
# def predict(x, intercept, coefficients):
#     '''
#     y = b_0 + b_1*x + ... + b_i*x_i
#     '''
#     #YOUR CODE HERE

#     return predictions


def predict(features, intercept, coefficients):
    """
    Make predictions using the estimated coefficients.

    Parameters:
        features (np.array): Input features.
        intercept (float): Intercept value (b0).
        coefficients (np.array): Coefficients (b1, b2, ...).

    Returns:
        np.array: Predicted values.
    """
    features = np.array(Features)
    # Calculate the predictions using the linear equation
    predictions = intercept + np.dot(features, coefficients)

    return predictions
intercept=0
y=predict(Features,intercept,coefficients)
print(y)

[[ 1.33266027]
 [-0.53453122]
 [-0.93596031]
 ...
 [ 0.52838115]
 [-0.02266325]
 [-0.41153611]]


#### Exercise 10: Root mean squared error (1 point)

Write a function to calculate the RMSE error.

**Hint:**

- [How to calculate the RSME error](https://towardsdatascience.com/what-does-rmse-really-mean-806b65f2e48e)

In [None]:
# Define a function to calculate the error
import numpy as np

def calculate_rmse(actual, predicted):
    """
    Calculate the Root Mean Square Error (RMSE) between actual and predicted values.

    Parameters:
        actual (np.array or list): Actual target values.
        predicted (np.array or list): Predicted target values.

    Returns:
        float: RMSE value.
    """
    actual = np.array(actual)
    predicted = np.array(predicted)
    error = actual - predicted
    mse = (error ** 2).mean()
    rmse = np.sqrt(mse)
    return rmse

# Calculate RMSE
rmse = calculate_rmse(Target_Variable, y)

print("RMSE:", rmse)


# YOUR CODE HERE

RMSE: 0.26701396565722163


#### Exercise 11: Split the data into train and test (1 point)

Write a function named train_test_split which takes features and targets as input and returns the train and test sets respectively.

**Hint:**

- Shuffle the data
- Consider 70 % of data as a train set and the rest of the data as a test set

In [None]:
# YOUR CODE HERE
import numpy as np

def train_test_split(features, targets, test_size=0.3, random_state=None):
    """
    Split the features and targets into training and testing sets.

    Parameters:
        features (np.array): Input features.
        targets (np.array): Target variable.
        test_size (float, optional): Proportion of data to include in the test set (default is 0.2).
        random_state (int, optional): Seed for random number generation (default is None).

    Returns:
        tuple: A tuple containing training and testing sets for features and targets.
    """
    features = np.array(features)
    targets = np.array(targets)
    if random_state is not None:
        np.random.seed(random_state)

    num_samples = features.shape[0]
    num_test_samples = int(test_size * num_samples)
    indices = np.arange(num_samples)
    np.random.shuffle(indices)
    test_indices = indices[:num_test_samples]
    train_indices = indices[num_test_samples:]
    X_train, X_test = features[train_indices], features[test_indices]
    y_train, y_test = targets[train_indices], targets[test_indices]

    return X_train, X_test, y_train, y_test

# Perform train-test split
X_train, X_test, y_train, y_test = train_test_split(Features,Target_Variable, test_size=0.3, random_state=42)

print("Training Features:\n", X_train)
print("Testing Features:\n", X_test)
print("Training Targets:\n", y_train)
print("Testing Targets:\n", y_test)


Training Features:
 [[ 1.1605233   0.78881655 -0.34166561 -0.30334906]
 [-1.72174133 -1.04468961  1.4920432   0.98087387]
 [-0.81600175 -0.89045475 -0.9680565   1.07128317]
 ...
 [-0.22156821 -0.83458396  0.36049837 -0.82388743]
 [ 0.94985498  1.14371409 -0.42249024 -0.44375744]
 [-1.77809846 -1.19026842  1.90963712  0.92608036]]
Testing Features:
 [[ 1.3483804   0.23955161 -1.28461964 -1.09306055]
 [ 0.81298767  1.36404959 -0.74242107  0.27403757]
 [-0.24437943 -0.73858064  1.98372637 -0.18691285]
 ...
 [-0.09811926  1.05479298 -0.85355494 -0.60128878]
 [ 0.23063066  0.27181503  0.93300617 -0.21841912]
 [-1.8089607  -1.42083379  2.59664648  1.18360987]]
Training Targets:
 [[-0.75320872]
 [ 2.06685422]
 [ 1.03562406]
 ...
 [ 0.57977345]
 [-0.8153169 ]
 [ 1.76217258]]
Testing Targets:
 [[-1.23601193]
 [-0.949494  ]
 [ 0.23759253]
 ...
 [-0.44501152]
 [ 0.01728427]
 [ 1.72291552]]


#### Exercise 12: Create a communicator (1 point)

Create a comunicator and define the rank and size

In [None]:
# YOUR CODE HERE
from mpi4py import MPI # Importing mpi4py package from MPI module
# Define a function
def main():
    # creating the communicator
    comm = MPI.COMM_WORLD
    # number of the process running the code i.e rank
    rank = comm.Get_rank()
    # total number of processes running i.e size
    size = comm.Get_size()
    # Displaying the rank and size of a communicator
    print("rank is {} and size is {}".format(rank,size))

# invoke the function
main()

rank is 0 and size is 1


In [None]:
!mpirun  --allow-run-as-root --oversubscribe -np 4 python3 /content/rank.py

In [None]:
# YOUR CODE HERE

%%writefile rank.py
from mpi4py import MPI # Importing mpi4py package from MPI module
# Define a function
def main():
  # creating the communicator
  comm = MPI.COMM_WORLD
  # number of the process running the code i.e rank
  rank = comm.Get_rank()
  print(rank)
  # total number of processes running i.e size
  size = comm.Get_size()
  # Displaying the rank and size of a communicator
  print("rank is {} and size is {}".format(rank,size))


# invoke the function
main()

Overwriting rank.py


#### Exercise 13: Divide the data into slices (1 point)

Write a function named dividing_data which takes train features set, train target set, and size of workers as inputs and returns the sliced data for each worker.

![img](https://cdn.iisc.talentsprint.com/CDS/Images/MiniProject_MPI_DataSlice.JPG)

For Example, if there are 4 processes, slice the data into 4 equal parts with 25% ratio

**Hint:**

- Divide the Data equally among the workers
  - Create an empty list
  - Iterate over the size of workers
  - Append each slice of data to the list

# @Mentor: do we have to divide both x_train and y_Train

In [None]:
def dividing_data(x_train, y_train, size_of_workers):
    # Size of the slice
    slice_for_each_worker = int(Decimal(x_train.shape[0]/size_of_workers).quantize(Decimal('1.'), rounding = ROUND_HALF_UP))
    print('Slice of data for each worker: {}'.format(slice_for_each_worker))
    # YOUR CODE HERE
    # print('x_train.size',x_train.shape)
    sliced_x_train = np.zeros((size_of_workers,slice_for_each_worker,x_train.shape[1]), dtype='d')
    sliced_y_train = np.zeros((size_of_workers,slice_for_each_worker,y_train.shape[1]), dtype='d')
    # print('sliced_x_train.shape',sliced_x_train.shape)
    # print(np.shape(sliced_x_train))
    for workerNo in range(size_of_workers):
      sliced_x_train[workerNo]=x_train[(workerNo*slice_for_each_worker):(workerNo*slice_for_each_worker+slice_for_each_worker)]
      sliced_y_train[workerNo]=y_train[(workerNo*slice_for_each_worker):(workerNo*slice_for_each_worker+slice_for_each_worker)]

    return (sliced_x_train,sliced_y_train)






In [None]:
(sliced_x_train,sliced_y_train)=dividing_data(X_train,y_train,4)

Slice of data for each worker: 1675


In [None]:
sliced_x_train

[array([[ 1.1605233 ,  0.78881655, -0.34166561, -0.30334906],
        [-1.72174133, -1.04468961,  1.4920432 ,  0.98087387],
        [-0.81600175, -0.89045475, -0.9680565 ,  1.07128317],
        ...,
        [-0.76232829, -1.10685569,  0.85218154,  1.05553003],
        [-1.75528724, -1.17531708,  1.16874467,  0.27814708],
        [-1.71100664, -0.7551058 ,  1.08455235, -0.14376296]]),
 array([[-0.14105803,  0.34657172, -0.58245566,  0.89799869],
        [-0.84820582, -1.07537919,  1.28998162,  1.24388273],
        [-0.14776721, -1.03918122, -1.89248822, -0.55813889],
        ...,
        [-0.17192027, -0.19246335,  0.93300617,  0.06787698],
        [-1.20513431, -0.96678527, -2.81355224,  1.82400904],
        [ 0.63586526,  0.51418408,  0.88249077, -0.79306608]]),
 array([[-0.87235888, -0.93373494,  0.21063603,  1.79592736],
        [ 0.81969685,  1.51041532, -0.30630484,  0.55142972],
        [-0.54092528, -0.98724499,  1.7782971 , -0.48896208],
        ...,
        [ 0.78749277,  0.75

#### Exercise 14: Prepare the data in root worker to assign data for all the workers (1 point)

- When it is the root worker, perform the below operation:
    - Store the features and target values in separate variables
    - Split the data into train and test sets using the train_test_split function defined above
    - Divide the data among the workers using the dividing_data function above

In [None]:
Features,Target_Variable=features_target(standardized_data)
X_train, X_test, y_train, y_test = train_test_split(Features,Target_Variable, test_size=0.3, random_state=42)
(sliced_x_train,sliced_y_train) = dividing_data(X_train,y_train, 4)


Slice of data for each worker: 1675


In [None]:
# YOUR CODE HERE
# Defining a function
def main():
    # communicator
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()   # number of the process running the code
    size = comm.Get_size()   # total number of processes running
    data = None # Starting with an empty  data
    if rank == 0:
      # Creating a Numpy array.
      Features,Target_Variable=features_target(standardized_data)
      X_train, X_test, y_train, y_test = train_test_split(Features,Target_Variable, test_size=0.3, random_state=42)
      (sliced_x_train,sliced_y_train) = dividing_data(X_train,y_train, size)
      return(sliced_x_train,sliced_y_train)

Slice of data for each worker: 6698
[array([[ 1.1605233 ,  0.78881655, -0.34166561, -0.30334906],
       [-1.72174133, -1.04468961,  1.4920432 ,  0.98087387],
       [-0.81600175, -0.89045475, -0.9680565 ,  1.07128317],
       ...,
       [-0.22156821, -0.83458396,  0.36049837, -0.82388743],
       [ 0.94985498,  1.14371409, -0.42249024, -0.44375744],
       [-1.77809846, -1.19026842,  1.90963712,  0.92608036]])]
Rank:  0 , recvbuf:  [[ 1.1605233   0.78881655 -0.34166561 -0.30334906]
 [-1.72174133 -1.04468961  1.4920432   0.98087387]
 [-0.81600175 -0.89045475 -0.9680565   1.07128317]
 ...
 [-0.22156821 -0.83458396  0.36049837 -0.82388743]
 [ 0.94985498  1.14371409 -0.42249024 -0.44375744]
 [-1.77809846 -1.19026842  1.90963712  0.92608036]]
Rank:  0 , recvbuf:  [[-0.75320872]
 [ 2.06685422]
 [ 1.03562406]
 ...
 [ 0.57977345]
 [-0.8153169 ]
 [ 1.76217258]]
6698 1


#### Exercise 15: Scatter and gather the data (1 point)

Perform the below operations:

- Send slices of the training set(the features data X and the expected target data Y) to every worker including the root worker
    - **Hint:** scatter()
    - use `barrier()` to block workers until all workers in the group reach a Barrier, to scatter from root worker.
- Every worker should get the predicted target Y(yhat) for each slice
- Get the new coefficient of each instance in a slice
    - **Hint:** fit function defined above
- Gather the new coefficient from each worker
    - **Hint:** gather()
- Calculate the root mean square error for the test set

To know more about `scatter`, `gather` and `barrier` click [here](https://nyu-cds.github.io/python-mpi/05-collectives/)

In [None]:
%%writefile ScatteringNumpyArray.py
from mpi4py import MPI # Importing mpi4py package from MPI module
import numpy as np # Importing numpy package under a name np
import pandas as pd
# Defining a function
def main():
    # communicator
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()   # number of the process running the code
    size = comm.Get_size()   # total number of processes running
    numDataPerRank = None   # Number of elements in a array for each rank
    data = None # Starting with an empty  data

    dataFrame=pd.read_csv("/content/scaled_renamed_data")
    # Creating a Numpy array.
    numDataPerRank = int(np.ceil(dataFrame.shape[0]/size))

    if rank == 0:
      data=dataFrame.values

    recvbuf = np.empty(shape=(numDataPerRank,dataFrame.shape[1]), dtype='d') # allocate space for recvbuf
    barrier = comm.Barrier()
    # scatter operation
    comm.Scatter(data, recvbuf, root=0)
    # Displaying the result
    print('Rank: ',rank, ', recvbuf received: ',recvbuf)
    filtered_data = recvbuf[~np.all(recvbuf == 0, axis=1)]
    print("length of recv buffer",len(filtered_data))
    MPI.Finalize()


# Calling the main function
main()

Writing ScatteringNumpyArray.py


In [None]:
!mpirun  --allow-run-as-root --oversubscribe -np 5 python /content/ScatteringNumpyArray.py

In [None]:
!mpirun -np 5 python /content/ScatteringNumpyArray.py

--------------------------------------------------------------------------
mpirun has detected an attempt to run as root.

Running as root is *strongly* discouraged as any mistake (e.g., in
defining TMPDIR) or bug can result in catastrophic damage to the OS
file system, leaving your system in an unusable state.

We strongly suggest that you run mpirun as a non-root user.

You can override this protection by adding the --allow-run-as-root option
to the cmd line or by setting two environment variables in the following way:
the variable OMPI_ALLOW_RUN_AS_ROOT=1 to indicate the desire to override this
protection, and OMPI_ALLOW_RUN_AS_ROOT_CONFIRM=1 to confirm the choice and
add one more layer of certainty that you want to do so.
We reiterate our advice against doing so - please proceed at your own risk.
--------------------------------------------------------------------------


In [None]:
# %%writefile ScatteringAndFittingAndGathering.py
from mpi4py import MPI # Importing mpi4py package from MPI module
import numpy as np # Importing numpy package under a name np
import pandas as pd
# Defining a function
from math import sqrt

feature_columns=['AmbientTemperature',	'ExhaustVaccum',	'AmbientPressure',	'RelativeHumidity']
target_columns=['EnergyOutput']

def get_coeficients(X,y):
  # Add a column of ones to X for the intercept term
  #print("np.ones(X.shape[0])",np.ones(X.shape[0]))
  X = np.c_[np.ones(X.shape[0]), X]
  #print("np.c_[np.ones(X.shape[0]), X]",X)
  # Calculate the coefficients using the formula
  X_transpose = X.T
  X_transpose_X = np.dot(X_transpose, X)
  #print("X_transpose_X",X_transpose_X)
  X_transpose_X_inv = np.linalg.inv(X_transpose_X)
  X_transpose_y = np.dot(X_transpose, y)
  coefficients = np.dot(X_transpose_X_inv, X_transpose_y)

  #print("Coefficients:", coefficients)
  return coefficients

def fit(X, y):
    coeff=get_coeficients(X,y)
    return (coeff[0],coeff[1:])

def rmse(y_predicted,y_actual):
  return sqrt(np.power((y_predicted-y_actual),2).sum())/len(y_predicted)


def predict(x, intercept, coefficients):
  '''
  y = b_0 + b_1*x + ... + b_i*x_i
  '''
  y=[intercept]*len(x)
  for (record_index,record) in enumerate(x):
    for coeff_index,feature_val in enumerate(record):
      y[record_index]=y[record_index]+coefficients[coeff_index]*feature_val


  return y


def main():
    # communicator
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()   # number of the process running the code
    size = comm.Get_size()   # total number of processes running
    numDataPerRank = None   # Number of elements in a array for each rank
    data = None # Starting with an empty  data

    dataframe_features_target=pd.read_csv("/content/scaled_renamed_data")
    # Creating a Numpy array.
    numDataPerRank = int(np.ceil(dataframe_features_target.shape[0]/size))

    if rank == 0:
      data=dataframe_features_target.values

    recvbuf = np.empty(shape=(numDataPerRank,dataframe_features_target.shape[1]), dtype='d') # allocate space for recvbuf
    barrier = comm.Barrier()
    # scatter operation
    comm.Scatter(data, recvbuf, root=0)
    # Displaying the result
    #print('Rank: ',rank, ', recvbuf received: ',recvbuf)
    #filtered_data = recvbuf[~np.all(recvbuf == 0, axis=1)]
    print('Rank: ',rank, "length of recv buffer",len(recvbuf))
    X=dataframe_features_target[feature_columns].to_numpy()
    y=dataframe_features_target[target_columns].to_numpy()
    y=y.reshape(dataframe_features_target.shape[0],)

    (bias, coeff)=fit(X,y)
    #print('Rank: ',rank, "bias",bias,'coeff',coeff)

    recvbuf = None
    # Gathering the Information
    recvbuf = comm.gather((bias, coeff), root = 0)
    # Display the result
    if rank == 0:
        print('Rank: ',rank, ', recvbuf received: ', recvbuf)
        for (bias, coeff) in recvbuf:
          y_predicted=predict(X,bias,coeff)
          print(rmse(y_predicted,y))

    MPI.Finalize()

# Calling the main function
main()

Rank:  0 length of recv buffer 9568
Rank:  0 , recvbuf received:  [(-1.5891015564630014e-15, array([-0.86350078, -0.17417154,  0.02160293, -0.13521023]))]
0.002729753261663917


In [None]:
!mpirun  --allow-run-as-root --oversubscribe -np 5 python /content/ScatteringAndFittingAndGathering.py

#### Exercise 16: Make a script and execute everything in one place (1 point)

Write a script(.py) file which contains the code of all the above exercises in it so that you can run the code on multiple processes using MPI.

**Hint:**

- magic commands
- put MPI related code under main function
- !mpirun --allow-run-as-root -np 4 python filename.py

In [None]:
# YOUR CODE HERE for scipt(.py)
# YOUR CODE HERE for scipt(.py)
# %%writefile RunAll.py
from mpi4py import MPI # Importing mpi4py package from MPI module
import numpy as np # Importing numpy package under a name np
# Defining a function
# Importing pandas
import pandas as pd
# Importing sqrt function from the Math
from math import sqrt
# Importing Decimal, ROUND_HALF_UP functions from the decimal package
from decimal import Decimal, ROUND_HALF_UP
import time

def read_content(filename):
  return pd.read_csv(filename)

def analyse_data(input_df):
  df=input_df.copy(deep=True)
  #print('rows:',df.shape[0],'columns:',df.shape[1])
  # 1. info() - DataFrame structure summary
  #print("DataFrame Info:")
  #df.info()

  # 2. head() - Display the first few rows
  #print("\nFirst few rows:")
  #print(df.head())

  # 3. describe() - Summary statistics for numerical columns
  #print("\nSummary statistics:")
  print(df.describe())

  # 4. shape - Number of rows and columns
  #print("\nNumber of rows and columns:")
  print(df.shape)

  # 5. dtypes - Data types of columns
  #print("\nData types of columns:")
  print(df.dtypes)

  # 6. count() - Number of non-null values for each column
  #print("\nNumber of non-null values for each column:")
  print(df.count())

  # 7. value_counts() - Count of unique values in a column
  for col in df.columns:
    print("\nCount of unique values in the '"+col+"' column:")
    print(df[col].value_counts())

  # 8. nunique() - Number of unique values for each column
  print("\nNumber of unique values for each column:")
  print(df.nunique())

  # 9. isnull() and notnull() - Boolean values for missing or non-missing data
  print("\nCount of null values:")
  print(df.isnull().sum())

  print('Count of duplicate rows:')
  duplicates = df.duplicated()
  print(duplicates.sum())

  df_no_duplicates = df.drop_duplicates()
  print("Count without duplicates:")
  print(df_no_duplicates)
  return df

def handle_null(input_df,threshold_percent):
  # print(input_df.isnull().sum())
  df=input_df.copy(deep=True)
  rows_count=input_df.shape[0]
  # print("type(df.isnull().sum().items())",type(df.isnull().sum().items()))
  for column_name, count in df.isnull().sum().items():
    print(df[column_name].dtype)
    if(count==0):
      print('no null values found for %s'%column_name)
    elif((count/rows_count) < threshold_percent):
      print('dropping na values for %s'%column_name)
      df = df.dropna(subset=[column_name])
    else:
      print('replacing %s with mean'%column_name)
      if('float64'==df[column_name].dtype):
        df = df.fillna(df[column_name.mean()])

    return df

def standardise(input_df):
  df=input_df.copy(deep=True)
  for col in df.columns:
    df[col]=(df[col] - df[col].mean())/df[col].std()

  return df

def getFeatureAndTarget(input_df):
  df_features_targets=input_df.copy(deep=True)
  df_features_targets = df_features_targets.rename(columns={'AT': 'AmbientTemperature', 'V': 'ExhaustVaccum', 'AP': 'AmbientPressure', 'RH': 'RelativeHumidity', 'PE': 'EnergyOutput'})
  return (df_features_targets,['AmbientTemperature', 'ExhaustVaccum', 'AmbientPressure', 'RelativeHumidity'],['EnergyOutput'])

def random_split(df):
  # Define the proportion of data for training and testing
  train_ratio = 0.7
  test_ratio = 0.3

  # Calculate the number of samples for training and testing
  total_samples = df.shape[0]
  train_samples = int(total_samples * train_ratio)
  test_samples = total_samples - train_samples

  # Shuffle the data to ensure randomness
  shuffled_df = df.sample(frac=1, random_state=42)

  # Split the data into training and testing sets
  training_set = shuffled_df[:train_samples]
  testing_set = shuffled_df[train_samples:]
  return (training_set,testing_set)

def dividing_data(x_train, y_train, size_of_workers):
    # Size of the slice
    slice_for_each_worker = int(Decimal(x_train.shape[0]/size_of_workers).quantize(Decimal('1.'), rounding = ROUND_HALF_UP))
    print('Slice of data for each worker: {}'.format(slice_for_each_worker))
    # YOUR CODE HERE
    # print('x_train.size',x_train.shape)
    sliced_x_train = np.zeros((size_of_workers,slice_for_each_worker,x_train.shape[1]), dtype='d')
    sliced_y_train = np.zeros((size_of_workers,slice_for_each_worker,y_train.shape[1]), dtype='d')
    # print('sliced_x_train.shape',sliced_x_train.shape)
    # print(np.shape(sliced_x_train))
    for workerNo in range(size_of_workers):
      sliced_x_train[workerNo]=x_train[(workerNo*slice_for_each_worker):(workerNo*slice_for_each_worker+slice_for_each_worker)]
      sliced_y_train[workerNo]=y_train[(workerNo*slice_for_each_worker):(workerNo*slice_for_each_worker+slice_for_each_worker)]

    return (sliced_x_train,sliced_y_train)


def get_coeficients(X,y):
  # Add a column of ones to X for the intercept term
  #print("np.ones(X.shape[0])",np.ones(X.shape[0]))
  X = np.c_[np.ones(X.shape[0]), X]
  #print("np.c_[np.ones(X.shape[0]), X]",X)
  # Calculate the coefficients using the formula
  X_transpose = X.T
  X_transpose_X = np.dot(X_transpose, X)
  #print("X_transpose_X",X_transpose_X)
  X_transpose_X_inv = np.linalg.inv(X_transpose_X)
  X_transpose_y = np.dot(X_transpose, y)
  coefficients = np.dot(X_transpose_X_inv, X_transpose_y)

  #print("Coefficients:", coefficients)
  return coefficients

def fit(X, y):
    coeff=get_coeficients(X,y)
    return (coeff[0],coeff[1:])

def predict(x, intercept, coefficients):
  '''
  y = b_0 + b_1*x + ... + b_i*x_i
  '''
  y=[intercept]*len(x)
  for (record_index,record) in enumerate(x):
    for coeff_index,feature_val in enumerate(record):
      y[record_index]=y[record_index]+coefficients[coeff_index]*feature_val


  return y

def rmse(y_predicted,y_actual):
  return sqrt(np.power((y_predicted-y_actual),2).sum())/len(y_predicted)

def main():
  FILENAME = "/content/PowerPlantData.csv"

  df=read_content(FILENAME)
  cleaned_df=analyse_data(df)
  df_nullhandled=handle_null(cleaned_df,5)
  scaled_df=standardise(df_nullhandled)
  (dataframe_features_target,feature_columns,target_columns)=getFeatureAndTarget(scaled_df)

    # communicator
  comm = MPI.COMM_WORLD
  rank = comm.Get_rank()   # number of the process running the code
  size = comm.Get_size()   # total number of processes running
  numDataPerRank = None   # Number of elements in a array for each rank
  data = None # Starting with an empty  data

  #dataframe_features_target=pd.read_csv("/content/scaled_renamed_data")
  # Creating a Numpy array.
  numDataPerRank = int(np.ceil(dataframe_features_target.shape[0]/size))

  if rank == 0:
    data=dataframe_features_target.values

  recvbuf = np.empty(shape=(numDataPerRank,dataframe_features_target.shape[1]), dtype='d') # allocate space for recvbuf
  barrier = comm.Barrier()
  # scatter operation
  comm.Scatter(data, recvbuf, root=0)
  # Displaying the result
  #print('Rank: ',rank, ', recvbuf received: ',recvbuf)
  #filtered_data = recvbuf[~np.all(recvbuf == 0, axis=1)]
  print('Rank: ',rank, "length of recv buffer",len(recvbuf))
  X=dataframe_features_target[feature_columns].to_numpy()
  y=dataframe_features_target[target_columns].to_numpy()
  y=y.reshape(dataframe_features_target.shape[0],)

  (bias, coeff)=fit(X,y)

  recvbuf = None
  # Gathering the Information
  recvbuf = comm.gather((bias, coeff), root = 0)
  # Display the result
  if rank == 0:
      print('Rank: ',rank, ', recvbuf received: ', recvbuf)
      for (bias, coeff) in recvbuf:
        y_predicted=predict(X,bias,coeff)
        print(rmse(y_predicted,y))

  print('Rank: ',rank, "bias",bias,'coeff',coeff)
  MPI.Finalize()

main()

                AT            V           AP           RH           PE
count  9568.000000  9568.000000  9568.000000  9568.000000  9568.000000
mean     19.651231    54.305804  1013.259078    73.308978   454.365009
std       7.452473    12.707893     5.938784    14.600269    17.066995
min       1.810000    25.360000   992.890000    25.560000   420.260000
25%      13.510000    41.740000  1009.100000    63.327500   439.750000
50%      20.345000    52.080000  1012.940000    74.975000   451.550000
75%      25.720000    66.540000  1017.260000    84.830000   468.430000
max      37.110000    81.560000  1033.300000   100.160000   495.760000
(9568, 5)
AT    float64
V     float64
AP    float64
RH    float64
PE    float64
dtype: object
AT    9568
V     9568
AP    9568
RH    9568
PE    9568
dtype: int64

Count of unique values in the 'AT' column:
25.21    14
13.78    12
24.43    11
16.00    10
10.59    10
         ..
11.66     1
27.85     1
17.26     1
19.85     1
33.41     1
Name: AT, Length: 2773,

In [None]:
# YOUR CODE HERE for MPI command
!mpirun --allow-run-as-root --oversubscribe -np 4 python RunAll.py

#### Exercise 17: Implement predict using OpenMP (1 point)

Get the predictions for test data and calculate the test error(RMSE) by implementing the OpenMP (pymp)

**Hints:**

* Using the pymp.Parallel implement the predict function (use from above)

* Call the predict function by passing test data as an argument

* calculate the error (RMSE) by comparing the Actual test data and predicted test data

In [None]:
!pip install pymp-pypi

Collecting pymp-pypi
  Downloading pymp-pypi-0.5.0.tar.gz (12 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pymp-pypi
  Building wheel for pymp-pypi (setup.py) ... [?25l[?25hdone
  Created wheel for pymp-pypi: filename=pymp_pypi-0.5.0-py3-none-any.whl size=10319 sha256=7f42268505b5cc04a12f7431098dcb0a63349ed56d07dab0bfa61d5237201a5b
  Stored in directory: /root/.cache/pip/wheels/5e/db/4b/4c02f5b91b1abcde14433d1b336ac00a09761383e7bb1013cf
Successfully built pymp-pypi
Installing collected packages: pymp-pypi
Successfully installed pymp-pypi-0.5.0


In [None]:
import pymp

In [None]:
dataframe_features_target=pd.read_csv("/content/scaled_renamed_data")

In [None]:
feature_columns=['AmbientTemperature', 'ExhaustVaccum', 'AmbientPressure', 'RelativeHumidity']
target_columns=['EnergyOutput']

In [None]:
def random_split(df):
  # Define the proportion of data for training and testing
  train_ratio = 0.7
  test_ratio = 0.3

  # Calculate the number of samples for training and testing
  total_samples = df.shape[0]
  train_samples = int(total_samples * train_ratio)
  test_samples = total_samples - train_samples

  # Shuffle the data to ensure randomness
  shuffled_df = df.sample(frac=1, random_state=42)

  # Split the data into training and testing sets
  training_set = shuffled_df[:train_samples]
  testing_set = shuffled_df[train_samples:]
  return (training_set,testing_set)

In [None]:
def fit(x, y):
    coeff=get_coeficients(X,y)
    return (coeff[0],coeff[1:])

X=dataframe_features_target[feature_columns].to_numpy()
y=dataframe_features_target[target_columns].to_numpy()
y=y.reshape(dataframe_features_target.shape[0],)

(bias, coeff)=fit(X,y)

In [None]:
(training_set,testing_set)=random_split(dataframe_features_target)

In [None]:
bias,coeff

(-1.5891015564630014e-15,
 array([-0.86350078, -0.17417154,  0.02160293, -0.13521023]))

In [None]:
import pymp
# YOUR CODE HERE
def predict2(x, intercept, coefficients):
  '''
  y = b_0 + b_1*x + ... + b_i*x_i
  '''
  # y=[intercept]*len(x)
  print("len(x)",len(x))
  y = pymp.shared.array((len(x),), dtype='float')
  with pymp.Parallel(4) as p:
    for record_index in p.range(0, len(x)):
        y[record_index] = intercept
        record=x[record_index]
        for coeff_index,feature_val in enumerate(record):
          y[record_index]=y[record_index]+coefficients[coeff_index]*feature_val
        # The parallel print function takes care of asynchronous output.
        p.print('Yay! {} done!'.format(record_index))


  return y

y_predicted2=predict2(X,bias,coeff)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Yay! 3220 done!
Yay! 3221 done!
Yay! 3222 done!
Yay! 3223 done!
Yay! 5565 done!
Yay! 5566 done!
Yay! 5567 done!
Yay! 5568 done!
Yay! 5569 done!
Yay! 7744 done!
Yay! 7745 done!
Yay! 7746 done!
Yay! 7747 done!
Yay! 7748 done!
Yay! 7749 done!
Yay! 7750 done!
Yay! 3224 done!
Yay! 3225 done!
Yay! 3226 done!
Yay! 3227 done!
Yay! 3228 done!
Yay! 5570 done!
Yay! 5571 done!
Yay! 5572 done!
Yay! 7751 done!
Yay! 7752 done!
Yay! 7753 done!
Yay! 7754 done!
Yay! 7755 done!
Yay! 7756 done!
Yay! 7757 done!
Yay! 7758 done!
Yay! 7759 done!
Yay! 7760 done!
Yay! 7761 done!
Yay! 7762 done!
Yay! 7763 done!
Yay! 7764 done!
Yay! 7765 done!
Yay! 7766 done!
Yay! 7767 done!
Yay! 7768 done!
Yay! 7769 done!
Yay! 7770 done!
Yay! 7771 done!
Yay! 7772 done!
Yay! 7773 done!
Yay! 7774 done!
Yay! 7775 done!
Yay! 7776 done!
Yay! 7777 done!
Yay! 7778 done!
Yay! 7779 done!
Yay! 7780 done!
Yay! 7781 done!
Yay! 7782 done!
Yay! 7783 done!
Yay! 7784 done!
Yay! 77

#### Exercise 18: Use Sklearn to compare (1 point)

Apply the Linear regression on the given data using sklearn package and compare with the above results

**Hint:**
* Split the data into train and test
* Fit the train data and predict the test data using `sklearn Linear Regression`
* Compare the coefficients and intercept with above estimated coefficients
* calculate loss (RMSE) on test data and predictions and compare

In [None]:
# YOUR CODE HERE
import numpy as np
from sklearn.linear_model import LinearRegression
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error

# Create a linear regression model
model = LinearRegression()

# Fit the model to the data
model.fit(X, y)

# Make predictions
X_new = np.array([6]).reshape(-1, 1)
y_pred = model.predict(X)

In [None]:
# Calculate the mean squared error (MSE)
mse = mean_squared_error(y, y_pred)

# Calculate the root mean squared error (RMSE)
rmse_scikit = np.sqrt(mse)

print("Mean Squared Error (MSE):", mse)
print("Root Mean Squared Error (RMSE):", rmse_scikit)

Mean Squared Error (MSE): 0.07129645785599593
Root Mean Squared Error (RMSE): 0.26701396565722163


In [None]:
model.coef_

array([-0.86350078, -0.17417154,  0.02160293, -0.13521023])

In [None]:
model.intercept_

-1.5852676439507103e-15