In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window


spark = SparkSession.builder \
  .appName("DW") \
  .enableHiveSupport() \
  .config(
    "hive.metastore.uris",
    "thrift://hive-metastore:9083"
  ) \
  .getOrCreate()


In [5]:
spark.sql(
    """
    CREATE DATABASE IF NOT EXISTS golds LOCATION 'hdfs://hdfs-nn:9000/demo/golds/'
    """
)

DataFrame[]

In [3]:
from pyspark.sql.functions import col

In [7]:
#dim tables

spark.sql("DROP TABLE IF EXISTS golds.dim_title")
spark.sql("DROP TABLE IF EXISTS golds.dim_time")
spark.sql("DROP TABLE IF EXISTS golds.dim_platform")
spark.sql("DROP TABLE IF EXISTS golds.dim_rating_status")
spark.sql("DROP TABLE IF EXISTS golds.dim_person")
spark.sql("DROP TABLE IF EXISTS golds.dim_category")
spark.sql("DROP TABLE IF EXISTS golds.dim_genre")

spark.sql("""
CREATE TABLE IF NOT EXISTS golds.dim_title (
  id_title INT,
  title STRING,
  age_rating STRING,
  runtime INT,
  studio_name STRING,
  type STRING
)
USING PARQUET
LOCATION 'hdfs://hdfs-nn:9000/demo/golds/dim_title'
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS golds.dim_time (
  id_time INT,
  date DATE,
  day INT,
  month INT,
  year INT,
  day_name STRING,
  month_name STRING,
  is_weekend BOOLEAN,
  season STRING
)
USING PARQUET
LOCATION 'hdfs://hdfs-nn:9000/demo/golds/dim_time'
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS golds.dim_platform (
  id_platform INT,
  platform_name STRING
)
USING PARQUET
LOCATION 'hdfs://hdfs-nn:9000/demo/golds/dim_platform'
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS golds.dim_rating_status (
  id_rating_source_status INT,
  tomatometer_status STRING,
  source STRING
)
USING PARQUET
LOCATION 'hdfs://hdfs-nn:9000/demo/golds/dim_rating_status'
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS golds.dim_person (
  id_person INT,
  name STRING,
  person_type STRING
)
USING PARQUET
LOCATION 'hdfs://hdfs-nn:9000/demo/golds/dim_person'
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS golds.dim_category (
  id_award INT,
  canonical_category STRING
)
USING PARQUET
LOCATION 'hdfs://hdfs-nn:9000/demo/golds/dim_category'
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS golds.dim_genre (
  id_genre INT,
  genre_name STRING
)
USING PARQUET
LOCATION 'hdfs://hdfs-nn:9000/demo/golds/dim_genre'
""")

DataFrame[]

In [8]:
#bridge tables

spark.sql("DROP TABLE IF EXISTS golds.bridge_launch_genre")
spark.sql("DROP TABLE IF EXISTS golds.bridge_views_platform")

spark.sql("""
CREATE TABLE IF NOT EXISTS golds.bridge_launch_genre (
  id_genre INT,
  id_group_genre INT
)
USING PARQUET
LOCATION 'hdfs://hdfs-nn:9000/demo/golds/bridge_title_genre'
""")


spark.sql("""
CREATE TABLE IF NOT EXISTS golds.bridge_views_platform (
  id_platform INT,
  id_group_platform INT
)
USING PARQUET
LOCATION 'hdfs://hdfs-nn:9000/demo/golds/bridge_views_platform'
""")

DataFrame[]

In [9]:
#fact tables

spark.sql("DROP TABLE IF EXISTS golds.views")
spark.sql("DROP TABLE IF EXISTS golds.rating")
spark.sql("DROP TABLE IF EXISTS golds.participation")
spark.sql("DROP TABLE IF EXISTS golds.awards")
spark.sql("DROP TABLE IF EXISTS golds.launch")

spark.sql("""
CREATE TABLE IF NOT EXISTS golds.views (
  id_title INT,
  id_time INT,
  id_group_platform INT,
  views_price DOUBLE,
  views_date INT,
  downloads_price DOUBLE,
  first_tag_date INT,
  days_since_release INT
)
USING PARQUET
LOCATION 'hdfs://hdfs-nn:9000/demo/golds/views'
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS golds.rating (
  id_title INT,
  id_rating_source_status INT,
  id_time INT,
  popularity_index STRING,
  critic_score DOUBLE,
  audience_score DOUBLE,
  critic_reviews INT,
  audience_reviews INT,
  rt_critic_audience STRING
)
USING PARQUET
LOCATION 'hdfs://hdfs-nn:9000/demo/golds/rating'
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS golds.participation (
  id_person INT,
  id_title INT,
  id_time INT
)
USING PARQUET
LOCATION 'hdfs://hdfs-nn:9000/demo/golds/participation'
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS golds.awards (
  id_award INT,
  id_title INT,
  id_time INT,
  id_person INT,
  is_nomination BOOLEAN,
  is_winner BOOLEAN,
  is_oscar_nominee BOOLEAN,
  is_oscar_winner BOOLEAN
)
USING PARQUET
LOCATION 'hdfs://hdfs-nn:9000/demo/golds/awards';
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS golds.launch (
  id_title INT,
  id_time INT,
  id_group_genre INT,
  flag_theaters BOOLEAN,
  flag_streaming BOOLEAN,
  gross DOUBLE,
  budget DOUBLE
)
USING PARQUET
LOCATION 'hdfs://hdfs-nn:9000/demo/golds/launch'
""")

DataFrame[]

In [10]:
actorfilms = spark.read.parquet("hdfs://hdfs-nn:9000/demo/silver/actorfilms")
audience_reviews = spark.read.parquet("hdfs://hdfs-nn:9000/demo/silver/audience_reviews")
boxoffice_info = spark.read.parquet("hdfs://hdfs-nn:9000/demo/silver/boxoffice_info")
credits = spark.read.parquet("hdfs://hdfs-nn:9000/demo/silver/credits")
critic_reviews = spark.read.parquet("hdfs://hdfs-nn:9000/demo/silver/critic_reviews")
dataset_piracy = spark.read.parquet("hdfs://hdfs-nn:9000/demo/silver/dataset_piracy")
oscar_fulldata = spark.read.parquet("hdfs://hdfs-nn:9000/demo/silver/oscar_fulldata")
rotten_tomatoes_movies = spark.read.parquet("hdfs://hdfs-nn:9000/demo/silver/rotten_tomatoes_movies")
titles = spark.read.parquet("hdfs://hdfs-nn:9000/demo/silver/titles")
tv_show_links = spark.read.parquet("hdfs://hdfs-nn:9000/demo/silver/tv_show_links")

In [11]:
# dim_time
from pyspark.sql import functions as F

SILVER = "hdfs://hdfs-nn:9000/demo/silver"
GOLD_DIM_TIME = "hdfs://hdfs-nn:9000/demo/golds/dim_time"

# 1) Read real date sources
piracy = spark.read.parquet(f"{SILVER}/dataset_piracy") \
    .select(
        F.to_date("posted_date").alias("posted_date"),
        F.to_date("release_date").alias("release_date")
    )

rt = spark.read.parquet(f"{SILVER}/rotten_tomatoes_movies") \
    .select(
        F.to_date("in_theaters_date").alias("in_theaters_date"),
        F.to_date("on_streaming_date").alias("on_streaming_date")
    )

boxinfo = spark.read.parquet(f"{SILVER}/boxoffice_info") \
    .select(F.to_date(F.col("released")).alias("released"))

# 2) Collect all distinct real dates
real_dates = (
    piracy.select(F.col("posted_date").alias("date"))
          .unionByName(piracy.select(F.col("release_date").alias("date")))
          .unionByName(rt.select(F.col("in_theaters_date").alias("date")))
          .unionByName(rt.select(F.col("on_streaming_date").alias("date")))
          .unionByName(boxinfo.select(F.col("released").alias("date")))
          .where(F.col("date").isNotNull())
          .distinct()
)

# 3) Daily grain (is_weekend as INT 0/1)
dim_time_daily = (
    real_dates
    .withColumn("id_time", F.date_format("date", "yyyyMMdd").cast("int"))
    .withColumn("day", F.dayofmonth("date"))
    .withColumn("month", F.month("date"))
    .withColumn("year", F.year("date"))
    .withColumn("day_name", F.date_format("date", "EEEE"))
    .withColumn("month_name", F.date_format("date", "MMMM"))
    .withColumn(
        "is_weekend",
        F.dayofweek("date").isin(1, 7)  # devolve boolean
    )
    .withColumn(
        "season",
        F.when(F.col("month").isin([12, 1, 2]), "Winter")
         .when(F.col("month").isin([3, 4, 5]), "Spring")
         .when(F.col("month").isin([6, 7, 8]), "Summer")
         .otherwise("Fall")
    )
    .select("id_time", "date", "day", "month", "year", "day_name", "month_name", "is_weekend", "season")
)


# 4) Year-only grain (1910..2024) - is_weekend NULL but INT type
years_df = spark.range(1910, 2025).select(F.col("id").cast("int").alias("year"))

