# Predictor pentru venitul si profitul filmelor

## Introducere

### Setul de date este preluat de pe Kaggle si este disponibil la: [The Movies Dataset](https://www.kaggle.com/datasets/rounakbanik/the-movies-dataset)

### Prezentare set de date
Acest set de date conține informații detaliate despre 45.000 de filme lansate până în iulie 2017, inclusiv date despre actori, regizori, bugete, încasări, postere, cuvinte-cheie din descrierea filmelor, limbi vorbite și case de producție. Informațiile provin din baza de date TMDB și includ și numărul de voturi și scorurile acordate de utilizatori.

Folosirea acestui set de date poate fi utilă pentru a analiza tendințele din industria cinematografică, pentru a prezice succesul financiar al filmelor și pentru a înțelege mai bine preferințele publicului.

### Obiectivele proiectului
1. Familiarizarea cu PySpark și manipularea datelor folosind DataFrame API.
2. Preprocesarea datelor pentru a extrage informații relevante despre filme, actori și regizori.
3. Realizarea de agregări și analize pentru a obține statistici despre filme, cum ar fi numărul de filme pe an, bugetul mediu și venitul mediu.
4. Construirea unui model de regresie liniară pentru a prezice venitul și profitul filmelor pe baza caracteristicilor disponibile.
5. Vizualizarea rezultatelor și interpretarea acestora pentru a trasa concluzii despre succesul financiar al filmelor.
6. Explorarea relațiilor dintre diferite caracteristici ale filmelor și succesul lor financiar, cum ar fi genurile, regizorii și actorii implicați.



In [256]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, year, to_date
from pyspark.sql.types import ArrayType, StringType, LongType, StructType, StructField
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
import ast
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [257]:
spark = SparkSession.builder.appName("MovieRevenuePrediction").getOrCreate()

# Încărcarea datelor
movies = spark.read.csv('movie_dataset/movies_metadata.csv', header=True, inferSchema=True)
credits = spark.read.csv('movie_dataset/credits.csv', header=True, inferSchema=True)
keywords = spark.read.csv('movie_dataset/keywords.csv', header=True, inferSchema=True)

## Implementarea de script-uri Spark

### Preprocesarea datelor
La acest pas am utilizat și UDF-uri (User Defined Functions) pentru a transforma datele din coloanele care conțin liste de dicționare în formate mai ușor de utilizat. De asemenea, am folosit funcții de Spark SQL pentru a efectua diverse agregări și analize.

#### Preprocesarea DataFrame-ului movies_metadata
Pentru a pregăti datele pentru analiza ulterioară, vom efectua următoarele operațiuni:
1. **Filtrarea intrărilor**: Vom elimina filmele care sunt marcate ca "adult" sau "video".
2. **Eliminarea coloanelor inutile**: Vom elimina coloanele care nu sunt relevante pentru analiza noastră, cum ar fi `homepage`, `original_language`, `overview`, `poster_path`, `spoken_languages`, `status`, `tagline`, `video`, `adult`, `imdb_id` și `original_language`.
3. **Transformarea coloanei `belongs_to_collection`**: Vom transforma această coloană astfel încât să conțină 1 dacă filmul aparține unei colecții și 0 în caz contrar.
4. **Extracția genurilor**: Vom extrage doar numele genurilor din coloana `genres`, care conține o listă de dicționare cu `id` și `name`.
5. **Extracția companiilor de producție**: Vom extrage doar numele companiilor de producție din coloana `production_companies`, care conține o listă de dicționare cu `name` și `id`.
6. **Extracția țărilor de producție**: Vom extrage doar numele țărilor de producție din coloana `production_countries`, care conține o listă de dicționare cu `iso_3166_1` și `name`.
7. **Conversia bugetului și veniturilor**: Vom converti coloanele `budget` și `revenue` la tipul `LongType`.
8. **Eliminarea rândurilor cu buget sau venit 0**: Vom elimina rândurile în care bugetul sau venitul este 0, deoarece acestea nu sunt relevante pentru analiza noastră.
9. **Transformarea datei de lansare**: Vom transforma coloana `release_date` astfel încât să conțină doar anul lansării filmului.
10. **Adăugarea unei coloane de profit**: Vom adăuga o coloană `profit` care va conține diferența dintre venit și buget.

