In [1]:
# Respuesta
import os
os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3.6'

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()



# Combinando DataFrames

En _pyspark_ hay dos formas de combinar los datos de DataFrames.
* por filas: `join`
* por columnas: `union`

In [2]:
movies_df = spark.read.csv('Data/movie-ratings/movies.csv', sep=',', header=True, inferSchema=True)
ratings_df = spark.read.csv('Data/movie-ratings/tags.csv', sep=',', header=True, inferSchema=True)

In [3]:
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [4]:
ratings_df.show(5)

+------+-------+----------+----------+
|userId|movieId|       tag| timestamp|
+------+-------+----------+----------+
|     1|    318|  narrated|1425942391|
|    20|   4306|Dreamworks|1459855607|
|    20|  89302|   England|1400778834|
|    20|  89302| espionage|1400778836|
|    20|  89302|      jazz|1400778841|
+------+-------+----------+----------+
only showing top 5 rows





## join

Añadamos a cada rating el título y el género de la pelicula.

In [5]:
ratings_movies_df = ratings_df.join(movies_df, on='movieId', how='inner')

In [6]:
ratings_movies_df.show(5)

+-------+------+----------+----------+--------------------+--------------------+
|movieId|userId|       tag| timestamp|               title|              genres|
+-------+------+----------+----------+--------------------+--------------------+
|    318|     1|  narrated|1425942391|Shawshank Redempt...|         Crime|Drama|
|   4306|    20|Dreamworks|1459855607|        Shrek (2001)|Adventure|Animati...|
|  89302|    20|   England|1400778834|   Page Eight (2011)|      Drama|Thriller|
|  89302|    20| espionage|1400778836|   Page Eight (2011)|      Drama|Thriller|
|  89302|    20|      jazz|1400778841|   Page Eight (2011)|      Drama|Thriller|
+-------+------+----------+----------+--------------------+--------------------+
only showing top 5 rows



 

Si la columna de unión tuviera distinto nombre en ambos DataFrames

In [7]:
movies2_df = movies_df.withColumnRenamed('movieId', 'id_movie')
movies2_df.show(2)

+--------+----------------+--------------------+
|id_movie|           title|              genres|
+--------+----------------+--------------------+
|       1|Toy Story (1995)|Adventure|Animati...|
|       2|  Jumanji (1995)|Adventure|Childre...|
+--------+----------------+--------------------+
only showing top 2 rows



In [8]:
ratings_movies2_df = ratings_df.join(movies2_df, 
                                     on=[ratings_df['movieId'] == movies2_df['id_movie']], how='outer')

In [9]:
ratings_movies2_df.show(5)