dim_time_year_only = (
    years_df
    .withColumn("id_time", (F.col("year") * 10000).cast("int"))
    .withColumn("date", F.lit(None).cast("date"))
    .withColumn("day", F.lit(None).cast("int"))
    .withColumn("month", F.lit(None).cast("int"))
    .withColumn("day_name", F.lit(None).cast("string"))
    .withColumn("month_name", F.lit(None).cast("string"))
    .withColumn("is_weekend", F.lit(None).cast("boolean"))
    .withColumn("season", F.lit(None).cast("string"))
    .select("id_time", "date", "day", "month", "year", "day_name", "month_name", "is_weekend", "season")
)

# 5) Union & write
dim_time = (
    dim_time_daily
    .unionByName(dim_time_year_only)
    .dropDuplicates(["id_time"])
)

dim_time.write.mode("overwrite").parquet(GOLD_DIM_TIME)

# Quick check
dim_time.printSchema()
dim_time.orderBy(F.col("id_time").asc()).show(20, truncate=False)



root
 |-- id_time: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- day_name: string (nullable = true)
 |-- month_name: string (nullable = true)
 |-- is_weekend: boolean (nullable = true)
 |-- season: string (nullable = true)

+--------+----------+----+-----+----+---------+----------+----------+------+
|id_time |date      |day |month|year|day_name |month_name|is_weekend|season|
+--------+----------+----+-----+----+---------+----------+----------+------+
|19100000|null      |null|null |1910|null     |null      |null      |null  |
|19110000|null      |null|null |1911|null     |null      |null      |null  |
|19120000|null      |null|null |1912|null     |null      |null      |null  |
|19130000|null      |null|null |1913|null     |null      |null      |null  |
|19140000|null      |null|null |1914|null     |null      |null      |null  |
|19140601|1914-06-01|1   |6   

In [12]:
#dim_title
from pyspark.sql import functions as F

SILVER_TITLES = "hdfs://hdfs-nn:9000/demo/silver/titles"
SILVER_RT     = "hdfs://hdfs-nn:9000/demo/silver/rotten_tomatoes_movies"

titles = spark.read.parquet(SILVER_TITLES)

rt = (
    spark.read.parquet(SILVER_RT)
      .select(
          F.col("movie_title").alias("rt_title"),
          F.col("studio_name")
      )
)

dim_title_df = (
    titles.alias("t")
    .join(
        rt.alias("r"),
        F.lower(F.col("t.title")) == F.lower(F.col("r.rt_title")),
        "left"
    )
    .select(
        F.col("t.id_title").cast("int").alias("id_title"),
        F.col("t.title").alias("title"),
        F.when(F.col("t.age_certification") == "unknown", None)
         .otherwise(F.upper(F.col("t.age_certification"))).alias("age_rating"),
        F.col("t.runtime").cast("int").alias("runtime"),
        F.col("r.studio_name").alias("studio_name"),
        F.lower(F.col("t.type")).alias("type")
    )
    .dropDuplicates(["id_title"])
)


dim_title_df.write.mode("overwrite").insertInto("golds.dim_title")



spark.table("golds.dim_title").show(5, truncate=False)
print("count:", spark.table("golds.dim_title").count())



+--------+----------------------------+----------+-------+-----------+-----+
|id_title|title                       |age_rating|runtime|studio_name|type |
+--------+----------------------------+----------+-------+-----------+-----+
|1       |#ABtalks                    |TV-PG     |68     |null       |show |
|2       |#Alive                      |null      |98     |null       |movie|
|3       |#AnneFrank. Parallel Stories|null      |92     |null       |movie|
|4       |#FriendButMarried           |null      |102    |null       |movie|
|5       |#FriendButMarried 2         |null      |104    |null       |movie|
+--------+----------------------------+----------+-------+-----------+-----+
only showing top 5 rows

count: 9999


In [37]:
#dim_rating_status
from pyspark.sql import functions as F
from pyspark.sql.window import Window

HDFS_NN = "hdfs://hdfs-nn:9000"
SILVER  = f"{HDFS_NN}/demo/silver"
GOLD    = f"{HDFS_NN}/demo/golds"

RT_SILVER  = f"{SILVER}/rotten_tomatoes_movies"
DRS_PATH   = f"{GOLD}/dim_rating_status"

# -------- 1) Rotten Tomatoes: status + source ----------
rt = spark.read.parquet(RT_SILVER)

rt_status = (
    rt.select(F.col("tomatometer_status").alias("status"))
      .filter(F.col("status").isNotNull())
      .withColumn("status", F.trim(F.col("status")))
      .dropDuplicates(["status"])
      .withColumn("source", F.lit("rotten_tomatoes"))
)

# -------- 2) IMDB / TMDB linhas extra ----------
extra_status = spark.createDataFrame(
    [
        ("score", "imdb"),
        ("score", "tmdb"),
    ],
    ["status", "source"]
)

# -------- 3) União + normalização ----------
all_status = (
    rt_status
    .unionByName(extra_status)
    .withColumn("status", F.lower(F.trim("status")))
    .withColumn("source", F.lower(F.trim("source")))
    .dropDuplicates(["status", "source"])
)

# -------- 4) Surrogate key estável ----------
dim_rating_status = (
    all_status
    .withColumn(
        "id_rating_source_status",
        F.row_number().over(Window.orderBy("source", "status")).cast("bigint")
    )
    .select(
        "id_rating_source_status",
        F.col("status").alias("tomatometer_status"),
        "source"
    )
)

# -------- 5) Escrever Gold ----------
(
    dim_rating_status
    .write
    .mode("overwrite")
    .format("parquet")
    .option("path", DRS_PATH)
    .saveAsTable("golds.dim_rating_status")
)

dim_rating_status.orderBy("source", "tomatometer_status").show(100, truncate=False)


+-----------------------+------------------+---------------+
|id_rating_source_status|tomatometer_status|source         |
+-----------------------+------------------+---------------+
|1                      |score             |imdb           |
|2                      |certified fresh   |rotten_tomatoes|
|3                      |fresh             |rotten_tomatoes|
|4                      |rotten            |rotten_tomatoes|
|5                      |score             |tmdb           |
+-----------------------+------------------+---------------+



In [14]:
#dim_category  

from pyspark.sql import functions as F
from pyspark.sql.window import Window

OSCARS = spark.read.parquet("hdfs://hdfs-nn:9000/demo/silver/the_oscar_award")

dim_category = (
    OSCARS
    .select(F.trim(F.col("canon_category")).alias("canonical_category"))
    .where(F.col("canonical_category").isNotNull())
    .dropDuplicates(["canonical_category"])
    .withColumn(
        "id_award",
        F.row_number().over(Window.orderBy(F.lower(F.col("canonical_category"))))
    )
    .select("id_award", "canonical_category")
)

(dim_category.write
    .mode("overwrite")
    .format("parquet")
    .option("path", "hdfs://hdfs-nn:9000/demo/golds/dim_category")
    .saveAsTable("golds.dim_category"))

dim_category.orderBy("id_award").show(100, truncate=False)

+--------+-------------------------------------------------+
|id_award|canonical_category                               |
+--------+-------------------------------------------------+
|1       |actor in a leading role                          |
|2       |actor in a supporting role                       |
|3       |actress in a leading role                        |
|4       |actress in a supporting role                     |
|5       |animated feature film                            |
|6       |art direction                                    |
|7       |art direction (black-and-white)                  |
|8       |art direction (color)                            |
|9       |assistant director                               |
|10      |award of commendation                            |
|11      |best picture                                     |
|12      |cinematography                                   |
|13      |cinematography (black-and-white)                 |
|14      |cinematography

In [15]:
#dim_person
from pyspark.sql import functions as F
from pyspark.sql.window import Window

HDFS_NN = "hdfs://hdfs-nn:9000"
SILVER_BASE = f"{HDFS_NN}/demo/silver"
GOLD_BASE   = f"{HDFS_NN}/demo/golds"

CREDITS_PATH = f"{SILVER_BASE}/credits"
RT_PATH      = f"{SILVER_BASE}/rotten_tomatoes_movies"
PIRACY_PATH  = f"{SILVER_BASE}/dataset_piracy"

OUT_PATH  = f"{GOLD_BASE}/dim_person"
OUT_TABLE = "golds.dim_person"

credits = spark.read.parquet(CREDITS_PATH)
rt      = spark.read.parquet(RT_PATH)
piracy  = spark.read.parquet(PIRACY_PATH)

def clean_person_name(col_):
    x = F.trim(col_)
    x = F.regexp_replace(x, r"\s+", " ")
    x = F.regexp_replace(x, r'^[\'"]+|[\'"]+$', "")
    x = F.when(
        (x.isNull()) | (F.length(x) == 0) |
        (F.lower(x).isin("missing", "missingunknown", "unknown", "null", "none", "n/a")),
        F.lit(None).cast("string")
    ).otherwise(x)
    return x

def split_csv(col_):
    return F.split(col_, r"\s*,\s*")

# 1) actors + directors from credits (role column)
people_credits = (
    credits
    .select(
        clean_person_name(F.col("name")).alias("name"),
        F.upper(F.trim(F.col("role"))).alias("role")
    )
    .where(F.col("name").isNotNull())
    .where(F.col("role").isin("ACTOR", "DIRECTOR"))
    .select(
        F.col("name"),
        F.when(F.col("role") == "ACTOR", F.lit("actor"))
         .otherwise(F.lit("director"))
         .alias("person_type")
    )
)

