# LO BASICO

In [None]:
!apt-get install openjdk-8-jdk
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!pip install pyspark
import pandas as pd #librería de análisis de datos con buenas funciones para manejar tablas
from tabulate import tabulate
from pyspark import SparkContext
import json
from datetime import date
import folium
import sys

In [None]:
from google.colab import drive
drive.mount('/content/drive') #da acceso al Google Drive

# EJECUTAR PRIMERO PARA VER EL PROGRAMA POR PASOS

In [None]:
sc = SparkContext()

In [None]:
! wget -N wild.mat.ucm.es/tmp/sample_10e4.json
rdd_bruto = sc.textFile("sample_10e4.json")

In [None]:
sc.stop() # Ejecutar solo si se desea parar manualmente el objeto SparkContext

# ***PASO 1:*** Conseguir RDD: {"id": .. ; "edad": .. ; "tipo": .. ; "viajes" : [(fecha1,origen1,destino1),...] }

Los ***pasos*** incluyen ***comandos*** que muestran ***pequeños fragmentos de la RDD*** con el fin de ***apreciar mejor su funcionamiento***. Estos comandos sacrifican tiempo de computación y no estarán incluidos en el comando final, así que se aconseja utilizar esta explicación con ***archivos no muy grandes*** (hasta 500mb). 

Se presenta el ***uso por pasos para archivos únicos***. El sistema para utilizar más archivos es análogo a utilizar uno y se implementará en el programa final.

In [None]:
"""
Función que dado un dato tipo "2017-04-01T01:00:00.000+0200" devuelve una 
tupla (X,Y) donde X es "ES" o "FS" dependiendo de si es entresemana o fin de 
semana, e Y es el intervalo asignado a cada hora.

(Tabla que asigna intervalos de hora en el Guión de la Práctica)
"""

def fecha_tupla(string):
  year = int(string[0:4])
  month = int(string[5:7])
  day = int(string[8:10])
  hour = int(string[11:13])
  dia_semana = date(year,month,day).weekday() #Con esto sacamos el día de la semana
  es_o_fs = "ES"
  intervalo_horario = 1
  if dia_semana > 4: # 5 y 6 corresponde a sábado y domingo respectivamente
    es_o_fs = "FS"
  if hour >= 10 and hour < 13:
    intervalo_horario = 2
  elif hour >= 13 and hour < 16:
    intervalo_horario = 3
  elif hour >= 16 and hour < 21:
    intervalo_horario = 4
  elif hour >= 21 and hour < 23:
    intervalo_horario = 5
  elif hour >= 23 or (hour >= 00 and hour < 6):
    intervalo_horario = 6
  return (es_o_fs, intervalo_horario)

"""
La función mapper_1, dada una línea de la RDD original (en bruto), transforma
sus datos en un par (K,V), donde:
K = 'user_day_code edad tipo' (en adelante user_day_code = id)
V = ( ("ES" o "FS", intervalo_horas) , estacion_origen , estacion_destino))

Este mapeo (con una consiguiente agrupación) formará los elementos de RDD_1 y 
y ayudará a agrupar los viajes que hace un mismo usuario en un día. 
"""
def mapper_1(line): #acepta lineas del diccionario original
  data = json.loads(line)
  usuario = data['user_day_code']
  edad = str(data['ageRange'])
  tipo = str(data['user_type'])
  fecha = data['unplug_hourTime']['$date']
  tupla_fecha = fecha_tupla(fecha)
  origen = data['idunplug_station']
  destino = data['idplug_station']
  clave = usuario + " " + edad + " " + tipo 
  valor = (tupla_fecha, origen, destino)
  return (clave, valor)

"Cargamos la rdd_1:"

rdd_1 = rdd_bruto.map(mapper_1).groupByKey()
#formato RDD_1: ('id edad tipo', [(("ES",1),90,163), (("FS",3),20,165), ...])
print("----------------------")
print("RDD 1 cargado con éxito")
print("Muestra RDD 1:")
rdd_1.persist()
print(rdd_1.first())
print("----------------------")

"""
La función mapper_2 toma las líneas de RDD_1 para transformarlas en diccionarios
con la información que aparece en el objetivo del Paso 1:
{"id": .. ; "edad": .. ; "tipo": .. ; "viajes" : [(fecha1,origen1,destino1),...]}

La RDD que recogerá toda esta información será rdd_2. 
"""
def mapper_2(line): # acepta lineas de pares ('id edad tipo', (ES, origen, destino))
  clave = line[0]
  lista_claves = clave.split()
  lista_valores = []
  for elementos in line[1]:
    lista_valores.append(elementos)
  dicc_final = dict(id=lista_claves[0],edad=int(lista_claves[1]),tipo=int(lista_claves[2]),viajes=lista_valores) 
  return dicc_final

rdd_2 = rdd_1.map(mapper_2)
print("RDD 2 cargado con éxito")
print("Muestra RDD 2:")
rdd_2.persist()
print(rdd_2.first())

----------------------
RDD 1 cargado con éxito
Muestra RDD 1:
('e4d55deb9ac172a8d8f5f0a32599815bd51b7c8760d67e42b11adf7c0829341b 0 1', <pyspark.resultiterable.ResultIterable object at 0x7f2d0d35e128>)
----------------------
RDD 2 cargado con éxito
Muestra RDD 2:
{'id': 'e4d55deb9ac172a8d8f5f0a32599815bd51b7c8760d67e42b11adf7c0829341b', 'edad': 0, 'tipo': 1, 'viajes': [(('FS', 6), 90, 66)]}


# ***PASO 2:*** CONSEGUIR RDD: {'id': .. , 'puntos': .. , 'uso_estaciones': ..}


El ***objetivo*** de este paso es ***conseguir una RDD*** formada por ***líneas de diccionarios*** donde ***a cada persona se le atribuya una puntuación*** en función de su ***edad***, ***tipo*** de usuario (anual u ocasional) y ***viajes*** (donde tomará importancia el día y horario de cada viaje, así como el número de ellos).

En principio, ***cada parámetro*** (edad, tipo y viajes) otorgará 
***de 0 a 5 puntos*** a una persona, consiguiendo ***desde 0 puntos*** (usuario puramente
*de ocio*) hasta un ***máximo de 15 puntos*** (usuario puramente *trabajador*).

*Aclaración:* Para nosotros, un usuario *puramente de ocio o puramente trabajador* es aquel que *reúne todos los requisitos* que *consideramos imporantes a tener en cuenta para decidir una cosa u otra*. Esos requisitos se corresponderán en mayor o menor medida con la realidad, pero *en ningún caso supondrán que consideremos a ese cliente un cliente por ocio o trabajo con una seguridad total.*

