<a href="https://colab.research.google.com/github/ParalelaUCM/biciMAD/blob/master/BicimadSpark_Reorganizacion.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Setup**

Instalamos y configuramos las herramientas necesarias para empezar a trabajar

In [0]:
!apt-get install openjdk-8-jdk
!apt install unzip
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!pip install pyspark

In [0]:
!apt install unzip

In [0]:
import json
import os
from pyspark import SparkContext

In [0]:
sc = SparkContext()

# **Descargamos los datasets de bicimad**
Descargaremos cada dataset de la web oficial de biciMAD. Los renombraremos a '.zip' para poder descomprimirlos y una vez descomprimidos los movemos a la carpeta 'datasets'. Por último eliminamos los ficheros comprimidos.

In [0]:
#Estructura de carpetas para el dataset
!mkdir dataset
!cd dataset/
!mkdir dataset/usages
!mkdir dataset/stations

## Datasets de uso
Aquí nos descargamos los datasets de uso por usuario y los movemos dentro de la carpeta _usages/_ dentro de _datasets/_

In [0]:
#Enero de 2019
!wget -N "https://opendata.emtmadrid.es/getattachment/2ebcc70a-4914-43c6-9ada-a0f0520032a4/201901_Usage_Bicimad.aspx"
!mv 201901_Usage_Bicimad.aspx 201901_Usage_Bicimad.zip
!unzip 201901_Usage_Bicimad.zip
!mv 201901_Usage_Bicimad.json dataset/usages
!rm 201901_Usage_Bicimad.zip

In [0]:
#Febrero de 2019
!wget -N "https://opendata.emtmadrid.es/getattachment/aa8c34d2-ddba-46d9-b6db-882c0b4a12f0/201902_Usage_Bicimad.aspx"
!mv 201902_Usage_Bicimad.aspx 201902_Usage_Bicimad.zip 
!unzip 201902_Usage_Bicimad.zip
!mv 201902_Usage_Bicimad.json dataset/usages
!rm 201902_Usage_Bicimad.zip


In [0]:
#Junio de 2019
!wget -N "https://opendata.emtmadrid.es/getattachment/7517a650-ccdf-4ab1-b1b0-a1d13694472e/201906_Usage_Bicimad.aspx"
!mv 201906_Usage_Bicimad.aspx 201906_Usage_Bicimad.zip
!unzip 201906_Usage_Bicimad.zip
!mv 201906_Usage_Bicimad.json dataset/usages
!rm 201906_Usage_Bicimad.zip

## Datasets de estaciones
Aquí nos descargamos los datasets de ocupacion por estación y los movemos dentro de la carpeta _stations/_ dentro de _datasets/_

# **Creamos los RDD**
Una vez tenemos las bases de datos descargadas vamos a codificarlas de forma cómoda.

-----
Para tener los datos almacenados de una forma cómoda primero creamos un diccionario cuya clave va a ser un string con el mes y el año del dataset y como valor va a tener el rdd asociado al uso por usuario de ese mes.

La funcion ```mapper_usages``` nos servirá para crear la rdd mas legible. Nos quedaremos con los datos necesarios y cada linea la codificaremos como un diccionario.

In [0]:
rdd_usages = {} 

In [0]:
def mapper_usages(line):
  data = json.loads(line)
  user = data['user_type']
  user_day = data['user_day_code']
  start = data['idunplug_station']
  end = data['idplug_station']
  date = data['unplug_hourTime']['$date'][0:10]
  hora = data['unplug_hourTime']['$date'][11:19]
  time = data['travel_time']
  age = data['ageRange']
  return {"user_type": user,
          "user_day_code": user_day, 
          "start": start, "end": end , 
          "travel_time": time, 
          "date": date, 
          "hour": hora, 
          "age": age}

In [39]:
directory = 'dataset/usages'
for filename in os.listdir(directory):
    if filename.endswith(".json"):
      #Nos quedamos con la fecha del dataset en formato YYYYMM
      name = filename.split("_")[0]
      rdd_usages[name] = \
      sc.textFile(os.path.join(directory, filename)).map(mapper_usages)
      #DEBUG starts
      print(name)
      #DEBUG ends
    else:
        continue

201901


----
Por otro lado un diccionario cuya clave va a ser tambien un string con el mes y el año del dataset y como valor va a tener el rdd asociado a la ocupacion de las estaciones en ese mes.

Aquí usaremos la funcion ```mapper_stations``` para acomodar los datos.

In [0]:
rdd_stations = {} 

In [0]:
def mapper_stations(line):
  data = json.loads(line)
  day = data['_id'][0:10]
  hour = data['_id'][11:27]
  station = data['stations']
  return {"day": day, "hour": hour, "station": station}

In [0]:
directory = 'dataset/stations'
for filename in os.listdir(directory):
    if filename.endswith(".json"):
      #Nos quedamos con la fecha del dataset en formato YYYYMM
      name = filename.split("_")[0]
      rdd_stations[name] = \
      sc.textFile(os.path.join(directory, filename)).map(mapper_stations)
      #DEBUG starts
      print(name)
      #DEBUG ends
    else:
        continue

# DEMO
Como usar la nueva forma de organizar los rdd

In [0]:
rdd_usages["201901"].take(5)