#  Practica 3: RDDs

In [1]:
# Curso: TDM - 2023/2024
# Nombre: Daniel Mihai
# Apellidos: Rece
# Fecha: 24/10/2023

- Se valorará la claridad del código y evitar redundancias o código poco eficiente; en particular se valorará utilizar las funciones para RDDs de Spark minimizando el uso de Python. 
- Además de las funciones que se piden se pueden añadir otras auxiliares si se necesitan, y también otros imports.
- El código debe funcionar correctamente no solo con las pruebas que vienen de ejemplo sino con cualquier otra prueba.


## Preparando el entorno

In [2]:
import findspark
findspark.init()

import pyspark                         # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext                        # para trabajar con RDD's

df = spark.sql('''select 'spark' as hola ''')
df.show()

23/10/24 19:05:47 WARN Utils: Your hostname, danrec-HP-Pavilion-Gaming-Laptop-15-ec0xxx resolves to a loopback address: 127.0.1.1; using 10.8.93.156 instead (on interface wlo1)
23/10/24 19:05:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/10/24 19:05:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


                                                                                

+-----+
| hola|
+-----+
|spark|
+-----+



## Descripción del dataset

El dataset con el que vamos a trabajar está extraído del portal de datos abiertos de la ciudad de Madrid. Se trata de un fichero con datos metereológicos de la ciudad.

La URL es:
```
https://datos.madrid.es/egob/catalogo/300351-6-meteorologicos-diarios.csv
```

Intérprete de los datos del fichero:

* Se trata de datos diarios del año 2021
* Cada registro tiene la siguiente estructura:

|PROVINCIA|MUNICIPIO|ESTACION|MAGNITUD|PUNTO_MUESTREO|ANO| MES |D01|V01|D02|V02|
|:--------|:--------|:-------|:-------|:-------------|:--|:---|:---|:---|:---|:---|
|28 |79| 4| 1| 28079004_82_98| 2019| 1| 18| V |20| V|

* El campo `PUNTO_MUESTREO` incluye el código de la estación completo
(provincia, municipio y estación) más el parámetro y la técnica de
muestreo.
* `D01` corresponde al dato del primer día del mes, `D02` al del segundo día
y así sucesivamente.
* UNICAMENTE SON VÁLIDOS LOS DATOS QUE LLEVAN EL CÓDIGO DE VALIDACIÓN “V"


El fichero se encuentra también aquí: [./data/meteo21.csv](./data/meteo21.csv)

## Ejercicios

### Crear un RDD con los datos 

__a)__ Crear un RDD llamado `rdd1` a partir del fichero csv proporcionado. Usa el método `textFile`.<br>
__b)__ ¿Cuántas filas tiene el fichero? Usa la operación  `count`.<br> 
__c)__ ¿Cuántas particiones se han creado? Usa la operación `getNumPartitions`.<br>
__d)__ Mostrar las 5 primeras filas.


In [11]:
# Sol: Rows:  1057, Particiones:  
path = "./data/meteo21.csv"
rdd1 = sc.textFile(path)
rdd1.count()
rdd1.getNumPartitions()
rdd1.take(5)

['PROVINCIA;MUNICIPIO;ESTACION;MAGNITUD;PUNTO_MUESTREO;ANO;MES;D01;V01;D02;V02;D03;V03;D04;V04;D05;V05;D06;V06;D07;V07;D08;V08;D09;V09;D10;V10;D11;V11;D12;V12;D13;V13;D14;V14;D15;V15;D16;V16;D17;V17;D18;V18;D19;V19;D20;V20;D21;V21;D22;V22;D23;V23;D24;V24;D25;V25;D26;V26;D27;V27;D28;V28;D29;V29;D30;V30;D31;V31',
 '',
 '28;079;102;81;28079102_81_98;2021;01;02.21;V;01.48;V;01.75;V;01.73;V;00.75;V;01.43;V;01.62;V;02.27;V;00.33;V;02.14;V;00.80;V;00.70;V;00.57;V;00.52;V;00.56;V;00.48;V;00.83;V;00.77;V;00.88;V;01.70;V;03.42;V;04.38;V;03.74;V;03.13;V;03.62;V;01.50;V;01.04;V;00.63;V;00.94;V;04.20;V;03.81;V',
 '',
 '28;079;102;81;28079102_81_98;2021;02;03.09;V;02.32;V;01.66;V;01.40;V;02.58;V;01.45;V;02.58;V;03.28;V;03.52;V;03.42;V;01.28;V;02.60;V;01.28;V;01.28;V;01.92;V;01.36;V;00.98;V;01.46;V;01.34;V;02.23;V;03.26;V;02.07;V;01.31;V;02.49;V;02.32;V;01.51;V;01.48;V;03.11;V;00000;N;00000;N;00000;N']