Tanto el ***parámetro edad*** como el del ***tipo*** de usuario pueden ***no estar presentes***, luego en el mapeo, si un ***parámetro falla*** se juzgará a la persona ***sobre 10 puntos***, y si fallan dos, ***sobre 5***. El *parámetro viajes siempre estará presente*.

Una vez juzgada la puntuación de cada persona, teniendo en cuenta el número de parámetros que han fallado, se hará una ***media de puntuación***, que dejará a cada
persona con una puntuacion ***entre 0*** (puramente de ocio) ***y 1*** (puramente trabajo). 

***Cada persona tiene asignado un uso de estaciones***, donde vienen las estaciones
con las que la persona ***ha tenido contacto*** (*sin importar el número de veces
que la haya usado*).

Empezaremos definiendo las *funciones auxiliares*:

In [None]:
"""
Función que, dada una edad, devuelve los puntos correspondientes a esa edad
atendiendo a las tablas del guión de la Práctica:
"""
def point_age(edad):
  puntos = 0 # También vale para edad 6
  if edad == 1 or edad == 2:
    puntos = edad
  elif edad == 3 or edad == 4:
    puntos = edad + 1
  elif edad == 5:
    puntos = 3
  return puntos

"""
Función que, dado un tipo de usuario (anual u ocasional), devuelve los puntos 
correspondientes al tipo atendiendo a las tablas del guión de la Práctica:
"""

def point_tipo(tipo):
  puntos = 0
  if tipo == 2:
    puntos = 4
  return puntos

"""
A continuación definiremos funciones que relacionarán el día de la semana e
intervalos horarios introducidos con su puntuación correspondiente según el 
guión de la Práctica. 

1. Para un unico viaje:

Funcion que dada una dupla de la forma ("ES" o "FS", intervalo_horas) devuelve 
la puntuacion de la misma en base al guión de la Práctica:
"""

def point_1viaje(es_o_fs,int_horas):
  puntos = 0
  if es_o_fs == 'ES':
    if int_horas == 1:
      puntos = 3.5
    else:
      puntos = 2.5
  else:
    if int_horas == 1:
      puntos = 2.5
    else:
      puntos = 1
  return puntos

"""
2. Para dos viajes:

Función que dada una tupla tipo:
("ES" o "FS", intervalo_horario_viaje1,intervalo_horario_viaje2), devuelve 
la puntuacion de la misma en base al guión de la Práctica.

Esto es posible gracias a la implementación de matrices, correspondientes 
a las tablas del guión de la Práctica, gracias a las cuales se obtiene la 
puntuación de una persona que realiza dos viajes en función del día 
y los intervalos horarios de cada viaje. 
"""

M_ES =[[0,2,5,5,5,2.5],[2,0,2,3,4,2.5],[5,2,0,3,4,2.5],[5,3,3,0,3,3.5],[5,4,4,3,0,0],[1,2.5,2.5,3.5,0,0]] #entre semana
M_FS = [[0,1,1.5,1.5,1.5,0],[1,0,0,1.5,1.5,1.5],[1.5,0,0,1,1.5,0],[1.5,1.5,1,0,1,0],[1.5,1.5,1.5,1,0,0],[0,1.5,0,0,0,0]] #fin de semana 

def point_2viajes(es_o_fs,viaje1_horario,viaje2_horario):
  result = 0
  if es_o_fs == 'ES':
    result = M_ES[viaje1_horario-1][viaje2_horario-1]
  elif es_o_fs == 'FS':
    result = M_FS[viaje1_horario-1][viaje2_horario-1]
  return result


Asumimos que si un cliente ***realiza 3 viajes al día o más***, es complicado designar
***ante qué tipo de cliente estamos*** (incluso podría haber cogido la bicicleta por 
la mañana para trabajar y por la tarde para el ocio), así que en estos casos 
atribuiremos ***2,5 puntos en este nivel*** y ***utilizaremos los otros niveles*** para dar 
una conclusión más acertada.

*Curiosidad:* En abril de 2017, según las pruebas de programación de nuestro grupo, *una persona batió el record mensual de usos diarios de BICIMAD* con un total de *33 usos en un día*. 

A continuación definimos el mapper que nos dará el RDD objetivo de este paso:

In [None]:
"""
La función mapper_3 toma las líneas de RDD_2 para transformarlas en diccionarios
con la información que aparece en el objetivo del Paso 1:
{'id': .. , 'puntos': .. , "uso_estaciones": ..}

La RDD que recogerá toda esta información será rdd_3. 
"""
def mapper_3(line):
  edad = line['edad']
  tipo = line['tipo']
  user = line['id']
  lista_viajes = line['viajes'] #formato [(("ES",3), 2 , 127), (("ES",5), 3, 29)), ...]
  if edad == 0: #No hay información por edad
    parametro_edad = 0
    puntos_por_edad = 0 #Desactivamos el parámetro de la edad
  else:
    puntos_por_edad = point_age(edad) #Averiguamos los puntos correspondientes a la edad
    parametro_edad = 1
  if tipo == 0: #Análogo a la edad
    puntos_por_tipo = 0
    parametro_tipo = 0
  else:
    puntos_por_tipo = point_tipo(tipo)
    parametro_tipo = 1

  parametro_viajes = 1 #Siempre tendremos el parámetro viaje
  numero_viajes = len(lista_viajes) #Depende de si tenemos 1 viaje, 2 o más 

  if numero_viajes == 1:
    es_o_fs = lista_viajes[0][0][0]
    int_horario = lista_viajes[0][0][1]
    puntos_por_viajes = point_1viaje(es_o_fs,int_horario)

  elif numero_viajes == 2:
    es_o_fs = lista_viajes[0][0][0]
    est1_horario = lista_viajes[0][0][1]
    est2_horario = lista_viajes[1][0][1]
    puntos_por_viajes = point_2viajes(es_o_fs,est1_horario,est2_horario)

  else:
    puntos_por_viajes = 2.5

  puntos_id = (puntos_por_viajes + puntos_por_tipo + puntos_por_edad)/(5*(parametro_viajes+parametro_edad+parametro_tipo))
  #Recuento de puntos por cada parámetro y media teniendo en cuenta cada parámetro
  lista_estaciones_id = []
  #Preparación de la clave "uso_estaciones":
  for elem in lista_viajes:
    if (elem[1] in lista_estaciones_id) == False:
      lista_estaciones_id.append(elem[1])
    if (elem[2] in lista_estaciones_id) == False:
      lista_estaciones_id.append(elem[2])

  lista_estaciones_id.sort() #Ordenamos la lista
  #Definimos el formato del diccionario final
  dic = dict(id=user,puntos=puntos_id, uso_estaciones=lista_estaciones_id)
  return dic

rdd_3 = rdd_2.map(mapper_3)
print("RDD 3 cargado con éxito")
print("Muestra RDD 3:")
rdd_3.persist()
print(rdd_3.first())
print("----------------------")