In [258]:
# Preprocesarea DataFrame-ului movies

# Filtrarea intrărilor pentru a elimina filmele care sunt marcate ca "adult" sau "video"
movies = movies.filter((col("adult") == "False") & (col("video") == "False"))

# Eliminarea coloanelor inutile
movies = movies.drop("homepage", "original_language", "original_title", "overview", "poster_path", "spoken_languages", "status", 'tagline', 'video', 'adult', 'imdb_id', 'original_language')

# Transformarea coloanei belongs_to_collection pentru a conține 1 dacă filmul aparține unei colecții și 0 în caz contrar
def belongs_to_collection(value):
    if value is None or value == '[]':
        return 0
    else:
        return 1
belongs_to_collection_udf = udf(belongs_to_collection, StringType())
movies = movies.withColumn("belongs_to_collection", belongs_to_collection_udf(col("belongs_to_collection")))

# Extractia genurilor care arată astfel: [{'id': 28, 'name': 'Action'}, {'id': 12, 'name': 'Adventure'}]
def extract_genres(genres):
    try:
        genres = ast.literal_eval(genres)
        return [g['name'] for g in genres]
    except:
        return []
extract_genres_udf = udf(extract_genres, ArrayType(StringType()))
movies = movies.withColumn("genres", extract_genres_udf(col("genres")))

# Extragtia numelui companiilor de producție care arată astfel: [{'name': 'Warner Bros.', 'id': 6194}, {'name': 'Village Roadshow Pictures', 'id': 25}]
def extract_production_companies(companies):
    try:
        companies = ast.literal_eval(companies)
        return [c['name'] for c in companies]
    except:
        return []
extract_production_companies_udf = udf(extract_production_companies, ArrayType(StringType()))
movies = movies.withColumn("production_companies", extract_production_companies_udf(col("production_companies")))

# Extractia numelui țărilor de producție care arată astfel: [{'iso_3166_1': 'US', 'name': 'United States of America'}, {'iso_3166_1': 'GB', 'name': 'United Kingdom'}]
def extract_production_countries(countries):
    try:
        countries = ast.literal_eval(countries)
        return [c['name'] for c in countries]
    except:
        return []
extract_production_countries_udf = udf(extract_production_countries, ArrayType(StringType()))
movies = movies.withColumn("production_countries", extract_production_countries_udf(col("production_countries")))

# Convertirea coloanelor budget și revenue la tipul LongType
movies = movies.withColumn("budget", col("budget").cast(LongType()))
movies = movies.withColumn("revenue", col("revenue").cast(LongType()))

# Eliminarea rândurilor cu buget sau venit 0
movies = movies.filter((col("budget") > 0) & (col("revenue") > 0))

# Transformarea datei de lansare
movies = movies.withColumn("release_date", to_date(col("release_date"), "yyyy-MM-dd"))
movies = movies.withColumn("release_year", year(col("release_date")))
movies = movies.drop("release_date")

# Adăugarea unei coloane de profit
movies = movies.withColumn("profit", col("revenue") - col("budget"))

#### Preprocesarea DataFrame-ului credits
Pentru a pregăti datele din DataFrame-ul `credits`, vom efectua următoarele operațiuni:
1. **Extracția informațiilor despre actori**: Vom extrage doar numele personajelor și numele actorilor din coloana `cast`, care conține o listă de dicționare cu `cast_id`, `character`, `credit_id`, `gender`, `id`, `name`, `order` și `profile_path`.
2. **Extracția numelui regizorului**: Vom extrage doar numele regizorului din coloana `crew`, care conține o listă de dicționare cu `credit_id`, `department`, `gender`, `id`, `job`, `name` și `profile_path`. Vom căuta în această listă pentru a găsi regizorul (job-ul "Director" sau "Writer").
3. **Îmbinarea cu DataFrame-ul `movies`**: Vom adăuga coloanele `director` și `cast_info` la DataFrame-ul `movies`, astfel încât să avem informații complete despre fiecare film.
4. **Filtrarea rândurilor**: Vom elimina rândurile în care nu există informații despre regizor sau despre actori.