# 2) writers from RT writers
people_rt_writers = (
    rt.select(F.col("writers").alias("raw"))
      .where(F.col("raw").isNotNull())
      .withColumn("arr", split_csv(F.col("raw")))
      .withColumn("name", F.explode(F.col("arr")))      # explode first (safe) [file:4]
      .select(clean_person_name(F.col("name")).alias("name"))
      .where(F.col("name").isNotNull())
      .withColumn("person_type", F.lit("writer"))
)

# 3) directors from RT directors
people_rt_directors = (
    rt.select(F.col("directors").alias("raw"))
      .where(F.col("raw").isNotNull())
      .withColumn("arr", split_csv(F.col("raw")))
      .withColumn("name", F.explode(F.col("arr")))
      .select(clean_person_name(F.col("name")).alias("name"))
      .where(F.col("name").isNotNull())
      .withColumn("person_type", F.lit("director"))
)

# 4) writers from piracy writer
people_piracy_writers = (
    piracy.select(F.col("writer").alias("raw"))
          .where(F.col("raw").isNotNull())
          .withColumn("arr", split_csv(F.col("raw")))
          .withColumn("name", F.explode(F.col("arr")))
          .select(clean_person_name(F.col("name")).alias("name"))
          .where(F.col("name").isNotNull())
          .withColumn("person_type", F.lit("writer"))
)

# 5) directors from piracy director
people_piracy_directors = (
    piracy.select(F.col("director").alias("raw"))
          .where(F.col("raw").isNotNull())
          .withColumn("arr", split_csv(F.col("raw")))
          .withColumn("name", F.explode(F.col("arr")))
          .select(clean_person_name(F.col("name")).alias("name"))
          .where(F.col("name").isNotNull())
          .withColumn("person_type", F.lit("director"))
)

# Union all -> dimperson
dim_person_base = (
    people_credits
    .unionByName(people_rt_writers)
    .unionByName(people_rt_directors)
    .unionByName(people_piracy_writers)
    .unionByName(people_piracy_directors)
    .dropDuplicates(["name", "person_type"])  # same name can exist in multiple roles [file:4]
)

w = Window.orderBy(F.col("person_type").asc(), F.col("name").asc())
dim_person = (
    dim_person_base
    .withColumn("id_person", F.row_number().over(w).cast("int"))
    .select("id_person", "name", "person_type")
)

spark.sql(f"DROP TABLE IF EXISTS {OUT_TABLE}")

(
    dim_person.write
    .mode("overwrite")
    .format("parquet")
    .option("path", OUT_PATH)
    .saveAsTable(OUT_TABLE)
)

dim_person.orderBy("id_person").show(50, truncate=False)


+---------+------------------+-----------+
|id_person|name              |person_type|
+---------+------------------+-----------+
|1        |21 Savage         |actor      |
|2        |2Mex              |actor      |
|3        |50 Cent           |actor      |
|4        |50-Grand          |actor      |
|5        |87gongzhu         |actor      |
|6        |9m88              |actor      |
|7        |A Leslie Kies     |actor      |
|8        |A Martinez        |actor      |
|9        |A$AP Rocky        |actor      |
|10       |A. Ali Flores     |actor      |
|11       |A. Bernard Sneed  |actor      |
|12       |A. C. Murali Mohan|actor      |
|13       |A. E. Manoharan   |actor      |
|14       |A. Joseph Denucci |actor      |
|15       |A. Karunanidhi    |actor      |
|16       |A. Murat Özgen    |actor      |
|17       |A. R. Manikandan  |actor      |
|18       |A. R. Rahman      |actor      |
|19       |A. Smith Harrison |actor      |
|20       |A. V. M. Rajan    |actor      |
|21       |

In [16]:
#dim_genre
from pyspark.sql import functions as F
from pyspark.sql.window import Window

HDFS_NN     = "hdfs://hdfs-nn:9000"
SILVER_BASE = f"{HDFS_NN}/demo/silver"
GOLD_BASE   = f"{HDFS_NN}/demo/golds"

TITLES_PATH = f"{SILVER_BASE}/titles"
RT_PATH     = f"{SILVER_BASE}/rotten_tomatoes_movies"
OUT_PATH    = f"{GOLD_BASE}/dim_genre"
OUT_TABLE   = "golds.dim_genre"

def kt(col_):
    return F.lower(F.trim(col_))

# -------------------------
# 0) Lê fontes
# -------------------------
titles = spark.read.parquet(TITLES_PATH)
rt     = spark.read.parquet(RT_PATH)

# -------------------------
# 1) Extrai géneros normalizados de titles
#    - genres pode vir como "['a','b']" OU "a, b"
# -------------------------
titles_genres = (
    titles
    .select(F.col("genres").alias("raw"))
    .filter(F.col("raw").isNotNull())
    .withColumn(
        "arr",
        F.when(
            F.col("raw").rlike(r"^\s*\[.*\]\s*$"),
            F.from_json(F.regexp_replace(F.col("raw"), r"'", '"'), "array<string>")
        ).otherwise(
            F.split(F.col("raw"), r"\s*,\s*")
        )
    )
    .select(F.explode("arr").alias("genero"))
)

# -------------------------
# 2) Extrai géneros normalizados de Rotten (genre = "a, b")
# -------------------------
rt_genres = (
    rt
    .select(F.col("genre").alias("raw"))
    .filter(F.col("raw").isNotNull())
    .withColumn("arr", F.split(F.col("raw"), r"\s*,\s*"))
    .select(F.explode("arr").alias("genero"))
)

# -------------------------
# 3) União + limpeza básica
# -------------------------
dim_genre_raw = (
    titles_genres
    .unionByName(rt_genres)
    .select(kt(F.col("genero")).alias("genre"))
    .filter(F.col("genre").isNotNull())
    .filter(F.col("genre") != "")
)

# -------------------------
# 4) Mapeamento de sinónimos / aliases
# -------------------------
from itertools import chain

# Dicionário de sinónimos → alvo normalizado
synonyms = {
    "documentation": "documentary",
    "special interest": "documentary",
    # acrescenta aqui outros: "sci-fi" -> "science fiction", etc.
}

# Expressão de mapping: create_map('documentation','documentary', ...)
mapping_expr = F.create_map(
    [F.lit(x) for x in chain(*synonyms.items())]
)

dim_genre_norm = (
    dim_genre_raw
    .withColumn(
        "genre_norm",
        F.coalesce(mapping_expr[F.col("genre")], F.col("genre"))
    )
    .select("genre_norm")
    .distinct()
)

# -------------------------
# 5) Surrogate key e seleção final
# -------------------------
dim_genre = (
    dim_genre_norm
    .withColumn(
        "id_genre",
        F.row_number().over(Window.orderBy("genre_norm")).cast("int")
    )
    .select("id_genre", F.col("genre_norm").alias("genre"))
)

# -------------------------
# 6) Write Gold (Parquet + tabela)
# -------------------------
spark.sql(f"DROP TABLE IF EXISTS {OUT_TABLE}")

(
    dim_genre.write
    .mode("overwrite")
    .format("parquet")
    .option("path", OUT_PATH)
    .saveAsTable(OUT_TABLE)
)

dim_genre.orderBy("id_genre").show(50, truncate=False)
spark.sql(f"SELECT COUNT(*) AS n, COUNT(id_genre) AS n_id FROM {OUT_TABLE}").show()


+--------+-------------------------+
|id_genre|genre                    |
+--------+-------------------------+
|1       |action                   |
|2       |action & adventure       |
|3       |animation                |
|4       |anime & manga            |
|5       |art house & international|
|6       |classics                 |
|7       |comedy                   |
|8       |crime                    |
|9       |cult movies              |
|10      |documentary              |
|11      |drama                    |
|12      |european                 |
|13      |faith & spirituality     |
|14      |family                   |
|15      |fantasy                  |
|16      |gay & lesbian            |
|17      |history                  |
|18      |horror                   |
|19      |kids & family            |
|20      |music                    |
|21      |musical & performing arts|
|22      |mystery & suspense       |
|23      |reality                  |
|24      |romance                  |
|

In [17]:
#dim_platform
from pyspark.sql import Row
from pyspark.sql import functions as F
from pyspark.sql.window import Window

HDFS_NN   = "hdfs://hdfs-nn:9000"
GOLD_BASE = f"{HDFS_NN}/demo/golds"

OUT_PATH  = f"{GOLD_BASE}/dim_platform"
OUT_TABLE = "golds.dim_platform"

# Pequena lista estática de plataformas atuais
platforms = spark.createDataFrame(
    [
        Row(platform_name="netflix"),
        Row(platform_name="amazon")
        # se no futuro tiveres mais: Row(platform_name="disney"), Row(platform_name="hbo"), ...
    ]
)

dim_platform = (
    platforms
    .withColumn("platform_name", F.trim(F.lower("platform_name")))
    .dropDuplicates(["platform_name"])
    .withColumn(
        "id_platform",
        F.row_number().over(Window.orderBy("platform_name")).cast("int")
    )
    .select("id_platform", "platform_name")
)

spark.sql(f"DROP TABLE IF EXISTS {OUT_TABLE}")

(
    dim_platform.write
    .mode("overwrite")
    .format("parquet")
    .option("path", OUT_PATH)
    .saveAsTable(OUT_TABLE)
)