RDD 3 cargado con éxito
Muestra RDD 3:
{'id': 'e4d55deb9ac172a8d8f5f0a32599815bd51b7c8760d67e42b11adf7c0829341b', 'puntos': 0.1, 'uso_estaciones': [66, 90]}
----------------------


# ***PASO 3:*** CONSEGUIR LISTA FINAL DE DICCIONARIOS DE LA FORMA: {'estacion': ... , 'media': ..., 'usos': ... , 'usos_ocio': ... , 'usos_trabajo': ...}

Este es el paso ***más complicado*** de realizar y el que más recursos consume. En principio tardaba 6 minutos en ejecutarse en un archivo de 500mb (en el programa completo, por pasos hubiera tardado más). Ahora hemos conseguido que tarde 38 segundos. 

 Cada persona tiene una ***puntuación*** y una ***lista de estaciones*** con las que ha tenido contacto. 

El ***objetivo*** de este paso es ***coger cada estación de esa lista*** y ***atribuirle la puntuación de las personas que la han utilizado***, así como otros parámetros.

Si ***dos personas*** con distintas puntuaciones han utilizado la ***misma estación***, el objetivo es atribuirle a esa estacion la ***suma*** de las respectivas puntuaciones de cada persona y ***dividirla*** entre dos. Es lógico pensar que esto puede ***generalizarse*** a un número finito de personas y conseguir ***para cada estación su media de puntos***, así como su ***uso*** (*recuento del número de personas que han tenido contacto con ella*), su ***uso de ocio*** (*recuento del número de personas con una puntuación menor que 0.5 que han tenido contacto con ella*) y su ***uso de trabajo*** (*análogo al uso de ocio pero con personas con puntuación mayor de 0.5*)

Hemos eliminado de 'usos_ocio' y 'usos_trabajo' a las ***personas con puntuación 0.5***. Estas personas sí que aparecen en 'usos', pero con 0.5 puntos asumimos que no podemos constatar con rigor un uso recreacional o laboral. 

***`Procedimiento del Paso 3:`***

***`1)`*** **Cambiar el formato** *:{'id': .., 'puntos': .., 'uso_estaciones': [66, 90]}*

**al formato** *:[('estacion23', {'puntos': ..., 'numero_personas': 1}),...]*

Esta es una manera de conseguir que ***cada estación sea una clave***, pero como cada linea de la RDD anterior contiene una ***lista de estaciones***, cada linea de la nueva RDD con este formato será una ***lista***.

*'numero_personas'* es siempre 1 en este formato, ya que la puntuación de la estación sólo ha dependido de la puntuación que le ha otorgado la persona. 

***`2)`*** ***Transformar una RDD de líneas en formato:***

*línea 1 -> [('estacion23', {'puntos': ..., 'numero_personas': 1}),('estacion47', {'puntos': ..., 'numero_personas': 1}), ...]*

***en una RDD de líneas en formato:***

*línea 1 -> ('estacion23', {'puntos': ..., 'numero_personas': 1})*

*línea 2 -> ('estacion47', {'puntos': ..., 'numero_personas': 1})*

*línea 3 -> ...*

Esto significa ***transformar una RDD de "un tamaño" en otra RDD de "tamaño más grande"***. Para este procedimiento seguiremos los siguientes ***pasos***: 

- Conseguir el ***número de elementos que tiene la lista más grande de todas las líneas de la RDD original***. Para esto aplicamos una pequeña ***reducción***. Este número será *n*. 

- ***Crear una lista de n RDD's*** donde ***cada línea de estas RDD's esté formada por el n-ésimo componente de la lista correspondiente a cada línea de la RDD original.*** Las RDD llegarán en algún momento a incluir lineas de tuplas vacías, ya que muchas listas de la RDD original tienen un tamaño pequeño. 

- ***Unir todas estas RDD's*** en una gran RDD formada, ahora sí, por los líneas de formato ('estacion23', {'puntos': ..., 'numero_personas': 1}) y tuplas en blanco. ***Filtrar*** después todas estas tuplas en blanco para expulsarlas. 

***`3)`*** ***Agrupar en la RDD de líneas en formato***:

*('estacion23', {'puntos': ..., 'numero_personas': 1})*

todos los elementos con ***estaciones iguales*** y ***devolver una RDD de líneas en formato***: 

