In [None]:
from pyspark import SparkConf, SparkContext

# Setting up Spark configuration and context
conf = SparkConf().setMaster("local").setAppName("MinWeatherTemperature")
sc = SparkContext(conf=conf)

# Function to parse each line of the dataset
def parseLine(line):
    fields = line.split(",")  # Assuming the file is CSV
    station_id = fields[0]
    entry_type = fields[2]
    temperature = float(fields[3]) * 0.1 * (9 / 5) + 32  # Convert to Fahrenheit
    return (station_id, entry_type, temperature)

# Reading the file
lines = sc.textFile("file:///spark/weather1800.csv")

# Parsing lines into structured tuples
parsed_lines = lines.map(parseLine)

# Filtering for minimum temperature entries (TMIN)
tmin_records = parsed_lines.filter(lambda x: x[1] == "TMIN")

# Mapping station ID to temperature
station_temps = tmin_records.map(lambda x: (x[0], x[2]))

# Reducing to find the minimum temperature for each station
min_temps = station_temps.reduceByKey(lambda x, y: min(x, y))

# Collecting and printing the results
results = min_temps.collect()

for result in results:
    print(f'{result[0]} -> {result[1]:.2f} Fahrenheit')



---

### Sample Dataset (`weather1800.csv`)
```
ITE00100554,18000101,TMAX,-75
ITE00100554,18000101,TMIN,-148
ITE00100554,18000102,TMAX,-70
ITE00100554,18000102,TMIN,-125
ITE00100554,18000103,TMIN,-160
EZE00100082,18000101,TMAX,-44
EZE00100082,18000101,TMIN,-78
EZE00100082,18000102,TMAX,-33
EZE00100082,18000102,TMIN,-64
EZE00100082,18000103,TMIN,-90
```

---

### Step-by-Step Explanation

#### **Step 1: Parsing the Data**

Using the `parseLine` function, each line is converted into a tuple:
```python
def parseLine(line):
    fields = line.split(",")
    station_id = fields[0]
    entry_type = fields[2]
    temperature = float(fields[3]) * 0.1 * (9 / 5) + 32  # Celsius to Fahrenheit
    return (station_id, entry_type, temperature)
```

The parsed data looks like this:
```
[
 ('ITE00100554', 'TMAX', 5.0),
 ('ITE00100554', 'TMIN', -9.4),
 ('ITE00100554', 'TMAX', 5.0),
 ('ITE00100554', 'TMIN', -13.0),
 ('ITE00100554', 'TMIN', -20.8),
 ('EZE00100082', 'TMAX', 24.8),
 ('EZE00100082', 'TMIN', -10.4),
 ('EZE00100082', 'TMAX', 27.4),
 ('EZE00100082', 'TMIN', -11.2),
 ('EZE00100082', 'TMIN', -22.0)
]
```

---

#### **Step 2: Filtering for `TMIN` Records**

The `filter` operation keeps only records where the `entry_type` is `"TMIN"`:
```python
tmin_records = parsed_lines.filter(lambda x: x[1] == "TMIN")
```

Filtered data:
```
[
 ('ITE00100554', 'TMIN', -9.4),
 ('ITE00100554', 'TMIN', -13.0),
 ('ITE00100554', 'TMIN', -20.8),
 ('EZE00100082', 'TMIN', -10.4),
 ('EZE00100082', 'TMIN', -11.2),
 ('EZE00100082', 'TMIN', -22.0)
]
```

---

#### **Step 3: Mapping Station ID to Temperature**

The `map` operation transforms the data into key-value pairs of `(station_id, temperature)`:
```python
station_temps = tmin_records.map(lambda x: (x[0], x[2]))
```

Mapped data:
```
[
 ('ITE00100554', -9.4),
 ('ITE00100554', -13.0),
 ('ITE00100554', -20.8),
 ('EZE00100082', -10.4),
 ('EZE00100082', -11.2),
 ('EZE00100082', -22.0)
]
```

---

#### **Step 4: Finding the Minimum Temperature**

The `reduceByKey` operation groups records by `station_id` and reduces the values for each key using the `min` function:
```python
min_temps = station_temps.reduceByKey(lambda x, y: min(x, y))
```

**How `reduceByKey` works:**

1. **Grouping by Key**:
   - Records are grouped by `station_id`:
     ```
     ITE00100554: [-9.4, -13.0, -20.8]
     EZE00100082: [-10.4, -11.2, -22.0]
     ```

