In [1]:
import pandas as pd
from datetime import datetime
import os

## Procesamiento y Tratamiento de datos

Vamos a suponer que los datos llegan todos los días a las 3:00 AM al bucket **gd-ld-p2**, donde gd hace referencia a la unidad de negocio, ld hace referencia a la capa de landing donde se depositan los datos en crudo y p2 hace referencia al proyecto. La ruta donde se depositan los ficheros *movies.csv* y *ratings.csv* es la siguiente:

- gd-ld-p2/movielens/year={YYYY}/month={MM}/day={DD}/movies.csv
- gd-ld-p2/movielens/year={YYYY}/month={MM}/day={DD}/ratings.csv

Tras el proceso de limpieza y transformación de los mismos, los datos finales serán depositados en el bucket de staging **gd-st-p2**, bajo la ruta:

- gd-st-p2/movielens/year={YYYY}/month={MM}/day={DD}/movies.csv

- gd-st-p2/movielens/year={YYYY}/month={MM}/day={DD}/ratings.csv

Finalmente, la transformación de los datos se depositará en el bucket de bussines, denominado **gd-bu-p2**, bajo la ruta:

- gd-bu-p2/movielens/year={YYYY}/month={MM}/day={DD}/datos_procesados.csv


Se asume que landing es la capa donde se cargan los datos en crudo y staging es la capa donde se depositan los datos limpios sin missing values ni outliers y bussines en la capa donde se cargan los datos transformados para poder calcular de manera óptima la consulta pedida. 

La partición de la ruta por origen y año, mes y día permite una mejor organización de los mismos, y facilita la creación de un historial y las posibles búsquedas de logs.


Además, vamos a suponer que estamos a día 21 de noviembre de 2019 para poder seleccionar los géneros con mejores puntuaciones de la última semana, ya que solo se disponen datos hasta ese día.

También vamos a suponer que tenemos activado un sistema de notificaciones en 'movielens/movies' y 'movielens/ratings', de manera que cuando los datos en crudo se cargen en el bucket de landing en las rutas anteriores, se llame de manera automática a la función **landing_to_staging** que se encarga de tratar los valores perdidos, normalizar los valores necesarios y limpiar los outliers. De igual manera, suponemos que tenemos activado un sistema de notificaciones en el bucket de staging para que cuando se carguen los datos limpios se llame a la función **staging_to_business** que realice la transformación requerida y guarde los datos en el fichero datos_procesados.csv.

Antes de comenzar a programar nuestras funciones que serán llamadas de manera automática, vamos a analizar los datos para ver que tipo de preprocesado es necesario llevar a cabo.

## Tratamiento de valores perdidos

**Movies.csv**

In [2]:
movies_path = 'gd-ld-p2/movielens/year=2019/month=11/day=21/movies.csv'
movies = pd.read_csv(movies_path)
total_na = movies.isna().sum()
empty_strings = movies[movies == ''].notna().sum()
print('Total of Nan values in movies:\n',total_na)
print('\nTotal of empty strings in movies:\n',empty_strings)
print('\nDatatype infered by pandas:\n',movies.dtypes)
print('\nConteo de géneros:\n', movies.genres.value_counts())

Total of Nan values in movies:
 movieId    0
title      0
genres     0
dtype: int64

Total of empty strings in movies:
 movieId    0
title      0
genres     0
dtype: int64

Datatype infered by pandas:
 movieId     int64
title      object
genres     object
dtype: object

Conteo de géneros:
 Drama                                   9056
Comedy                                  5674
(no genres listed)                      5062
Documentary                             4731
Comedy|Drama                            2386
                                        ... 
Animation|Children|Comedy|Horror           1
Action|Adventure|Drama|Fantasy|IMAX        1
Action|Crime|Drama|Thriller|IMAX           1
Animation|Fantasy|Musical                  1
Comedy|Horror|Mystery|Sci-Fi|Western       1
Name: genres, Length: 1662, dtype: int64


Observamos que no existen valores nulos, pero que existe un tipo de género denominado **(no genres listed)**. Este tipo de valor es importante mantenerlo en nuestro dataset ya que nos puede ayudar a detectar películas que son muy votadas y no tengan asignado un género, y así poder añadírselo. 

Concluimos que el conjunto de datos movies.csv no contiene valores perdidos y puede ser procesado sin tratamiento.

**ratings.csv**