*('estacion23', [{'puntos': 0.1 , 'numero_personas': 1}, {'puntos': 0.7 , 'numero_personas': 1}, ... ]*

***`4)`*** ***Transformar la RDD anterior en una lista de diccionarios de la forma:***

*{'estacion': ... , 'media': ..., 'usos': ... , 'usos_ocio': ... , 'usos_trabajo': ...}*

***donde:***

*'media'* = puntos en media de la estación (entre 0 y 1)

*'usos'* = número de personas que han tenido contacto con la estación 

*'usos_ocio'* = número de personas con una puntuación menor de 0.5 que han tenido contacto con la estación 

*'usos_trabajo'* = número de personas con una puntuación mayor de 0.5 que han tenido contacto con la estación

In [None]:
#Apartado 1)

def mapper_4(line): #parte de rdd_3, de formato {'id': .., 'puntos': .., 'uso_estaciones': [66, 90]}
  puntos = line['puntos']
  lista_estaciones = line['uso_estaciones']
  lista_tupla_salida = []
  for elem in lista_estaciones:
    clave = 'estacion'+str(elem)
    valor_dic = {'puntos': puntos, 'numero_personas':1}
    lista_tupla_salida.append((clave,valor_dic))
  return lista_tupla_salida #formato [('estacion23', {'puntos': ..., 'numero_personas': 1}),...]


rdd_4 = rdd_3.map(mapper_4)
print("RDD 4 cargado con éxito")
print("Muestra RDD 4:") 
rdd_4.persist()
print(rdd_4.first())
print("----------------------")

#Apartado 2)

def reducer_1(list_dic_1,list_dic_2):
  if len(list_dic_1) > len(list_dic_2):
    return list_dic_1
  else:
    return list_dic_2 #devuelve la lista de diccionarios más larga

print("Iniciando REDUCCIÓN de apoyo...")
lista_dic_grande = rdd_4.reduce(reducer_1)
"""
La longitud de este diccionario es el máximo uso que le ha dado una persona al 
bicimad en un día, dentro de este archivo.
"""
print("Reducción de apoyo completada")
print("----------------------")
print("Iniciando separación de RDD's...")
print("Uniendo las RDD's por estación...")

#Ahora crearemos len(lista_dic_grande) RDD's distintas y las uniremos:

def mapper_5(n, line):                      
  longitud = len(line)
  if n < len(line):
    return line[n]
  else:
    tupla_vacia = ()
    return tupla_vacia
"""
La función anterior coge elementos de la rdd_4 y se queda con su elemento 
número n-ésimo, si es que lo tiene. Si no lo tiene, devuelve una tupla vacía,
"""

rdd_5 = rdd_4.map(lambda x: mapper_5(0, x)) #mapeamos los primeros elementos
i = 1
while i < len(lista_dic_grande):
  rdd_5 = rdd_5.union(rdd_4.map(lambda x: mapper_5(i, x))) #unimos todos los demás
  i = i + 1

#Apartado 3)

rdd_6 = rdd_5.filter(lambda x: x != ()).groupByKey() 
#Con rdd_6 filtramos tuplas vacías y agrupamos todas las estaciones iguales. 
print("Unión completada con éxito en RDD 6")
print("Mostrando RDD 6...")
rdd_6.persist()
muestra = rdd_6.first()
print("Muestra de RDD 6:")
print(muestra)
print("----------------------")

#Apartado 4)

def mapper_6(line): #Cada linea es tipo ('estacion25', [{'puntos': ..., 'numero_personas: 1}, ... , ...])
  clave = line[0]
  lista_diccionarios = line[1]
  dic_final = {'estacion': int(clave[8:]), 'media': 0, 'usos': 0, 'usos_ocio': 0, 'usos_trabajo': 0}
  for elem in lista_diccionarios:
    dic_final['media'] = dic_final['media'] + elem['puntos'] #primero calculamos los puntos totales
    dic_final['usos'] = dic_final['usos'] + elem['numero_personas']
    if elem['puntos'] < 0.5:
      dic_final['usos_ocio'] = dic_final['usos_ocio'] + elem['numero_personas']
    elif elem['puntos'] > 0.5:
      dic_final['usos_trabajo'] = dic_final['usos_trabajo'] + elem['numero_personas']
  #ahora calculamos la media de los puntos
  dic_final['media'] = dic_final['media']/dic_final['usos']
  return dic_final #devuelve {'estacion': ... , 'media': ... , 'usos': ... , 'usos_ocio': ... , 'usos_trabajo': ...}

rdd_7 = rdd_6.map(mapper_6)
print("RDD 7 cargada con éxito")
print("Muestra RDD 7:")
rdd_7.persist()
print(rdd_7.first())
print("----------------------")
"""
Los elementos de rdd_7 son los diccionarios finales correspondientes a cada 
estación, y nos dan toda la información necesaria. 
Haciendo un collect de todos los elementos acabamos el paso 3.
"""
print("Cargando lista final...")
lista_final = rdd_7.collect()
print("¡Lista final correctamente cargada!")
print("Mostrando los 10 primeros elementos de la lista final:")
for elem in lista_final[0:9]:
  print(elem)

*Curiosidad:* Adjuntamos en el siguiente código *cómo fué el inicial paso 3 que programamos por primera vez*. En este paso *no sabíamos como crear RDD's "más grandes"* y decidimos hacer una *gran función de reducción* que *aceptara dos listas de diccionarios* y *buscara en cada diccionario de la primera lista diccionarios de la segunda lista* y *si las estaciones correspondientes fueran iguales entonces combinara esos diccionarios y los añadiera a una gran lista final.* Si no había estaciones iguales entre listas, simplemente se añadían los diccionarios como estaban en la lista final. 

Si bien la solución funcionaba, esta era mucho más lenta (6 minutos aprox. para 500mb). Aquí se puede ejecutar para hacerse una idea de cómo funcionaba:

In [None]:
def mapper_4(line): #parte de rdd_3, de formato {'id': .., 'puntos': .., 'uso_estaciones': [66, 90]}
  puntos = line['puntos']
  lista_estaciones = line['uso_estaciones']
  lista_dic = []
  for elem in lista_estaciones:
    dic = {'estacion':elem , 'puntos': puntos, 'numero_personas':1}
    lista_dic.append(dic)
  return lista_dic


print("----------------------")
rdd_4 = rdd_3.map(mapper_4)
print("RDD de apoyo cargado con éxito")
print("Muestra RDD de apoyo:")
rdd_4.persist()
print(rdd_4.first())
print("----------------------")

def reducer_1(list_dic_1,list_dic_2): #parte de dos elementos de rdd_4, de formato [{"estacion": .. , "puntos": .. , "numero_personas": 1}, ...]
  list_dic_total = list_dic_1 + list_dic_2
  list_dic_final = []
  while list_dic_total != []:
    misma_estacion_que = 0
    i = 1
    while i<len(list_dic_total):
      if list_dic_total[0]['estacion'] == list_dic_total[i]['estacion']:
        misma_estacion_que = i
      i = i + 1
    if misma_estacion_que == 0:
      list_dic_final.append(list_dic_total[0])
      list_dic_total.pop(0)
    else:
      dic = {'estacion': list_dic_total[0]['estacion'], 'puntos': list_dic_total[0]['puntos']+list_dic_total[misma_estacion_que]['puntos'], 'numero_personas': list_dic_total[0]['numero_personas']+list_dic_total[misma_estacion_que]['numero_personas']}
      list_dic_final.append(dic)
      list_dic_total.pop(0)
      list_dic_total.pop(misma_estacion_que-1)
  return list_dic_final

lista_dic_totales = rdd_4.reduce(reducer_1)
print("Lista Diccionarios Finales DE APOYO cargados con éxito")
print("Muestra Lista Diccionales Finales DE APOYO")
print(lista_dic_totales[1])
print("----------------------")

def dic_total_a_final(lista_dic_totales): #hace las medias
  lista_dic_final = []
  for elem in lista_dic_totales:
    dic = {'estacion': elem['estacion'], 'media': elem['puntos']/elem['numero_personas']}
    lista_dic_final.append(dic)
  return lista_dic_final

lista_dic_finales = dic_total_a_final(lista_dic_totales)

print("Lista de Diccionarios FINALES cargados con éxito")
print("Muestra Lista Diccionarios FINALES")
print(lista_dic_finales[1])
print("----------------------")

----------------------
RDD de apoyo cargado con éxito
Muestra RDD de apoyo:
[{'estacion': 66, 'puntos': 0.1, 'numero_personas': 1}, {'estacion': 90, 'puntos': 0.1, 'numero_personas': 1}]
----------------------
Lista Diccionarios Finales DE APOYO cargados con éxito
Muestra Lista Diccionales Finales DE APOYO
{'estacion': 90, 'puntos': 70.08333333333331, 'numero_personas': 266}
----------------------
Lista de Diccionarios FINALES cargados con éxito
Muestra Lista Diccionarios FINALES
{'estacion': 90, 'media': 0.2634711779448621}
----------------------


In [None]:
sc.stop() #parada manual del SparkContext

# ***`PROGRAMA ENTERO CON TODOS LOS PASOS JUNTOS`*** (ejecutar sólo lo básico antes, con SparkContext parado)

Presentamos los algoritmos explicados con anterioridad de forma conjunta. Al ejecutar este programa podemos introducir los datos necesarios de forma interactiva y cargar los archivos que queramos. Estos archivos se unirán en una única RDD global que se procesará utilizando los pasos anteriores.

Hemos quitado los comentarios en esta sección para no repetirnos y poder visualizar el código de forma conjunta. 

In [3]:
#Funciones auxiliares:

def fecha_tupla(string):
  year = int(string[0:4])
  month = int(string[5:7])
  day = int(string[8:10])
  hour = int(string[11:13])
  dia_semana = date(year,month,day).weekday() #Con esto sacamos el día de la semana
  es_o_fs = "ES"
  intervalo_horario = 1
  if dia_semana > 4: # 5 y 6 corresponde a sábado y domingo respectivamente
    es_o_fs = "FS"
  if hour >= 10 and hour < 13:
    intervalo_horario = 2
  elif hour >= 13 and hour < 16:
    intervalo_horario = 3
  elif hour >= 16 and hour < 21:
    intervalo_horario = 4
  elif hour >= 21 and hour < 23:
    intervalo_horario = 5
  elif hour >= 23 or (hour >= 00 and hour < 6):
    intervalo_horario = 6
  return (es_o_fs, intervalo_horario)

def point_age(edad):
  puntos = 0 # También vale para edad 6
  if edad == 1 or edad == 2:
    puntos = edad
  elif edad == 3 or edad == 4:
    puntos = edad + 1
  elif edad == 5:
    puntos = 3
  return puntos

def point_tipo(tipo):
  puntos = 0
  if tipo == 2:
    puntos = 4
  return puntos  

def point_1viaje(es_o_fs,int_horas):
  puntos = 0
  if es_o_fs == 'ES':
    if int_horas == 1:
      puntos = 3.5
    else:
      puntos = 2.5
  else:
    if int_horas == 1:
      puntos = 2.5
    else:
      puntos = 1
  return puntos

M_ES =[[0,2,5,5,5,2.5],[2,0,2,3,4,2.5],[5,2,0,3,4,2.5],[5,3,3,0,3,3.5],[5,4,4,3,0,0],[1,2.5,2.5,3.5,0,0]] #entre semana
M_FS = [[0,1,1.5,1.5,1.5,0],[1,0,0,1.5,1.5,1.5],[1.5,0,0,1,1.5,0],[1.5,1.5,1,0,1,0],[1.5,1.5,1.5,1,0,0],[0,1.5,0,0,0,0]] #fin de semana 

def point_2viajes(es_o_fs,viaje1_horario,viaje2_horario):
  result = 0
  if es_o_fs == 'ES':
    result = M_ES[viaje1_horario-1][viaje2_horario-1]
  elif es_o_fs == 'FS':
    result = M_FS[viaje1_horario-1][viaje2_horario-1]
  return result

# Mappers y Reducers:

def mapper_1(line): #acepta lineas del diccionario original
  data = json.loads(line)
  usuario = data['user_day_code']
  edad = str(data['ageRange'])
  tipo = str(data['user_type'])
  fecha = data['unplug_hourTime']['$date']
  tupla_fecha = fecha_tupla(fecha)
  origen = data['idunplug_station']
  destino = data['idplug_station']
  clave = usuario + " " + edad + " " + tipo 
  valor = (tupla_fecha, origen, destino)
  return (clave, valor)

def mapper_2(line): # acepta lineas de pares ('id edad tipo', (ES, origen, destino))
  clave = line[0]
  lista_claves = clave.split()
  lista_valores = []
  for elementos in line[1]:
    lista_valores.append(elementos)
  dicc_final = dict(id=lista_claves[0],edad=int(lista_claves[1]),tipo=int(lista_claves[2]),viajes=lista_valores) 
  return dicc_final

def mapper_3(line):
  edad = line['edad']
  tipo = line['tipo']
  user = line['id']
  lista_viajes = line['viajes'] #formato [(("ES",3), 2 , 127), (("ES",5), 3, 29)), ...]
  if edad == 0: #No hay información por edad
    parametro_edad = 0
    puntos_por_edad = 0 #Desactivamos el parámetro de la edad
  else:
    puntos_por_edad = point_age(edad) #Averiguamos los puntos correspondientes a la edad
    parametro_edad = 1
  if tipo == 0: #Análogo a la edad
    puntos_por_tipo = 0
    parametro_tipo = 0
  else:
    puntos_por_tipo = point_tipo(tipo)
    parametro_tipo = 1

  parametro_viajes = 1 #Siempre tendremos el parámetro viaje
  numero_viajes = len(lista_viajes) #Depende de si tenemos 1 viaje, 2 o más 

  if numero_viajes == 1:
    es_o_fs = lista_viajes[0][0][0]
    int_horario = lista_viajes[0][0][1]
    puntos_por_viajes = point_1viaje(es_o_fs,int_horario)

  elif numero_viajes == 2:
    es_o_fs = lista_viajes[0][0][0]
    est1_horario = lista_viajes[0][0][1]
    est2_horario = lista_viajes[1][0][1]
    puntos_por_viajes = point_2viajes(es_o_fs,est1_horario,est2_horario)

  else:
    puntos_por_viajes = 2.5

  puntos_id = (puntos_por_viajes + puntos_por_tipo + puntos_por_edad)/(5*(parametro_viajes+parametro_edad+parametro_tipo))
  #Recuento de puntos por cada parámetro y media teniendo en cuenta cada parámetro
  lista_estaciones_id = []
  #Preparación de la clave "uso_estaciones":
  for elem in lista_viajes:
    if (elem[1] in lista_estaciones_id) == False:
      lista_estaciones_id.append(elem[1])
    if (elem[2] in lista_estaciones_id) == False:
      lista_estaciones_id.append(elem[2])

  lista_estaciones_id.sort() #Ordenamos la lista
  #Definimos el formato del diccionario final
  dic = dict(id=user,puntos=puntos_id, uso_estaciones=lista_estaciones_id)
  return dic

def mapper_4(line): #parte de rdd_3, de formato {'id': .., 'puntos': .., 'uso_estaciones': [66, 90]}
  puntos = line['puntos']
  lista_estaciones = line['uso_estaciones']
  lista_tupla_salida = []
  for elem in lista_estaciones:
    clave = 'estacion'+str(elem)
    valor_dic = {'puntos': puntos, 'numero_personas':1}
    lista_tupla_salida.append((clave,valor_dic))
  return lista_tupla_salida #formato [('estacion23', {'puntos': ..., 'numero_personas': 1}),...]

def mapper_5(n, line):                      
  longitud = len(line)
  if n < len(line):
    return line[n]
  else:
    tupla_vacia = ()
    return tupla_vacia

def mapper_6(line): #Cada linea es tipo ('estacion25', [{'puntos': ..., 'numero_personas: 1}, ... , ...])
  clave = line[0]
  lista_diccionarios = line[1]
  dic_final = {'estacion': int(clave[8:]), 'media': 0, 'usos': 0, 'usos_ocio': 0, 'usos_trabajo': 0}
  for elem in lista_diccionarios:
    dic_final['media'] = dic_final['media'] + elem['puntos'] #primero calculamos los puntos totales
    dic_final['usos'] = dic_final['usos'] + elem['numero_personas']
    if elem['puntos'] < 0.5:
      dic_final['usos_ocio'] = dic_final['usos_ocio'] + elem['numero_personas']
    elif elem['puntos'] > 0.5:
      dic_final['usos_trabajo'] = dic_final['usos_trabajo'] + elem['numero_personas']
  #ahora calculamos la media de los puntos
  dic_final['media'] = dic_final['media']/dic_final['usos']
  return dic_final #devuelve {'estacion': ... , 'media': ... , 'usos': ... , 'usos_ocio': ... , 'usos_trabajo': ...}

def reducer_1(list_dic_1,list_dic_2):
  if len(list_dic_1) > len(list_dic_2):
    return list_dic_1
  else:
    return list_dic_2 #devuelve la lista de diccionarios más larga

In [None]:
def main(sc, lista_archivos):
  rdd_bruto = sc.textFile(lista_archivos[0])
  i = 1
  if i < len(lista_archivos):
    rdd_bruto = rdd_bruto.union(sc.textFile(lista_archivos[i]))
    i = i + 1
  rdd_1 = rdd_bruto.map(mapper_1).groupByKey() 
  #formato ('id edad tipo', [(("ES",2),90,163)), (("FS",1),20,165)), ...])
  print("----------------------")
  print("RDD 1 cargado con éxito")

  rdd_2 = rdd_1.map(mapper_2) 
  #formato {'id': ..., 'edad': 4, 'tipo': 1, 'viajes': [("ES",2),90,163),(("FS",1), 20,165)), ... ]}
  print("RDD 2 cargado con éxito")

  rdd_3 = rdd_2.map(mapper_3)
  #formato {'id': ..., 'puntos': 0.4, 'uso_estaciones': [23, 129]}
  print("RDD 3 cargado con éxito")

  rdd_4 = rdd_3.map(mapper_4)
  print("RDD 4 cargado con éxito")

  print("Iniciando REDUCCIÓN de apoyo...")
  lista_dic_grande = rdd_4.reduce(reducer_1)
  print("Reducción de apoyo completada")
  print("Iniciando separación de RDD's...")
  print("Uniendo las RDD's por estación...")
  rdd_5 = rdd_4.map(lambda x: mapper_5(0, x))
  i = 1
  while i < len(lista_dic_grande):
    rdd_5 = rdd_5.union(rdd_4.map(lambda x: mapper_5(i, x)))
    #unimos todos los demás
    i = i + 1
  print("RDD 5 cargado con éxito")

  rdd_6 = rdd_5.filter(lambda x: x != ()).groupByKey()
  print("Unión completada con éxito en RDD 6")

  rdd_7 = rdd_6.map(mapper_6).filter(lambda x: x['estacion']<500)
  #Para evitar errores en los archivos en bruto hemos filtrado la ultima RDD.
  print("RDD 7 cargado con éxito")
  print("Guardando RDD como lista_trabajo_ocio.json...")
  
  rdd_7 = rdd_7.coalesce(1, shuffle=True) #unimos el RDD en una sola partición

  # Hacemos un map de todos los elementos del RDD a string
  rdd_7 = rdd_7.map(json.dumps)

  # Hacemos la reducción a una gran string separada por líneas con todos los datos
  json_string = rdd_7.reduce(lambda x, y: x + "\n" + y)

  # Escribimos esta gran cadeana en un archivo
  with open("lista_trabajo_ocio.json", "w") as f:
    f.write(json_string)

  print("Archivo guardado satisfactoriamente.")
  print("Resultados publicados en el archivo lista_trabajo_ocio.json")


if __name__ == "__main__":
  print("Bienvenido a Spark BICIMAD TRABAJO/OCIO.")
  print("Programado por el Grupo 12 de la asignatura de Programación Paralela.")
  print("Este programa está diseñado para analizar los archivos de datos de ")
  print("BICIMAD y dar resultados sobre su uso recreativo o laboral.")
  print("Para más información consulte el guión de la práctica.")
  print("--------------------------")
  print("Introduce la ruta del archivos que quieras analizar uno por uno. (no hacen falta las comillas)")
  print("Cuando hayas acabado de introducir rutas escribe EJECUTAR y se analiarán los archivos.")
  i = 0
  j = 0
  lista_archivos = []
  archivo = input()
  while j != 1:
    if archivo == "ejecutar" or archivo == "EJECUTAR" or archivo[-5:] != ".json":
      print("No has introducido ningún archivo o tu archivo no es de la extension .json.")
      print("Por favor, introduce al menos un archivo con estas características.")
      archivo = input()
    else:
      j = 1
      lista_archivos.append(archivo)
  archivo = input()
  while i != 1:
    if archivo != "ejecutar" and archivo != "EJECUTAR":
      if archivo[-5:] != ".json":
        print("Por favor, introduce archivos con extensión .json")
        archivo = input()
      else:
        lista_archivos.append(archivo)
        archivo = input()
    else:
      i = 1
  sc = SparkContext()
  main(sc, lista_archivos)
  sc.stop()

# OPCIONES DE VISUALIZACIÓN

Dada una lista de ***diccionarios finales*** guardados en un archivo ***lista_trabajo_ocio.json***, devuelto por el anterior programa, las opciones de visualización que hemos implementado siguen un ***árbol de decisiones***:

***1) Parámetro central del orden***: 

