In [0]:
# Global data variables
SANDBOX_NAME = # Sandbox Name
DATA_PATH = "/data/sandboxes/" + SANDBOX_NAME + "/data/data/" 

# Nueva sección

# Nueva sección



# Combinando DataFrames

En _pyspark_ hay dos formas de combinar los datos de DataFrames.
* por filas: `join`
* por columnas: `union`

In [0]:
movies_df = spark.read.csv(DATA_PATH + 'movie-ratings/movies.csv', sep=',', header=True, inferSchema=True)
ratings_df = spark.read.csv(DATA_PATH + 'movie-ratings/ratings.csv', sep=',', header=True, inferSchema=True)

In [0]:
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [0]:
ratings_df.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    110|   1.0|1425941529|
|     1|    147|   4.5|1425942435|
|     1|    858|   5.0|1425941523|
|     1|   1221|   5.0|1425941546|
|     1|   1246|   5.0|1425941556|
+------+-------+------+----------+
only showing top 5 rows





## join

Añadamos a cada rating el título y el género de la pelicula.

In [0]:
ratings_movies_df = ratings_df.join(movies_df, on='movieId', how='inner')

In [0]:
ratings_movies_df.show(5)

+-------+------+------+----------+--------------------+----------------+
|movieId|userId|rating| timestamp|               title|          genres|
+-------+------+------+----------+--------------------+----------------+
|    110|     1|   1.0|1425941529|   Braveheart (1995)|Action|Drama|War|
|    147|     1|   4.5|1425942435|Basketball Diarie...|           Drama|
|    858|     1|   5.0|1425941523|Godfather, The (1...|     Crime|Drama|
|   1221|     1|   5.0|1425941546|Godfather: Part I...|     Crime|Drama|
|   1246|     1|   5.0|1425941556|Dead Poets Societ...|           Drama|
+-------+------+------+----------+--------------------+----------------+
only showing top 5 rows



 

Si la columna de unión tuviera distinto nombre en ambos DataFrames

In [0]:
movies2_df = movies_df.withColumnRenamed('movieId', 'id_movie')
movies2_df.show(2)

+--------+----------------+--------------------+
|id_movie|           title|              genres|
+--------+----------------+--------------------+
|       1|Toy Story (1995)|Adventure|Animati...|
|       2|  Jumanji (1995)|Adventure|Childre...|
+--------+----------------+--------------------+
only showing top 2 rows



In [0]:
ratings_movies2_df = ratings_df.join(movies2_df, 
                                     on=[ratings_df['movieId'] == movies2_df['id_movie']], how='outer')

In [0]:
ratings_movies2_df.show(5)

+------+-------+------+---------+--------+--------------------+--------------------+
|userId|movieId|rating|timestamp|id_movie|               title|              genres|
+------+-------+------+---------+--------+--------------------+--------------------+
|  null|   null|  null|     null|     148|Awfully Big Adven...|               Drama|
|  null|   null|  null|     null|     463|Guilty as Sin (1993)|Crime|Drama|Thriller|
|  null|   null|  null|     null|     471|Hudsucker Proxy, ...|              Comedy|
|  null|   null|  null|     null|     496|What Happened Was...|Comedy|Drama|Roma...|
|  null|   null|  null|     null|     833|High School High ...|              Comedy|
+------+-------+------+---------+--------+--------------------+--------------------+
only showing top 5 rows





## union

Imagina que tuvieramos un DataFrame con las películas de terror y otro con las de comedia y quisieramos unirlo todo en uno.

In [0]:
from pyspark.sql import functions as F

In [0]:
horror_df = movies_df.filter(F.col('genres') == 'Horror')
comedy_df = movies_df.filter(F.col('genres') == 'Comedy')

In [0]:
horror_df.select('genres').distinct().show()

+------+
|genres|
+------+
|Horror|
+------+



In [0]:
comedy_df.select('genres').distinct().show()

+------+
|genres|
+------+
|Comedy|
+------+



In [0]:
horror_comedy_df = horror_df.union(comedy_df)

In [0]:
horror_comedy_df.select('genres').distinct().show()

+------+
|genres|
+------+
|Horror|
|Comedy|
+------+





# Persistiendo DataFrames

Debido al concepto de *lazy_evaluation* de Spark cada vez que realicemos una acción sobre el DataFrame `ratings_movies_df` se ejecutara la operación de _join_. Este tipo de operación es muy costosa computacionalmente por lo que es recomedable realizar un `cache` o `persist` sobre el DataFrame para evitar ejecutarla multiples veces.

Al persistir un DataFrame se guarda temporalmente el resultado del DAG hasta el punto donde se cachea el DataFrame, evitando que se ejecute esa parte repetidas veces con cada acción.

# Nueva sección



Por ejemplo, si queremos contar el número de títulos únicos con rating de 5 y también con rating de 1:

__ineficiente:__

In [0]:
%%time

ratings_movies_df.filter(F.col('rating') == 5).select('title').distinct().count()

CPU times: user 6.36 ms, sys: 2.9 ms, total: 9.27 ms
Wall time: 30.2 s


21384

In [0]:
%%time

ratings_movies_df.filter(F.col('rating') == 1).select('title').distinct().count()

CPU times: user 9.32 ms, sys: 1.66 ms, total: 11 ms
Wall time: 28.9 s


18413



__eficiente__

Observa que la primera acción puede ser incluso más lenta que la anterior ya que se está guardando el resultado. En cambio la segunda acción es mucho más rápida al no necesitar volver a ejecutar el join.



__efficient__

Notice that first action could take even longer than the previous one because the result is being stored. On the other hand, second action is much faster because it doesn't need to reexecute the join statement.

In [0]:
ratings_movies_df.persist()

DataFrame[movieId: int, userId: int, rating: double, timestamp: int, title: string, genres: string]

In [0]:
%%time

ratings_movies_df.filter(F.col('rating') == 5).select('title').distinct().count()

CPU times: user 21.5 ms, sys: 8.31 ms, total: 29.8 ms
Wall time: 1min 53s


21384

In [0]:
%%time

ratings_movies_df.filter(F.col('rating') == 1).select('title').distinct().count()

CPU times: user 5.58 ms, sys: 1.83 ms, total: 7.41 ms
Wall time: 7.01 s


18413



Es importante borrar los DataFrames cacheados cuando no se vuelven a utilizar.

In [0]:
ratings_movies_df.unpersist()

DataFrame[movieId: int, userId: int, rating: double, timestamp: int, title: string, genres: string]



Es posible elegir si el guardado temporal se hace en memoria, en disco, o en ambas.

In [0]:
from pyspark import StorageLevel

In [0]:
ratings_movies_df.persist(storageLevel=StorageLevel.DISK_ONLY)

DataFrame[movieId: int, userId: int, rating: double, timestamp: int, title: string, genres: string]

In [0]:
ratings_df.unpersist()

DataFrame[userId: int, movieId: int, rating: double, timestamp: int]

In [0]:
ratings_movies_df.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)

DataFrame[movieId: int, userId: int, rating: double, timestamp: int, title: string, genres: string]

In [0]:
ratings_df.unpersist()

DataFrame[userId: int, movieId: int, rating: double, timestamp: int]

In [0]:
ratings_movies_df.persist(storageLevel=StorageLevel.MEMORY_ONLY)

DataFrame[movieId: int, userId: int, rating: double, timestamp: int, title: string, genres: string]

In [0]:
ratings_df.unpersist()

DataFrame[userId: int, movieId: int, rating: double, timestamp: int]