In [31]:
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType, BooleanType, IntegerType

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col
spark = SparkSession.builder.appName("Jupyter").getOrCreate()

25/08/24 19:16:55 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [24]:
columns = ["actor", "actorid", "film", "year", "votes", "rating", "filmid"]

In [25]:
actor_film = spark.read.option("header", "false").csv("/home/iceberg/data/actor_films.csv")
actor_film = actor_film.toDF(*columns)
actor_film.createOrReplaceTempView('actor_films')

In [26]:
actor_film

DataFrame[actor: string, actorid: string, film: string, year: string, votes: string, rating: string, filmid: string]

In [27]:
actor_film.printSchema()

root
 |-- actor: string (nullable = true)
 |-- actorid: string (nullable = true)
 |-- film: string (nullable = true)
 |-- year: string (nullable = true)
 |-- votes: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- filmid: string (nullable = true)



In [32]:
schema = StructType([
    StructField("actorid", StringType(), True),
    StructField("actor", StringType(), True),
    StructField("films", ArrayType(
        StructType([
            StructField("film", StringType(), True),
            StructField("votes", IntegerType(), True),
            StructField("rating", FloatType(), True),
            StructField("filmid", StringType(), True)
        ])
    ), True),
    StructField("quality_class", StringType(), True),
    StructField("is_active", BooleanType(), True),
    StructField("current_year", IntegerType(), True)
])

In [33]:
empty_actors_df = spark.createDataFrame([], schema)

In [34]:
empty_actors_df.createOrReplaceTempView("actors")

In [35]:
query = """
WITH yesterday AS (
    SELECT *
    FROM actors
    WHERE current_year = 1973
),
today AS (
    SELECT
        actorid,
        actor,
        year,
        COLLECT_LIST(
            STRUCT(
                film,
                votes,
                rating,
                filmid
            )
        ) AS films,
        AVG(rating) AS avg_rating
    FROM actor_films
    WHERE year = 1974
    GROUP BY actorid, actor, year
),
final_data AS (
    SELECT
        COALESCE(t.actorid, y.actorid) AS actorid,
        COALESCE(t.actor, y.actor) AS actor,

        -- merge yesterday’s films with today’s (if any)
        CONCAT(
            COALESCE(y.films, ARRAY()), 
            COALESCE(t.films, ARRAY())
        ) AS films,

        -- quality_class comes from *this year’s avg rating* if active,
        -- otherwise keep last year's value
        CASE
            WHEN t.year IS NOT NULL THEN
                CASE
                    WHEN t.avg_rating > 8 THEN 'star'
                    WHEN t.avg_rating > 7 THEN 'good'
                    WHEN t.avg_rating > 6 THEN 'average'
                    ELSE 'bad'
                END
            ELSE y.quality_class
        END AS quality_class,

        -- active if in today’s data
        (t.year IS NOT NULL) AS is_active,

        -- carry forward year correctly
        COALESCE(t.year, y.current_year + 1) AS current_year
    FROM today t
    FULL OUTER JOIN yesterday y
        ON t.actorid = y.actorid
)

SELECT
    actorid,
    actor,
    films,
    quality_class,
    is_active,
    current_year
FROM final_data
"""

In [36]:
result_df = spark.sql(query)

In [39]:
result_df.count()

                                                                                

537