In [259]:
# Preprocesarea DataFrame-ului credits

# Extracția informațiilor despre actori și personaje care arată așa: [{'cast_id': 25, 'character': 'Lt. Vincent Hanna', 'credit_id': '52fe4292c3a36847f80291f5', 'gender': 2, 'id': 1158, 'name': 'Al Pacino', 'order': 0, 'profile_path': '/ks7Ba8x9fJUlP9decBr6Dh5mThX.jpg'}, {'cast_id': 26, 'character': 'Neil McCauley', 'credit_id': '52fe4292c3a36847f80291f9', 'gender': 2, 'id': 380, 'name': 'Robert De Niro', 'order': 1, 'profile_path': '/lvTSwUcvJRLAJ2FB5qFaukel516.jpg'}]
cast_schema = ArrayType(
    StructType([
        StructField("character", StringType(), True),
        StructField("actor_name", StringType(), True)
    ])
)

def extract_cast(cast_str):
    try:
        cast_list = ast.literal_eval(cast_str)
        return [(c.get('character', ''), c.get('name', '')) for c in cast_list]
    except:
        return []

extract_cast_udf = udf(extract_cast, cast_schema)
credits = credits.withColumn("cast_info", extract_cast_udf(col("cast")))
credits = credits.drop("cast")

# Extracția numelui directorului din echipaj care arată așa: [{'credit_id': '52fe44b79251416c7503e7fb', 'department': 'Editing', 'gender': 2, 'id': 4057, 'job': 'Editor', 'name': 'Adam Weiss', 'profile_path': None}, {'credit_id': '52fe44b79251416c7503e82f', 'department': 'Writing', 'gender': 2, 'id': 14639, 'job': 'Screenplay', 'name': 'Mel Brooks', 'profile_path': '/ndFo3LOYNCUghQTK833N1Wtuynr.jpg'}]
def extract_director(crew_str):
    try:
        crew_list = ast.literal_eval(crew_str)
        for member in crew_list:
            if member.get('job', '') == 'Director' or member.get('job', '') == 'Writer':
                return member.get('name', '')
        return None
    except:
        return None

extract_director_udf = udf(extract_director, StringType())
credits = credits.withColumn("director", extract_director_udf(col("crew")))
credits = credits.drop("crew")
credits = credits.filter(col("director").isNotNull() & col('cast_info').isNotNull())

# Îmbinarea cu DataFrame-ul movies
movies = movies.join(credits.select("id", "director", "cast_info"), on="id", how="left")

#### Preprocesarea DataFrame-ului keywords
Pentru a pregăti datele din DataFrame-ul `keywords`, vom efectua următoarele operațiuni:
1. **Extracția cuvintelor-cheie**: Vom extrage doar numele cuvintelor-cheie din coloana `keywords`, care conține o listă de dicționare cu `id` și `name`.
2. **Îmbinarea cu DataFrame-ul `movies`**: Vom adăuga coloana `keywords` la DataFrame-ul `movies`, astfel încât să avem informații complete despre fiecare film.

In [260]:
# Preprocesarea DataFrame-ului keywords

# Extracția cuvintelor-cheie care arată așa: [{'id': 1, 'name': 'action'}, {'id': 2, 'name': 'adventure'}]
def extract_keywords(keywords):
    try:
        keywords = ast.literal_eval(keywords)
        return [k['name'] for k in keywords]
    except:
        return []
extract_keywords_udf = udf(extract_keywords, ArrayType(StringType()))
keywords = keywords.withColumn("keywords", extract_keywords_udf(col("keywords")))

