## General Setup of solution

The setup for my solution follows the instructions provided in Lab 2, using the following commands:

- . ./setup.sh
- terraform apply
- python parse-tf-state.py
- ansible-playbook -i hosts install_packages.yml
- ansible-playbook -i hosts config_ssh.yml
- ansible-playbook -i hosts nfs.yml

### Modifications and Configuration:
I modified the Terraform configuration file (`simple_deployment.tf`) to deploy 20 virtual machines. Due to the Google Cloud quota limit of 8 VMs per region, I distributed the VMs across following zones and regions:  

- **Zones**: `["us-central1-c", "us-east1-b", "us-east5-b", "us-west1-b"]`  
- **Regions**: `["us-central1", "us-east1", "us-east5", "us-west1"]`  

I set static external IP addresses to the VMs. This prevents IP address changes after suspending and restarting the machines. Additionally, I configured the VMs to use 't2d-standard-1' machine types and set them as Spot as requested in the assignment.  

### Package Installation:
In the `install_packages.yml` file, I included the packages specified in the assignment:  
- `numpy`, `torch`, `pandas`, `scipy`, and `mpi4py`  

I also added a step to upgrade `pip` to avoid package installation issues, as certain packages failed to install with older versions of `pip`.

# Training Process

### I run training and experiments with following commands:
1. **Set the GCP node IP address**:
    `export GCP_IP=IP_ADDRESS_OF_ONE_OF_THE_NODES`

2. **Copy the dataset to the GCP node**:
    `scp -i .ssh/id_gcp amazon_reviews_2M.csv $GCP_userID@$GCP_IP:~`

3. **Copy the training script to the GCP node**:
`scp -i .ssh/id_gcp train.py $GCP_userID@$GCP_IP:~`

4. **Run the training with MPI**:
    `ssh -i .ssh/id_gcp $GCP_userID@$GCP_IP " mpiexec --hostfile hostfile_mpi -x MASTER_ADDR=bml-0 -x MASTER_PORT=12340 -n 5 python3 train.py ~/amazon_reviews_2M.csv ~/output.csv "`
    
- Thanks to the Network File System (NFS), which was set up using: ansible-playbook -i hosts nfs.yml, the dataset (`amazon_reviews_2M.csv`) and the training script (`train.py`) only need to be copied to one node. The other nodes can access these files through the NFS mount.
- In accordance with Lab 2, I used `mpiexec` instead of `mpirun`. However, I also once tested `mpirun` and based on the documentation, both commands should give the same results.

## Breakdown of Train.py and Timing

The `train.py` script is divided into four main parts, with timing measured for each section.  
To ensure accurate measurement, `dist.barrier()` is used at key points to synchronize all nodes, ensuring no node progresses to the next step until all nodes have completed the current one. Specific implementation details are also documented as comments within `train.py`.

### Data Sharding (Part 1):
In the first part, each node reads a portion of the dataset based on its rank (`RANK`).  
The script calculates the number of rows each node should process using pandas by dividing the dataset among the nodes.  

I introduced an additional variable, `Fraction`, to control the portion of the dataset used during training:  
- `Fraction = 5/20`, `10/20`, `15/20` – For weak scaling experiments.  
- `Fraction = 20/20` – For strong scaling experiments.  

- This allows testing with smaller or full fractions of the dataset.   
- Note: The value of `Fraction` is manually adjusted within `train.py` rather than being passed as a command-line argument.  
- Although passing `Fraction` as a parameter would have been convenient, the assignment specified that only the dataset path and output path are to be passed as parameters, so I implemented this that way.

---

### Training (Word Count by Class) (Part 2):
In this stage, each node counts the occurrences of words for each class using the portion of data assigned to it.  
This process is performed independently on each node.

---

### Synchronization (Part 3):
Once the word counts are complete, the data from all nodes is synchronized to aggregate the results.  

---

### Probability Calculation and Saving (Part 4):
The final step involves calculating the class and word probabilities from the synchronized data.  
The results are then saved to output files on each node.  


# Expetiment times

### **Experiment Times (5/20 of Dataset)**

| VMs  | Data Sharding (s) | Training (s) | Synchronization (s) | Probability Calculation & Saving (s) |
|------|-------------------|--------------|---------------------|--------------------------------------|
| 5    | 2.61              | 7.16         | 0.32                | 5.37                                 |
| 10   | 166.71            | 3.29         | 1.88                | 6.30                                 |
| 15   | 156.77            | 2.27         | 2.38                | 6.11                                 |
| 20   | 507.59            | 2.03         | 10.85               | 9.47                                 |

---

### **Experiment Times (10/20 of Dataset)**

| VMs  | Data Sharding (s) | Training (s) | Synchronization (s) | Probability Calculation & Saving (s) |
|------|-------------------|--------------|---------------------|--------------------------------------|
| 5    | 4.95              | 13.60        | 0.41                | 7.53                                 |
| 10   | 170.22            | 7.16         | 2.13                | 9.31                                 |
| 15   | 169.40            | 4.32         | 2.89                | 8.21                                 |
| 20   | 288.45            | 4.27         | 4.93                | 10.91                                |

---

### **Experiment Times (15/20 of Dataset)**