dim_platform.orderBy("id_platform").show(truncate=False)


+-----------+-------------+
|id_platform|platform_name|
+-----------+-------------+
|1          |amazon       |
|2          |netflix      |
+-----------+-------------+



In [18]:
#bridge_launch_genre

from pyspark.sql import functions as F
from pyspark.sql.window import Window

HDFS_NN     = "hdfs://hdfs-nn:9000"
SILVER_BASE = f"{HDFS_NN}/demo/silver"
GOLD_BASE   = f"{HDFS_NN}/demo/golds"

# INPUTS
DIM_TITLE_PATH = f"{GOLD_BASE}/dim_title"
DIM_GENRE_PATH = f"{GOLD_BASE}/dim_genre"

TITLES_PATH = f"{SILVER_BASE}/titles"
RT_PATH     = f"{SILVER_BASE}/rotten_tomatoes_movies"

# OUTPUTS
OUT_BRIDGE = f"{GOLD_BASE}/bridge_launch_genre"
OUT_MAP    = f"{GOLD_BASE}/map_title_group_genre"     # para alimentar a fact launch
# (se não quiseres o mapa, podes apagar esta linha e o write correspondente)

def kt(col_):
    return F.lower(F.trim(col_))

# -------------------------
# 0) Read dims + sources
# -------------------------
dim_title = spark.read.parquet(DIM_TITLE_PATH)   # (idtitle, title, ...)
dim_genre = spark.read.parquet(DIM_GENRE_PATH)   # (id_genre, genre)

titles = spark.read.parquet(TITLES_PATH)         # tem title + genres (raw)
rt     = spark.read.parquet(RT_PATH)             # tem movie_title + genre (raw)

# -------------------------
# 1) Build title -> genre_name pairs (normalizados)
#    - titles.genres pode vir como "['a','b']" ou "a, b"
#    - rt.genre vem como "a, b"
# -------------------------
titles_pairs = (
    titles
    .select(F.col("title").alias("title_raw"), F.col("genres").alias("raw"))
    .where(F.col("title_raw").isNotNull() & F.col("raw").isNotNull())
    .withColumn("k_title", kt(F.col("title_raw")))
    .withColumn(
        "arr",
        F.when(
            F.col("raw").rlike(r"^\s*\[.*\]\s*$"),
            F.from_json(F.regexp_replace(F.col("raw"), r"'", '"'), "array<string>")
        ).otherwise(
            F.split(F.col("raw"), r"\s*,\s*")
        )
    )
    .withColumn("genre_name", F.explode(F.col("arr")))
    .withColumn("genre_name", kt(F.col("genre_name")))
    .select("k_title", "genre_name")
    .where(F.col("k_title").isNotNull() & (F.col("genre_name") != ""))
)

rt_pairs = (
    rt
    .select(F.col("movie_title").alias("title_raw"), F.col("genre").alias("raw"))
    .where(F.col("title_raw").isNotNull() & F.col("raw").isNotNull())
    .withColumn("k_title", kt(F.col("title_raw")))
    .withColumn("arr", F.split(F.col("raw"), r"\s*,\s*"))
    .withColumn("genre_name", F.explode(F.col("arr")))
    .withColumn("genre_name", kt(F.col("genre_name")))
    .select("k_title", "genre_name")
    .where(F.col("k_title").isNotNull() & (F.col("genre_name") != ""))
)

title_genres = titles_pairs.unionByName(rt_pairs).dropDuplicates(["k_title", "genre_name"])

# -------------------------
# 2) Lookups via DIMs
# -------------------------
dim_title_lu = (
    dim_title
    .select(F.col("id_title").alias("id_title"), kt(F.col("title")).alias("k_title"))
    .dropDuplicates(["k_title"])
)

dim_genre_lu = (
    dim_genre
    .select("id_genre", kt(F.col("genre")).alias("genre_name"))
    .dropDuplicates(["genre_name"])
)

title_genres_ids = (
    title_genres
    .join(dim_title_lu, "k_title", "inner")
    .join(dim_genre_lu, "genre_name", "inner")
    .select("id_title", "id_genre")
    .dropDuplicates()
)

# -------------------------
# 3) Criar id_group_genre por "conjunto de géneros" do título
# -------------------------
sig = (
    title_genres_ids
    .groupBy("id_title")
    .agg(F.sort_array(F.collect_set("id_genre")).alias("genres_set"))
    .withColumn("group_key", F.concat_ws("-", F.col("genres_set")))
    .select("id_title", "group_key", "genres_set")
)

group_dim = (
    sig.select("group_key")
    .dropDuplicates()
    .withColumn("id_group_genre", F.dense_rank().over(Window.orderBy("group_key")).cast("int"))
)

title_to_group_genre = (
    sig.join(group_dim, "group_key", "inner")
    .select("id_title", "id_group_genre", "genres_set")
)

bridge_launch_genre = (
    title_to_group_genre
    .select("id_group_genre", F.explode("genres_set").alias("id_genre"))
    .select("id_group_genre", "id_genre")
    .dropDuplicates()
)

# -------------------------
# 4) Write Gold
# -------------------------
bridge_launch_genre.write.mode("overwrite").parquet(OUT_BRIDGE)

# mapa para preencher launch.id_group_genre (join por id_title)
title_to_group_genre.select("id_title", "id_group_genre").write.mode("overwrite").parquet(OUT_MAP)

# quick checks
spark.read.parquet(OUT_BRIDGE).groupBy("id_group_genre").count().orderBy(F.desc("count")).show(10, truncate=False)
spark.read.parquet(OUT_BRIDGE).show(10, truncate=False)

+--------------+-----+
|id_group_genre|count|
+--------------+-----+
|890           |11   |
|889           |11   |
|1014          |11   |
|170           |11   |
|1024          |11   |
|174           |11   |
|412           |10   |
|891           |10   |
|173           |10   |
|176           |10   |
+--------------+-----+
only showing top 10 rows

+--------------+--------+
|id_group_genre|id_genre|
+--------------+--------+
|1             |1       |
|2             |1       |
|2             |10      |
|3             |1       |
|3             |10      |
|3             |11      |
|3             |12      |
|3             |32      |
|4             |1       |
|4             |10      |
+--------------+--------+
only showing top 10 rows



In [19]:
 dim_genre = spark.read.parquet("hdfs://hdfs-nn:9000/demo/golds/dim_genre")

(
    spark.read.parquet("hdfs://hdfs-nn:9000/demo/golds/bridge_launch_genre")
    .join(dim_genre, "id_genre", "left")
    .where(F.col("id_group_genre") == 2)  # por exemplo
    .orderBy("id_genre")
    .show(truncate=False)
)


+--------+--------------+-----------+
|id_genre|id_group_genre|genre      |
+--------+--------------+-----------+
|1       |2             |action     |
|10      |2             |documentary|
+--------+--------------+-----------+



In [20]:
#bridge_views_platform
from pyspark.sql import functions as F
from pyspark.sql.window import Window

HDFS_NN     = "hdfs://hdfs-nn:9000"
SILVER_BASE = f"{HDFS_NN}/demo/silver"
GOLD_BASE   = f"{HDFS_NN}/demo/golds"

DIM_TITLE_PATH    = f"{GOLD_BASE}/dim_title"
DIM_PLATFORM_PATH = f"{GOLD_BASE}/dim_platform"
TITLES_PATH       = f"{SILVER_BASE}/titles"

OUT_BRIDGE = f"{GOLD_BASE}/bridge_views_platform"
OUT_MAP    = f"{GOLD_BASE}/map_title_group_platform"

def kt(c):
    return F.lower(F.trim(c))

dim_title    = spark.read.parquet(DIM_TITLE_PATH)       # id_title, title, ...
dim_platform = spark.read.parquet(DIM_PLATFORM_PATH)    # id_platform, platform_name
titles       = spark.read.parquet(TITLES_PATH)          # title, netflix, amazon, ...

# 1) title -> platform_name (netflix/amazon flags)
title_platforms = (
    titles
    .select(
        kt(F.col("title")).alias("k_title"),
        F.col("netflix").cast("int").alias("netflix"),
        F.col("amazon").cast("int").alias("amazon")
    )
    .where(F.col("k_title").isNotNull())
    .withColumn(
        "platform_arr",
        F.array(
            F.when(F.col("netflix") == 1, F.lit("netflix")),
            F.when(F.col("amazon")  == 1, F.lit("amazon"))
        )
    )
    # array_compact remove NULLs do array de plataformas
    .withColumn("platform_arr", F.array_compact("platform_arr"))  # Spark 3.4+ / Databricks[web:166][web:170]
    .withColumn("platform_name", F.explode("platform_arr"))
    .select("k_title", kt(F.col("platform_name")).alias("platform_name"))
    .dropDuplicates()
)

# 2) lookups
dim_title_lu = (
    dim_title
    .select(F.col("id_title").alias("idtitle"), kt(F.col("title")).alias("k_title"))
    .dropDuplicates(["k_title"])
)

dim_platform_lu = (
    dim_platform
    .select("id_platform", kt(F.col("platform_name")).alias("platform_name"))
    .dropDuplicates(["platform_name"])
)

