# Ejemplo 2
Es este ejemplo se visualiza la carga de un CSV mediante un paquete externo como lo es Pandas y el trabajo en paralelo de Metaflow mediante el uso de <b>foreach</b>

## Estadisticas
En este ejemplo se cargara un archivo CSV de metadatos de peliculas
mediante Pandas y se procede a calcular algunas estadisticas especificas del genero de pelicula. 

   ### Ejecucion

In [2]:
from metaflow import FlowSpec, step, IncludeFile

Se procede a importar los modulos a utilizar desde <b>Metaflow</b>

In [3]:
def script_path(filename):
    import os

    current_folder = globals()['_dh'][0]
    filepath = os.path.join(current_folder,'movies.csv')
    return os.path.join(filepath, filename)

Se implementa la funcion `os.path.join`, conveniente para obtener la ruta del directorio de una manera mas facil. Esto permite utilizar el tutorial desde cualquier directorio 

In [4]:
class MovieStatsFlow(FlowSpec):

    movie_data = IncludeFile("movie_data",
                             help="The path to a movie metadata file.",
                             default=script_path('movies.csv'))


De manera similar a los <b>Parametros</b>, se puede definir un archivo de datos para incluirlo como entrada. Metaflow versiona el archivo y lo hara accesible a todos los pasos directamente a traves del objeto `self`

In [5]:
    @step
    def start(self):

        import pandas
        from io import StringIO

        # Cargar la data mediante pandas
        self.dataframe = pandas.read_csv(StringIO(self.movie_data)) #StringIo permite leer cadenas como archivos

        # La columna 'genres' tiene una lista de generos para cada pelicula
        self.genres = {genre for genres \
                       in self.dataframe['genres'] \
                       for genre in genres.split('|')} # Compresion de diccionarios
        self.genres = list(self.genres) #Convierte el diccionario en lista

        # Se quiere calcular algunas estadiscitas para cada genero.
        # El argumento 'foreach' permite hacer los calculos en paralelo
        self.next(self.compute_statistics, foreach='genres')


En el step <b>start</b> se comienza por analizar el CSV y a este extraer la informacion necesaria y almacenarla en un diccionario de Python. La lectura del CSV se realiza mediante la herramienta Pandas

En el metodo `self.next` se ejecutan pasos paralelos a travez de ramas. La palabra clave <b>foreach</b>

In [6]:
    @step
    def compute_statistics(self):
        # Encontrar todas las 
        self.genre = self.input
        print("Computing statistics for %s" % self.genre)

        # Seleccionar todas las peliculas que pertenezcan al genero especificado.
        # Crear un marcoo de datos con estas peliculas, y solo con las columnas necesarias
        selector = self.dataframe['genres'].\
                   apply(lambda row: self.genre in row) # Funcion anonima para recorrer cada elemento de la lista genres
        self.dataframe = self.dataframe[selector]

        self.dataframe = self.dataframe[['movie_title', 'genres', 'gross']]

        # Obtener estadisticas sobre la taquilla de estos titulos (gross)
        points = [.25, .5, .75]
        self.quartiles = self.dataframe['gross'].quantile(points).values

        # Unir a los resultados de otros generos
        self.next(self.join)

El step <b>compute_statics</b> se ejecuta de manera paralela en funcion de <b>foreach</b>. Obtiene las peliculas que sean del genero especificado.
<br>
Luego se obtienen las estadisticas de la taquilla de los titulos que estan en el dataframe.

En el metodo `self.next` se unen los resultados de los otros generos despues de que la ejecucion en paralelo del <b>compute_statics</b> termine


In [7]:
    @step
    def join(self, inputs):
        # Merge results from the genre specific computations.
        self.genre_stats = {inp.genre.lower(): \
                            {'quartiles': inp.quartiles,
                             'dataframe': inp.dataframe} \
                            for inp in inputs}

        self.next(self.end)


El step <b>join</b> une los resultados de las ramas paralelas combinandolos en un diccionario. Estos datos seran utilizados en el siguiente ejemplo como informacion base para generar una lista mejorada de peliculas recomendadas.

Al ultimo apunta al paso final del programa

In [8]:
    @step
    def end(self):
        """
        End the flow.

        """
        pass

En el step <b>end</b> en general no realiza ninguna accion, pues el presente codigo es previo a la ejecucion del siguiente ejemplo, dejando listos los datos a utilizarse

`if __name__ == '__main__'`:<br>
    `    MovieStatsFlow()`

Sentencia para empezar la ejecucion del programa

### Si todos los pasos se ejecutaron correctamente, tendriamos una salida similar a la siguiente:

<img src="resultado2.png">

En la imagen anterior se puede visualizar la correcta ejecucion del programa, como tambien el uso de los cuatro nucleos del computador empleado, como evidencia del trabajo en paralelo de Metaflow



[Descargar CSV](https://drive.google.com/uc?id=19-sUqw-qAtF82Ki7eDYmN5YUyhirb0Sz&export=download&authuser=0)
<br/>
[Descargar archivo .py](https://drive.google.com/uc?id=1eotKzZsH8Yv9oaS399461YkWj9VbvrG2&export=download&authuser=0)