+------+-------+--------------------+----------+--------+--------------------+--------------------+
|userId|movieId|                 tag| timestamp|id_movie|               title|              genres|
+------+-------+--------------------+----------+--------+--------------------+--------------------+
|110803|    148|              catchy|1434969127|     148|Awfully Big Adven...|               Drama|
|150781|    148|    nudity (topless)|1400281940|     148|Awfully Big Adven...|               Drama|
|236944|    148|Nudity (Topless -...|1158734777|     148|Awfully Big Adven...|               Drama|
|  null|   null|                null|      null|     463|Guilty as Sin (1993)|Crime|Drama|Thriller|
|   930|    471|       Coen Brothers|1303593059|     471|Hudsucker Proxy, ...|              Comedy|
+------+-------+--------------------+----------+--------+--------------------+--------------------+
only showing top 5 rows





## union

Imagina que tuvieramos un DataFrame con las películas de terror y otro con las de comedia y quisieramos unirlo todo en uno.

In [10]:
from pyspark.sql import functions as F

In [11]:
horror_df = movies_df.filter(F.col('genres') == 'Horror')
comedy_df = movies_df.filter(F.col('genres') == 'Comedy')

In [12]:
horror_df.select('genres').distinct().show()

+------+
|genres|
+------+
|Horror|
+------+



In [13]:
comedy_df.select('genres').distinct().show()

+------+
|genres|
+------+
|Comedy|
+------+



In [14]:
horror_comedy_df = horror_df.union(comedy_df)

In [15]:
horror_comedy_df.select('genres').distinct().show()

+------+
|genres|
+------+
|Horror|
|Comedy|
+------+





# Persistiendo DataFrames

Debido al concepto de *lazy_evaluation* de Spark cada vez que realicemos una acción sobre el DataFrame `ratings_movies_df` se ejecutara la operación de _join_. Este tipo de operación es muy costosa computacionalmente por lo que es recomedable realizar un `cache` o `persist` sobre el DataFrame para evitar ejecutarla multiples veces.

Al persistir un DataFrame se guarda temporalmente el resultado del DAG hasta el punto donde se cachea el DataFrame, evitando que se ejecute esa parte repetidas veces con cada acción.



Por ejemplo, si queremos contar el número de títulos únicos con rating de 5 y también con rating de 1:

__ineficiente:__

In [16]:
%%time

ratings_movies_df.filter(F.col('tag') == 'narrated').select('title').distinct().count()

CPU times: user 4.94 ms, sys: 2.46 ms, total: 7.39 ms
Wall time: 3.15 s


88

In [17]:
%%time

ratings_movies_df.filter(F.col('tag') == 'England').select('title').distinct().count()

CPU times: user 4.59 ms, sys: 1.97 ms, total: 6.56 ms
Wall time: 2.81 s


196



__eficiente__

Observa que la primera acción puede ser incluso más lenta que la anterior ya que se está guardando el resultado. En cambio la segunda acción es mucho más rápida al no necesitar volver a ejecutar el join.



__efficient__

Notice that first action could take even longer than the previous one because the result is being stored. On the other hand, second action is much faster because it doesn't need to reexecute the join statement.

In [18]:
ratings_movies_df.persist()

DataFrame[movieId: int, userId: int, tag: string, timestamp: string, title: string, genres: string]

In [19]:
%%time

ratings_movies_df.filter(F.col('tag') == 'narrated').select('title').distinct().count()

CPU times: user 6.08 ms, sys: 2.89 ms, total: 8.97 ms
Wall time: 10.3 s


88

In [20]:
%%time

ratings_movies_df.filter(F.col('tag') == 'England').select('title').distinct().count()

CPU times: user 4.8 ms, sys: 2.22 ms, total: 7.03 ms
Wall time: 1.95 s


196



Es importante borrar los DataFrames cacheados cuando no se vuelven a utilizar.

In [21]:
ratings_movies_df.unpersist()

DataFrame[movieId: int, userId: int, tag: string, timestamp: string, title: string, genres: string]



Es posible elegir si el guardado temporal se hace en memoria, en disco, o en ambas.

In [22]:
from pyspark import StorageLevel

In [23]:
ratings_movies_df.persist(storageLevel=StorageLevel.DISK_ONLY)

DataFrame[movieId: int, userId: int, tag: string, timestamp: string, title: string, genres: string]

In [24]:
ratings_movies_df.unpersist()

DataFrame[movieId: int, userId: int, tag: string, timestamp: string, title: string, genres: string]

In [25]:
ratings_movies_df.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)

DataFrame[movieId: int, userId: int, tag: string, timestamp: string, title: string, genres: string]

In [26]:
ratings_movies_df.unpersist()

DataFrame[movieId: int, userId: int, tag: string, timestamp: string, title: string, genres: string]

In [27]:
ratings_movies_df.persist(storageLevel=StorageLevel.MEMORY_ONLY)

DataFrame[movieId: int, userId: int, tag: string, timestamp: string, title: string, genres: string]

In [28]:
ratings_movies_df.unpersist()

DataFrame[movieId: int, userId: int, tag: string, timestamp: string, title: string, genres: string]