In [11]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Transform_Data_Spark") \
    .master("spark://spark-master:7077") \
    .getOrCreate()

In [12]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import split, explode, trim, col, monotonically_increasing_id

def create_dim_table_from_column(df: DataFrame, column_name: str, delimiter: str = ",", id_column: str = "id", value_column: str = "value") -> DataFrame:
    """
    Create a dimension table with distinct values from a delimited string column,
    and assign a unique ID using monotonically_increasing_id.

    Parameters:
    - df: Input Spark DataFrame
    - column_name: Name of the column to extract distinct values from
    - delimiter: Delimiter to split the values (default is comma)
    - id_column: Name for the generated ID column (default is 'id')
    - value_column: Name for the value column (default is 'value')

    Returns:
    - A DataFrame with columns [id_column, value_column]
    """
    return (
        df.select(explode(split(col(column_name), delimiter + r"\s*")).alias(value_column))
          .select(trim(col(value_column)).alias(value_column))
          .distinct()
          .withColumn(id_column, monotonically_increasing_id())
          .select(id_column, value_column)
    )

def create_bridge_table(
    fact_df: DataFrame,
    dim_df: DataFrame,
    fact_id_col: str,
    fact_list_col: str,
    dim_value_col: str,
    dim_id_col: str,
    delimiter: str = ", ",
    bridge_id_col: str = "bridge_id"
) -> DataFrame:
    """
    Create a bridge (many-to-many) table between a fact table and a dimension table.

    Parameters:
    - fact_df: Source DataFrame containing list column (e.g. genres)
    - dim_df: Dimension DataFrame with distinct values and IDs
    - fact_id_col: Name of ID column in the fact table (e.g. movie_id)
    - fact_list_col: Name of delimited string column in the fact table (e.g. genres)
    - dim_value_col: Name of value column in the dimension table (e.g. genre_name)
    - dim_id_col: Name of ID column in the dimension table (e.g. genre_id)
    - delimiter: Delimiter to split the values (default: ', ')
    - bridge_id_col: Name of ID column to generate in the bridge table

    Returns:
    - DataFrame with bridge table [bridge_id, fact_id_col, dim_id_col]
    """
    exploded_df = (
        fact_df.select(col(fact_id_col), col(fact_list_col))
               .withColumn(fact_list_col, split(col(fact_list_col), delimiter))
               .withColumn(fact_list_col, explode(col(fact_list_col)))
               .select(col(fact_id_col), trim(col(fact_list_col)).alias(fact_list_col))
    )

    joined_df = (
        exploded_df.join(
            dim_df,
            exploded_df[fact_list_col] == dim_df[dim_value_col],
            "inner"
        )
    )

    bridge_df = (
        joined_df.select(col(fact_id_col), col(dim_id_col))
                 .withColumn(bridge_id_col, monotonically_increasing_id())
                 .select(bridge_id_col, fact_id_col, dim_id_col)
    )

    return bridge_df

In [6]:
# CLEAN_DATA_PATH = "/opt/bitnami/spark/resources/dataset/cleaned_data"

# df = spark.read \
#     .option("header", True) \
#     .option("inferSchema", True) \
#     .csv(CLEAN_DATA_PATH)

# # Create Dim Movie
# dim_movie_df = df.select("id", "status", "release_date", "adult", "original_language", "overview") \
#                  .withColumnRenamed("id", "movie_id")

# # Create Fact Movie
# fact_movie_df = df.select("id", "vote_average", "popularity", "vote_count", "budget", "revenue", "runtime") \
#                   .withColumnRenamed("id", "movie_id")

# # Create Dim Genres
# dim_genres_df = create_dim_table_from_column(
#     df, "genres", id_column="genre_id", value_column="genre_name"
# )

# # Create Dim Keyword
# dim_keyword_df = create_dim_table_from_column(
#     df, "keywords", id_column="keyword_id", value_column="keyword_name"
# )

# # Create Dim Production Company
# dim_production_company_df = create_dim_table_from_column(
#     df, "production_companies", id_column="company_id", value_column="company_name"
# )

# # Create Dim Production Country
# dim_production_country_df = create_dim_table_from_column(
#     df, "production_countries", id_column="country_id", value_column="country_name"
# )