title_platforms_ids = (
    title_platforms
    .join(dim_title_lu, "k_title", "inner")
    .join(dim_platform_lu, "platform_name", "inner")
    .select("idtitle", "id_platform")
    .dropDuplicates()
)

# 3) Agrupar por conjunto de plataformas
sig = (
    title_platforms_ids
    .groupBy("idtitle")
    .agg(F.sort_array(F.collect_set("id_platform")).alias("platforms_set"))
    .withColumn("group_key", F.concat_ws("-", F.col("platforms_set")))
    .select("idtitle", "group_key", "platforms_set")
)

group_dim = (
    sig.select("group_key").dropDuplicates()
    .withColumn(
        "id_group_platform",
        F.dense_rank().over(Window.orderBy("group_key")).cast("int")
    )
)

map_title_group_platform = (
    sig.join(group_dim, "group_key", "inner")
    .select("idtitle", "id_group_platform", "platforms_set")
)

bridge_views_platform = (
    map_title_group_platform
    .select("id_group_platform", F.explode("platforms_set").alias("id_platform"))
    .dropDuplicates(["id_group_platform", "id_platform"])
)

# 4) Write Gold
bridge_views_platform.write.mode("overwrite").parquet(OUT_BRIDGE)
map_title_group_platform.select("idtitle", "id_group_platform").write.mode("overwrite").parquet(OUT_MAP)

spark.read.parquet(OUT_BRIDGE).show(10, truncate=False)


+-----------------+-----------+
|id_group_platform|id_platform|
+-----------------+-----------+
|2                |2          |
|1                |1          |
|2                |1          |
|3                |2          |
+-----------------+-----------+



In [21]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

HDFS_NN     = "hdfs://hdfs-nn:9000"
SILVER_BASE = f"{HDFS_NN}/demo/silver"
GOLD_BASE   = f"{HDFS_NN}/demo/golds"

def kt(col_):
    return F.lower(F.trim(col_))

def has_col(df, c):
    return c in df.columns

def pick_col(df, candidates, cast=None):
    """
    devolve a primeira coluna existente em df dentro de candidates.
    se nenhuma existir, devolve lit(None).
    """
    for c in candidates:
        if has_col(df, c):
            x = F.col(c)
            return x.cast(cast) if cast else x
    return F.lit(None).cast(cast) if cast else F.lit(None)

def norm_title_expr(c):
    """
    normalização forte para joins por título
    """
    return F.trim(F.regexp_replace(F.lower(F.col(c)), r"\s*\(.*?\)\s*", ""))  # remove (2019)


def write_gold_table(df, table_name, path):
    # evita caches/stale metadata
    spark.catalog.clearCache()
    spark.sql(f"DROP TABLE IF EXISTS {table_name}")

    # escreve a tabela external diretamente
    (df.write
       .mode("overwrite")
       .format("parquet")
       .option("path", path)
       .saveAsTable(table_name))

# --- usar aqui ---
write_gold_table(bridge_launch_genre, "golds.bridge_launch_genre", OUT_BRIDGE)

spark.sql("SELECT COUNT(*) AS n FROM golds.bridge_launch_genre").show()



+----+
|   n|
+----+
|6846|
+----+



In [22]:
#launch
from pyspark.sql import functions as F

HDFS_NN     = "hdfs://hdfs-nn:9000"
SILVER_BASE = f"{HDFS_NN}/demo/silver"
GOLD_BASE   = f"{HDFS_NN}/demo/golds"

BOX_PATH    = f"{SILVER_BASE}/boxoffice_info"
RT_PATH     = f"{SILVER_BASE}/rotten_tomatoes_movies"
TITLES_PATH = f"{SILVER_BASE}/titles"
MAP_GEN     = f"{GOLD_BASE}/map_title_group_genre"

OUT_PATH    = f"{GOLD_BASE}/launch"
DIM_TIME_PATH  = f"{GOLD_BASE}/dim_time"

def norm_title(c):
    x = F.lower(F.trim(c))
    x = F.regexp_replace(x, r"\s*[\(\[]\s*\d{4}\s*[\)\]]\s*", "")
    x = F.regexp_replace(x, r"[^a-z0-9\s]", " ")
    x = F.regexp_replace(x, r"\s+", " ")
    return F.trim(x)

def write_gold_table(df, table_name, path):
    spark.catalog.clearCache()
    spark.sql(f"DROP TABLE IF EXISTS {table_name}")
    (df.write
       .mode("overwrite")
       .format("parquet")
       .option("path", path)
       .saveAsTable(table_name))
    spark.sql(f"REFRESH TABLE {table_name}")

# -------------------------------------------------
# 1) Ler dim_title SEM inventar (já está fix como LONG)
# -------------------------------------------------
spark.catalog.clearCache()
spark.sql("REFRESH TABLE golds.dim_title")
dim_title = spark.table("golds.dim_title")

# Base: id_title (LONG) + title + key k
if "id_title" not in dim_title.columns or "title" not in dim_title.columns:
    raise Exception("golds.dim_title tem de ter colunas id_title e title")

base = (dim_title
    .select(
        F.col("id_title").cast("long").alias("id_title"),
        F.col("title").cast("string").alias("title_dim")
    )
    .filter(F.col("title_dim").isNotNull() & (F.length(F.trim(F.col("title_dim"))) > 0))
    .dropDuplicates(["id_title"])
    .withColumn("k", norm_title(F.col("title_dim")))
    .cache()
)

print("base rows:", base.count())
base.select("id_title","title_dim").show(5, truncate=False)

# -------------------------------------------------
# 2) Dim_time (metastore -> fallback path)
# -------------------------------------------------
spark.catalog.clearCache()
try:
    spark.sql("REFRESH TABLE golds.dim_time")
    dim_time = spark.table("golds.dim_time")
    _ = dim_time.limit(1).count()
except:
    dim_time = spark.read.parquet(DIM_TIME_PATH)

dim_time = (dim_time
    .select(F.col("id_time").cast("int").alias("id_time"),
            F.to_date("date").alias("date"))
    .dropDuplicates(["date"])
    .cache()
)
_ = dim_time.count()

# -------------------------------------------------
# 3) TITLES: release_year + plataformas p/ flag_streaming
# -------------------------------------------------
titles = spark.read.parquet(TITLES_PATH)
title_col = "title" if "title" in titles.columns else ("name" if "name" in titles.columns else None)
if title_col is None:
    raise Exception("silver/titles não tem coluna title/name")

t2 = (titles
    .select(
        norm_title(F.col(title_col)).alias("k"),
        (F.col("release_year").cast("int") if "release_year" in titles.columns else F.lit(None).cast("int")).alias("release_year"),
        (F.coalesce(F.col("netflix").cast("int"), F.lit(0)) if "netflix" in titles.columns else F.lit(0)).alias("netflix"),
        (F.coalesce(F.col("amazon").cast("int"),  F.lit(0)) if "amazon"  in titles.columns else F.lit(0)).alias("amazon"),
    )
    .dropDuplicates(["k"])
    .withColumn(
        "launch_date_titles",
        F.when(F.col("release_year").isNotNull(),
               F.to_date(F.concat_ws("-", F.col("release_year").cast("string"), F.lit("01"), F.lit("01"))))
         .otherwise(F.lit(None).cast("date"))
    )
    .withColumn("has_streaming_platform", (F.col("netflix") == 1) | (F.col("amazon") == 1))
    .select("k","launch_date_titles","has_streaming_platform")
)

# -------------------------------------------------
# 4) BOXOFFICE
# -------------------------------------------------
box = spark.read.parquet(BOX_PATH)
box_title_col = "title" if "title" in box.columns else ("movie_title" if "movie_title" in box.columns else None)
if box_title_col is None:
    raise Exception("boxoffice_info não tem title/movie_title")

box2 = (box
    .select(
        norm_title(F.col(box_title_col)).alias("k"),
        (F.to_date(F.col("released").cast("string")) if "released" in box.columns else F.lit(None).cast("date")).alias("launch_date_box"),
        (F.regexp_replace(F.col("gross").cast("string"),  r"[^0-9]", "").cast("int") if "gross" in box.columns else F.lit(None).cast("int")).alias("gross"),
        (F.regexp_replace(F.col("budget").cast("string"), r"[^0-9]", "").cast("int") if "budget" in box.columns else F.lit(None).cast("int")).alias("budget"),
    )
    .groupBy("k")
    .agg(F.max("launch_date_box").alias("launch_date_box"),
         F.max("gross").alias("gross"),
         F.max("budget").alias("budget"))
)

# -------------------------------------------------
# 5) RT
# -------------------------------------------------
rt = spark.read.parquet(RT_PATH)
rt_title_col = "movie_title" if "movie_title" in rt.columns else ("title" if "title" in rt.columns else None)
if rt_title_col is None:
    raise Exception("rotten_tomatoes_movies não tem title/movie_title")

rt2 = (rt
    .select(
        norm_title(F.col(rt_title_col)).alias("k"),
        (F.to_date(F.col("in_theaters_date")) if "in_theaters_date" in rt.columns else F.lit(None).cast("date")).alias("in_theaters_date"),
        (F.to_date(F.col("on_streaming_date")) if "on_streaming_date" in rt.columns else F.lit(None).cast("date")).alias("on_streaming_date"),
    )
    .groupBy("k")
    .agg(F.max("in_theaters_date").alias("in_theaters_date"),
         F.max("on_streaming_date").alias("on_streaming_date"))
)

