In [71]:
# importamos las liberias necesarias para trabajar con python con spark
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, IntegerType, FloatType, DateType, BooleanType, DoubleType
from pyspark.sql.functions import *

# creamos la sesion de spark con 4 cores y 4 gigas de memoria mas nombre de la aplicacion
spark = SparkSession.builder.master("local[4]").appName("carga_de_datos").getOrCreate()

# creamos variable con la ruta del archivos
capaAlmacenamiento = "../datalake/capaAlmacenamiento/combined.csv"
capaProcesamiento = "../datalake/capaProcesamiento/peliculas.parquet"


In [72]:
# creamos la esquema de la df anterior
schemaDf = StructType([
    StructField("adult", BooleanType(), True),
    StructField("genres", StringType(), True),
    StructField("id", IntegerType(), False),
    StructField("imdb_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("overview", StringType(), True),
    StructField("release_date", StringType(), True),
    StructField("runtime", DoubleType(), True),
    StructField("cast", StringType(), True),
    StructField("crew", StringType(), True),
    StructField("keywords", StringType(), True),
    StructField("poster", StringType(), True),
])

# cargamos los datos en un dataframe
df = spark.read.csv(capaAlmacenamiento, header=True, schema=schemaDf, sep=",").persist()

In [73]:
# casteamos la columna de fecha a tipo date
df = df.withColumn("release_date", to_date(df["release_date"], "yyyy-MM-dd"))

In [74]:
df.printSchema()

root
 |-- adult: boolean (nullable = true)
 |-- genres: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- runtime: double (nullable = true)
 |-- cast: string (nullable = true)
 |-- crew: string (nullable = true)
 |-- keywords: string (nullable = true)
 |-- poster: string (nullable = true)



In [75]:
# miramos el total de registros
print("Total de registros: ", df.count())

Total de registros:  15379


In [76]:
# miramos la cantidad de registros duplicados
print("Total de registros duplicados: ", df.count() - df.dropDuplicates().count())

Total de registros duplicados:  2029


In [77]:
# eliminamos los registros duplicados
df = df.dropDuplicates()

# miramos la cantidad de registros duplicados
print("Total de registros duplicados: ", df.count() - df.dropDuplicates().count())

Total de registros duplicados:  0


In [78]:
# miramos el total de registros
print("Total de registros: ", df.count())

# hacemos un show de los datos
df.show(5)

Total de registros:  13350
+-----+--------------------+------+---------+---------+--------------------+------------+-------+--------------------+--------------------+--------------------+--------------------+
|adult|              genres|    id|  imdb_id|    title|            overview|release_date|runtime|                cast|                crew|            keywords|              poster|
+-----+--------------------+------+---------+---------+--------------------+------------+-------+--------------------+--------------------+--------------------+--------------------+
|false|[{'id': 35, 'name...|  9058|tt0110737| Only You|"A childhood inci...|        null|   null| she meets the ch...|          1994-09-17|               115.0|[{'cast_id': 1, '...|
|false|[{'id': 16, 'name...|136619|tt1693039|Pinocchio|Geppetto the carp...|  2012-10-05|   75.0|"[{'cast_id': 3, ...| 'credit_id': '52...|         'gender': 2|         'id': 69785|
|false|[{'id': 18, 'name...|382899|tt5437970|   Mother|A Sloven

In [79]:
# miramos la cantidad de registros nulos por columna
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-----+------+---+-------+-----+--------+------------+-------+----+----+--------+------+
|adult|genres| id|imdb_id|title|overview|release_date|runtime|cast|crew|keywords|poster|
+-----+------+---+-------+-----+--------+------------+-------+----+----+--------+------+
|   20|     1| 20|      9|    4|      93|         643|    653|  28|  32|      34|    34|
+-----+------+---+-------+-----+--------+------------+-------+----+----+--------+------+



In [80]:
# borramos los registros nulos
df = df.na.drop()

# miramos la cantidad de registros nulos por columna
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-----+------+---+-------+-----+--------+------------+-------+----+----+--------+------+
|adult|genres| id|imdb_id|title|overview|release_date|runtime|cast|crew|keywords|poster|
+-----+------+---+-------+-----+--------+------------+-------+----+----+--------+------+
|    0|     0|  0|      0|    0|       0|           0|      0|   0|   0|       0|     0|
+-----+------+---+-------+-----+--------+------------+-------+----+----+--------+------+



In [81]:

# la cantidad TOTAL de registros
print("Total de registros: ", df.count())

Total de registros:  12611


In [82]:
# hacemos un show de los datos
df.show(5)

+-----+--------------------+------+---------+-----------+--------------------+------------+-------+--------------------+--------------------+--------------------+--------------------+
|adult|              genres|    id|  imdb_id|      title|            overview|release_date|runtime|                cast|                crew|            keywords|              poster|
+-----+--------------------+------+---------+-----------+--------------------+------------+-------+--------------------+--------------------+--------------------+--------------------+
|false|[{'id': 16, 'name...|136619|tt1693039|  Pinocchio|Geppetto the carp...|  2012-10-05|   75.0|"[{'cast_id': 3, ...| 'credit_id': '52...|         'gender': 2|         'id': 69785|
|false|[{'id': 18, 'name...|382899|tt5437970|     Mother|A Slovenian mothe...|  2016-01-27|   90.0|[{'cast_id': 2, '...|[{'credit_id': '5...|[{'id': 131, 'nam...|https://m.media-a...|
|false|[{'id': 28, 'name...|184315|tt1267297|   Hercules|Fourteen hundred ...|  

In [83]:
# guardamos el dataframe en formato parquet
df.write.parquet(capaProcesamiento, mode="overwrite")


In [84]:
# liberamos memoria
df.unpersist()

# cerramos la sesion de spark
spark.stop()