La visualización resultante puede *variar en función del parámetro que tome el protagonismo en el orden*: Número de estación, puntación media, número de usos totales, número de clientes por trabajo y número de clientes por ocio. 

***2) Orden del parámetro***:

Cada parámetro elegido puede ser mostrado en *orden creciente o decreciente*.

***3) Número de elementos:***

Es posible que *solo se necesiten ciertos elementos en la tabla*. Se podrán elegir mostrar *toda la tabla* o sólo los *n primeros elementos.* 

- Una vez tomada una decisión se cargarán tres ***tipos de visualización diferentes:***

***a)*** Se cargará una ***tabla*** en el *orden de la decisión tomada* donde se mostrarán de manera atractiva todos los datos elegidos. 

***b)*** Se cargará un ***mapa*** donde se ilustrarán *los primeros 10 elementos de dicha tabla*, con la *posicion*, *nombres* y *direcciones* de las estaciones. Esto es posible con la ayuda del *documento excel* que facilita la web de BICIMAD sobre coordenadas de estaciones. 

Si no se ha elegido una tabla con más de 10 elementos o no se pueden mostrar las coordenadas de todas las estaciones *el programa lo avisará* y *sólo mostrará los disponibles.* 

***c)*** Se cargará un ***gráfico de barras múltiples*** con información sobre el *uso*, *uso por trabajo* y *uso por ocio* de cada estación, en el orden fijado por la tabla. 