2. **Applying the `min` Function**:
   - For `ITE00100554`:
     ```
     min(-9.4, -13.0) = -13.0
     min(-13.0, -20.8) = -20.8
     Final: -20.8
     ```
   - For `EZE00100082`:
     ```
     min(-10.4, -11.2) = -11.2
     min(-11.2, -22.0) = -22.0
     Final: -22.0
     ```

Reduced data:
```
[
 ('ITE00100554', -20.8),
 ('EZE00100082', -22.0)
]
```

---

#### **Step 5: Collecting and Printing Results**

The `collect` method retrieves the final results:
```python
results = min_temps.collect()
```

Results:
```
[
 ('ITE00100554', -20.8),
 ('EZE00100082', -22.0)
]
```

Each result is printed:
```
ITE00100554 -> -20.80 Fahrenheit
EZE00100082 -> -22.00 Fahrenheit
```

---

### Summary of Transformations

| Step                | Operation           | Input Data                          | Output Data                                 |
|---------------------|---------------------|-------------------------------------|--------------------------------------------|
| Parse Lines         | `map(parseLine)`   | Raw CSV lines                       | Structured tuples                          |
| Filter `TMIN`       | `filter`           | All temperature records             | Only `TMIN` records                        |
| Map Station to Temp | `map`              | `TMIN` tuples                       | Key-value pairs `(station_id, temperature)`|
| Reduce by Key       | `reduceByKey(min)` | Key-value pairs                     | Minimum temperature per station            |
| Collect Results     | `collect`          | Reduced data                        | Final results in driver program            |

---

### Key Concepts Explained

1. **`map`**:
   - Transforms each element of the RDD. For example, parsing a line into a structured tuple.

2. **`filter`**:
   - Retains only elements that satisfy a condition. For example, filtering for `TMIN` records.

3. **`reduceByKey`**:
   - Groups data by key and reduces the values using a specified function. For example, finding the minimum temperature for each station.

4. **`collect`**:
   - Retrieves the final results to the driver program.

---

### If we don't use min_temp.collect(). what would happen?

If we do not use `min_temp.collect()`, the results of the `reduceByKey` operation will remain as an **RDD** (Resilient Distributed Dataset) in the Spark cluster and will not be retrieved to the driver program for further use or display. Here's what would happen:

---

### 1. **Data Stays in the Cluster**
   - Without `collect()`, the data is not transferred from the Spark executors to the driver program.
   - The RDD `min_temp` will exist only as a distributed dataset in the cluster.

---

### 2. **No Output on the Driver Console**
   - The `for` loop that tries to iterate over `results` will fail because `results` won't exist (since `collect()` wasn't called).
   - This will result in an **error**, as the driver program cannot access the distributed data directly.

---

### 3. **How Spark Works**
   - Spark transformations like `reduceByKey` are **lazy**, meaning they don't execute until an **action** is called.
   - `collect()` is an action that triggers the execution of all preceding transformations and brings the data back to the driver program.
   - Without `collect()`, the transformations remain unexecuted, and the program cannot proceed with further steps (like printing results).

---

### Example Without `collect()`

```python
min_temp = weather_temp.reduceByKey(lambda x, y: min(x, y))
for result in min_temp:
    print(result)  # This will fail
```

#### Error Message:
```
TypeError: 'PipelinedRDD' object is not iterable
```

This happens because `min_temp` is an RDD, and RDDs cannot be directly iterated over in the driver program.

---

### Correct Way: Use `collect()`

To bring the RDD data to the driver program, use:
```python
results = min_temp.collect()
for result in results:
    print(result)
```

---

### Alternative Actions to `collect()`
If you don't want to use `collect()`, you can use other Spark actions like:
1. **`take(n)`**:
   - Retrieves the first `n` elements from the RDD.
   - Example:
     ```python
     results = min_temp.take(5)
     print(results)
     ```

2. **`saveAsTextFile(path)`**:
   - Saves the RDD data to a file instead of bringing it to the driver.
   - Example:
     ```python
     min_temp.saveAsTextFile("output/min_temp_results")
     ```

---

### Key Takeaway
Without `collect()` or another action, the RDD remains in the cluster, and no actual computation or data retrieval will occur on the driver side. To use the data locally (e.g., for printing), you must trigger an action like `collect()`.