# Filtering meteorological data

We will use meteorolical data from Meteogalicia that contains the measurements of a weather station in Santiago during June 2017.

## Cargar los datos

Load the data in `datasets/meteogalicia.txt` into an RDD:

In [26]:
temperaturas = sc.textFile('datasets/meteogalicia.txt')

In [27]:
type(temperaturas)

pyspark.rdd.RDD

In [28]:
temperaturas.toDebugString()

'(2) datasets/meteogalicia.txt MapPartitionsRDD[14] at textFile at NativeMethodAccessorImpl.java:0 []\n |  datasets/meteogalicia.txt HadoopRDD[13] at textFile at NativeMethodAccessorImpl.java:0 []'

In [29]:
temperaturas.takeSample(num=5, withReplacement=True)

[u'      1          2017-06-19 08:00:00    Velocidade do Vento (km/h)                 2,77',
 u'      1          2017-06-22 07:00:00    Velocidade do Vento (km/h)                 4,25',
 u'      1          2017-06-17 16:00:00    Visibilidade (m)                          20098',
 u'      1          2017-06-19 15:20:00    Temperatura media (\ufffdC)                    31,09',
 u'      1          2017-06-15 12:50:00    Velocidade do Vento (km/h)                 11,16']

## Filter temperature data

Filter data from the RDD keeping only "Temperatura media" lines.

Filtrar los datos del RDD manteniendo solo las líneas de "Temperatura media".

In [32]:
temperaturas = rdd.filter(lambda line: 'Temperatura media' in line)

In [33]:
temperaturas.take(5)

[u'      1          2017-06-01 00:10:00    Temperatura media (\ufffdC)                    13,82',
 u'      1          2017-06-01 00:20:00    Temperatura media (\ufffdC)                    13,71',
 u'      1          2017-06-01 00:30:00    Temperatura media (\ufffdC)                    13,61',
 u'      1          2017-06-01 00:40:00    Temperatura media (\ufffdC)                    13,52',
 u'      1          2017-06-01 00:50:00    Temperatura media (\ufffdC)                    13,33']

In [34]:
type(temperaturas)

pyspark.rdd.PipelinedRDD

## Count the number of points

In [35]:
temperaturas.count()

4176

## Find the maximum temperature of the month

Extract the column with the temperature strings:

Extraer la columna con las strings de temperatura:

In [36]:
temperaturas_strings = temperaturas.map(lambda line: line.split()[6])

In [37]:
temperaturas_strings.take(5)

[u'13,82', u'13,71', u'13,61', u'13,52', u'13,33']

In [43]:
type(temperaturas_strings)

pyspark.rdd.PipelinedRDD

The temperature_strings contain strings of the form "21,55", in order to use them we have to convert them to floats we have to first replace the "," with a ".":

In [44]:
values = temperaturas_strings.map(lambda x: x.replace(',', '.'))

In [45]:
values.take(7)

[u'13.82', u'13.71', u'13.61', u'13.52', u'13.33', u'13.06', u'12.94']

And now we can convert them to floats:

In [47]:
temperatures = values.map(lambda x: float(x))

In [49]:
temperatures.take(3)

[13.82, 13.71, 13.61]

In [51]:
temperatures = values.map(lambda x: float(x))

Finally we can calculate the maximum temperature:

In [52]:
temperatures.reduce(lambda x,y: x if x > y else y)

34.4

Sometimes it is useful to explore the API to find more direct ways to do what we want.

In this case we can see that there is a **max()** built-in function in the RDD object just to do this, so we can also do:

In [53]:
temperatures.max()

34.4

## Find the minimum temperature of the month

In [54]:
temperatures.reduce(lambda x,y: x if x < y else y)

-9999.0

Reading the header of the dataset file we can see that -9999 is used as a code to indicate N/A values.

So we have to filter out -9999 and repeat:

In [55]:
temperatures.filter(lambda x: x != -9999.0).reduce(lambda x,y: x if x < y else y)

9.09

In [13]:
### rdd.saveAsTextFile('results_directory')