In [None]:
ratings_path = 'gd-ld-p2/movielens/year=2019/month=11/day=21/ratings.csv'
ratings = pd.read_csv(ratings_path)
total_na = ratings.isna().sum()
empty_strings = ratings[ratings == ''].notna().sum()
print('\nTotal of Nan values in ratings:\n',total_na)
print('\nTotal of empty strings in ratings:\n',empty_strings)
print('\nDatatype infered by pandas:\n',movies.dtypes)

En este caso encontramos un total de 495115 Nan values en las votaciones. Vamos a comprobar cuántas votaciones del total son Nan values y si el resto tienen valores que pertenezcan al conjunto de valores posibles: float (0.5 a 5)

In [None]:
total_na.rating/ratings.shape[0]*100

Es decir, existen un total de 1.98% de votaciones perdidas.

Finalmente eliminamos del conjunto de datos las filas que contengan valores perdidos.

In [None]:
ratings.dropna(axis="index", subset=["rating"], inplace=True)

## Normalización

Respecto al proceso de normalización, para este proyecto no se ve necesario realizar ningún tipo de normalización ya que el ojetivo es realizar un promedio de las puntuaciones, y para dicho fin no es necesario normalizar los datos. 

## Limpieza de outliers

Para la limpieza de outliers, dado que el único dato numérico con el que trabajamos son las puntuaciones 'ratings', vamos a comprobar que los valores que toma esa columna están en el rango establecido $[0.5,5]$ con incrementos de $0.5$. En primer lugar, echaremos un vistazo a los valores que contiene el dataset.

In [None]:
ratings.rating.value_counts()

Vemos que existen puntuaciones negativas y puntuaciones superiores a 5, lo cual viola las restricciones anteriores. Por tanto, el siguiente paso será eliminar del conjunto de datos, las filas que contengan puntuaciones que estén fuera de los límites establecidos.

In [None]:
ratings = ratings[(0.5 <= ratings.rating) & (ratings.rating <= 5)]
ratings.rating.value_counts()

Ahora podemos observar con mayor claridad la existencia de puntuaciones cuyos decimales pertenecen al rango $(0.5,0)$. Para este tipo de puntuaciones, en lugar de eliminarlas, se realizará un redondeo al múltiplo de 5 más cercano.

In [None]:
ratings.rating = (ratings.rating*2).round()/2
ratings.rating.value_counts()

### Automatizándolo

A continuación, se definen las funciones encargadas de preprocesar los datos limpiando los valores perdidos para ambos ficheros y tratando los outliers del campo ratings, realizando las mismas operaciones que arriba y guardando los datos resultante en la capa de staging bajo las ruta:

- gd-st-p2/movielens/year={YYYY}/month={MM}/day={DD}/movies.csv
- gd-st-p2/movielens/year={YYYY}/month={MM}/day={DD}/ratings.csv

In [None]:
def preprocess(df, is_rating=False):
    # clean missing values
    columns = df.columns[df.isna().any()].tolist()
    df.dropna(axis="index", subset=columns, inplace=True)
    
    # clean rating outliers
    if is_rating:
        df = df[(0.5 <= df.rating) & (df.rating <= 5)]
        df.rating = (df.rating*2).round()/2
    
    return df

def landing_to_staging(event):
    ld_bucket = event['bucket']
    movies_key = event['movies']
    ratings_key = event['ratings']
    
    ld_movies_path = ld_bucket +'/'+ movies_key
    ld_ratings_path = ld_bucket +'/'+ ratings_key
    
    print('Reading movies data from landing...')
    df_movies = pd.read_csv(ld_movies_path)
    print('Reading ratings data from landing...')
    df_ratings = pd.read_csv(ld_ratings_path)
    
    print('Preprocessing movies data...')
    df_movies = preprocess(df_movies)
    print('Preprocessing ratings data...')
    df_ratings = preprocess(df_ratings, is_rating=True)
     
    bucket_split = ld_bucket.split('-')
    st_bucket = f'{bucket_split[0]}-st-{bucket_split[-1]}'
    st_movies_path = f'{st_bucket}/{movies_key}'
    st_ratings_path = f'{st_bucket}/{ratings_key}'
    
    folder = '/'.join(st_ratings_path.split('/')[:-1])
    if not os.path.exists(folder):
        os.makedirs(folder)
    print('Writing movies data in staging...')
    df_movies.to_csv(st_movies_path)
    print('Writing ratings data in staging...')
    df_ratings.to_csv(st_ratings_path)