# Îmbinarea cu DataFrame-ul movies
movies = movies.join(keywords.select("id", "keywords"), on="id", how="left")

In [261]:
# Verificarea și afișarea primelor 5 rânduri din DataFrame-ul movies
movies.show(5, truncate=False)

                                                                                

+------+---------------------+--------+--------------------------------------------------+----------+------------------------------------------------------------------------------------+---------------------------------+---------+-------+---------------------+------------+----------+------------+---------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id    |belongs_to_collection|budget  |genres                            

### Grupări și agregări
Diversele agregări și grupări au fost realizate atât prin DataFrame API, cât și prin Spark SQL. Acestea includ:
1. **Numărul de filme pe an**: Numărul total de filme lansate în fiecare an.
2. **Bugetul și venitul mediu pe an**: Calcularea bugetului și veniturilor medii pentru filmele lansate în fiecare an.
3. **Profitul mediu pe gen**: Calcularea profitului mediu pentru fiecare gen de film.
4. **Regizorii cu profitul mediu cel mai mare**: Identificarea regizorilor care au realizat filme cu profitul mediu cel mai mare.
5. **Top 10 filme după profit**: Selectarea celor mai profitabile filme.

In [262]:
# Agregări și analize folosind DataFrame API

# Exemplu: Numărul de filme pe an
movies_by_year = movies.groupBy("release_year").agg(F.count("id").alias("movie_count"))

# Exemplu: Bugetul și venitul mediu pe an (ca Long)
avg_budget_revenue_by_year = movies.groupBy("release_year").agg(
    F.avg("budget").alias("avg_budget"),
    F.avg("revenue").alias("avg_revenue")
).withColumn(
    "avg_budget", col("avg_budget").cast(LongType())
).withColumn(
    "avg_revenue", col("avg_revenue").cast(LongType())
).orderBy(col("release_year").desc())
movies_by_year = movies_by_year.join(avg_budget_revenue_by_year, on="release_year", how="left")
movies_by_year.show(5, truncate=False)

# Exemplu: Profitul mediu pe gen
avg_profit_by_genre = movies.select(
    F.explode(col("genres")).alias("genre"),
    col("profit")
).groupBy("genre").agg(
    F.avg("profit").alias("avg_profit")
).withColumn(
    "avg_profit", col("avg_profit").cast(LongType())
).orderBy(col("avg_profit").desc())
avg_profit_by_genre.show(5, truncate=False)

# Exemplu: Regizorii cu profitul mediu cel mai mare
avg_profit_by_director = movies.groupBy("director").agg(
    F.avg("profit").alias("avg_profit")
).withColumn(
    "avg_profit", col("avg_profit").cast(LongType())
).orderBy(col("avg_profit").desc())
avg_profit_by_director.show(5, truncate=False)

# Exemplu: Top 10 filme după profit
top_profit_movies = movies.select("title", "profit", "release_year").orderBy(col("profit").desc()).limit(10)
top_profit_movies.show(truncate=False)


                                                                                

+------------+-----------+----------+-----------+
|release_year|movie_count|avg_budget|avg_revenue|
+------------+-----------+----------+-----------+
|1959        |6          |6063974   |43529166   |
|1990        |53         |22115679  |88648401   |
|1975        |18         |3555872   |51746518   |
|1977        |20         |7660000   |93140772   |
|1924        |1          |1135654   |1213880    |
+------------+-----------+----------+-----------+
only showing top 5 rows


                                                                                

+---------------+----------+
|genre          |avg_profit|
+---------------+----------+
|Animation      |170974483 |
|Family         |143940038 |
|Fantasy        |141087569 |
|Adventure      |140277362 |
|Science Fiction|97177327  |
+---------------+----------+
only showing top 5 rows


                                                                                

+-------------+----------+
|director     |avg_profit|
+-------------+----------+
|David Yates  |1217000000|
|Kyle Balda   |1082730962|
|Pierre Coffin|894761885 |
|Chris Renaud |800457937 |
|Michael Bay  |686297228 |
+-------------+----------+
only showing top 5 rows


                                                                                