### Gestión de filas vacías

Comprobar que no hay filas vacías ('') en el conjunto de datos. Puedes usar la operación  `filter` para seleccionar las filas cuyo valor es esactamente ''. Luego puedes contar las filas que hay en el resultado.

In [13]:
# Sol: 4
rdd1.filter(lambda x:x=="").count()

4

### Eliminar filas vacías

Elimina las filas vacías y vuelve a comprobar que no hay filas vacías ('') en el conjunto de datos. Puedes usar la operación  `filter` para seleccionar las filas cuyo valor es esactamente ''. El RDD resultante es justo lo que buscamos. Lo llamaremos `rdd1`. No hay operación `delete`  como tal.

In [14]:
# Sol:
rdd1 = rdd1.filter(lambda x:x!="")

### Lista de filas

El `rdd1` creado en el apartado anterior es un RDD de strings sin filas vacías. <br>
__a)__ Transformarlo de forma que cada string en `rdd1`, se convierta en una lista de strings, es decir separar sus componentes (recordar que la función `split` divide un string según una cadena de separación). Esto nos permitirá trabajar con los datos de forma separada. Llamar al resultado `rdd2`.<br>
__b)__ Mostrar las 5 primeras filas. Observar la primera fila.

In [17]:
# Sol:
rdd2 = rdd1.map(lambda x : x.split(";"))
rdd2.take(5)

[['PROVINCIA',
  'MUNICIPIO',
  'ESTACION',
  'MAGNITUD',
  'PUNTO_MUESTREO',
  'ANO',
  'MES',
  'D01',
  'V01',
  'D02',
  'V02',
  'D03',
  'V03',
  'D04',
  'V04',
  'D05',
  'V05',
  'D06',
  'V06',
  'D07',
  'V07',
  'D08',
  'V08',
  'D09',
  'V09',
  'D10',
  'V10',
  'D11',
  'V11',
  'D12',
  'V12',
  'D13',
  'V13',
  'D14',
  'V14',
  'D15',
  'V15',
  'D16',
  'V16',
  'D17',
  'V17',
  'D18',
  'V18',
  'D19',
  'V19',
  'D20',
  'V20',
  'D21',
  'V21',
  'D22',
  'V22',
  'D23',
  'V23',
  'D24',
  'V24',
  'D25',
  'V25',
  'D26',
  'V26',
  'D27',
  'V27',
  'D28',
  'V28',
  'D29',
  'V29',
  'D30',
  'V30',
  'D31',
  'V31'],
 ['28',
  '079',
  '102',
  '81',
  '28079102_81_98',
  '2021',
  '01',
  '02.21',
  'V',
  '01.48',
  'V',
  '01.75',
  'V',
  '01.73',
  'V',
  '00.75',
  'V',
  '01.43',
  'V',
  '01.62',
  'V',
  '02.27',
  'V',
  '00.33',
  'V',
  '02.14',
  'V',
  '00.80',
  'V',
  '00.70',
  'V',
  '00.57',
  'V',
  '00.52',
  'V',
  '00.56',
  'V',
  '

### Eliminar la fila de títulos

__a)__ Crear un nuevo RDD que no contenga la fila de títulos. Llámalo `rdd3`. (El rdd recién creado tiene 1052 filas).<br>
__b)__ Crear una variable de tipo `list` llamada `columns` que sea una lista de las columnas del fichero. (La lista recién creada tiene 69 elementos).

In [58]:
# Sol a:
primer_elemento = rdd2.first()
rdd3 = rdd2.filter(lambda x:x!=primer_elemento)
rdd3.count()

1052

In [57]:
# Sol b
columns = []
for i in range(len(rdd3.first())):
    column = (rdd3.map(lambda x: x[i])).collect()
    columns.append(column)
