In [0]:
class Transformer:
    def __init__(self):
        pass

    def transform(self, inputDFs):
        pass


In [0]:
from pyspark.sql.functions import split, explode, col
class Task1Transformer(Transformer):

    def transform(self, inputDF):
        """
        Distribución de los animes según su género
        """

        # Dividir las cadenas de géneros en listas
        anime_df = inputDF.withColumn("genre_list", split(col("genre"), ", "))

        # Explotar las listas para convertir cada elemento en una fila separada
        exploded_anime_df = anime_df.withColumn("genre", explode(col("genre_list")))

        # Crear vista temporal
        exploded_anime_df.createOrReplaceTempView("animes")

        task1 = spark.sql(""" 
                          SELECT genre, COUNT(*) AS counted
                          FROM animes
                          GROUP BY genre
                          ORDER BY counted DESC
                          """)
        return task1

class Task2Transformer(Transformer):

    def transform(self, inputDF):
        """
            Relación entre puntuación y número de episodio
        """

        anime_df = inputDF.createOrReplaceTempView("animes")

        task2 = spark.sql("""
                          SELECT episodes, AVG(score) AS avg_score
                          FROM animes
                          GROUP BY episodes
                          ORDER BY episodes DESC
                          """)
        return task2

class Task3Transformer(Transformer):

    def transform(self, inputDF):
        """
            Popularidad de los animes a lo largo de los años
        """

        anime_df = inputDF.createOrReplaceTempView("animes")

        task3 = spark.sql("""
                          SELECT aired_from_year, AVG(members) AS avg_members
                          FROM animes
                          GROUP BY aired_from_year
                          ORDER BY aired_from_year DESC
                          """)
        return task3
    
class Task4Transformer(Transformer):

    def transform(self, inputsDF):
        """
            Géneros de anime más comúnmente completados por los usuarios
        """

        anime_df = inputsDF.get("Anime")
        review_df = inputsDF.get("Review")

        # Dividir las cadenas de géneros en listas
        anime_df = anime_df.withColumn("genre_list", split(col("genre"), ", "))

        # Explotar las listas para convertir cada elemento en una fila separada
        exploded_anime_df = anime_df.withColumn("genre", explode(col("genre_list")))

        # Crear vista temporal
        exploded_anime_df.createOrReplaceTempView("animes")
        review_df.createOrReplaceTempView("reviews")

        task3 = spark.sql("""
                          SELECT a.genre, COUNT(r.my_status) AS count_completed
                          FROM animes AS a
                          LEFT JOIN reviews AS r
                          USING (anime_id)
                          WHERE r.my_status = 2
                          GROUP BY a.genre
                          ORDER BY count_completed DESC
                          """)
        return task3