+--------------------------------------------+----------+------------+
|title                                       |profit    |release_year|
+--------------------------------------------+----------+------------+
|Avatar                                      |2550965087|2009        |
|Star Wars: The Force Awakens                |1823223624|2015        |
|Titanic                                     |1645034188|1997        |
|Jurassic World                              |1363528810|2015        |
|Furious 7                                   |1316249360|2015        |
|The Avengers                                |1299557910|2012        |
|Harry Potter and the Deathly Hallows: Part 2|1217000000|2011        |
|Avengers: Age of Ultron                     |1125403694|2015        |
|Frozen                                      |1124219009|2013        |
|Beauty and the Beast                        |1102886337|2017        |
+--------------------------------------------+----------+------------+



In [263]:
# Agregări și analize folosind Spark SQL

# Înregistrarea DataFrame-ului movies ca o vedere temporară pentru a putea utiliza SQL
movies.createOrReplaceTempView("movies")

# Exemplu: Numărul de filme pe an
movies_by_year = spark.sql("""
    SELECT release_year, COUNT(id) AS movie_count
    FROM movies
    GROUP BY release_year
""")

# Exemplu: Bugetul și venitul mediu pe an (ca Long)
avg_budget_revenue_by_year = spark.sql("""
    SELECT
        release_year,
        CAST(AVG(budget) AS BIGINT) AS avg_budget,
        CAST(AVG(revenue) AS BIGINT) AS avg_revenue
    FROM movies
    GROUP BY release_year
    ORDER BY release_year DESC
""")

# Îmbinarea rezultatelor pentru a avea o vedere completă
movies_by_year.createOrReplaceTempView("movies_by_year")
avg_budget_revenue_by_year.createOrReplaceTempView("avg_budget_revenue_by_year")
final_movies_by_year = spark.sql("""
    SELECT a.*, b.avg_budget, b.avg_revenue
    FROM movies_by_year a
    LEFT JOIN avg_budget_revenue_by_year b
    ON a.release_year = b.release_year
    ORDER BY a.release_year DESC
""")
final_movies_by_year.show(5, truncate=False)

# Exemplu: Profitul mediu pe gen
from pyspark.sql.functions import explode
movies_exploded = movies.select(explode(col("genres")).alias("genre"), col("profit"))
movies_exploded.createOrReplaceTempView("movies_exploded")
avg_profit_by_genre = spark.sql("""
    SELECT
        genre,
        CAST(AVG(profit) AS BIGINT) AS avg_profit
    FROM movies_exploded
    GROUP BY genre
    ORDER BY avg_profit DESC
""")
avg_profit_by_genre.show(5, truncate=False)

# Exemplu: Regizorii cu profitul mediu cel mai mare
avg_profit_by_director = spark.sql("""
    SELECT
        director,
        CAST(AVG(profit) AS BIGINT) AS avg_profit
    FROM movies
    GROUP BY director
    ORDER BY avg_profit DESC
""")
avg_profit_by_director.show(5, truncate=False)

# Exemplu: Top 10 filme după profit
top_profit_movies = spark.sql("""
    SELECT title, profit, release_year
    FROM movies
    ORDER BY profit DESC
    LIMIT 10
""")
top_profit_movies.show(truncate=False)

                                                                                

+------------+-----------+----------+-----------+
|release_year|movie_count|avg_budget|avg_revenue|
+------------+-----------+----------+-----------+
|2017        |69         |60815007  |207432040  |
|2016        |240        |39763207  |121881832  |
|2015        |203        |36991712  |132837657  |
|2014        |203        |34918705  |114506952  |
|2013        |217        |39330317  |108860148  |
+------------+-----------+----------+-----------+
only showing top 5 rows


                                                                                