# -------------------------------------------------
# 6) JOINs + launch_date_final + id_time
# -------------------------------------------------
j = (base
    .join(t2, "k", "left")
    .join(box2, "k", "left")
    .join(rt2, "k", "left")
)

j = j.withColumn(
    "launch_date_final",
    F.coalesce("launch_date_titles", "launch_date_box", "in_theaters_date", "on_streaming_date")
)

j = j.join(dim_time, dim_time.date == F.col("launch_date_final"), "left")

# genre map (se existir)
try:
    map_genre = (spark.read.parquet(MAP_GEN)
        .select(F.col("id_title").cast("long").alias("id_title"),
                F.col("id_group_genre").cast("int").alias("id_group_genre"))
        .dropDuplicates(["id_title"])
    )
    j = j.join(map_genre, "id_title", "left")
except Exception as e:
    print("Sem MAP_GEN (map_title_group_genre):", e)
    j = j.withColumn("id_group_genre", F.lit(None).cast("int"))

# -------------------------------------------------
# 7) FLAGS 0/1 (INT) 
# -------------------------------------------------
j = (j
    .withColumn("flag_theaters",
        F.when(F.col("in_theaters_date").isNotNull(), 1).otherwise(0).cast("int")
    )
    .withColumn("flag_streaming",
        F.when(
            (F.col("on_streaming_date").isNotNull()) | (F.coalesce(F.col("has_streaming_platform"), F.lit(False))),
            1
        ).otherwise(0).cast("int")
    )
)

# -------------------------------------------------
# 8) OUTPUT
# -------------------------------------------------
launch_out = (j.select(
        F.col("id_title").cast("long").alias("id_title"),
        F.col("id_time").cast("int").alias("id_time"),
        F.col("id_group_genre").cast("int").alias("id_group_genre"),
        F.col("flag_theaters").cast("int").alias("flag_theaters"),
        F.col("flag_streaming").cast("int").alias("flag_streaming"),
        F.col("gross").cast("int").alias("gross"),
        F.col("budget").cast("int").alias("budget")
    ).dropDuplicates(["id_title"])
)

print("launch_out rows:", launch_out.count())
print("id_time null:", launch_out.filter(F.col("id_time").isNull()).count())
launch_out.show(10, truncate=False)

# -------------------------------------------------
# 9) Escrever tabela
# -------------------------------------------------
write_gold_table(launch_out, "golds.launch", OUT_PATH)
spark.sql("SELECT COUNT(*) AS n FROM golds.launch").show()


base rows: 9999
+--------+---------------------------------+
|id_title|title_dim                        |
+--------+---------------------------------+
|26      |...And Your Name Is Jonah        |
|29      |10 Days In Sun City              |
|474     |A Walk Among the Tombstones      |
|964     |An Evening with Beverly Luff Linn|
|1677    |Beyblade Burst Rise              |
+--------+---------------------------------+
only showing top 5 rows

launch_out rows: 9999
id_time null: 132
+--------+--------+--------------+-------------+--------------+---------+---------+
|id_title|id_time |id_group_genre|flag_theaters|flag_streaming|gross    |budget   |
+--------+--------+--------------+-------------+--------------+---------+---------+
|1       |20180101|null          |0            |1             |null     |null     |
|2       |20200101|67            |1            |1             |36733909 |32000000 |
|3       |20190101|719           |0            |1             |null     |null     |
|4       |

In [23]:
#awards
from pyspark.sql import functions as F

HDFS_NN     = "hdfs://hdfs-nn:9000"
SILVER_BASE = f"{HDFS_NN}/demo/silver"
GOLD_BASE   = f"{HDFS_NN}/demo/golds"

OSC_PATH = f"{SILVER_BASE}/the_oscar_award"
OUT_PATH = f"{GOLD_BASE}/awards"

def kt(c):
    # normalização simples (bate com a tua limpeza do silver)
    return F.lower(F.trim(c))

def write_gold_table(df, table_name, path):
    spark.catalog.clearCache()
    spark.sql(f"DROP TABLE IF EXISTS {table_name}")
    (df.write
       .mode("overwrite")
       .format("parquet")
       .option("path", path)
       .saveAsTable(table_name))
    spark.sql(f"REFRESH TABLE {table_name}")

# -----------------------------
# 1) Ler Oscars (silver)
# -----------------------------
osc = spark.read.parquet(OSC_PATH)

# garantir colunas necessárias
need = ["year_ceremony", "canon_category", "name", "film", "winner"]
missing = [c for c in need if c not in osc.columns]
if missing:
    raise Exception(f"Faltam colunas no silver/the_oscar_award: {missing}. Tenho: {osc.columns}")

# -----------------------------
# 2) Ler dims (golds) — IMPORTANTE: id_title como LONG
# -----------------------------
dim_title = (spark.table("golds.dim_title")
    .select(
        F.col("id_title").cast("long").alias("id_title"),
        kt(F.col("title")).alias("k_title")
    )
    .dropDuplicates(["k_title"])
)

dim_person = (spark.table("golds.dim_person")
    .select(
        F.col("id_person").cast("int").alias("id_person"),
        kt(F.col("name")).alias("k_name")
    )
    .dropDuplicates(["k_name"])
)

dim_category = (spark.table("golds.dim_category")
    .select(
        F.col("id_award").cast("int").alias("id_award"),
        kt(F.col("canonical_category")).alias("k_cat")
    )
    .dropDuplicates(["k_cat"])
)

# -----------------------------
# 3) Preparar osc: keys + id_time
# id_time = YYYY0101 (int)  -> se quiseres depois eu faço join com dim_time real
# -----------------------------
aw = (osc
    .withColumn("k_title", kt(F.col("film")))
    .withColumn("k_name",  kt(F.col("name")))
    .withColumn("k_cat",   kt(F.col("canon_category")))
    .withColumn("id_time", (F.col("year_ceremony").cast("int") * F.lit(10000) + F.lit(101)).cast("int"))
)

# -----------------------------
# 4) winner -> 0/1 int (robusto)
# -----------------------------
winner01 = (
    F.when(F.col("winner") == True, 1)
     .when(F.lower(F.col("winner").cast("string")) == "true", 1)
     .when(F.col("winner").cast("int") == 1, 1)
     .otherwise(0)
     .cast("int")
)

# -----------------------------
# 5) Construir fact awards (tudo 0/1 em INT)
# -----------------------------
awards_fact = (
    aw.join(dim_title,    "k_title", "left")
      .join(dim_person,   "k_name",  "left")
      .join(dim_category, "k_cat",   "left")
      .withColumn("is_nomination",    F.lit(1).cast("int"))
      .withColumn("is_winner",        winner01)
      .withColumn("is_oscar_nominee", F.lit(1).cast("int"))
      .withColumn("is_oscar_winner",  winner01)
      .select(
          F.col("id_award").cast("int").alias("id_award"),
          F.col("id_title").cast("long").alias("id_title"),   # LONG ✅
          F.col("id_time").cast("int").alias("id_time"),
          F.col("id_person").cast("int").alias("id_person"),
          F.col("is_nomination").cast("int").alias("is_nomination"),
          F.col("is_winner").cast("int").alias("is_winner"),
          F.col("is_oscar_nominee").cast("int").alias("is_oscar_nominee"),
          F.col("is_oscar_winner").cast("int").alias("is_oscar_winner"),
      )
      .dropDuplicates(["id_award", "id_title", "id_time", "id_person"])
)

print("awards_fact rows:", awards_fact.count())
print("null id_title:", awards_fact.filter(F.col("id_title").isNull()).count())
print("null id_person:", awards_fact.filter(F.col("id_person").isNull()).count())
print("null id_award:", awards_fact.filter(F.col("id_award").isNull()).count())
awards_fact.show(20, truncate=False)

# -----------------------------
# 6) Escrever tabela
# -----------------------------
write_gold_table(awards_fact, "golds.awards", OUT_PATH)
spark.sql("SELECT COUNT(*) AS n FROM golds.awards").show()


awards_fact rows: 6178
null id_title: 5346
null id_person: 2506
null id_award: 0
+--------+--------+--------+---------+-------------+---------+----------------+---------------+
|id_award|id_title|id_time |id_person|is_nomination|is_winner|is_oscar_nominee|is_oscar_winner|
+--------+--------+--------+---------+-------------+---------+----------------+---------------+
|44      |null    |20220101|null     |1            |0        |1               |0              |
|33      |null    |19640101|null     |1            |0        |1               |0              |
|41      |null    |19370101|39107    |1            |1        |1               |1              |
|55      |null    |20180101|null     |1            |0        |1               |0              |
|23      |null    |19660101|null     |1            |0        |1               |0              |
|16      |null    |19540101|null     |1            |1        |1               |1              |
|2       |null    |19850101|null     |1            |1  

In [7]:
#rating
from pyspark.sql import functions as F
from pyspark.sql import DataFrame

HDFS_NN = "hdfs://hdfs-nn:9000"
SILVER  = f"{HDFS_NN}/demo/silver"
GOLD    = f"{HDFS_NN}/demo/golds"