# # Create Dim Spoken Language
# dim_spoken_language_df = create_dim_table_from_column(
#     df, "spoken_languages", id_column="language_id", value_column="language_name"
# )

# # Create Bridge Table
# bridge_movie_genre_df = create_bridge_table(
#     fact_df=df,
#     dim_df=dim_genres_df,
#     fact_id_col="id",
#     fact_list_col="genres",
#     dim_value_col="genre_name",
#     dim_id_col="genre_id",
#     bridge_id_col="bridge_id"
# )
# bridge_movie_genre_df = bridge_movie_genre_df.withColumnRenamed("id", "movie_id")

# bridge_movie_keyword_df = create_bridge_table(
#     fact_df=df,
#     dim_df=dim_keyword_df,
#     fact_id_col="id",
#     fact_list_col="keywords",
#     dim_value_col="keyword_name",
#     dim_id_col="keyword_id",
#     bridge_id_col="bridge_id"
# )
# bridge_movie_keyword_df = bridge_movie_keyword_df.withColumnRenamed("id", "movie_id")

# bridge_movie_company_df = create_bridge_table(
#     fact_df=df,
#     dim_df=dim_production_company_df,
#     fact_id_col="id",
#     fact_list_col="production_companies",
#     dim_value_col="company_name",
#     dim_id_col="company_id",
#     bridge_id_col="bridge_id"
# )
# bridge_movie_company_df = bridge_movie_company_df.withColumnRenamed("id", "movie_id")

# bridge_movie_country_df = create_bridge_table(
#     fact_df=df,
#     dim_df=dim_production_country_df,
#     fact_id_col="id",
#     fact_list_col="production_countries",
#     dim_value_col="country_name",
#     dim_id_col="country_id",
#     bridge_id_col="bridge_id"
# )
# bridge_movie_country_df = bridge_movie_country_df.withColumnRenamed("id", "movie_id")

# bridge_movie_language_df = create_bridge_table(
#     fact_df=df,
#     dim_df=dim_spoken_language_df,
#     fact_id_col="id",
#     fact_list_col="spoken_languages",
#     dim_value_col="language_name",
#     dim_id_col="language_id",
#     bridge_id_col="bridge_id"
# )
# bridge_movie_language_df = bridge_movie_language_df.withColumnRenamed("id", "movie_id")

In [13]:
CLEAN_DATA_PATH = "/opt/bitnami/spark/resources/dataset/cleaned_data"

# Read data
df = spark.read.option("header", True).option("inferSchema", True).csv(CLEAN_DATA_PATH).cache()

# === Dimension Tables ===
dim_movie_df = df.select("id", "status", "release_date", "adult", "original_language", "overview") \
                 .withColumnRenamed("id", "movie_id")

# === Fact Tables ===
fact_movie_df = df.select("id", "vote_average", "popularity", "vote_count", "budget", "revenue", "runtime") \
                  .withColumnRenamed("id", "movie_id")

# === Create dimension tables from multi-value columns ===
dim_specs = [
    ("genres", "genre_id", "genre_name"),
    ("keywords", "keyword_id", "keyword_name"),
    ("production_companies", "company_id", "company_name"),
    ("production_countries", "country_id", "country_name"),
    ("spoken_languages", "language_id", "language_name")
]

dim_tables = {}
for col_name, id_col, value_col in dim_specs:
    dim_tables[col_name] = create_dim_table_from_column(
        df, col_name, id_column=id_col, value_column=value_col
    )

# === Create bridge tables dynamically ===
bridge_tables = {}

for col_name, id_col, value_col in dim_specs:
    bridge_df = create_bridge_table(
        fact_df=df,
        dim_df=dim_tables[col_name],
        fact_id_col="id",
        fact_list_col=col_name,
        dim_value_col=value_col,
        dim_id_col=id_col,
        bridge_id_col="bridge_id"
    ).withColumnRenamed("id", "movie_id")
    
    bridge_tables[col_name] = bridge_df

                                                                                

In [27]:
dim_genres_df = dim_tables["genres"]
dim_keyword_df = dim_tables["keywords"]
dim_production_company_df = dim_tables["production_companies"]
dim_production_country_df = dim_tables["production_countries"]
dim_spoken_language_df = dim_tables["spoken_languages"]