+---------------+----------+
|genre          |avg_profit|
+---------------+----------+
|Animation      |170974483 |
|Family         |143940038 |
|Fantasy        |141087569 |
|Adventure      |140277362 |
|Science Fiction|97177327  |
+---------------+----------+
only showing top 5 rows


                                                                                

+-------------+----------+
|director     |avg_profit|
+-------------+----------+
|David Yates  |1217000000|
|Kyle Balda   |1082730962|
|Pierre Coffin|894761885 |
|Chris Renaud |800457937 |
|Michael Bay  |686297228 |
+-------------+----------+
only showing top 5 rows




+--------------------------------------------+----------+------------+
|title                                       |profit    |release_year|
+--------------------------------------------+----------+------------+
|Avatar                                      |2550965087|2009        |
|Star Wars: The Force Awakens                |1823223624|2015        |
|Titanic                                     |1645034188|1997        |
|Jurassic World                              |1363528810|2015        |
|Furious 7                                   |1316249360|2015        |
|The Avengers                                |1299557910|2012        |
|Harry Potter and the Deathly Hallows: Part 2|1217000000|2011        |
|Avengers: Age of Ultron                     |1125403694|2015        |
|Frozen                                      |1124219009|2013        |
|Beauty and the Beast                        |1102886337|2017        |
+--------------------------------------------+----------+------------+



                                                                                

### Aplicarea metodelor de Machine Learning
În această secțiune vom prezice venitul filmelor folosind modele de regresie liniară și Random Forest. Vom folosi caracteristici precum bugetul, anul lansării, regizorul, genul principal, compania de producție și țara de producție. De asemenea, vom aplica tehnici de preprocesare pentru a transforma caracteristicile categorice în formate numerice utilizabile de către modelele de machine learning.

În această etapă am utiliat Pipeline-uri pentru a organiza fluxul de lucru de preprocesare și antrenare a modelului și am făcut hyperaparameter tuning folosind Cross-Validation pentru a optimiza modelul Random Forest. (Cerințele 4 și 5)

#### Pregătirea datelor pentru modelare
Vom pregăti datele pentru modelare prin următorii pași:
1. **Filtrarea datelor**: Vom filtra filmele care au genuri, companii de producție și țări de producție disponibile, precum și regizori.
2. **Selectarea caracteristicilor relevante**: Vom selecta coloanele relevante pentru modelare, inclusiv bugetul, anul lansării, regizorul, venitul și profitul.
3. **Indexarea și codificarea caracteristicilor categorice**: Vom utiliza `StringIndexer` pentru a transforma regizorii, genurile, companiile de producție și țările de producție în indici numerici.
4. **One-hot encoding**: Vom aplica `OneHotEncoder` pentru a transforma indici numerici în vectori one-hot.
5. **Asamblarea caracteristicilor**: Vom utiliza `VectorAssembler` pentru a combina toate caracteristicile într-un singur vector de caracteristici.
6. **Împărțirea datelor în seturi de antrenament și testare**: Vom împărți datele în seturi de antrenament și testare pentru a evalua performanța modelului.
7. **Definirea evaluatorului pentru regresie**: Vom utiliza `RegressionEvaluator` pentru a evalua performanța modelului pe baza RMSE (Root Mean Square Error).

In [264]:
# Pregătirea datelor pentru modelare

data = movies.filter(
    (size(col("genres")) > 0) &
    (size(col("production_companies")) > 0) &
    (size(col("production_countries")) > 0) &
    col("director").isNotNull()
)

data = data.select(
    "budget", "release_year", "director", "revenue", "profit",
    col("genres").getItem(0).alias("main_genre"),
    col("production_companies").getItem(0).alias("production_company"),
    col("production_countries").getItem(0).alias("production_country")
).dropna()

# Indexarea și codificarea caracteristicilor categorice
director_indexer = StringIndexer(inputCol="director", outputCol="director_index", handleInvalid="skip")
genre_indexer = StringIndexer(inputCol="main_genre", outputCol="genre_index", handleInvalid="skip")
company_indexer = StringIndexer(inputCol="production_company", outputCol="company_index", handleInvalid="skip")
country_indexer = StringIndexer(inputCol="production_country", outputCol="country_index", handleInvalid="skip")

