# Práctica spark sql (Etapa 2)

## Construcción del módulo toronto_bike

En esta etapa y con lo experimentado en la etapa anterior, vamos a crear un paquete de python, que posteriormente instalaremos en un environment para realizar ciertas pruebas. 

Como hemos visto en el portal de datos abiertos de Toronto (https://open.toronto.ca/dataset/bike-share-toronto-ridership-data/), los ficheros de datos se encunetran en el repositorio CKAN.

__Instrucciones para la creación del módulo `toronto_bike`__

El módulo `toronto_bike` debe exportar las clases `UrlToronto` y `BikeToronto` descritas a continuación.


## Clase `UrlToronto`

Vamos a crear una clase que permita recopilar todos los enlaces relacionados con el uso de bicicletas que hay en el portal de datos abiertos de Toronto (https://open.toronto.ca/dataset/bike-share-toronto-ridership-data/). Los enlaces que cumplen esta restricción son los que denominaremos a partir de ahora _enlaces válidos_. Estos enlaces contienen la cadena `bikeshare-ridership-YYYY.zip` donde `YYYY` representa el año.


__Requisitos:__

* Los datos se encuentran en CKAN, por lo que usaremos su API para la descarga. La clase ha de contener una constante de clase:
```
BASE_URL = "https://ckan0.cf.opendata.inter.prod-toronto.ca"
```
* Los objetos de la clase tienen __dos atributos__:
    * _temporal_path_: ubicación temporal donde se descargarán los datos  de los ficheros csv que necesitemos para crear nuestros dataframes.
    * _valid_urls_: lista de nombres de ficheros ZIP que se corresponden con la información de uso de bicicletas (uno por año).


* Ha de contener al menos los siguientes métodos:

    * `__init__`: método constructor con un argumento (ubicación temporal).
    *  `select_valid_urls`: método estático que se encarga de actualizar el atributo  _valid_urls_ de los objetos de la clase. Devuelve un conjunto de enlaces válidos. Si la petición al servidor devuelve un código de retorno distinto de 200, la función lanza una excepción de tipo `ConnectionError`. A continuación se muestra un fragmento de código que permite extraer información de dichos enlaces(consultar apartado __FOR DEVELOPERS__ de la url https://open.toronto.ca/dataset/bike-share-toronto-ridership-data/), y que puede servir como base para desarrrollar el método  `select_valid_urls`.


In [None]:
import requests

# Toronto Open Data is stored in a CKAN instance. It's APIs are documented here:
# https://docs.ckan.org/en/latest/api/

# To hit our API, you'll be making requests to:
base_url = "https://ckan0.cf.opendata.inter.prod-toronto.ca"

# Datasets are called "packages". Each package can contain many "resources"
# To retrieve the metadata for this package and its resources, use the package name in this page's URL:
url = base_url + "/api/3/action/package_show"
params = { "id": "bike-share-toronto-ridership-data"}
package = requests.get(url, params = params).json()

# To get resource data:
for _, resource in enumerate(package["result"]["resources"]):

       # To get metadata for non datastore_active resources:
       if not resource["datastore_active"]:
           url = base_url + "/api/3/action/resource_show?id=" + resource["id"]
           resource_metadata = requests.get(url).json()
           print(resource_metadata)
           # From here, you can use the "url" attribute to download this file


* `download_csv`:  método de instancia que acepta los argumentos de tipo entero `month` y `year` y devuelve el string de la ruta del fichero  CSV correspondiente al mes `month` y año `year`. Este fichero se extrae del ZIP correspondiente que ha de estar en _valid_urls_. Se lanzará una excepción de tipo `ValueError` en caso de que no exista. Se deberá comprobar que el mes y año se corresponden con valores válidos (`month` entre 1 y 12, `year` entre 20 y 23). Si la petición al servidor devuelve un código de retorno distinto de 200, la función lanza una excepción de tipo `ConnectionError`. A continuación dejo un posible esqueleto (casi completo):

In [None]:
    def download_csv(self, year:str, month:str)-> str:
        """
        TODO completar el docstring. No olvidéis las excepciones
        :param year:
        :param month:
        :return:
        """
        # temporal_path: ruta elegida donde se guardará el fichero CSV descomprimido
        temporal_path =  ...    # ruta donde se guardará el fichero CSV descomprimido
        # url: será la url donde se encuentran los datos (para year = 2023, esta url se corresponde con un ZIP con 12 ficheros, uno para cada mes )
        url =   ...   
        # name: nombre del fichero que nos intersa ( puede ser parte del nombre, algo de tipo '2023-05.csv')
        name = ...    # nombre del fichero que nos intersa ( algo de tipo '2023-05.csv') 
        response = requests.get(url)     # petición
        if response.status_code == 200:
            content = io.BytesIO(response.content)     
            zfile = zipfile.ZipFile(content, 'r')
            files = [f for f in zfile.filelist if f.filename.find(name) > 0]      # devuelve una lista unitaria con el fichero que nos interesa
            # nombre del fichero a extraer:
            print(files)
            
            # Extraemos el contenido en una carpeta temporal
            os.makedirs(temporal_path, exist_ok=True)                               # Creamos la carpeta temporal
            zfile.extractall(temporal_path, members=files)                          # extraemos del ZIP solo el fichero que nos interesa

            # nombre del fichero csv :
            file_path = os.path.join(temporal_path, files[0].filename)              # me guardo la ruta a dicho fichero
            return file_path
        else:
            raise ...


__Cuidado__: mirando el año 2022, nos encontramos con que no existe fichero ' ... 2022-11.csv'. Sin embargo, sí existe el fichero ' ... 2022-11.zip'. En estos caso, tendremos que extraer el csv que hay dentro, y por tanto modificar el código de forma apropiada.

 ## Clase `BikeToronto`

Vamos a crear una clase que permita crear un dataframe de pySpark con los datos de uso de las bicicletas en un mes y año concreto. Además, esta clase proporcionará ciertas operaciones para realizar limpieza y cierto análisis básico de los datos.

__Requisitos:__

* Los objetos de la clase tienen dos atributos (spark, parametros) que representan:

    * spark: la instancia de spark (recupera la SparkSession, si es que ya existe, o crea una nueva)
    * parametros: diccionario creado a partir de los datos del fichero json que se proporciona (_parametros.json_)

* Ha de contener al menos los siguientes métodos:

    * `__init__`: método constructor. Acepta un argumento de tipo `str` que representa la ruta al fichero json que contiene los parámetros de la clase.
    * `crear_df`: método de instancia que acepta los argumentos `month` y `year` (ambos de tipo `str`) y devuelve un objeto de tipo DataFrame de Spark con los datos de uso correspondientes al mes `month` y año  `year`. 
    * `procesar_df`_: método que acepta como argumento de entrada un dataframe de Spark y devuelve dos dataframes. Este método debe:        
        
        *  Eliminar filas duplicadas del dataframe de entrada
        *  Los tipos de cada una de las columnas han de ser los siguientes:  Start Time y End Time: fecha, User Type, Start Station Name y End Station Name: string.  El resto son de tipo entero.
        *  Las columnas que nos interesan son: Trip Duration, 	Start Station Id, 	Start Time, 	Start Station Name, 	End Station Id, 	End Time, 	End Station Name, y 	Bike Id.
        * Trip Duration viene expresado en segundos. Añadir una nueva columna con la duración en minutos.
        *  Se devolverán dos dataframes, uno con los datos de los usuario de tipo Casual Member y otro con el resto de usuarios.


## Instalamos el módulo  

```
pip install --force-reinstall  .\dist\toronto_bike-0.0.1-py3-none-any.whl
```

### Probamos el módulo


In [9]:
%pip install --force-reinstall  .\dist\toronto_bike-0.0.1-py3-none-any.whl

Processing c:\users\mario\downloads\ej2\dist\toronto_bike-0.0.1-py3-none-any.whl
Collecting requests (from toronto-bike==0.0.1)
  Using cached requests-2.32.3-py3-none-any.whl.metadata (4.6 kB)
Collecting findspark (from toronto-bike==0.0.1)
  Using cached findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Collecting pyspark (from toronto-bike==0.0.1)
  Using cached pyspark-3.5.4-py2.py3-none-any.whl
Collecting py4j==0.10.9.7 (from pyspark->toronto-bike==0.0.1)
  Using cached py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Collecting charset-normalizer<4,>=2 (from requests->toronto-bike==0.0.1)
  Using cached charset_normalizer-3.4.1-cp313-cp313-win_amd64.whl.metadata (36 kB)
Collecting idna<4,>=2.5 (from requests->toronto-bike==0.0.1)
  Using cached idna-3.10-py3-none-any.whl.metadata (10 kB)
Collecting urllib3<3,>=1.21.1 (from requests->toronto-bike==0.0.1)
  Using cached urllib3-2.3.0-py3-none-any.whl.metadata (6.5 kB)
Collecting certifi>=2017.4.17 (from requests->toront

ERROR: Could not install packages due to an OSError: [WinError 32] El proceso no tiene acceso al archivo porque está siendo utilizado por otro proceso: 'c:\\users\\mario\\appdata\\local\\programs\\python\\python313\\lib\\site-packages\\pyspark\\jars\\activation-1.1.1.jar'
Consider using the `--user` option or check the permissions.



In [10]:
from  toronto_bike.UrlToronto import UrlToronto
print(UrlToronto)
temporal ='carpeta_temporal'
downloader = UrlToronto(temporal)
path_csv = downloader.download_csv('2023', '01')

<class 'toronto_bike.UrlToronto.UrlToronto'>


In [11]:
# Si lo habéis hecho bien, esta celda no debe producir ninguna salida
assert(path_csv == 'carpeta_temporal\\bikeshare-ridership-2023/Bike share ridership 2023-01.csv')

In [12]:
from  toronto_bike.BikeToronto import BikeToronto

params = 'parametros.json'
year = '2023'
month = '05'

# implementar las llamadas para la creación del dataframe
bike_toronto = BikeToronto(params)

df = bike_toronto.crear_df(year, month)
print(df.columns)

['Trip Id', 'Trip  Duration', 'Start Station Id', 'Start Time', 'Start Station Name', 'End Station Id', 'End Time', 'End Station Name', 'Bike Id', 'User Type']


In [13]:
# Si lo habéis hecho bien, esta celda no debe producir ninguna salida
assert(len(df.columns) == 10)
assert(df.count() == 589217)
dtypes = dict(df.dtypes)
assert(dtypes['Bike Id'] == "int")
assert(dtypes["Start Station Name"] == 'string')

In [14]:
df_casual, df_anual = bike_toronto.procesar_df(df)
assert(df_casual.count() + df_anual.count() == df.count())
assert(len(df.columns) == 10)

root
 |-- Trip Id: integer (nullable = true)
 |-- Trip  Duration: integer (nullable = true)
 |-- Start Station Id: integer (nullable = true)
 |-- Start Time: timestamp (nullable = true)
 |-- Start Station Name: string (nullable = true)
 |-- End Station Id: integer (nullable = true)
 |-- End Time: timestamp (nullable = true)
 |-- End Station Name: string (nullable = true)
 |-- Bike Id: integer (nullable = true)
 |-- User Type: string (nullable = true)
 |-- Trip Duration Minutos: double (nullable = true)



## Tests

Define los tests que creas necesarios para comprobar que las clases funcionan correctamente. Los tests deberán estar en un directorio independiente, dentro del proyecto, pero no en el paquete que se distibuye.

## Docstrings  y anotaciones de tipos

Completar los docstring de todas las funciones. Anotar los tipos de las funciones.


## Entrega en el CV
La entrega consisitirá en un fichero comprimido `zip` con lo siguiente:
- Fichero `whl` con el instalable del paquete generado
- Los ficheros del paquete desarrollado, incluyendo los tests.
- Este cuaderno de jupyter con las pruebas necesarias usando la clase `BikeToronto`