RT_SILVER   = f"{SILVER}/rotten_tomatoes_movies"
TITLES_PATH = f"{SILVER}/titles"
DRS_PATH    = f"{GOLD}/dim_rating_status"
RATING_PATH = f"{GOLD}/rating"

cols = [
    "id_title",
    "id_rating_source_status",
    "id_time",
    "critic_score",
    "audience_score",
    "critic_reviews",
    "audience_reviews",
    "rt_critic_audience",
    "popularity_index"
]

def ensure_cols(df: DataFrame) -> DataFrame:
    for c in cols:
        if c not in df.columns:
            df = df.withColumn(c, F.lit(None))
    return df.select(cols)

def norm_title(c):
    x = F.lower(F.trim(c))
    x = F.regexp_replace(x, r"\s*\(.*?\)\s*", " ")
    x = F.regexp_replace(x, r"[^\p{L}\p{N}\s]", " ")
    x = F.regexp_replace(x, r"\s+", " ")
    return F.trim(x)

def norm_key(c):
    x = F.lower(F.trim(c))
    x = F.regexp_replace(x, r"[^\p{L}\p{N}\s]", " ")
    x = F.regexp_replace(x, r"\s+", " ")
    return F.trim(x)

# 1) Rotten completo
rt = spark.read.parquet(RT_SILVER)

dim_title_keys = (
    spark.read.parquet(f"{GOLD}/dim_title")
    .select(
        F.col("id_title").cast("bigint").alias("id_title"),
        norm_title(F.col("title")).alias("k_title")
    )
    .dropDuplicates(["k_title"])
)

dim_rs_keys = (
    spark.read.parquet(DRS_PATH)
    .select(
        F.col("id_rating_source_status").cast("bigint").alias("id_rating_source_status"),
        norm_key(F.col("source")).alias("k_source"),
        norm_key(F.col("tomatometer_status")).alias("k_status")
    )
    .dropDuplicates(["k_source", "k_status"])
)

rt_pre = (
    rt
    .withColumn("in_theaters_date", F.to_date("in_theaters_date"))
    .withColumn("on_streaming_date", F.to_date("on_streaming_date"))
    .withColumn("rating_date", F.coalesce(F.col("in_theaters_date"), F.col("on_streaming_date")))
    .withColumn(
        "id_time",
        F.when(F.col("rating_date").isNotNull(), F.date_format("rating_date", "yyyyMMdd").cast("int"))
         .otherwise(F.lit(None).cast("int"))
    )
    .withColumn("k_title", norm_title(F.col("movie_title")))
    .withColumn("k_source", norm_key(F.lit("rotten_tomatoes")))
    .withColumn("k_status", norm_key(F.col("tomatometer_status")))
)

rating_rot = (
    rt_pre
    .join(dim_title_keys, "k_title", "left")
    .join(dim_rs_keys, ["k_source", "k_status"], "left")
    .withColumn("critic_score", F.col("tomatometer_rating").cast("double"))
    .withColumn("audience_score", F.col("audience_rating").cast("double"))
    .withColumn("critic_reviews", F.col("tomatometer_count").cast("int"))
    .withColumn("audience_reviews", F.col("audience_count").cast("int"))
    .withColumn(
        "rt_critic_audience",
        (F.col("critic_score") - F.col("audience_score")).cast("double")
    )
    .withColumn(
        "popularity_index",
        (
            F.coalesce(F.col("critic_reviews").cast("double"), F.lit(0.0)) +
            F.coalesce(F.col("audience_reviews").cast("double"), F.lit(0.0))
        ).cast("double")
    )
    .select(
        F.col("id_title").cast("bigint").alias("id_title"),
        F.col("id_rating_source_status").cast("bigint").alias("id_rating_source_status"),
        F.col("id_time").cast("int").alias("id_time"),
        "critic_score",
        "audience_score",
        "critic_reviews",
        "audience_reviews",
        "rt_critic_audience",
        "popularity_index"
    )
    .dropDuplicates(["id_title", "id_rating_source_status", "id_time"])
)

# 2) IMDB/TMDB completos
titles = spark.read.parquet(TITLES_PATH)
drs    = spark.read.parquet(DRS_PATH)

imdb_key = drs.filter((F.col("source") == "imdb") & (F.col("tomatometer_status") == "score")) \
              .select("id_rating_source_status").first()
tmdb_key = drs.filter((F.col("source") == "tmdb") & (F.col("tomatometer_status") == "score")) \
              .select("id_rating_source_status").first()

id_imdb = imdb_key["id_rating_source_status"]
id_tmdb = tmdb_key["id_rating_source_status"]

imdb_df = (
    titles
    .select(
        F.col("id_title").cast("bigint").alias("id_title"),
        F.col("imdb_score").cast("double").alias("score"),
        F.col("imdb_votes").cast("int").alias("votes")
    )
    .filter(F.col("score").isNotNull())
    .withColumn("id_rating_source_status", F.lit(id_imdb).cast("bigint"))
    .withColumn("id_time", F.lit(None).cast("int"))
    .withColumn("critic_score", F.lit(None).cast("double"))
    .withColumn("audience_score", F.col("score"))
    .withColumn("critic_reviews", F.lit(None).cast("int"))
    .withColumn("audience_reviews", F.col("votes"))
    .withColumn("rt_critic_audience", F.lit(None).cast("double"))
    .withColumn("popularity_index", F.col("votes").cast("double"))
    .select(cols)
)

tmdb_df = (
    titles
    .select(
        F.col("id_title").cast("bigint").alias("id_title"),
        F.col("tmdb_score").cast("double").alias("score"),
        F.col("tmdb_popularity").cast("double").alias("popularity")
    )
    .filter(F.col("score").isNotNull())
    .withColumn("id_rating_source_status", F.lit(id_tmdb).cast("bigint"))
    .withColumn("id_time", F.lit(None).cast("int"))
    .withColumn("critic_score", F.lit(None).cast("double"))
    .withColumn("audience_score", F.col("score"))
    .withColumn("critic_reviews", F.lit(None).cast("int"))
    .withColumn("audience_reviews", F.lit(None).cast("int"))
    .withColumn("rt_critic_audience", F.lit(None).cast("double"))
    .withColumn("popularity_index", F.col("popularity"))
    .select(cols)
)

rating_union = (
    ensure_cols(rating_rot)
    .unionByName(ensure_cols(imdb_df))
    .unionByName(ensure_cols(tmdb_df))
    .dropDuplicates(["id_title", "id_rating_source_status", "id_time"])
    .repartition(4)   # ajuda a controlar o número de ficheiros
)

(
    rating_union
    .write
    .mode("overwrite")
    .format("parquet")
    .option("path", RATING_PATH)
    .saveAsTable("golds.rating")
)


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
#views

from pyspark.sql import functions as F
from pyspark.sql.window import Window

HDFS_NN     = "hdfs://hdfs-nn:9000"
SILVER_BASE = f"{HDFS_NN}/demo/silver"
GOLD_BASE   = f"{HDFS_NN}/demo/golds"

PIRACY_PATH = f"{SILVER_BASE}/dataset_piracy"
DIM_TITLE_PATH = f"{GOLD_BASE}/dim_title"
MAP_TITLE_GROUP_PLATFORM_PATH = f"{GOLD_BASE}/map_title_group_platform"

OUT_VIEWS = f"{GOLD_BASE}/views"

# ---------- helpers ----------
def kt(c):
    return F.lower(F.trim(c))

def pick_col(df, candidates):
    cols = set(df.columns)
    for c in candidates:
        if c in cols:
            return c
    return None

def to_date_flex(col):
    # tenta vários formatos comuns
    return F.coalesce(
        F.to_date(col),
        F.to_date(col, "yyyy-MM-dd"),
        F.to_date(col, "yyyy/MM/dd"),
        F.to_date(col, "dd-MM-yyyy"),
        F.to_date(col, "dd/MM/yyyy"),
        F.to_date(col, "MM/dd/yyyy"),
        F.to_date(col, "MM-dd-yyyy"),
    )

# ---------- read sources ----------
pir = spark.read.parquet(PIRACY_PATH)

dim_title = spark.read.parquet(DIM_TITLE_PATH)

# id_title pode ser id_title ou idtitle, normalizamos
id_title_col = "id_title" if "id_title" in dim_title.columns else (
    "idtitle" if "idtitle" in dim_title.columns else None
)
if id_title_col is None:
    raise Exception("Não encontrei coluna id_title nem idtitle em golds.dim_title")

dim_title_lu = (
    dim_title
    .select(
        F.col(id_title_col).cast("bigint").alias("id_title"),
        kt(F.col("title")).alias("k_title")
    )
    .dropDuplicates(["k_title"])
)

map_gp = spark.read.parquet(MAP_TITLE_GROUP_PLATFORM_PATH)

map_id_col = "id_title" if "id_title" in map_gp.columns else (
    "idtitle" if "idtitle" in map_gp.columns else None
)
if map_id_col is None:
    raise Exception("Não encontrei coluna id_title nem idtitle em golds.map_title_group_platform")

map_platform = (
    map_gp
    .select(
        F.col(map_id_col).cast("bigint").alias("id_title"),
        F.col("id_group_platform").cast("int").alias("id_group_platform")
    )
    .dropDuplicates(["id_title"])
)

