# <center><font color='#DE2138'>Carlos Vega González: NetflixAnalysis.ipynb</font></center>
---

## Índice de contenidos

### [1. Fase de carga de datos](#fase-carga)  
[1.1 Importación de librerías](#librerias)  
[1.2 Carga de datos](#datos)  

---
### [2. Fase de limpieza de datos](#fase-limpieza)  
[2.1 Renombrado de columnas](#columnas)  
[2.2 Conteo de nulos](#nulos)  
[2.3 Limpieza de categoría e ID](#categoria-id)  
[2.4 Eliminación de valores inválidos](#valores-invalidos)  
[2.5 Estandarización de duración](#duracion)  
[2.6 Validación de fechas en Netflix](#fechas-netflix)  
[2.7 Validación del año de estreno](#ano-estreno)  
[2.8 Normalización de subcategoría](#subcategoria)  
[2.9 Limpieza de género](#genero)  
[2.10 Conversión de tipos de datos](#conversion)  

---
### [3. Fase de exploración de datos](#fase-exploracion)  
[3.1 Duración promedio por país](#duracion-pais)  
[3.2 Actor más frecuente en musicales](#actor-musicales)  
[3.3 Diferencia semanal entre películas ](#diferencia)  
[3.4 Transformación de género a array](#genero-array)  
[3.5 Conteo por número de países](#conteo-paises)  
[3.6 Exportación a Parquet](#parquet)  

---



---


## **FASE DE CARGA DE DATOS**

---



## <h2 id="carga"><center><font color='#1E90FF'>Limpieza y Análisis de Datos de Netflix</font></center></h2>

En este documento se detalla el proceso completo de limpieza y análisis de un dataset de títulos de Netflix. Para ello se describen paso a paso las transformaciones realizadas, con justificaciones técnicas de las decisiones tomadas. Cada sección incluye una explicación en Markdown previa al código que se entiende como suficiente para este cuaderno. Por esta misma razón en el código no hay casi comentarios, porque la explicación se cubre en las celdas previas

##### Justificación del Uso de CamelCase en Variables y Funciones

A lo largo del desarrollo de este notebook, se ha utilizado la convención **CamelCase** para los nombres de variables y funciones, en lugar de la convención tradicional de Python `snake_case`. Este enfoque no ha sido casual, sino que responde a razones de práctica. Aunque en algunos puntos puede haberse colado el uso de `snake_case`, el objetivo ha sido mantener una coherencia con el estilo de Apache Spark y su ecosistema para practicar todo tipo de sintaxis.

---

## <h2 id="carga"><center><font color='#1E90FF'>Importación de Librerías</font></center></h2><a id="librerias"></a>

Antes de comenzar con el análisis, preparé el entorno importando todas las librerías necesarias y asegurando una correcta configuración. Debido a que utilizaremos PySpark para el procesamiento avanzado de datos, primero hago la comprobación de que esté correctamente instalado, procediendo con su instalación automática en caso de no encontrarse disponible.

Además, importé de manera explícita funciones específicas desde pyspark.sql.functions, evitando la práctica poco recomendada del import *. Para facilitar futuras tareas de extracción automática de información desde páginas web (fechas de estreno desde IMDb), añadí las librerías de scraping requests y BeautifulSoup desde esta etapa inicial del proyecto.

In [1]:
try:
    import pyspark
except ImportError:
    !pip install pyspark

#Todas las librerías para evitar F.
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, regexp_extract, trim, split, when, to_date, length, sum, avg, round, floor,
    expr, lower, explode, max, min, datediff, count, udf
)
from pyspark.sql.types import IntegerType

#Para scraping
import requests
from bs4 import BeautifulSoup

## <h2 id="carga"><center><font color='#1E90FF'>Carga de datos</font></center></h2><a id="datos"></a>

En este paso, inicié una sesión de Spark y cargué los datos del conjunto de Netflix. Debido a que los datos originales estaban divididos en múltiples archivos CSV comprimidos (siete archivos numerados), utilicé un patrón con wildcards (*) para facilitar la lectura simultánea de todos los archivos en una única instrucción con Spark.

Específicamente, utilicé la ruta /tmp/netflix_titles_dirty_*.csv para combinar todos los archivos coincidentes en un único DataFrame. Además, configuré la carga especificando el separador como tabulación (sep="\t") y permitiendo reconocer el esquema automáticamente.

Tras la carga inicial en el DataFrame df, realicé una inspección rápida del esquema y los datos cargados. Este chequeo inicial reveló columnas con nombres genéricos (por ejemplo, _c0, _c1), señalando la necesidad de asignar nombres más significativos para optimizar futuras operaciones.

### ¿Cómo lo resolví?
Durante esta fase del proceso, realicé las siguientes acciones clave para preparar los datos:

1. Creé una sesión de Spark mediante la clase SparkSession.
2. Leí múltiples archivos simultáneamente aprovechando rutas con wildcard (*.csv).
3. Especificé explícitamente el separador de columnas para asegurar una correcta interpretación de los datos.
4. Activé la inspección inicial del DataFrame para identificar columnas con nombres poco descriptivos.

### Características del código
- **Uso eficiente de rutas wildcard:** Aproveché patrones para combinar múltiples archivos fácilmente.
- **Verificación preliminar del esquema`** Utilicé inspecciones rápidas para identificar mejoras inmediatas en la estructura.


In [2]:
# Intenté hacerlo a varias líneas pero había que hacer el bucle aplanado
# !for i in {01..07}; do
#     wget -O "/tmp/netflix_titles_dirty_$i.csv.gz" "https://github.com/datacamp/data-cleaning-with-pyspark-live-training/blob/master/data/netflix_titles_dirty_$i.csv.gz?raw=True"
# done


In [3]:
!for i in {01..07}; do wget -O "/tmp/netflix_titles_dirty_$i.csv.gz" "https://github.com/datacamp/data-cleaning-with-pyspark-live-training/blob/master/data/netflix_titles_dirty_$i.csv.gz?raw=True"; done

!for file in /tmp/netflix_titles_dirty_*.csv.gz; do gunzip -c "$file" > "${file%.gz}"; done

!ls -lh /tmp/netflix_titles_dirty_* # Added ! to execute as shell command

--2025-03-16 17:25:18--  https://github.com/datacamp/data-cleaning-with-pyspark-live-training/blob/master/data/netflix_titles_dirty_01.csv.gz?raw=True
Resolving github.com (github.com)... 140.82.112.4
Connecting to github.com (github.com)|140.82.112.4|:443... connected.
HTTP request sent, awaiting response... No data received.
Retrying.

--2025-03-16 17:25:20--  (try: 2)  https://github.com/datacamp/data-cleaning-with-pyspark-live-training/blob/master/data/netflix_titles_dirty_01.csv.gz?raw=True
Connecting to github.com (github.com)|140.82.112.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://github.com/datacamp/data-cleaning-with-pyspark-live-training/raw/refs/heads/master/data/netflix_titles_dirty_01.csv.gz [following]
--2025-03-16 17:25:20--  https://github.com/datacamp/data-cleaning-with-pyspark-live-training/raw/refs/heads/master/data/netflix_titles_dirty_01.csv.gz
Reusing existing connection to github.com:443.
HTTP request sent, awaiting res

In [4]:
spark = SparkSession.builder.appName("netflixAnalysis").getOrCreate()

file_path = "/tmp/netflix_titles_dirty_*.csv"
df = spark.read.csv(file_path, sep="\t", inferSchema=True)

print("### DataFrame inicial ###")
print(f"Filas totales (antes de cualquier limpieza): {df.count()}")
df.show(3)
df.printSchema()

### DataFrame inicial ###
Filas totales (antes de cualquier limpieza): 6238
+--------+-----+--------------------+-----------+-------------+--------------+-------------+----+-----+------+---------------+--------------------+
|     _c0|  _c1|                 _c2|        _c3|          _c4|           _c5|          _c6| _c7|  _c8|   _c9|           _c10|                _c11|
+--------+-----+--------------------+-----------+-------------+--------------+-------------+----+-----+------+---------------+--------------------+
|80044126|Movie| D.L. Hughley: Clear|Jay Chapman| D.L. Hughley| United States|July 13, 2017|2014|TV-MA|59 min|Stand-Up Comedy|In this 2014 stan...|
|80148179|Movie|My Scientology Movie| John Dower|Louis Theroux|United Kingdom|July 13, 2017|2015|TV-MA|99 min|  Documentaries|After speaking wi...|
|70301023|Movie|Tom Segura: Compl...|Jay Chapman|   Tom Segura| United States|July 13, 2017|2014|TV-MA|74 min|Stand-Up Comedy|Levelheaded stand...|
+--------+-----+--------------------



---


## **FASE DE LIMPIEZA DE DATOS**

---



## <center><font color='#1E90FF'>Renombrado de columnas  <a id="columnas"></a></font></center>

El siguiente paso consistió en **renombrar las columnas del DataFrame**, que originalmente presentaban nombres genéricos (ej. `_c0`, `_c1`, `_c2`), por nombres más descriptivos que facilitan enormemente la comprensión del análisis posterior.

Para ello, creé un diccionario donde almacené los pares clave-valor con el formato original y el nuevo nombre deseado:

- `_c0` → **id**: Identificador único del título.
- `_c1` → **categoria**: Tipo de contenido ("Movie" o "TV Show").
- `_c2` → **titulo**: Título del contenido.
- `_c3` → **director**: Nombre del director o elenco de directores/as.
- `_c4` → **reparto**: Actores principales del contenido.
- `_c5` → **pais**: País de origen del contenido.
- `_c6` → **estrenoNetflix**: Fecha de estreno en Netflix.
- `_c7` → **anioEstrenoReal**: Año original de lanzamiento del contenido.
- `_c8` → **subcategoria**: Clasificación de edad o subcategoría adicional.
- `_c9` → **duracion**: Duración en minutos o número de temporadas.
- `_c10` → **genero**: Género del contenido.
- `_c11` → **descripcion**: Descripción o sinopsis del contenido.

### ¿Cómo lo resolví?

Me enfoqué en estos puntos clave para clarificar el contenido del DataFrame:

- **Definí claramente un diccionario** con los nuevos nombres de columnas.
- Utilicé la función `withColumnRenamed()` de Spark para actualizar los nombres fácilmente.
- Realicé una comprobación rápida con `.show()` para asegurar que los cambios fueran efectivos y las columnas reflejaran correctamente la información.

In [5]:
df = df.withColumnRenamed("_c0", "id")\
       .withColumnRenamed("_c1", "categoria")\
       .withColumnRenamed("_c2", "titulo")\
       .withColumnRenamed("_c3", "director")\
       .withColumnRenamed("_c4", "reparto")\
       .withColumnRenamed("_c5", "pais")\
       .withColumnRenamed("_c6", "estrenoNetflix")\
       .withColumnRenamed("_c7", "anioEstrenoReal")\
       .withColumnRenamed("_c8", "subcategoria")\
       .withColumnRenamed("_c9", "duracion")\
       .withColumnRenamed("_c10", "genero")\
       .withColumnRenamed("_c11", "descripcion")

df.show(3)

+--------+---------+--------------------+-----------+-------------+--------------+--------------+---------------+------------+--------+---------------+--------------------+
|      id|categoria|              titulo|   director|      reparto|          pais|estrenoNetflix|anioEstrenoReal|subcategoria|duracion|         genero|         descripcion|
+--------+---------+--------------------+-----------+-------------+--------------+--------------+---------------+------------+--------+---------------+--------------------+
|80044126|    Movie| D.L. Hughley: Clear|Jay Chapman| D.L. Hughley| United States| July 13, 2017|           2014|       TV-MA|  59 min|Stand-Up Comedy|In this 2014 stan...|
|80148179|    Movie|My Scientology Movie| John Dower|Louis Theroux|United Kingdom| July 13, 2017|           2015|       TV-MA|  99 min|  Documentaries|After speaking wi...|
|70301023|    Movie|Tom Segura: Compl...|Jay Chapman|   Tom Segura| United States| July 13, 2017|           2014|       TV-MA|  74 min|

Tras la primera revisión manual de los datos, lo primero que se asume es que puede haber registros en los que director y reparto hayan sido intercambiados accidentalmente, pero es imposible diferenciarlos de manera automática debido a la falta de un patrón claro para detectar estos errores (en ambos hay nomnbres de una o varias personas)

## <center><font color='#1E90FF'>Conteo de nulos <a id="nulos"></a></font></center>

A continuación, realicé una evaluación preliminar del conjunto de datos original, enfocándome especialmente en identificar la cantidad de valores nulos presentes en cada columna. Este análisis inicial resulta crucial para determinar la magnitud del problema de datos incompletos.

Para llevar a cabo esta tarea, implementé la función personalizada `contar_nulos()`, basada en funciones nativas de PySpark. Esta función permitió obtener rápidamente un resumen numérico sobre los datos faltantes, facilitando decisiones posteriores en la limpieza.

### ¿Cómo lo resolví?

En esta fase, mi enfoque fue obtener un panorama claro sobre la calidad inicial de los datos, especialmente respecto a valores nulos. Los pasos seguidos fueron:

- Creé la función `contar_nulos()` para identificar rápidamente columnas problemáticas.
- Ejecuté esta función sobre el DataFrame original antes de cualquier modificación.
- Analicé la salida para identificar prioridades en los siguientes pasos del proceso de limpieza.


In [6]:
def contar_nulos(df):
    nulos_df = df.select([(sum(col(c).isNull().cast("int")).alias(c)) for c in df.columns])

    resultado = nulos_df.toPandas().transpose().reset_index().rename(columns={'index': 'columna', 0: 'nulos'})
    return resultado

resultado = contar_nulos(df)
print(resultado)


            columna  nulos
0                id      2
1         categoria     59
2            titulo     64
3          director   2012
4           reparto    629
5              pais    537
6    estrenoNetflix     80
7   anioEstrenoReal     74
8      subcategoria     84
9          duracion     76
10           genero     83
11      descripcion     86


## <center><font color='#1E90FF'>Limpieza de categoría e ID <a id="categoria-id"></a></font></center>

El siguiente paso consistió en limpiar y estandarizar las columnas `categoria` e `id`, cruciales para identificar los registros de forma única y clasificar el contenido adecuadamente. Para lograrlo, ejecuté estos pasos:

- **Filtrado de categorías:** Conservé únicamente las filas con categorías válidas (`Movie`, `TV Show`), eliminando cualquier categoría inesperada.
- **Conversión de ID:** Transformé los identificadores (`id`) al tipo numérico entero (`IntegerType`). Esto marcó como nulos los IDs no numéricos.
- **Eliminación de IDs inválidos:** Descarté registros cuyo ID convertido fuese nulo o no cumpliera exactamente con la longitud requerida de 8 dígitos (asumiendo que los IDs de Netflix siguen este estándar).
- **Eliminación de duplicados:** Finalmente, removí registros duplicados basados en la columna `id` para asegurar la unicidad.

Encapsulé todo el procedimiento en la función `limpiarCategoriaId()`, que posteriormente apliqué sobre el DataFrame original `df`, obteniendo así un nuevo DataFrame limpio denominado `dfLimpio`. Este nuevo conjunto de datos contiene exclusivamente películas y series correctamente identificadas por IDs únicos y válidos.

---


In [7]:
def limpiarCategoriaId(df):
    categoriasValidas = ["Movie", "TV Show"]
    dfLimpio = df.filter(df.categoria.isin(categoriasValidas))
    dfLimpio = dfLimpio.withColumn("id", dfLimpio.id.cast(IntegerType()))
    dfLimpio = dfLimpio.filter(dfLimpio.id.isNotNull())
    dfLimpio = dfLimpio.filter(length(dfLimpio.id.cast("string")) == 8)
    dfLimpio = dfLimpio.dropDuplicates(["id"])
    return dfLimpio

dfLimpio = limpiarCategoriaId(df)

## <center><font color='#1E90FF'>Eliminación de valores inválidos <a id="valores-invalidos"></a></font></center>

El siguiente paso consistió en identificar y eliminar valores inválidos en columnas de texto esenciales (**titulo**, **director**, **reparto**, **descripcion** y **pais**), ya que se observó que contenían información incorrecta como URLs, correos electrónicos, números aislados o fechas. Para lograr esto, definí patrones mediante expresiones regulares (regex) que detectan específicamente:

- **URLs**: cadenas que comienzan con `http://` o `https://`.
- **Correos electrónicos**: formatos habituales que contienen un símbolo `@` seguido de un dominio.
- **Números puros**: cadenas compuestas exclusivamente por dígitos, sin caracteres alfabéticos.
- **Fechas textuales**: fechas escritas en inglés como `"January 31, 2019"`.

La solución consistió en construir filtros booleanos basados en estas expresiones regulares para detectar filas inválidas, y posteriormente eliminar dichas filas del DataFrame. Repetí este proceso para cada columna relevante, asegurando así que todas las filas conservadas contuvieran información coherente y válida para el análisis.

In [8]:
columnas = ["director", "reparto", "descripcion", "pais"]
patrones = [
    r"(?i)^https?://",
    r"(?i)^[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}$",
    r"^[0-9]+$",
    r"^(January|February|March|April|May|June|July|August|September|October|November|December)\\s+\\d{1,2},\\s+\\d{4}$"
]
condiciones = None
for columna in columnas:
    for patron in patrones:
        nueva_condicion = col(columna).rlike(patron)
        if condiciones is None:
            condiciones = nueva_condicion
        else:
            condiciones = condiciones | nueva_condicion

dfLimpio = dfLimpio.filter(~condiciones)


El campo titulo se analiza de manera diferente porque, a diferencia de los otros, puede contener números de forma válida. En algunas producciones, los títulos incluyen caracteres numéricos como parte del nombre oficial, por ejemplo, "300",

In [9]:
columnas = ["titulo"]
patrones = [
    r"(?i)^https?://",  # URL
    r"(?i)^[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}$",  # correo
    r"^(January|February|March|April|May|June|July|August|September|October|November|December)\\s+\\d{1,2},\\s+\\d{4}$" # Fechas en inglés
]
condiciones = None
for columna in columnas:
    for patron in patrones:
        nueva_condicion = col(columna).rlike(patron)
        if condiciones is None:
            condiciones = nueva_condicion
        else:
            condiciones = condiciones | nueva_condicion

dfLimpio = dfLimpio.filter(~condiciones)

## <center><font color='#1E90FF'>Estandarización de duración <a id="duracion"></a></font></center>

En este punto, procedí a estandarizar la columna `duracion`, separando claramente las duraciones de películas (en minutos) y el número de temporadas de las series en columnas independientes, facilitando así futuras operaciones numéricas:

- **`duracionMin`**: Almacena duración en minutos (tipo entero) para películas.
- **`numTemporadas`**: Almacena número de temporadas (tipo entero) para series.

Para lograr esto, definí expresiones regulares que extraen exclusivamente números seguidos por «min» o «Season(s)», asignando estos valores en las nuevas columnas mediante las funciones `when()`, `rlike()` y `regexp_extract()`. Finalmente, eliminé filas con valores inesperados en la columna original.


In [10]:
regexMin = r"(\d+)\s+min$"
regexSeasons = r"(\d+)\s+Seasons?$"

dfLimpio = dfLimpio.withColumn(
    "duracionMin",
    when(dfLimpio.duracion.rlike(regexMin), regexp_extract(dfLimpio.duracion, regexMin, 1).cast("int"))
).withColumn(
    "numTemporadas",
    when(dfLimpio.duracion.rlike(regexSeasons), regexp_extract(dfLimpio.duracion, regexSeasons, 1).cast("int"))
)

# Ahora filtrar
dfLimpio = dfLimpio.filter((dfLimpio.duracionMin.isNotNull()) | (dfLimpio.numTemporadas.isNotNull()))

## <center><font color='#1E90FF'>Validación de fechas en Netflix <a id="fechas-netflix"></a></font></center>

En esta fase, me centré en limpiar y validar la columna `estrenoNetflix`, la cual representa las fechas en que los títulos fueron estrenados en Netflix. Debido a que los datos originales podían contener formatos variados o inconsistentes, realicé lo siguiente:

- **Estandarización de formato:** Utilicé expresiones como `"January 1, 2020"` para convertir textos a fechas reconocibles por Spark, eliminando espacios innecesarios y errores comunes.
- **Conversión a tipo fecha:** Transformé los textos válidos al formato de fecha reconocido formalmente por Spark mediante `to_date()` con el formato `"MMMM d, yyyy"`.
- **Filtrado:** Conservé únicamente aquellas filas cuya conversión resultó exitosa, descartando registros con fechas inválidas o inconsistentes.

Con esto garantizo que todas las fechas almacenadas en `estrenoNetflix` sean válidas y uniformes, facilitando significativamente las operaciones analíticas posteriores.


---

### Características del código

- **Uso de `trim()` y `to_date()`:** Garantizan uniformidad y reconocimiento efectivo de las fechas.
- **Filtro basado en conversión:** Permite mantener únicamente fechas válidas y consistentes para análisis temporales.


In [None]:
dfLimpio = dfLimpio.withColumn("estrenoNetflix", trim(dfLimpio.estrenoNetflix))
dfLimpio = dfLimpio.filter(to_date(dfLimpio.estrenoNetflix, "MMMM d, yyyy").isNotNull())

## <center><font color='#1E90FF'>Validación del año de estreno <a id="ano-estreno"></a></font></center>

El siguiente paso fue validar la columna **anioEstrenoReal**, que registra el año original de estreno (cine o TV). Detecté valores incorrectos como texto, años imposibles o nulos. Para garantizar coherencia y calidad, realicé lo siguiente:

- **Conversión a numérico**: Transformé la columna a tipo entero, marcando automáticamente como nulos los valores no numéricos o inconsistentes.
- **Filtrado de nulos**: Eliminé filas cuyo año resultó nulo tras la conversión (años inválidos o texto).

Este proceso eliminó errores típicos como textos en campos numéricos o años extremos improbables, dejando un conjunto de datos con años válidos, numéricos y consistentes.

In [12]:
dfLimpio = dfLimpio.withColumn("anioEstrenoReal", dfLimpio.anioEstrenoReal.cast("int"))
dfLimpio = dfLimpio.filter(dfLimpio.anioEstrenoReal.isNotNull())

## <center><font color='#1E90FF'>Normalización de subcategoría <a id="subcategoria"></a></font></center>

El siguiente paso fue limpiar y normalizar la columna `subcategoria`, ya que contenía datos mixtos: clasificaciones por edad (por ejemplo, `"PG-13"`), o valores redundantes de duración en minutos o temporadas. Para solucionar esto:

- Definí una lista con clasificaciones de audiencia válidas (`"PG"`, `"TV-MA"`, `"R"`, etc.).
- Apliqué la función `trim()` para eliminar espacios extra.
- Usé sentencias `when()` encadenadas para clasificar los valores:
  - Si el valor era una clasificación válida, lo mantuve tal cual.
  - Si coincidía con el patrón de minutos (`regexMin`), extraje el valor numérico.
  - Si coincidía con temporadas (`regexSeasons`), extraje igualmente el número.
  - Si no cumplía ninguno de estos casos, asigné valor nulo.
- Finalmente, filtré para conservar únicamente filas con valores válidos tras esta normalización.

**Con esta estrategia, ahora la columna `subcategoria` contiene solo clasificaciones claras o valores numéricos, eliminando casos confusos o irrelevantes.**

---


In [None]:
categorias_validas = ["G", "PG", "PG-13", "R", "NC-17", "NR", "UR","TV-G", "TV-PG", "TV-Y", "TV-Y7", "TV-Y7-FV", "TV-14", "TV-MA"]
dfLimpio = dfLimpio.withColumn("subcategoria", trim(col("subcategoria")))

dfLimpio = dfLimpio.withColumn(
    "subcategoria",
    when(col("subcategoria").isin(categorias_validas), col("subcategoria"))
    .when(col("subcategoria").rlike(regexMin), regexp_extract(col("subcategoria"), regexMin, 1).cast("int"))
    .when(col("subcategoria").rlike(regexSeasons), regexp_extract(col("subcategoria"), regexSeasons, 1).cast("int"))
    .otherwise(None)
)

dfLimpio = dfLimpio.filter(col("subcategoria").isNotNull())

## <center><font color='#1E90FF'>Limpieza de género <a id="genero"></a></font></center>

La siguiente etapa consistió en limpiar la columna `género`, que incluye listas de géneros separadas por comas (ej.: `"Comedy, Drama"`). Para mantener la calidad del análisis y asegurar la consistencia, realicé los siguientes pasos:

- **Eliminación de valores vacíos o nulos:** Removí los registros sin información en la columna género, asumiendo que cada título debe contar con al menos un género válido.

- **Normalización del formato:** Aunque idealmente validaría los géneros frente a una lista predefinida (ej.: `"Comedy"`, `"Drama"`, `"Action"`), aquí asumí que los géneros restantes eran válidos y me enfoqué en asegurar su formato correcto.

Tras este proceso, la columna `género` quedó limpia, completa y preparada para transformaciones adicionales en fases posteriores.

---


In [None]:
dfLimpio = dfLimpio.filter(dfLimpio.genero.isNotNull())

### Resumen de transformaciones

Finalmente, se muestra un ejemplo de cómo comparar el DataFrame original versus el DataFrame limpio, revisando la cantidad de filas y el schema para verificar el resultado global de la limpieza.


In [15]:
resultado = contar_nulos(dfLimpio)
print(resultado)

print("----- ANTES DE LA LIMPIEZA -----")
print(f"Filas totales: {df.count()}")
print("\n----- DESPUÉS DE LA LIMPIEZA -----")
print(f"Filas totales: {dfLimpio.count()}")
dfLimpio.show(5)


            columna  nulos
0                id      0
1         categoria      0
2            titulo      0
3          director      0
4           reparto      0
5              pais      0
6    estrenoNetflix      0
7   anioEstrenoReal      0
8      subcategoria      0
9          duracion      0
10           genero      0
11      descripcion      0
12      duracionMin     95
13    numTemporadas   3532
----- ANTES DE LA LIMPIEZA -----
Filas totales: 6238

----- DESPUÉS DE LA LIMPIEZA -----
Filas totales: 3627
+--------+---------+--------------------+----------------+--------------------+--------------+-----------------+---------------+------------+--------+--------------------+--------------------+-----------+-------------+
|      id|categoria|              titulo|        director|             reparto|          pais|   estrenoNetflix|anioEstrenoReal|subcategoria|duracion|              genero|         descripcion|duracionMin|numTemporadas|
+--------+---------+--------------------+-------

## <center><font color='#1E90FF'>Conversión de tipos de datos <a id="conversion"></a></font></center>

El siguiente paso fue realizar una última conversión de tipos en el DataFrame (`dfLimpio`), garantizando que todas las columnas tuviesen el formato correcto antes de iniciar el análisis exploratorio. Para ello:

- **`duracion`**: Convertí esta columna a tipo entero, extrayendo únicamente el valor numérico inicial (minutos o temporadas), aunque ya disponíamos de columnas específicas (`duracionMin`, `numTemporadas`).

- **`anioEstrenoReal`**: Confirmé que se mantuviera como tipo entero tras la limpieza previa.

- **`estrenoNetflix`**: Transformé esta columna a formato fecha (`DateType`) utilizando el patrón `"MMMM d, yyyy"`, esencial para futuros análisis temporales.

Finalmente, comprobé que todas las conversiones se aplicaron correctamente revisando el esquema actualizado del DataFrame.


In [16]:
dfLimpio = dfLimpio.withColumn("duracion", split(dfLimpio.duracion, " ")[0].cast("int")) \
       .withColumn("anioEstrenoReal", dfLimpio.anioEstrenoReal.cast("int")) \
       .withColumn("estrenoNetflix", to_date(dfLimpio.estrenoNetflix, "MMMM d, yyyy"))

for columna in dfLimpio.schema.fields:
    print(f"{columna.name}: {columna.dataType}")

id: IntegerType()
categoria: StringType()
titulo: StringType()
director: StringType()
reparto: StringType()
pais: StringType()
estrenoNetflix: DateType()
anioEstrenoReal: IntegerType()
subcategoria: StringType()
duracion: IntegerType()
genero: StringType()
descripcion: StringType()
duracionMin: IntegerType()
numTemporadas: IntegerType()




---


## **FASE DE EXPLORACIÓN DE DATOS**

---



## <center><font color='#1E90FF'>Duración promedio por país <a id="duracion-pais"></a></font></center>

Una consulta clave del análisis fue determinar la duración promedio de las películas según el país productor. Sin embargo, el campo `pais` incluye casos de coproducciones (varios países separados por coma), lo que nos lleva a dos enfoques posibles:

**1. Promedio por país combinado:**
- Cada combinación de países cuenta como un único grupo (ej.: `"Estados Unidos, India"` sería un grupo independiente).

**2. Promedio por país individual:**
- Cada país involucrado se considera por separado, distribuyendo la duración a cada país participante.

Para abordar ambos casos realicé lo siguiente:

### **Cálculo por país combinado**
- Utilicé directamente la columna `pais` original, promediando la duración en minutos (`duracionMin`) solo para películas (descartando series, que no tienen minutos asociados).

### **Cálculo por país individual**
- Dividí la columna de países mediante `split()` generando múltiples filas con `explode()`, asignando así una fila por país individual.
- Sobre este DataFrame ampliado, recalculé la duración media en minutos.
- Presenté estos resultados en minutos y segundos, mejorando su claridad visual.

Finalmente, comparé ambos enfoques destacando las diferencias entre medir por país combinado o individual, demostrando cómo influyen las coproducciones en métricas clave como la duración promedio.

---

In [17]:
mediaDuracion = dfLimpio.groupBy("pais").agg(round(avg("duracionMin"),2).alias("mediaDuracion"))
mediaDuracion.show(10, truncate = False)

+--------------------------------------+-------------+
|pais                                  |mediaDuracion|
+--------------------------------------+-------------+
|South Africa, United States, Germany  |106.0        |
|Chile, United States, France          |96.0         |
|Hong Kong, China, Singapore           |113.0        |
|Denmark, France, United States, Sweden|90.0         |
|Brazil, France, Germany               |130.0        |
|United States, Ireland                |100.0        |
|Australia, United Arab Emirates       |95.67        |
|United States, Germany, Australia     |97.0         |
|France, Iran, United States           |122.0        |
|United Kingdom, India, United States  |121.0        |
+--------------------------------------+-------------+
only showing top 10 rows



In [18]:
mediaDuracion = mediaDuracion.withColumn("minutos", floor(col("mediaDuracion"))) \
                             .withColumn("segundos", round((col("mediaDuracion") - col("minutos")) * 60, 0).cast("int")) \
                             .withColumn("duracionFormateada", expr("concat(minutos, ' min ', segundos, ' sec')")) \
                             .drop("mediaDuracion", "minutos", "segundos")

mediaDuracion.show(10, truncate=False)


+--------------------------------------+------------------+
|pais                                  |duracionFormateada|
+--------------------------------------+------------------+
|South Africa, United States, Germany  |106 min 0 sec     |
|Chile, United States, France          |96 min 0 sec      |
|Hong Kong, China, Singapore           |113 min 0 sec     |
|Denmark, France, United States, Sweden|90 min 0 sec      |
|Brazil, France, Germany               |130 min 0 sec     |
|United States, Ireland                |100 min 0 sec     |
|Australia, United Arab Emirates       |95 min 40 sec     |
|United States, Germany, Australia     |97 min 0 sec      |
|France, Iran, United States           |122 min 0 sec     |
|United Kingdom, India, United States  |121 min 0 sec     |
+--------------------------------------+------------------+
only showing top 10 rows



In [19]:
df_paisIndividual = dfLimpio.withColumn("paisIndividual", explode(split(dfLimpio.pais, ", ")))
df_paisIndividual.select("paisIndividual").show(10)

+--------------+
|paisIndividual|
+--------------+
|     Hong Kong|
| United States|
|United Kingdom|
|         India|
|         India|
|         India|
| United States|
| United States|
|         India|
|         India|
+--------------+
only showing top 10 rows



In [20]:
mediaDuracionSiParticipa = df_paisIndividual.groupBy("paisIndividual").agg(round(avg("duracion"),2).alias("mediaDuracionSiParticipa"))
mediaDuracionSiParticipa.orderBy("paisIndividual").show(3)

+--------------+------------------------+
|paisIndividual|mediaDuracionSiParticipa|
+--------------+------------------------+
|   Afghanistan|                    84.0|
|       Albania|                   105.0|
|     Argentina|                    91.0|
+--------------+------------------------+
only showing top 3 rows



In [21]:
mediaDuracionSiParticipa = mediaDuracionSiParticipa.withColumn("minutos", floor(col("mediaDuracionSiParticipa"))) \
                             .withColumn("segundos", round((col("mediaDuracionSiParticipa") - col("minutos")) * 60, 0).cast("int")) \
                             .withColumn("duracionFormateada", expr("concat(minutos, ' min ', segundos, ' sec')")) \
                             .drop("mediaDuracionSiParticipa", "minutos", "segundos")

mediaDuracionSiParticipa.show(10, truncate=False)


+--------------+------------------+
|paisIndividual|duracionFormateada|
+--------------+------------------+
|Russia        |101 min 0 sec     |
|Senegal       |106 min 0 sec     |
|Sweden        |99 min 32 sec     |
|Philippines   |115 min 47 sec    |
|Singapore     |100 min 7 sec     |
|Malaysia      |101 min 22 sec    |
|Turkey        |99 min 56 sec     |
|Malawi        |114 min 0 sec     |
|Iraq          |76 min 0 sec      |
|Germany       |102 min 42 sec    |
+--------------+------------------+
only showing top 10 rows



In [22]:
# Aquí se comprueba que había diferencia
paises = ["United States", "France", "India", "Spain"]

# Filtrar los DataFrames para incluir solo los países seleccionados
mediaDuracionFiltrada = mediaDuracion.filter(mediaDuracion.pais.isin(paises))
mediaDuracionSiParticipaFiltrada = mediaDuracionSiParticipa.filter(mediaDuracionSiParticipa.paisIndividual.isin(paises))

# Hacer un join entre ambos DataFrames en la columna de país
comparacionDuracion = mediaDuracionFiltrada.alias("general").join(
    mediaDuracionSiParticipaFiltrada.alias("participa"),
    mediaDuracionFiltrada.pais == mediaDuracionSiParticipaFiltrada.paisIndividual,
    "inner"
).select(
    mediaDuracionFiltrada.pais.alias("Pais"),
    mediaDuracionFiltrada.duracionFormateada.alias("Media General"),
    mediaDuracionSiParticipaFiltrada.duracionFormateada.alias("Media Si Participa")
)

comparacionDuracion.show(truncate=False)

+-------------+--------------+------------------+
|Pais         |Media General |Media Si Participa|
+-------------+--------------+------------------+
|France       |94 min 48 sec |99 min 16 sec     |
|United States|89 min 50 sec |91 min 8 sec      |
|India        |128 min 51 sec|126 min 52 sec    |
|Spain        |101 min 38 sec|99 min 11 sec     |
+-------------+--------------+------------------+



## <center><font color='#1E90FF'>Actor más frecuente en musicales <a id="actor-musicales"></a></font></center>

Otra consulta clave consistió en identificar al actor más frecuente en películas relacionadas con la temática *"music"* cuya duración supera los 90 minutos. El procedimiento que realicé para resolver este análisis fue:

1. **Filtrado del DataFrame**: Seleccioné únicamente películas (*categoria = "Movie"*) con la palabra *"music"* en su descripción (previamente convertida a minúsculas) y duración superior a 90 minutos.
2. **Separación del reparto**: Dividí el campo reparto usando `split()` y `explode()`, generando filas individuales por cada actor.
3. **Conteo de apariciones**: Agrupé los resultados por actor, contando cuántas veces aparecía cada uno dentro del conjunto filtrado.
4. **Ordenación y selección**: Ordené el resultado por frecuencia descendente, identificando al actor con más apariciones.

El actor más frecuente bajo estas condiciones fue **Note Chern-Yim**, quien participó en *2 películas* del subconjunto analizado, aunque cabe señalar que existieron otros actores con la misma cantidad de apariciones.

In [23]:
dfLimpio = dfLimpio.filter((dfLimpio.categoria=="Movie") & (lower(dfLimpio.descripcion).contains("music")) & (dfLimpio.duracion > 90))

masApariciones = dfLimpio.withColumn("actorIndividual", explode(split(dfLimpio.reparto, ", ")))
masApariciones = masApariciones.groupBy("actorIndividual").agg(count("actorIndividual").alias("numeroActuaciones"))
masApariciones.orderBy("numeroActuaciones", ascending=False).show(30)

actorMasApariciones = masApariciones.orderBy("numeroActuaciones", ascending=False).collect()[0][0]

print(f"Hay muchos actores con 2 actuaciones, pero en este caso nos vamos a quedar con el primero que sale que es {actorMasApariciones}")

+-------------------+-----------------+
|    actorIndividual|numeroActuaciones|
+-------------------+-----------------+
|     Note Chern-Yim|                2|
|      Divyadarshini|                2|
|            Vineeth|                2|
|        Divya Dutta|                2|
|          Kumaravel|                2|
|    Michelle Buteau|                2|
|       Ravi Prakash|                2|
|      Nedumudi Venu|                2|
|  Aparna Balamurali|                2|
|             Sumesh|                2|
| G.V. Prakash Kumar|                2|
|       Quincy Jones|                2|
|       Wayne Kramer|                1|
|    Graham Phillips|                1|
|        Maris Racal|                1|
|          Ione Skye|                1|
|         Dana Fuchs|                1|
|  Yoakyake Chernyim|                1|
|        Jett Pangan|                1|
|        Vivian Bang|                1|
|Apisit Opasaimlikit|                1|
|       Chris D'Elia|                1|


## <center><font color='#1E90FF'>Diferencia semanal entre películas <a id="diferencia"></a></font></center>

En esta fase, analicé la diferencia semanal entre películas con temática "music" de duración mayor a 90 minutos, protagonizadas por el actor más frecuente. El objetivo fue determinar el intervalo temporal entre su primera y última aparición en esta categoría específica. Para ello:

- Tomé como referencia el DataFrame filtrado (`df_music`) obtenido anteriormente, que ya cumplía los requisitos definidos.
- Filtré específicamente los títulos protagonizados por el actor más frecuente identificado.
- Dado que solo disponía de fechas de estreno en Netflix y años originales, decidí obtener fechas de estreno reales desde IMDb usando web scraping (`requests` y `BeautifulSoup`).
- Obtuve las fechas exactas para cada película consultando IMDb (por ejemplo: *"Holy Man 2"*: 24 de diciembre de 2008, *"Holy Man 3"*: 5 de abril de 2010, ilustrativamente).
- Asocié estas fechas al DataFrame filtrado mediante un `join`.
- Convertí finalmente estas fechas al tipo `DateType` de Spark para realizar cálculos de intervalo con precisión.

Con estos pasos, pude determinar con exactitud la diferencia en semanas entre las fechas extremas de aparición del actor en películas relacionadas con música.

### ¿Cómo lo resolví?
Obtuve las fechas reales de estreno mediante *scraping* con `BeautifulSoup` desde IMDb y las asocié al DataFrame, calculando luego la diferencia temporal entre estas fechas reales para cuantificar la dispersión en semanas.

### Características del código
- **Web scraping con `requests` y `BeautifulSoup`**: Permite incorporar datos externos confiables (IMDb).
- **Uso de `join` y `to_date()`**: Facilitan un cálculo preciso y claro del intervalo temporal entre producciones.


In [24]:
holy = dfLimpio.filter(dfLimpio.reparto.contains(actorMasApariciones))

In [25]:
# Diccionario con las películas y sus URLs de IMDb
urls = {
    'Holy Man 2': 'https://www.imdb.com/title/tt3051874/releaseinfo/',
    'Holy Man 3': 'https://www.imdb.com/title/tt1720154/releaseinfo/'
}

# Función para extraer la fecha de estreno desde IMDb
def obtener_fecha_estreno(url):
    response = requests.get(url, headers={"User-Agent": "Mozilla/5.0"})
    if response.status_code == 200:
        soup = BeautifulSoup(response.text, 'html.parser')
        # (Basado en la estructura actual de IMDb)
        fecha_span = soup.find_all("span", class_="ipc-metadata-list-item__list-content-item")

        if fecha_span:
            release_date = fecha_span[0].text.strip()
            return release_date
        else:
            return None
    else:
        print(f"Error al acceder a la página {url}. Código: {response.status_code}")
        return None

# Crear una lista de tuplas (título, fecha_estreno)
data = []
for titulo, url in urls.items():
    fecha_estreno = obtener_fecha_estreno(url)
    if fecha_estreno:
        data.append((titulo, fecha_estreno))


dfFechasReales = spark.createDataFrame(data, ["titulo", "fechaEstrenoReal"])

# Mostrar el DataFrame
dfFechasReales.show()

+----------+----------------+
|    titulo|fechaEstrenoReal|
+----------+----------------+
|Holy Man 2| October 2, 2008|
|Holy Man 3| August 12, 2010|
+----------+----------------+



In [26]:
dfJoin = holy.join(dfFechasReales, on='titulo', how='left')
dfJoin.show()

dfJoin = dfJoin.withColumn(
    "fechaEstrenoReal",
    to_date(dfJoin.fechaEstrenoReal, "MMMM d, yyyy")
)

dfDiff = dfJoin.select(
    datediff(
        max("fechaEstrenoReal"),
        min("fechaEstrenoReal")
    ).alias("diferenciaDias")
)

dfDiff.show()

print(f"Entre la primera película de {actorMasApariciones} según el filtro aplicado, es decir entre {dfFechasReales.collect()[0][0]} y {dfFechasReales.collect()[1][0]} han pasado {dfDiff.collect()[0][0]/7} semanas")

+----------+--------+---------+--------------+--------------------+--------+--------------+---------------+------------+--------+--------------------+--------------------+-----------+-------------+----------------+
|    titulo|      id|categoria|      director|             reparto|    pais|estrenoNetflix|anioEstrenoReal|subcategoria|duracion|              genero|         descripcion|duracionMin|numTemporadas|fechaEstrenoReal|
+----------+--------+---------+--------------+--------------------+--------+--------------+---------------+------------+--------+--------------------+--------------------+-----------+-------------+----------------+
|Holy Man 2|80999990|    Movie|Note Chern-Yim|Apisit Opasaimlik...|Thailand|    2018-07-30|           2008|       TV-14|      92|Comedies, Faith &...|Former rap musici...|         92|         NULL| October 2, 2008|
|Holy Man 3|80999991|    Movie|Note Chern-Yim|Krissada Sukosol,...|Thailand|    2018-07-30|           2010|       TV-14|     102|Comedies, F

## <center><font color='#1E90FF'>Transformación de género a array <a id="genero-array"></a></font></center>

El siguiente paso fue transformar la columna `genero`, originalmente en texto plano, en una estructura tipo lista (array) que facilitase futuras consultas y filtrados específicos por género. Para ello, usé la función `split()` de Spark, especificando la coma como separador, obteniendo así una lista de géneros individuales por cada registro.

Este proceso convirtió valores como `"Comedy, Drama"` en un array del tipo `["Comedy", "Drama"]`, facilitando análisis posteriores que requieran acceder individualmente a cada género (por ejemplo, contar apariciones o filtrar por género específico).

### ¿Cómo lo resolví?
Convertí el texto plano en una estructura de array claramente manejable usando la función `split()`. Esto permitió una mejor organización y aprovechamiento de la información por género en análisis posteriores.

In [27]:
df = df.withColumn("generosArray", split(df.genero, ","))

df.select("genero", "generosArray").show(5, truncate=False)

+-----------------------------------------------+---------------------------------------------------+
|genero                                         |generosArray                                       |
+-----------------------------------------------+---------------------------------------------------+
|Stand-Up Comedy                                |[Stand-Up Comedy]                                  |
|Documentaries                                  |[Documentaries]                                    |
|Stand-Up Comedy                                |[Stand-Up Comedy]                                  |
|Comedies, Dramas, Independent Movies           |[Comedies,  Dramas,  Independent Movies]           |
|British TV Shows, Reality TV, Romantic TV Shows|[British TV Shows,  Reality TV,  Romantic TV Shows]|
+-----------------------------------------------+---------------------------------------------------+
only showing top 5 rows



## <center><font color='#1E90FF'>Conteo por número de países <a id="conteo-paises"></a></font></center>

Otra consulta relevante fue determinar cuántas producciones del catálogo corresponden a un único país, frente a cuántas fueron realizadas en colaboración entre dos o más países. Para ello, realicé lo siguiente:

### Creación de una columna auxiliar
Implementé una función UDF sencilla (`contarPaises`) que cuenta el número de países listados en la columna `pais`, basándose en el número de comas presentes más uno.

### Aplicación de la función
Añadí la columna `numeroPaises` al DataFrame, identificando fácilmente cuántos países participaron en cada producción.

### Conteo de producciones
Separé las **producciones individuales** (únicamente un país involucrado) de las **coproducciones** (dos o más países).  

**Ejemplo:**  
- Un registro con `pais = "India"` tendría `numeroPaises = 1`.  
- Un registro con `pais = "United States, Mexico"` tendría `numeroPaises = 2`.  
---


In [28]:
def contarPaises(paises):
  return paises.count(",") + 1

contarPaises = udf(contarPaises, IntegerType())

dfLimpio = dfLimpio.withColumn("numeroPaises", contarPaises(dfLimpio.pais))
producciones1pais = dfLimpio.filter(dfLimpio.numeroPaises > 1).count()
produccionesVariosPaises = dfLimpio.filter(dfLimpio.numeroPaises == 1).count()


print(f"En {producciones1pais} películas solamente había un país involucrado en su producción y en {produccionesVariosPaises} películas había más de un país involucrado")

En 4 películas solamente había un país involucrado en su producción y en 49 películas había más de un país involucrado


## <center><font color='#1E90FF'>Exportación a Parquet <a id="parquet"></a></font></center>

Finalmente, tras completar las etapas de limpieza y análisis preliminar, exporté el DataFrame resultante a un archivo en formato **Parquet**. Este formato ofrece ventajas significativas para entornos analíticos intensivos

Realicé esta exportación indicando la opción `mode="overwrite"` para asegurar que, si existía un archivo previo en la ruta especificada, fuera sustituido directamente por la nueva versión.

In [29]:
parquet = "/tmp/netflixLimpioFiltrado.parquet"
dfLimpio.write.parquet(parquet, mode="overwrite")
print(f"DataFrame limpio guardado como Parquet en {parquet}")


DataFrame limpio guardado como Parquet en /tmp/netflixLimpioFiltrado.parquet