El ***proceso*** de este programa se detallará ***entre las líneas del código.***

In [None]:
#Funciones auxiliares:

"""
Esta función lee el excel de las coordenadas BICIMAD importado desde la web
con un comando anterior y transforma los datos en una lista de diccionarios 
de formato {'id_est': ... , 'nombre_est': ..., 'dir_est': ... , 'lat_est': ... , 'lon_est': ...}
donde cada elemento es:

id_est = Identificador de la estación (id)
nombre_est = Nombre de la estación (name)
dir_est = Dirección de la estación (address)
lat_est = Coordenada de la latitud de la estación (latitude)
lon_est = Coordenada de la longitud de la estación (longitude)
"""
def import_coord_catalog(): 

    coords = pd.read_excel('2018_Julio_Bases_Bicimad_EMT.xlsx')
    #Quitamos las columnas que no vamos a usar
    coords = coords.drop(columns=['total_bases', 'number'])

    #Preparamos la lista final de diccionarios
    index = 0
    lista_dic_coords = []
    while index < len(coords.index):
      id = coords.loc[index]['id']
      nombre = coords.loc[index]['name']
      dir = coords.loc[index]['address']
      lat = coords.loc[index]['latitude']
      lon = coords.loc[index]['longitude']
      dic = {'id_est':id, 'nombre_est': nombre, 'dir_est': dir, 'lat_est': lat, 'lon_est': lon}
      lista_dic_coords.append(dic)
      index = index + 1

    #Depuramos la lista de diccionarios: (no nos interesan coordenadas incompletas, id's repetidas ni direcciones erróneas)
    lista_dic_coords_final = []
    lista_id_ya_usada = [] 
    for elem in lista_dic_coords: 
      if elem['id_est'] not in lista_id_ya_usada: #Cualquier id repetida saldrá fuera de este condicional
        #Quitamos elementos con latitudes y longitudes incoherentes
        if (elem['lat_est'] < 41 and elem['lat_est'] > 40) and (elem['lon_est'] < -3 and elem['lon_est'] > -4): 
          #Todas las direcciones empiezan por nombres tipo Calle, Glorieta, etc. 
          #Las que no valen empiezan por comillas, etc.
          if elem['dir_est'][0] in ["A", "C", "P", "R", "G"]: 
            lista_dic_coords_final.append(elem)
            lista_id_ya_usada.append(elem['id_est'])
          else:
            #Si la dirección no es coherente la clasificamos como desconocida
            elem['dir_est'] = "Dirección desconocida"
            lista_dic_coords_final.append(elem)    
            lista_id_ya_usada.append(elem['id_est'])

    #Ordenamos la lista final por id para un resultado más limpio
    lista_dic_coords_final.sort(key=(lambda x: x['id_est']))
    return lista_dic_coords_final