# ---------- detect columns in piracy ----------
title_col   = pick_col(pir, ["title", "movie_title", "movie", "film", "name", "movie_name"])
views_col   = pick_col(pir, ["views", "view_count", "views_pirate", "views_price"])
down_col    = pick_col(pir, ["downloads", "download_count", "downloads_pirate", "downloads_price"])
posted_col  = pick_col(pir, ["posted_date", "post_date", "date_posted", "leak_date"])
release_col = pick_col(pir, ["release_date", "released_date", "date_release", "premiere_date"])

if title_col is None or posted_col is None:
    raise Exception(
        f"Não encontrei colunas essenciais no piracy: title_col={title_col}, posted_col={posted_col}"
    )

# ---------- staging ----------
pir2 = (
    pir
    .withColumn("title_raw", F.col(title_col).cast("string"))
    .withColumn("k_title", kt(F.col("title_raw")))
    .withColumn("posted_date", to_date_flex(F.col(posted_col).cast("string")))
    .withColumn(
        "release_date",
        to_date_flex(F.col(release_col).cast("string")) if release_col else F.lit(None).cast("date")
    )
    .withColumn(
        "views_pirate",
        F.col(views_col).cast("long") if views_col else F.lit(None).cast("long")
    )
    .withColumn(
        "downloads_pirate",
        F.col(down_col).cast("long") if down_col else F.lit(None).cast("long")
    )
)

# id_time = yyyyMMdd do posted_date
pir2 = pir2.withColumn(
    "id_time",
    F.when(
        F.col("posted_date").isNotNull(),
        F.date_format(F.col("posted_date"), "yyyyMMdd").cast("int")
    ).otherwise(F.lit(None).cast("int"))
)

# join para id_title
views_staging = (
    pir2
    .join(dim_title_lu, "k_title", "left")
)

# join para id_group_platform (SEM forçar 0; deixamos NULL quando não há mapeamento)
views_staging = (
    views_staging
    .join(map_platform, "id_title", "left")
    .withColumn("id_group_platform", F.col("id_group_platform").cast("int"))
)

# days_since_release = posted_date - release_date
views_staging = views_staging.withColumn(
    "days_since_release",
    F.when(
        F.col("posted_date").isNotNull() & F.col("release_date").isNotNull(),
        F.datediff(F.col("posted_date"), F.col("release_date"))
    ).otherwise(F.lit(None).cast("int"))
)

# first_leak_flag = 1 no primeiro posted_date por id_title
w = Window.partitionBy("id_title").orderBy(F.col("posted_date").asc_nulls_last())
views_staging = (
    views_staging
    .withColumn("rn_first", F.row_number().over(w))
    .withColumn(
        "first_leak_flag",
        F.when(F.col("rn_first") == 1, F.lit(True)).otherwise(F.lit(False))
    )
    .drop("rn_first")
)

# ---------- AGREGAR para evitar duplicados (mesma PK) ----------
# PK: id_title, id_time, id_group_platform
views_fact = (
    views_staging
    .groupBy("id_title", "id_time", "id_group_platform")
    .agg(
        F.max("views_pirate").alias("views_pirate"),
        F.max("downloads_pirate").alias("downloads_pirate"),
        F.max(F.col("first_leak_flag").cast("int")).cast("int").alias("first_leak_flag"),
        F.min("days_since_release").alias("days_since_release")
    )
    .select(
        F.col("id_title").cast("bigint").alias("id_title"),
        F.col("id_time").cast("int").alias("id_time"),
        F.col("id_group_platform").cast("int").alias("id_group_platform"),
        F.col("views_pirate").cast("long").alias("views_pirate"),
        F.col("downloads_pirate").cast("long").alias("downloads_pirate"),
        F.col("first_leak_flag").cast("int").alias("first_leak_flag"),
        F.col("days_since_release").cast("int").alias("days_since_release")
    )
)

# ---------- write ----------
(
    views_fact.write
    .mode("overwrite")
    .format("parquet")
    .option("path", OUT_VIEWS)
    .saveAsTable("golds.views")
)

# ---------- quick checks ----------
spark.sql("SELECT COUNT(*) AS n FROM golds.views").show()

spark.sql("""
SELECT
  SUM(CASE WHEN id_title IS NULL THEN 1 ELSE 0 END) AS null_id_title,
  SUM(CASE WHEN id_time IS NULL THEN 1 ELSE 0 END) AS null_id_time,
  SUM(CASE WHEN id_group_platform IS NULL THEN 1 ELSE 0 END) AS null_id_group_platform
FROM golds.views
""").show()

spark.sql("""
SELECT id_group_platform, COUNT(*) AS n
FROM golds.views
GROUP BY id_group_platform
ORDER BY id_group_platform
""").show(truncate=False)

spark.sql("""
SELECT *
FROM golds.views
LIMIT 200
""").show(truncate=False)


In [1]:
#participation
from pyspark.sql import functions as F

HDFS_NN = "hdfs://hdfs-nn:9000"
SILVER  = f"{HDFS_NN}/demo/silver"
GOLD    = f"{HDFS_NN}/demo/golds"

ACTOR_FILMS_PATH = f"{SILVER}/actorfilms"
OUT_PATH         = f"{GOLD}/participation"

# --------- helpers ----------
def norm_name(c):
    x = F.lower(F.trim(c))
    x = F.regexp_replace(x, r"\s+", " ")
    return F.trim(x)

def norm_title(c):
    x = F.lower(F.trim(c))
    x = F.regexp_replace(x, r"\s*\(.*?\)\s*", " ")
    x = F.regexp_replace(x, r"[^\p{L}\p{N}\s]", " ")
    x = F.regexp_replace(x, r"\s+", " ")
    return F.trim(x)

# --------- ler actor_films ----------
actor_films = spark.read.parquet(ACTOR_FILMS_PATH)

# garantir colunas essenciais
need = ["Actor", "Film", "Year"]
missing = [c for c in need if c not in actor_films.columns]
if missing:
    raise Exception(f"Faltam colunas em silver/actor_films: {missing}. Tenho: {actor_films.columns}")

af = (
    actor_films
    .select(
        norm_name(F.col("Actor")).alias("k_name"),
        norm_title(F.col("Film")).alias("k_title"),
        F.col("Year").cast("int").alias("year"),
    )
    .filter(F.col("k_name").isNotNull() & F.col("k_title").isNotNull())
    .dropDuplicates()
)

# --------- dims ----------
dim_person = (
    spark.table("golds.dim_person")
    .select(
        F.col("id_person").cast("int").alias("id_person"),
        norm_name(F.col("name")).alias("k_name"),
        F.col("person_type")
    )
    .dropDuplicates(["k_name", "person_type"])
)

dim_title = (
    spark.table("golds.dim_title")
    .select(
        F.col("id_title").cast("bigint").alias("id_title"),
        norm_title(F.col("title")).alias("k_title")
    )
    .dropDuplicates(["id_title", "k_title"])
)

dim_time = (
    spark.table("golds.dim_time")
    .select(
        F.col("id_time").cast("int").alias("id_time"),
        F.col("year").cast("int").alias("year")
    )
    .dropDuplicates(["id_time", "year"])
)

# mapear Year -> id_time (usa linhas ano-only / primeiro id_time do ano)
year_to_id_time = (
    dim_time
    .groupBy("year")
    .agg(F.min("id_time").alias("id_time_year"))
)

# --------- juntar título + tempo ----------
af_title = (
    af.join(dim_title, "k_title", "inner")
)

af_title_time = (
    af_title
    .join(year_to_id_time, af_title.year == year_to_id_time.year, "left")
    .select(
        af_title.k_name,
        af_title.id_title,
        af_title.year,
        F.col("id_time_year").cast("int").alias("id_time")
    )
)

# --------- juntar dim_person ----------
# actor_films traz só actores, portanto person_type esperado = 'actor' na dim_person
part_join = (
    af_title_time
    .join(
        dim_person.filter(F.col("person_type") == "actor"),
        on="k_name",
        how="inner"
    )
    .select(
        F.col("id_person").cast("int").alias("id_person"),
        F.col("id_title").cast("bigint").alias("id_title"),
        F.col("id_time").cast("int").alias("id_time"),
        F.col("person_type")
    )
)

participation_fact = (
    part_join
    .dropDuplicates(["id_person", "id_title", "id_time"])
)

# --------- write ----------
(
    participation_fact
    .write
    .mode("overwrite")
    .format("parquet")
    .option("path", OUT_PATH)
    .saveAsTable("golds.participation")
)

# --------- quick checks ----------
spark.sql("SELECT COUNT(*) AS n FROM golds.participation").show()

spark.sql("""
SELECT
  SUM(CASE WHEN id_person IS NULL THEN 1 ELSE 0 END) AS null_id_person,
  SUM(CASE WHEN id_title  IS NULL THEN 1 ELSE 0 END) AS null_id_title,
  SUM(CASE WHEN id_time   IS NULL THEN 1 ELSE 0 END) AS null_id_time
FROM golds.participation
""").show()

spark.sql("""
SELECT person_type, COUNT(*) AS n
FROM golds.participation
GROUP BY person_type
ORDER BY n DESC
""").show(truncate=False)

spark.sql("SELECT * FROM golds.participation LIMIT 50").show(truncate=False)


NameError: name 'spark' is not defined

In [None]:
spark.stop()