# Primera apoximación para resolver la pregunta 1

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder \
    .remote("sc://localhost:15002").getOrCreate()

## Versión DF

In [3]:
# Cargamos el dataset original sin modificar
data_set_original = spark.read.option("inferSchema", "true").option("header", "true").csv("/user/hadoop2/datasets_csv/BR.csv")

# Como lee raro la descrición, por que tiene saltos de lineas lo que haremos sera quitar todos aquellas filas que tenga null todo lo que este despues de video_id
data_set_original = data_set_original.dropna(subset=['title'])
data_set_original.show()

+-----------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+----------+------+--------+-------------+--------------------+-----------------+----------------+--------------------+
|   video_id|               title|         publishedAt|           channelId|        channelTitle|categoryId|       trending_date|                tags|view_count| likes|dislikes|comment_count|      thumbnail_link|comments_disabled|ratings_disabled|         description|
+-----------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+----------+------+--------+-------------+--------------------+-----------------+----------------+--------------------+
|s9FH4rDMvds|LEVEI UM FORA? FI...|2020-08-11T22:21:49Z|UCGfBwrCoi9ZJjKiU...|       Pietro Guedes|        22|2020-08-12T00:00:00Z|pietro|guedes|ing...|    263835| 85095|     487|         4500|ht

In [4]:
# Calculamos el las metricas de cada video
trending_metrics = data_set_original.select("video_id", "title", "trending_date", "likes", "dislikes", "comment_count", "view_count")

# Crea las nuevas columnas calculadas
trending_metrics = trending_metrics \
    .withColumn("likesToViewsRatio", col("likes") / col("view_count")) \
    .withColumn("dislikesToViewsRatio", col("dislikes") / col("view_count")) \
    .withColumn("commentsToViewsRatio", col("comment_count") / col("view_count")) \
    .withColumn("likesToDislikesRatio", col("likes") / col("dislikes")).select(
        "video_id",
        "title",
        "trending_date",
        "likesToViewsRatio",
        "dislikesToViewsRatio",
        "commentsToViewsRatio",
        "likesToDislikesRatio"
    )

# Muestra el nuevo DataFrame
trending_metrics.show()

+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|   video_id|               title|       trending_date|   likesToViewsRatio|dislikesToViewsRatio|commentsToViewsRatio|likesToDislikesRatio|
+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|s9FH4rDMvds|LEVEI UM FORA? FI...|2020-08-12T00:00:00Z| 0.32253112740917617|0.001845850626338...|0.017056114617090227|  174.73305954825463|
|jbGRowa5tIk|ITZY “Not Shy” M/...|2020-08-12T00:00:00Z| 0.11905027774675962|0.002529303824788711|0.005173272978481...|   47.06839746968898|
|3EfkCrXKZNs|Oh Juliana PARÓDI...|2020-08-12T00:00:00Z|0.017311868781424867|0.002387723859996...|                 0.0|    7.25036469730124|
|gBjox7vn3-g|Contos de Runeter...|2020-08-12T00:00:00Z| 0.15381185318292237|8.052976606435726E-4|0.009144454427473296|               191.0|
|npoUGx7UW7o|Entrevi

In [5]:
# Cargamos el dataset de videos rankeados 
columnas = data_set_original.columns[:-1] + ["ranking"]
ranked_videos = spark.read.option("inferSchema", "true").option("delimiter", "\t").csv("/user/hadoop2/BR_ranked/BR").toDF(*columnas)
ranked_videos.show()

+-----------+--------------------+-------------------+--------------------+--------------------+----------+-------------------+--------------------+----------+------+--------+-------------+--------------------+-----------------+----------------+-------+
|   video_id|               title|        publishedAt|           channelId|        channelTitle|categoryId|      trending_date|                tags|view_count| likes|dislikes|comment_count|      thumbnail_link|comments_disabled|ratings_disabled|ranking|
+-----------+--------------------+-------------------+--------------------+--------------------+----------+-------------------+--------------------+----------+------+--------+-------------+--------------------+-----------------+----------------+-------+
|s9FH4rDMvds|LEVEI UM FORA? FI...|2020-08-11 18:21:49|UCGfBwrCoi9ZJjKiU...|       Pietro Guedes|        22|2020-08-11 20:00:00|pietro|guedes|ing...|    263835| 85095|     487|         4500|https://i.ytimg.c...|            false|          

