In [1]:
from pyspark.sql import SparkSession
from operator import add

In [2]:
spark = SparkSession.builder.appName("FilmTV RDD Analysis").getOrCreate()

path = "dataset/filmtv_movies.csv"
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ",") \
    .csv(path)

rdd = df.rdd

print("\n5 Film dengan rating tertinggi:")
top_rated = rdd.filter(lambda row: row['avg_vote'] is not None) \
               .takeOrdered(5, key=lambda x: -x['avg_vote'])

for film in top_rated:
    print(f"{film['title']} ({film['year']}): {film['avg_vote']}")

print("\nFilm dengan durasi terpanjang:")
def max_duration(a, b):
    return a if a['duration'] > b['duration'] else b

longest_film = rdd.filter(lambda row: row['duration'] is not None).reduce(max_duration)
print(f"{longest_film['title']} ({longest_film['year']}): {longest_film['duration']} menit")

print("\nRata-rata tension film horror:")

def is_valid_tension(value):
    try:
        float(value)
        return True
    except:
        return False

horror_rdd = rdd.filter(
    lambda row: row['genre'] and "Horror" in row['genre'] 
                and row['tension'] is not None 
                and is_valid_tension(row['tension'])
)

tension_values = horror_rdd.map(lambda row: float(row['tension']))
count = tension_values.count()

if count > 0:
    avg_tension = tension_values.reduce(add) / count
    print(f"Rata-rata tension: {avg_tension:.2f}")
else:
    print("Tidak ada film horror dengan data tension.")

spark.stop()



5 Film dengan rating tertinggi:
Kiss the Bride (2007): 10.0
England, My England (1995): 10.0
Rembrandt's J'accuse (2008): 10.0
Sex & Mrs. X (2000): 10.0
The Godfather Trilogy: 1901-1980 (1992): 9.8

Film dengan durasi terpanjang:
Die Zweite Heimat - Chronik einer Jugend (1992): 1525 menit

Rata-rata tension film horror:
Rata-rata tension: 0.97