len(columns)

69

###  Comprobación de tipos

Las columnas D01, D02,...,D31 contienen el dato recogido por la estación (sensor) para una cierta magnitud. Nos interesa comprobar que el dato es un número real en todas las medicciones.

__a)__ Escribir código Python para crear la lista de posiciones que ocupan las variables D01, D02,...,D31. Llamar a la variable `posD`. Observa que dichas variables aparecerán después de la columna 'MES'.<br>
__b)__ La siguiente función devuelve True si el dato es un número (o se puede transformar a número). En caso contrario devuelve False.<br>

```Python
def is_number(n):
    try:
        float(n)   # Type-casting the string to `float`.
                   # If string is not a valid `float`, 
                   # it'll raise `ValueError` exception
    except ValueError:
        return False
    return True
```
Escribir una función llamada `contar_No_numero` que recibe como parámetros de entrada una fila del RDD y una lista de  posiciones de columnas $(n_1, n_2, ... n_m)$, y devuelve la lista de pares del tipo (clave, valor), donde la clave es el número de columna y el valor es False si el dato en la columna es numérico y True en caso contrario. Por ejemplo:
$[(n_1, True), (n_2,False), ... (n_m,True)]$ indicando que la columna $n_1$ es numérica, mientras que la columna $n_2$ no lo es.

__c)__ Aplicar la función anterior al rdd3 completo para construir un nuevo rdd4 con el siguiente aspecto:
```Python
[(7, 0),
 (9, 0),
 (11, 0),
 (13, 0), ...
```
La clave representa una columna del rdd3, mientras que el valor representa el número de valores en la columna que no son numéricos.
Puedes usar operaciones como `flatMap` y `reduceByKey` para resolver el ejercicio.


In [68]:
def is_number(n):
    try:
        float(n)   # Type-casting the string to `float`.
                   # If string is not a valid `float`, 
                   # it'll raise `ValueError` exception
    except ValueError:
        return False
    return True


posD=[]
for i in range(len(primer_elemento)):
    if primer_elemento[i][0]== "D":
        posD.append(i)

def contar_No_numero(fila, posiciones):
    devolucion = []
    for elem in posiciones:
        devolucion.append((elem, is_number(fila[elem])))
    return devolucion

rdd4 = rdd3.map(lambda x: contar_No_numero(x,posD))

In [69]:
rdd4.take(1)

[[(7, True),
  (9, True),
  (11, True),
  (13, True),
  (15, True),
  (17, True),
  (19, True),
  (21, True),
  (23, True),
  (25, True),
  (27, True),
  (29, True),
  (31, True),
  (33, True),
  (35, True),
  (37, True),
  (39, True),
  (41, True),
  (43, True),
  (45, True),
  (47, True),
  (49, True),
  (51, True),
  (53, True),
  (55, True),
  (57, True),
  (59, True),
  (61, True),
  (63, True),
  (65, True),
  (67, True)]]

In [70]:
# Sol 
import operator as op

### Conversión de tipos

Dado que todos los valores en las columnas de `posD` son números reales válidos, generar código para convertir los strings de las posiciones en `posD` en float (con la función predefinida float(x)). Llamar `raw4` al RDD resultante.
Se valorará hacerlo reduciendo al mínimo el número de recorridos de `rdd3`. Usa la operación `map`.

In [129]:
# Sol:
def number(n):
    try:
        float(n)   # Type-casting the string to `float`.
                   # If string is not a valid `float`, 
                   # it'll raise `ValueError` exception
    except ValueError:
        return n
    return float(n)

def other_is_number(n):
    try:
        float(n)   # Type-casting the string to `float`.
                   # If string is not a valid `float`, 
                   # it'll raise `ValueError` exception
    except ValueError:
        return "N"
    return "S"

raw4 = rdd3.map(lambda x: [number(x[i]) if i in posD else x[i] for i in range(len(x))])

### Aplanar el RDD por días

Queremos cambiar el RDD `raw4`  para que cada día sea un elemento nuevo del RDD. Por ejemplo el elemento
```Python
[['28','079','102','81','28079102_81_98','2021','01',2.21,'V',1.48, 'V', 1.75,'V',  1.73, ...]
 ```