In [6]:
# Hacemos un join entre los dos DataFrames, es necesario? No para nada, es trabajo extra por que 
# los datos que estan en ranked_videos son exactamente los mismos que hay en el data set original
# sin el problema de las descripciones, pero lo hago para aja tener una excusa para probar los joins
joined_df = ranked_videos.join(
    trending_metrics,
    (ranked_videos['video_id'] == trending_metrics['video_id']) & 
    (ranked_videos['trending_date'] == trending_metrics['trending_date']),
    "inner"  # Tipo de join
)

# Selecciona las columnas de trending_metrics más la columna ranking de ranked_videos
result_df = joined_df.select(
    trending_metrics["video_id"],
    trending_metrics["title"],
    trending_metrics["trending_date"],
    trending_metrics["likesToViewsRatio"],
    trending_metrics["dislikesToViewsRatio"],
    trending_metrics["commentsToViewsRatio"],
    trending_metrics["likesToDislikesRatio"],
    ranked_videos["ranking"]
)

result_df = result_df.orderBy(
    col("likesToViewsRatio").desc(),
    col("dislikesToViewsRatio").desc(),
    col("commentsToViewsRatio").desc(),
    col("likesToDislikesRatio").desc()
)

# Muestra el resultado
result_df.show()

+-----------+-----------------------+--------------------+-------------------+--------------------+--------------------+--------------------+-------+
|   video_id|                  title|       trending_date|  likesToViewsRatio|dislikesToViewsRatio|commentsToViewsRatio|likesToDislikesRatio|ranking|
+-----------+-----------------------+--------------------+-------------------+--------------------+--------------------+--------------------+-------+
|CuyTC8FLICY|   Anitta - Girl Fro...|2021-04-30T00:00:00Z| 0.5985101756875228|0.006574267907390492| 0.13878193169892536|    91.0383002516075|      1|
|oVPYa7QCmRg|[MV] eAeon(이이언) _...|2021-04-30T00:00:00Z| 0.4875012936880122|6.088734228767051E-4| 0.08327049705845588|   800.6611479028697|      9|
|mrIaMSHhChQ|   CRIANDO PERSONAGE...|2020-10-07T00:00:00Z|0.48499609392950693|0.001424566885712...|0.050181517393502136|   340.4516129032258|     28|
|gruvzNW87V8|     10 FATOS SOBRE MIM|2020-12-02T00:00:00Z|0.47845379552696626|8.338544923910778E-4|0.02

## Versión SQL

In [14]:
ranked_videos.createOrReplaceTempView("ranked_videos")

query = """
SELECT 
    video_id,
    title,
    trending_date,
    (likes / view_count) AS likesToViewsRatio,
    (dislikes / view_count) AS dislikesToViewsRatio,
    (comment_count / view_count) AS commentsToViewsRatio,
    (likes / dislikes) AS likesToDislikesRatio
FROM ranked_videos
ORDER BY likesToViewsRatio DESC, dislikesToViewsRatio DESC, commentsToViewsRatio DESC, likesToDislikesRatio DESC
"""

# Ejecuta la consulta SQL
trending_metrics_sql = spark.sql(query)

# Muestra el resultado
trending_metrics_sql.show()

+-----------+-----------------------+-------------------+-------------------+--------------------+--------------------+--------------------+
|   video_id|                  title|      trending_date|  likesToViewsRatio|dislikesToViewsRatio|commentsToViewsRatio|likesToDislikesRatio|
+-----------+-----------------------+-------------------+-------------------+--------------------+--------------------+--------------------+
|CuyTC8FLICY|   Anitta - Girl Fro...|2021-04-29 20:00:00| 0.5985101756875228|0.006574267907390492| 0.13878193169892536|    91.0383002516075|
|oVPYa7QCmRg|[MV] eAeon(이이언) _...|2021-04-29 20:00:00| 0.4875012936880122|6.088734228767051E-4| 0.08327049705845588|   800.6611479028697|
|mrIaMSHhChQ|   CRIANDO PERSONAGE...|2020-10-06 20:00:00|0.48499609392950693|0.001424566885712...|0.050181517393502136|   340.4516129032258|
|gruvzNW87V8|     10 FATOS SOBRE MIM|2020-12-01 20:00:00|0.47845379552696626|8.338544923910778E-4|0.026087733404806576|   573.7857142857143|
|oxx7GpZ9rGc|   

# Segunda aproximación para resolver la pregunta 1