# One-hot encoding pentru caracteristicile categorice
director_encoder = OneHotEncoder(inputCol="director_index", outputCol="director_vec")
genre_encoder = OneHotEncoder(inputCol="genre_index", outputCol="genre_vec")
company_encoder = OneHotEncoder(inputCol="company_index", outputCol="company_vec")
country_encoder = OneHotEncoder(inputCol="country_index", outputCol="country_vec")

# Asamblarea caracteristicilor într-un singur vector
assembler = VectorAssembler(
    inputCols=[
        "budget", "release_year",
        "director_vec", "genre_vec", "company_vec", "country_vec"
    ],
    outputCol="features"
)

# Împărțirea datelor în seturi de antrenament și testare
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Definirea evaluatorului pentru regresie
evaluator = RegressionEvaluator(labelCol="revenue", predictionCol="revenue_pred", metricName="rmse")

In [265]:
# Folosirea unui model de regresie liniară pentru a prezice venitul filmelor
lr_revenue = LinearRegression(featuresCol="features", labelCol="revenue", predictionCol="revenue_pred")
pipeline_revenue = Pipeline(stages=[
    director_indexer, genre_indexer, company_indexer, country_indexer,
    director_encoder, genre_encoder, company_encoder, country_encoder,
    assembler, lr_revenue
])
model_revenue = pipeline_revenue.fit(train_data)
predictions_revenue = model_revenue.transform(test_data)
rmse_revenue = evaluator.evaluate(predictions_revenue)
print(f"Linear Regression Test RMSE: {rmse_revenue}")

25/06/15 00:22:01 WARN Instrumentation: [99eed7dd] regParam is zero, which might cause numerical instability and overfitting.
25/06/15 00:22:03 WARN Instrumentation: [99eed7dd] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.

Linear Regression Test RMSE: 131742129.93671373


                                                                                

In [266]:
# Folosirea unui model de regresie Random Forest pentru a prezice venitul filmelor
rf_revenue = RandomForestRegressor(featuresCol="features", labelCol="revenue", predictionCol="revenue_pred")
pipeline_rf = Pipeline(stages=[
    director_indexer, genre_indexer, company_indexer, country_indexer,
    director_encoder, genre_encoder, company_encoder, country_encoder,
    assembler, rf_revenue
])
model_rf = pipeline_rf.fit(train_data)
predictions_rf = model_rf.transform(test_data)
rmse_rf = evaluator.evaluate(predictions_rf)
print(f"Random Forest Test RMSE: {rmse_rf}")



Random Forest Test RMSE: 80849427.42287876


                                                                                

In [267]:
# Încercarea cu un model simplu (fără caracteristici categorice)
simple_data = data.select("budget", "release_year", "revenue").dropna()

# Asamblarea caracteristicilor pentru modelul simplu
assembler = VectorAssembler(inputCols=["budget", "release_year"], outputCol="features")
simple_data = assembler.transform(simple_data)

# Împărțirea datelor în seturi de antrenament și testare
train_data, test_data = simple_data.randomSplit([0.8, 0.2], seed=42)

# Definirea evaluatorului pentru regresie
evaluator = RegressionEvaluator(labelCol="revenue", predictionCol="prediction", metricName="rmse")

# Regresie liniară
lr = LinearRegression(featuresCol="features", labelCol="revenue")
lr_model = lr.fit(train_data)
lr_predictions = lr_model.transform(test_data)
lr_rmse = evaluator.evaluate(lr_predictions)
print(f"Linear Regression Test RMSE with simple model: {lr_rmse}")

# Regresie Random Forest
rf = RandomForestRegressor(featuresCol="features", labelCol="revenue")
rf_model = rf.fit(train_data)
rf_predictions = rf_model.transform(test_data)
rf_rmse = evaluator.evaluate(rf_predictions)
print(f"Random Forest Test RMSE with simple model: {rf_rmse}")