"""
v_table toma una lista formada por elementos cargados del último archivo 
sacado por el programa (lista_trabajo_ocio.json), una de las 5 opciones de
parámetro, 2 de orden y n elementos sacados del programa principal detallados
en la presentación del programa e imprime una tabla de estaciones con sus 
características, un gráfico de barras múltiples y el mapa correspondiente 
a la explicación de dicha presentación. 
"""

def v_table(lista_tabla, opcion, orden, n): 

    #Ordenamos la lista de diccionarios según la opción deseada
    if opcion == 1:
      lista_tabla.sort(key=(lambda x: x['estacion']))
    elif opcion == 2:
      lista_tabla.sort(key=(lambda x: x['media']))
    elif opcion == 3:
      lista_tabla.sort(key=(lambda x: x['usos']))
    elif opcion == 4:
      lista_tabla.sort(key=(lambda x: x['usos_ocio']))
    elif opcion == 5:
      lista_tabla.sort(key=(lambda x: x['usos_trabajo']))

    #Dependiendo del orden, tomamos n elementos de la lista ordenada o de la 
    #lista al revés y creamos un DataFrame para representarlo como tabla
    if orden == 1:
      lista_tabla = lista_tabla[0:n]
      df = pd.DataFrame(lista_tabla)
    elif orden == 2:
      lista_tabla.reverse()
      lista_tabla = lista_tabla[0:n]
      df = pd.DataFrame(lista_tabla)

    #Hacemos un recuento de las estaciones utilizadas, para la gráfica
    #y el mapa
    lista_estaciones = []
    for elem in lista_tabla:
      lista_estaciones.append(elem['estacion'])
      elem.pop('estacion') #las pondremos en el eje X al representar la gráfica
      elem.pop('media') #para la gráfica la media es demasiado pequeña en comparación con los demás valores

    print(tabulate(df, headers='keys', showindex=False, tablefmt='psql'))

    #Ahora sacamos el mapa correspondiente a las 10 estaciones superiores de la tabla 
    #Si no hay datos suficientes como para mostrar las 10 estaciones, mostramos las que se puedan
    print("Mostrando mapa de las estaciones seleccionadas...")
    print(" ")
    #Cargamos la lista de diccionarios de coordenadas utilizando la función auxiliar anterior
    lista_coord = import_coord_catalog()
    #Preparamos el diccionario final con el que saldrán las estaciones que se van a representar
    dic_final_estaciones_para_mapa = []
    #Preparamos una lista de estaciones sin coordenada según nos las pida la lista de estaciones
    estaciones_sin_coordenadas = []
    i = 0
    #Nos interesa que la longitud maxima de estaciones en el mapa sea 10, y que no se sobrepase
    #la longitud de la lista de estaciones que hemos conseguido antes
    while (len(dic_final_estaciones_para_mapa) < 10) and (i<len(lista_estaciones)):
      #Cuando no exista el elemento deseado por la lista de estaciones este valor valdrá 1
      hay_elem = 0
      for dic_coord in lista_coord:
        #Buscamos elementos del diccionario de coordenadas para cada índice de la lista de estaciones
        if lista_estaciones[i] == dic_coord['id_est']:
          dic_final_estaciones_para_mapa.append(dic_coord)
          hay_elem = 1
      if hay_elem == 0:
        estaciones_sin_coordenadas.append(lista_estaciones[i])    
      i = i + 1

    if i < 9:
      print("No hay suficientes datos en la tabla para representar en el mapa el top 10 de estaciones de la tabla.")

    #Preparamos una advertencia para las estaciones que no han podido ser identificadas
    if estaciones_sin_coordenadas != []:
      string_estaciones_no = ""
      for elem in estaciones_sin_coordenadas:
        if elem == estaciones_sin_coordenadas[0]:
          string_estaciones_no = string_estaciones_no + str(elem)
        elif elem == estaciones_sin_coordenadas[-1]:
          string_estaciones_no = string_estaciones_no + " y " + str(elem)
        else:
          string_estaciones_no = string_estaciones_no + ", " + str(elem)
      print("No se han podido representar las estaciones " + string_estaciones_no + ".")
   
    print("Representando "+ str(len(dic_final_estaciones_para_mapa)) + " elementos:")
    
    #Iniciamos el mapa y le añadimos marcadores para cada longitud de la lista definitiva de coordenadas
    m = folium.Map(width=800,height=800,location=[40.428896, -3.702426], zoom_start=12.5)
   
    for elem in dic_final_estaciones_para_mapa:
      folium.Marker([elem['lat_est'], elem['lon_est']], popup=elem['nombre_est'], tooltip="Estacion " + str(elem['id_est']) + ": " + elem['dir_est']).add_to(m)
    display(m)

    #Ahora sacamos el gráfico
    print(" ")
    print("El gráfico correspondiente a esta tabla es:")
    print(" ")
    df2 = pd.DataFrame(lista_tabla, index=lista_estaciones)
    df2.plot.bar()

