# Maximum Temperature by Weather Station

The data used for this exercise can be found at datasets/1800.csv. This is real weather data form 1800, each of its rows represents a weather measurment. Among the data fields we can find the id of the weather station that made the measurment, the date, the observation type and the measured temperature (in tenths of degrees celsius). The data contains other fields, but we'll just ignore them, for they are not useful in our analysis.

In this notebook we use PySpark is to find the maximum temperature measured by each weather station in 1800. 

First we call the some libraries and tell the computer that we are going to run the script on our local system.

In [1]:
import pyspark
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("num_friends_by_age")
sc = SparkContext(conf = conf)

Let's load the data and display a little sample of it.

In [2]:
raw_data = sc.textFile('datasets/1800.csv')

for row in raw_data.take(10):
    print(row)

ITE00100554,18000101,TMAX,-75,,,E,
ITE00100554,18000101,TMIN,-148,,,E,
GM000010962,18000101,PRCP,0,,,E,
EZE00100082,18000101,TMAX,-86,,,E,
EZE00100082,18000101,TMIN,-135,,,E,
ITE00100554,18000102,TMAX,-60,,I,E,
ITE00100554,18000102,TMIN,-125,,,E,
GM000010962,18000102,PRCP,0,,,E,
EZE00100082,18000102,TMAX,-44,,,E,
EZE00100082,18000102,TMIN,-130,,,E,


Now, we define a function that will help us to split the rows into the fields and to keep only the ones that are useful for us.

In [3]:
def split_data(row):
    splitted_row = row.split(',')
    # We onply keep the following fields
    station_id = splitted_row[0]
    obs_type = splitted_row[2]
    # Converts the temperature to regular degrees celsius
    temperature = float(splitted_row[3]) / 10
    return (station_id, obs_type, temperature)

In the next cell we create a new RDD which contains only the fields choosen by the split_data function

In [4]:
data = raw_data.map(split_data)

for row in data.take(10):
    print(row)

('ITE00100554', 'TMAX', -7.5)
('ITE00100554', 'TMIN', -14.8)
('GM000010962', 'PRCP', 0.0)
('EZE00100082', 'TMAX', -8.6)
('EZE00100082', 'TMIN', -13.5)
('ITE00100554', 'TMAX', -6.0)
('ITE00100554', 'TMIN', -12.5)
('GM000010962', 'PRCP', 0.0)
('EZE00100082', 'TMAX', -4.4)
('EZE00100082', 'TMIN', -13.0)


The next block of code flilters out all the rows that don't correspond to maximum temperature observations

In [17]:
only_max_temps = data.filter(lambda measure: 'TMAX' in measure)

Now we don't need the observation type field, thus we can get rid of it and transform the values of our RDD into key/values pairs, where the key is the weather station id and the value the temperature

In [18]:
only_max_temps = only_max_temps.map(lambda measurment: (measurment[0], measurment[2]))

Finally, we are able to find the maximum temperature for each weather station

In [19]:
max_temps = only_max_temps.reduceByKey(lambda temp_1, temp_2: max(temp_1, temp_2))
results = max_temps.collect() 

Let's print our results

In [21]:
for result in results:
    print(result)

('ITE00100554', 32.3)
('EZE00100082', 32.3)
