In [1]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import ArrayType, StringType, StructType, StructField, IntegerType, BooleanType
from graphframes import GraphFrame
import ast

spark = SparkSession.builder \
    .appName("GraphFrames") \
    .config("spark.jars", "/root/graphframes-0.8.3-spark3.5-s_2.12.jar") \
    .config("spark.driver.memory", "192g") \
    .config("spark.executor.memory", "192g") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

24/01/01 06:13:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/01 06:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
def parse_list_from_string(list_str):
    try:
        return ast.literal_eval(list_str)
    except:
        return []

parse_udf = F.udf(parse_list_from_string, ArrayType(StringType()))

def process_movie_row(df):
    df = df.withColumn("Directors", parse_udf("Directors"))
    df = df.withColumn("Cast", parse_udf("Cast"))
    df = df.withColumn("Participants", F.array_union("Directors", "Cast"))
    return df

def build_oscar_dict(oscar_df):
    oscar_dict = {}
    for row in oscar_df.collect():
        name = row['name']
        if name not in oscar_dict:
            oscar_dict[name] = []
        oscar_dict[name].append((row['year_ceremony'], row['winner']))
    return oscar_dict

def get_oscar_info(name, oscar_dict):
    return oscar_dict.get(name, [])

def build_film_industry_network(movie_details_df, oscar_awards_df):
    movie_details_df = process_movie_row(movie_details_df)
    vertices = movie_details_df.select(F.explode("Participants").alias("id")).distinct()

    oscar_dict = build_oscar_dict(oscar_awards_df)
    get_oscar_info_udf = F.udf(lambda name: get_oscar_info(name, oscar_dict), ArrayType(StructType([
        StructField("year_ceremony", IntegerType(), True),
        StructField("winner", BooleanType(), True)
    ])))

    vertices = vertices.withColumn("oscars", get_oscar_info_udf("id"))

    movie_details_with_edges = movie_details_df.withColumn("src", F.explode("Participants"))
    expanded_edges = movie_details_with_edges.select("src", F.explode("Participants").alias("dst"), "Release Date")
    edges = expanded_edges.select(F.least("src", "dst").alias("src"),
                                  F.greatest("src", "dst").alias("dst"),
                                  "Release Date")
    edges = edges.filter("src != dst").distinct()

    edges = edges.groupBy("src", "dst").agg(F.collect_list("Release Date").alias("collaborations"),
                                             F.count("Release Date").alias("total_collaborations"))

    g = GraphFrame(vertices, edges)

    return g

if __name__ == "__main__":
    movie_details_df = spark.read.csv('./datasets/movies_1990_to_2010.csv', header=True, inferSchema=True)
    oscar_awards_df = spark.read.csv('./datasets/oscar_before_2010.csv', header=True, inferSchema=True)
    network = build_film_industry_network(movie_details_df, oscar_awards_df)
    network.vertices.write.format("parquet").save("./networkModel/1990_to_2010/vertices")
    network.edges.write.format("parquet").save("./networkModel/1990_to_2010/edges")
    spark.stop()

                                                                                