if __name__ == "__main__":
  print(" ")
  print("Descargando Excel COORDENADAS de la Base de Datos de BICIMAD...")
  print(" ")
  ! wget -N https://datos.madrid.es/FWProjects/egob/Catalogo/Transporte/Bici/ficheros/2018_Julio_Bases_Bicimad_EMT.xlsx
  print("Excel descargado. Nombrado como: 2018_Julio_Bases_Bicimad_EMT.xlsx")
  print(" ")


In [None]:
#Funcion principal:


"""
Esta función principal se encarga de cargar el último archivo sacado por el programa
anterior (lista_trabajo_ocio.json) y de pedir las opciones necesarias para ordenar los 
elementos de la tabla de final según la presentación de este programa. 

Tras conseguir todos los datos necesarios hace una llamada a la función auxiliar v_table
"""
def main(sc):
  #Busca el archivo lista_trabajo_ocio.json
  print("Cargando lista_trabajo_ocio.json...")

  if os.path.isfile('/content/lista_trabajo_ocio.json'):
      print('¡Archivo lista_trabajo_ocio.json encontrado!')
  else:
      print('ADVERTENCIA: El archivo lista_trabajo_ocio.json no se ha encontrado.')
      print('Por favor, asegúrate de haber ejecutado el programa anterior.')
      sc.stop()
      sys.exit("Archivo lista_trabajo_ocio.json no encontrado")
  
  """
  Carga la RDD correspondiente y calcula el número de elementos que tiene 
  utilizando una reducción. También carga los elementos y forma una lista
  de diccionarios con las líneas de la RDD
  """
  rdd_data = sc.textFile("/content/lista_trabajo_ocio.json")
  rdd_data = rdd_data.map(json.loads)
  numero_filas = rdd_data.map(lambda x: 1).reduce(lambda x,y: x + y)
  lista_tabla = rdd_data.collect()

  print("RDD cargado con éxito.")
  print("--------------------")
  print("Selecciona con respecto a qué valor deseas ordenar la tabla:")
  print(" ")
  print("1 -> Según el número de la estación")
  print("2 -> Según la media de puntuación ocio/trabajo de la estación (0 = ocio, 1 = trabajo)")
  print("3 -> Según el número de usos por personas distintas de la estación")
  print("4 -> Según el número de usos por personas con puntuación menor que 0.5 (usos por ocio) de la estación")
  print("5 -> Según el número de usos por personas con puntuación mayor que 0.5 (usos por trabajo) de la estación")
  print(" ")

  opcion = input()

  #Se asegura de mostrar fallos de lectura si los carácteres introducidos no 
  #corresponden a ninguna opción
  if opcion not in ["1","2","3","4","5"]:
    i = 0
    while i != 1:
      if opcion not in ["1","2","3","4","5"]:
        print("No se reconoce el dígito marcado. Por favor, introduzca una opción de las anteriores:")
        print(" ")
        opcion = input()
      else: 
        i = 1

  opcion = int(opcion)

  print(" ")
  print("Selecciona el orden según el cuál deseas ordenar la tabla:")
  print(" ")
  print("1 -> Orden creciente")
  print("2 -> Orden decreciente")
  print(" ")

  orden = input()

  if orden not in ["1","2"]:
    i = 0
    while i != 1:
      if orden not in ["1","2"]:
        print("No se reconoce el dígito marcado. Por favor, introduzca una opción de las anteriores:")
        print(" ")
        orden = input()
      else: 
        i = 1
  orden = int(orden)

  print(" ")
  print("Selecciona el número de filas que quieres mostrar:")
  print("T -> Todas las filas")
  print("(Número de filas que desees mostrar) -> N primeras filas (máximo " + str(numero_filas) + ")")
  print(" ")

  numero_elementos = input()

  j = 1
  elementos_validos = ["t", "T"]
  while j <= numero_filas:
    elementos_validos.append(str(j))
    j = j + 1

  if numero_elementos not in elementos_validos:
    i = 0
    while i != 1:
      if numero_elementos not in elementos_validos:
        print("No se reconoce el comando marcado. Por favor, introduzca una opción de las anteriores:")
        print(" ")
        numero_elementos = input()
      else: 
        i = 1

  #Para cargar las diferentes impresiones según las opciones elegidas se crean diferentes strings
  if numero_elementos in ["t", "T"]: 
    numero_elementos = numero_filas
    p_numero = "todos los"

  else:
    if int(numero_elementos) == numero_filas:
      p_numero = "todos los"
      numero_elementos = int(numero_elementos)
    else:
      p_numero = numero_elementos
      numero_elementos = int(numero_elementos)
    
  p_1 = "Tabla ordenada en función "

  if opcion == 1:
    p_opcion = "del número de estaciones"
  elif opcion == 2:
    p_opcion = "de la media de las estaciones"
  elif opcion == 3:
    p_opcion = "del número de usos"
  elif opcion == 4:
    p_opcion = "del número de usos por ocio"
  elif opcion == 5:
    p_opcion = "del número de usos por trabajo"

  if orden == 1:
    p_orden = "creciente"
  elif orden == 2:
    p_orden = "decreciente"

  print(" ")
  print(p_1 + p_opcion + " en orden " + p_orden + " mostrando " + p_numero + " elementos:")
  print(" ")

  #Finalmente se trasnfieren los datos recibidos a la función auxiliar
  v_table(lista_tabla, opcion, orden, numero_elementos)


if __name__ == "__main__":
  print("Bienvenido a las opciones de visualización de Spark BICIMAD TRBAJO/OCIO")
  print("Asegúrate de haber ejecutado el programa anterior y de haber recibido el archivo lista_trabajo_ocio.json")
  print(" ")
  sc = SparkContext()
  main(sc)
  sc.stop()