bridge_movie_genre_df = bridge_tables["genres"]
bridge_movie_keyword_df = bridge_tables["keywords"]
bridge_movie_company_df = bridge_tables["production_companies"]
bridge_movie_country_df = bridge_tables["production_countries"]
bridge_movie_language_df = bridge_tables["spoken_languages"]

In [19]:
dim_movie_df.show(5)
dim_movie_df.count()

+--------+--------+-------------------+-----+-----------------+--------------------+
|movie_id|  status|       release_date|adult|original_language|            overview|
+--------+--------+-------------------+-----+-----------------+--------------------+
|       5|Released|1995-12-09 00:00:00|false|               en|It's Ted the Bell...|
|       6|Released|1993-10-15 00:00:00|false|               en|While racing to a...|
|      12|Released|2003-05-30 00:00:00|false|               en|Nemo, an adventur...|
|      13|Released|1994-06-23 00:00:00|false|               en|A man with a low ...|
|      15|Released|1941-04-17 00:00:00|false|               en|Newspaper magnate...|
+--------+--------+-------------------+-----+-----------------+--------------------+
only showing top 5 rows



8441

In [20]:
fact_movie_df.show(5)
fact_movie_df.count()

+--------+------------+----------+----------+--------+---------+-------+
|movie_id|vote_average|popularity|vote_count|  budget|  revenue|runtime|
+--------+------------+----------+----------+--------+---------+-------+
|       5|       5.784|    15.295|      2436| 4000000|  4257354|     98|
|       6|       6.533|    13.564|       302|21000000| 12136938|    109|
|      12|       7.824|    55.456|     18061|94000000|940335536|    100|
|      13|       8.477|    92.693|     25409|55000000|677387716|    142|
|      15|       8.015|    28.218|      5034|  839727| 23218000|    119|
+--------+------------+----------+----------+--------+---------+-------+
only showing top 5 rows



8441

In [21]:
dim_tables.keys()

dict_keys(['genres', 'keywords', 'production_companies', 'production_countries', 'spoken_languages'])

In [23]:
for key, table in dim_tables.items():
    print(key)
    table.show(5)
    print(table.count())

genres
+--------+----------+
|genre_id|genre_name|
+--------+----------+
|       0|     Crime|
|       1|   Romance|
|       2|  TV Movie|
|       3|  Thriller|
|       4| Adventure|
+--------+----------+
only showing top 5 rows

19
keywords
+----------+------------+
|keyword_id|keyword_name|
+----------+------------+
|         0|       1970s|
|         1| pirate ship|
|         2|       anime|
|         3|inflammatory|
|         4|  hoverboard|
+----------+------------+
only showing top 5 rows

15225
production_companies
+----------+--------------------+
|company_id|        company_name|
+----------+--------------------+
|         0|                 PEA|
|         1|            ZDF/Arte|
|         2|    Relativity Media|
|         3|    Koppelman-Levien|
|         4|Win's Entertainme...|
+----------+--------------------+
only showing top 5 rows

8950
production_countries
+----------+------------+
|country_id|country_name|
+----------+------------+
|         0|     finland|
|         1

In [24]:
 bridge_tables.keys()

dict_keys(['genres', 'keywords', 'production_companies', 'production_countries', 'spoken_languages'])

In [25]:
for key, table in bridge_tables.items():
    print(key)
    table.show(5)
    print(table.count())

genres
+---------+--------+--------+
|bridge_id|movie_id|genre_id|
+---------+--------+--------+
|        0| 1139554|       0|
|        1| 1061412|       0|
|        2| 1014066|       0|
|        3|  958006|       0|
|        4|  919573|       0|
+---------+--------+--------+
only showing top 5 rows

21580
keywords
+---------+--------+----------+
|bridge_id|movie_id|keyword_id|
+---------+--------+----------+
|        0|  878361|         0|
|        1|  760104|         0|
|        2|  756999|         0|
|        3|  718032|         0|
|        4|  555285|         0|
+---------+--------+----------+
only showing top 5 rows

81539
production_companies
+---------+--------+----------+
|bridge_id|movie_id|company_id|
+---------+--------+----------+
|        0|    1643|         0|
|        1|     429|         0|
|        2|  888321|         1|
|        3|  336804|         1|
|        4|  126250|         1|
+---------+--------+----------+
only showing top 5 rows

27226
production_countries
+--

In [28]:
spark.stop()