se convertira en 31 elementos, uno por día (nótese que se incluye una nueva columna con el número de día)    
```Python
    ['28','079','102','81','28079102_81_98','2021','01', 'D01', 2.21,'V']
    ['28','079','102','81','28079102_81_98','2021','01', 'D02', 1.48,'V']
    ...
    ['28','079','102','81','28079102_81_98','2021','02', 'D31', 0,'N']
```

Puedes usar la operación `flatMap` .Llamar `rdd5` al nuevo RDD

In [142]:
# Sol:
def separar_valores(fila, posiciones):
    resultado = []
    primera_pos = min(posiciones)
    for i in range(len(posiciones)-1):  # Asegurarse de no exceder el índice
        j = str(i).zfill(2)
        cabecera = fila[:primera_pos] + [f"D{j}", fila[posiciones[i]], other_is_number(fila[posiciones[i]])]
        resultado.append(cabecera)
    return resultado

rdd5 = raw4.flatMap(lambda x: separar_valores(x,posD))
rdd5.take(3)

[['28', '079', '102', '81', '28079102_81_98', '2021', '01', 'D00', 2.21, 'S'],
 ['28', '079', '102', '81', '28079102_81_98', '2021', '01', 'D01', 1.48, 'S'],
 ['28', '079', '102', '81', '28079102_81_98', '2021', '01', 'D02', 1.75, 'S']]

### Borrado de filas erróneas
Ahora queremos eliminar todas las filas del rdd5 que tengan algún valor no válido (distinto de 'V').

__a)__ Escribir una función `removeNonValid(rdd)

- `name`: removeNonValid
- `desc`: Elimina las filas con dato no válido
- `input parameters`:
    - rdd: un RDD de listas de strings
- `return`: el rdd de entrada pero quedándose solo con los elementos que tengan una 'V' en el último campo.

__b)__ Eliminar la última columna.

In [143]:
# Sol a) el rdd resultante tiene  31045 filas
def removeNonValid(rdd):
    rdd = rdd.filter(lambda x: x[len(x)-1] == "S")
removeNonValid(rdd5)

In [145]:
# Sol b)
rdd5 = rdd5.map(lambda x: x[:len(x)-1])
rdd5.take(1)

[['28', '079', '102', '81', '28079102_81_98', '2021', '01', 'D00', 2.21]]

### Estadísticas

Calcular la media del valor recogido (el dato de la última columna) por punto de muestreo (columna 5ª, pos 4) y día (columna 7ª, pos 6). Se deben usar recursos de Spark y el resultado debe salir por orden de punto de muestreo  y dentro del mismo punto de muestreo por día. La salida será de la forma:

```Python
[['28079004_83_98', '01', 0.4875],
 ['28079004_83_98', '02', 9.788],
 ['28079004_83_98', '03', 11.0],
 ['28079004_83_98', '04', 13.15],
 ['28079004_83_98', '05', 18.196774193548386],
 ['28079004_83_98', '06', 22.666666666666668],
 ['28079004_83_98', '07', 25.87741935483871],
 ['28079004_83_98', '08', 26.246153846153845],
 ['28079004_83_98', '09', 20.453333333333333],
  ...
  ```

In [146]:
from statistics import mean
# Sol:
def obtener_claves(fila):
    return fila[4], fila[6]
def obtener_valor(fila):
    return fila[len(fila)-1]

final = rdd5.groupBy(obtener_claves).mapValues(lambda filas: mean([obtener_valor(f) for f in filas]))


In [147]:
final.take(10)

[(('28079102_81_98', '01'), 1.6706666666666667),
 (('28079102_81_98', '02'), 1.9526666666666666),
 (('28079102_81_98', '04'), 2.042),
 (('28079102_81_98', '08'), 2.0383333333333336),
 (('28079102_81_98', '11'), 1.764),
 (('28079102_82_98', '01'), 122.26666666666667),
 (('28079102_82_98', '02'), 125.66666666666667),
 (('28079102_82_98', '04'), 106.33333333333333),
 (('28079102_82_98', '08'), 96.63333333333334),
 (('28079102_82_98', '11'), 65.33333333333333)]

No olvides subir el notebook a la entrega del Campus Virtual.