25/06/15 00:22:20 WARN Instrumentation: [1d7f3fa3] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

Linear Regression Test RMSE with simple model: 74371055.41896947




Random Forest Test RMSE with simple model: 79111818.17373627


                                                                                

#### Optimizarea modelului Random Forest cu Cross-Validation
Pentru a îmbunătăți performanța modelului Random Forest, vom utiliza tehnica de Cross-Validation. Aceasta ne va permite să găsim cei mai buni parametri pentru model, cum ar fi numărul de arbori și adâncimea maximă a acestora. Vom defini o grilă de parametri și vom evalua modelul folosind RMSE ca metrică de performanță.

In [268]:
# Optimizarea modelului Random Forest cu Cross-Validation

# Definirea valorilor pentru parametrii modelului
paramGrid = (ParamGridBuilder()
    .addGrid(rf.numTrees, [20, 50])
    .addGrid(rf.maxDepth, [5, 10])
    .build())

# Configurarea evaluatorului pentru regresie
crossval = CrossValidator(
    estimator=rf,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3
)

# Antrenarea modelului cu Cross-Validation
cvModel = crossval.fit(train_data)
bestModel = cvModel.bestModel

# Evaluarea modelului optimizat pe setul de testare
predictions = bestModel.transform(test_data)
rmse = evaluator.evaluate(predictions)
print(f"Best Random Forest Test RMSE: {rmse}")

25/06/15 00:22:36 WARN DAGScheduler: Broadcasting large task binary with size 1051.2 KiB
25/06/15 00:22:36 WARN DAGScheduler: Broadcasting large task binary with size 1343.4 KiB
25/06/15 00:22:42 WARN DAGScheduler: Broadcasting large task binary with size 1174.9 KiB
25/06/15 00:22:43 WARN DAGScheduler: Broadcasting large task binary with size 1490.2 KiB
25/06/15 00:22:49 WARN DAGScheduler: Broadcasting large task binary with size 1102.9 KiB
25/06/15 00:22:49 WARN DAGScheduler: Broadcasting large task binary with size 1387.0 KiB

Best Random Forest Test RMSE: 79111818.17373627


                                                                                

#### Analiză și interpretare a rezultatelor
RMSE (Root Mean Square Error) este o măsură a erorii de predicție a modelului. Cu cât RMSE este mai mic, cu atât modelul prezice mai bine venitul filmelor. În cazul nostru, am obținut următoarele rezultate:
- **Modelul de regresie liniară cu caractersitici**: RMSE ~ 131 milioane USD
- **Modelul Random Forest cu toate datele**: RMSE ~ 80 milioane USD
- **Modelul de regresie liniară fără caracteristici**: RMSE ~ 74 milioane USD
- **Modelul Random Forest fără caracteristici**: RMSE ~ 79 milioane USD
- **Modelul Random Forest optimizat cu Cross-Validation și fără caracteristici**: RMSE ~ 79 milioane USD

Concluziile pe care le putem trasa din aceste rezultate sunt:
1. Încorporarea caracteristicilor categorice (regizori, genuri, companii de producție și țări de producție) a determinat o scădere a performanței, ceea ce sugerează că acestea nu pot fi folosite eficient sub forma de indici numerici și vectori one-hot.
2. Modelul Random Forest a avut o performanță semnificativ mai bună decât cel de regresie în cazul în care am folosit toate caracteristicile, ceea ce sugerează că Random Forest este întradevăr mai potrivit pebtru corelații complexe între caracteristici.
3. Optimizarea modelului Random Forest cu Cross-Validation nu a adus o îmbunătățire semnificativă a performanței, ceea ce sugerează ca parametrii aleși nu au avut un impact semnificativ asupra modelului în acest caz.
4. Pentru rezultate mai clare și mai precise, ar fi util să explorăm alte tehnici de preprocesare a caracteristicilor categorice, cum ar fi utilizarea de embedding-uri sau tehnici de reducere a dimensionalității.