| VMs  | Data Sharding (s) | Training (s) | Synchronization (s) | Probability Calculation & Saving (s) |
|------|-------------------|--------------|---------------------|--------------------------------------|
| 5    | 5.10              | 20.55        | 0.49                | 9.18                                 |
| 10   | 355.77            | 10.13        | 4.40                | 10.17                                |
| 15   | 284.07            | 6.79         | 4.46                | 12.09                                |
| 20   | 513.20            | 5.19         | 12.05               | 11.66                                |

---

### **Experiment Times (20/20 of Dataset - Strong Scaling)**

| VMs  | Data Sharding (s) | Training (s) | Synchronization (s) | Probability Calculation & Saving (s) |
|------|-------------------|--------------|---------------------|--------------------------------------|
| 5    | 4.94              | 29.74        | 0.60                | 10.07                                |
| 10   | 174.40            | 14.08        | 3.98                | 11.66                                |
| 15   | 185.14            | 9.14         | 2.93                | 11.71                                |
| 20   | 293.05            | 7.14         | 6.44                | 19.35                                |


## **Analysis of Results**

### 1. Data Sharding Time:  

One key observation is the significant delay in data distribution when VMs are located in different regions. This is evident in the results as with 5 VMs (all in the same region), data sharding times are relatively small. However, as the number of VMs increases, and so those number of regions, the time grows substantially.  

To investigate this further, I ran a small program ( `reduce.py` from Lab 2 and Lab 3) to send and sum `RANK` values across nodes. To my suprise even this program ran relatively longer, if I used nodes located in different regions.

Interestingly, the results do not show a clear upward trend with number of regions or number of data that need to be send. For example:  
- In the 15/20 dataset experiment, sharding time for 15 VMs (284s) is lower than for 10 VMs (355s).   
- For 20 VMs, the sharding time for 15/20 of the dataset (513s) is higher than for 20/20 of the dataset (293s).  

The assignment specifies that data sharding should occur before training, so I am unsure how critical this observation is. However, I thought it was important to highlight the inconsistencies. My best guess is that these variations are largely related to Google Cloud Platform environment factors, such as node performance or temporary regional delays.

---

### 2. Training Time:  
Training time behaves as expected – it decreases consistently as the number of VMs increases in each experiment.

---

### 3. Synchronization Time:  
The pattern for synchronization time mirrors the behavior observed in data sharding. However, since synchronization involves transmitting Python dictionaries (rather than the entire dataset), so the overall times are much smaller.  

Despite this, there is again no clear trend. For example:  
- In the 20/20 experiment, synchronization for 10 VMs (3.98s) takes longer than for 15 VMs (2.93s).  

---

### 4. Probability Calculation & Saving:  
Probability calculation times generally increase with the number of VMs. This is probably because, my implementation iterates over the synchronized dictionaries collected during the previous step.  


# **Prediction for 50 and 100 VMs:**

The method for predicting training time is based on a straightforward observation:  
- By dividing the size of the dataset by the number of VMs, and then dividing the result by the measured training time, we consistently observe a result of approximately 15,000 rows per second per VM.  

Thus, for the 20/20 dataset training time should be:  

- For 50 VMs:  
$
 \frac{2000000}{50} \div 15000 \approx 2,66
$

- For 100 VMs:  
$
\frac{2000000}{100} \div 15000 \approx 1,33
$

---

For remaining steps, results looks more logarithmic then linear so we will use use simple logarithmic regression. Hovewer, due to above observations I suspect that only result for fourth part (Probability Calculation & Saving), will be relatively correct. Results are generated by below code:


In [2]:
import numpy as np

vms = np.array([5, 10, 15, 20])
times_1 = np.array([4.94, 174.40, 185.14, 293.05])
times_3 = np.array([0.60, 3.98, 2.93, 6.44])
times_4 = np.array([10.07, 11.66, 11.71, 19.35])

log_vms = np.log(vms)

coeffs_1 = np.polyfit(log_vms, times_1,deg = 1)
coeffs_2 = np.polyfit(log_vms, times_3,deg = 1)
coeffs_3 = np.polyfit(log_vms, times_4,deg = 1)
a_1, b_1 = coeffs_1
a_2, b_2 = coeffs_2
a_3, b_3 = coeffs_3

print("Times for 50 and 100 VMs:\n")
print(f"Data Sharding: {(a_1 * np.log(50) + b_1):.2f} and {(a_1 * np.log(100) + b_1):.2f}")
print(f"Synchronization: {(a_2 * np.log(50) + b_2):.2f} and {(a_2 * np.log(100) + b_2):.2f}")
print(f"Probability Calculation & Saving: {(a_3 * np.log(50) + b_3):.2f} and {(a_3 * np.log(100) + b_3):.2f}")

Times for 50 and 100 VMs:

Data Sharding: 453.86 and 586.92
Synchronization: 8.80 and 11.25
Probability Calculation & Saving: 21.31 and 25.03


# Classification
For the classification part, I use two formulas from the first page of the assignment. Instead of multiplying probabilities directly, I apply a log-sum approach to avoid numerical underflow. Additionally, I include a small smoothing factor to prevent errors when taking the logarithm of 0.

More detailed comments on my solution can be found in the classify.py file. I execute it using the following command:
```bash
python classify.py output.csv test.txt predictions.txt
```
 where output.csv is generated by train.py, test.txt are documents to be labeled and predictions.txt is where output is given.