In [None]:
event = {
    'bucket': 'gd-ld-p2',
    'movies': 'movielens/year=2019/month=11/day=21/movies.csv',
    'ratings': 'movielens/year=2019/month=11/day=21/ratings.csv'
}
landing_to_staging(event)

## Transformación de datos

A continuación, se define el proceso de transformación de los conjuntos anteriores para poder disponibilizar el conjunto de datos que contenga la información correcta y estructurada de tal forma que permita realizar la consulta (lista de géneros ordenados por promedio de puntuación que se han obtenido en la última semana) de forma eficiente.

La automatización de este proceso será llevado a cabo por la función **staging_to_business** que será ejecutada cuando se reciba una notificación de que tanto los ficheros rating como movies han sido preprocesados con éxito.

In [None]:
col_movies = ['movieId', 'genres']
movies = pd.read_csv('gd-st-p2/movielens/year=2019/month=11/day=21/movies.csv', usecols=col_movies)
col_ratings = ['movieId', 'rating', 'timestamp']
ratings = pd.read_csv('gd-st-p2/movielens/year=2019/month=11/day=21/ratings.csv', usecols=col_ratings)

A continuación, se desglosan los distintos géneros para cada película y se realiza un join con los datos de rating, para poder realizar la consulta solicitada.

In [None]:
movies = movies.assign(genres=movies.genres.str.split('|')).explode('genres')
movies.head()

In [None]:
ratings_movies = ratings.merge(movies, on="movieId", how="left")
ratings_movies.head()

Se seleccionan aquellas películas cuyas votaciones tengan menos de 7 días.

In [None]:
today = datetime.timestamp(datetime(2019, 11, 21, 0, 0))
last7 = today - 7*86400
ratings_movies = ratings_movies[(ratings_movies['timestamp'] > last7) & (ratings_movies['timestamp'] <= today)]

In [None]:
ratings_movies = ratings_movies.drop(columns=['movieId'])
ratings_movies.head()

Se realiza la consulta solicitada: géneros cinematrográficos más votados de los últimos 7 días.

In [None]:
ratings_movies.groupby(['genres']).mean().sort_values(by=['rating'], ascending=False)

### Automatizándolo

A continuación, se define la función que se encarga de transformar los datos utilizando las mismas operaciones que arriba, y de depositar los datos resultantes en la ruta

- gd-st-p2/movielens/year={YYYY}/month={MM}/day={DD}/datos_procesados.csv

In [None]:
def staging_to_business(event):
    st_bucket = event['bucket']
    movies_key = event['movies']
    ratings_key = event['ratings']
    
    st_movies_path = st_bucket +'/'+ movies_key
    st_ratings_path = st_bucket +'/'+ ratings_key
    
    print('Reading movies data from staging...')
    col_movies = ['movieId', 'genres']
    df_movies = pd.read_csv(st_movies_path, usecols=col_movies)
    
    print('Reading ratings data from staging...')
    col_ratings = ['movieId', 'rating', 'timestamp']
    df_ratings = pd.read_csv(st_ratings_path, usecols=col_ratings)
    
    print('Transform data...')
    df_movies = df_movies.assign(genres=df_movies.genres.str.split('|')).explode('genres')
    df_ratings_movies = df_ratings.merge(df_movies, on="movieId", how="left")
    today = datetime.timestamp(datetime(2019, 11, 21, 0, 0))
    last7 = today - 7*86400
    df_ratings_movies = df_ratings_movies[(df_ratings_movies['timestamp'] > last7) & (df_ratings_movies['timestamp'] <= today)]
    df_ratings_movies = df_ratings_movies.drop(columns=['movieId'])
    
    bucket_split = st_bucket.split('-')
    bu_bucket = f'{bucket_split[0]}-bu-{bucket_split[-1]}'
    ratings_movies_key = '/'.join(ratings_key.split('/')[:-1])+'/datos_procesados.csv'
    bu_ratings_movies_path = f'{bu_bucket}/{ratings_movies_key}'
    
    print('Writing preprocess data in business...')
    folder = '/'.join(bu_ratings_movies_path.split('/')[:-1])
    if not os.path.exists(folder):
        os.makedirs(folder)
    df_ratings_movies.to_csv(bu_ratings_movies_path)

In [None]:
event = {
    'bucket': 'gd-st-p2',
    'movies': 'movielens/year=2019/month=11/day=21/movies.csv',
    'ratings': 'movielens/year=2019/month=11/day=21/ratings.csv'
}
staging_to_business(event)