In [1]:
import pandas as pd 
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkContext
from pyspark.sql import functions as F
spark = SparkSession.builder.appName('watchGo').getOrCreate()
sqlContext = SQLContext(spark)

In [14]:
def df_hdfs_spark(data, str_):
    '''
    Crea un dataframe de spark a partir de uno de pandas,
    lo escribe en un datalake de hdfs y lo vuelve a leer y guarda; 
    usando el formato parquet guarda el esquema
    Parámetros: un pandas dataframe y un str para darle al archivo parquet
    '''
    path = 'hdfs://namenode:8020/datalake' + str_ + '.parquet'
    df_spark = sqlContext.createDataFrame(data=data) #pd.dataframe --> spark
    df_spark.write.parquet(path, mode = 'overwrite') #spark --> hdfs  ojo con el parametro mode!!
    return spark.read.parquet(path) #hdfs --> spark

In [15]:
#lectura de bases de datos
justwatch = pd.read_csv('data/just-watch.csv')
#imdb = pd.read_csv('data/IMDb movies.csv')
filmaffinity = pd.read_csv('data/filmaffinity.csv')

In [16]:
jw = df_hdfs_spark(justwatch, 'jw')

In [17]:
jw.show()

+----------+--------+--------------------+
|Plataforma|    Tipo|              Titulo|
+----------+--------+--------------------+
|   netflix|pelicula|        coach carter|
|   netflix|pelicula|im thinking of en...|
|   netflix|pelicula|         i am mother|
|   netflix|pelicula|un franco 14 pesetas|
|   netflix|   serie|las escalofriante...|
|   netflix|   serie|             dracula|
|   netflix|pelicula|        ahora me ves|
|   netflix|   serie|         outer banks|
|   netflix|   serie|       dawsons creek|
|   netflix|   serie|         white lines|
|   netflix|pelicula|         soy leyenda|
|   netflix|pelicula|regreso al futuro ii|
|   netflix|pelicula|the block island ...|
|   netflix|pelicula|         plan oculto|
|   netflix|pelicula|         mollys game|
|   netflix|   serie|          sweet home|
|   netflix|pelicula|como entrenar a t...|
|   netflix|   serie|                brot|
|   netflix|pelicula|            kin 2018|
|   netflix|   serie|    ash vs evil dead|
+----------

In [18]:
jw.select(F.col('Plataforma')).distinct().show()

+------------------+
|        Plataforma|
+------------------+
|           netflix|
|               hbo|
|       disney-plus|
|amazon-prime-video|
+------------------+



In [19]:
jw.filter(F.col('Titulo') == 'stranger things').show()

+----------+-----+---------------+
|Plataforma| Tipo|         Titulo|
+----------+-----+---------------+
|   netflix|serie|stranger things|
+----------+-----+---------------+



In [20]:
cine = df_hdfs_spark(filmaffinity, 'cine')

In [21]:
cine.show()

+--------------------+--------------------+
|            Pelicula|               Cines|
+--------------------+--------------------+
|      Mortal Kombat |['Cines La Vaguad...|
|Una joven promete...|['Cines Embajador...|
|¡Upsss 2! ¿Y ahor...|['Cines La Vaguad...|
|Una veterinaria e...|['Cinesa Las Roza...|
|Detective Conan: ...|                  []|
|Crock of Gold: Be...|                  []|
| Amanece en Calcuta |['Cines La Vaguad...|
|         Otra ronda |['Cines Embajador...|
|            Cuñados |['Cinesa La Gavia...|
|               Vivo |['Cines La Vaguad...|
| Ruega por nosotros |['Cinesa  Xanadú'...|
|  Godzilla vs. Kong |['Cines La Vaguad...|
|          Nomadland |['Cines Embajador...|
|        Tom y Jerry |['Cinesa  Xanadú'...|
|     Monster Hunter | ['Cinesa La Gavia']|
|             Inmune |  ['Cinesa Loranca']|
|    Los traductores |['Cinesa Las Roza...|
|El informe Auschw...|                  []|
|     El agente topo |['Renoir Plaza Es...|
|Minari. Historia ...|['Cines Ve

In [41]:
ls_cines = cine.select(F.col('Cines')).filter(F.col('Pelicula') == 'Mortal Kombat ')

In [26]:
ls_cines.show()

+--------------------+
|               Cines|
+--------------------+
|['Cines La Vaguad...|
+--------------------+



In [42]:
ls_cines = ls_cines.select(F.split(F.col('Cines'), ',').alias('listado')).drop('Cines')

In [43]:
ls_cines.show()

+--------------------+
|             listado|
+--------------------+
|[['Cines La Vagua...|
+--------------------+



In [44]:
ls_cines.printSchema()

root
 |-- listado: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [46]:
ls_cines.show()

+--------------------+
|             listado|
+--------------------+
|[['Cines La Vagua...|
+--------------------+



In [14]:
spark.stop()