In [None]:
# Cell 1 - Install required packages
%pip install pyspark pandas numpy duckdb

In [None]:
# Cell 2 - Import libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, when, regexp_replace, split, size, year, udf
from pyspark.sql.types import DoubleType, ArrayType, FloatType
import duckdb
import pandas as pd
import numpy as np

In [None]:
# Cell 3 - Create Spark Session
spark = SparkSession.builder \
    .appName("MovieRatingPrediction") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

In [None]:
# Cell 4 - Load and prepare data
# Connect to DuckDB and get movies data
conn = duckdb.connect('grupo2_bigdata.db')

# Query to join MOVIES and USER_REVIEWS to get relevant features
query = """
SELECT 
    m.*,
    AVG(r.rating) as avg_rating,
    COUNT(r.rating) as review_count
FROM MOVIES m
LEFT JOIN USER_REVIEWS r ON m.movieId = r.movieId
GROUP BY m.movieId, m.title, m.originalTitle, m.genres, 
         m.originalLanguage, m.budget, m.revenue, m.runtime,
         m.releaseDate
LIMIT 1000  -- Remove this limit after testing
"""

# Execute query and convert to pandas
movies_df = conn.execute(query).fetchdf()
conn.close()

# Convert to Spark DataFrame
spark_df = spark.createDataFrame(movies_df)

# Show schema and sample data
print("DataFrame Schema:")
spark_df.printSchema()
print("\nSample Data:")
spark_df.show(5)

In [None]:
# Cell 5 - Feature Engineering (Fixed)
def prepare_features(df):
    # 1. Handle genres (convert to array and get count)
    df = df.withColumn('genre_count', size(split(col('genres'), '\\|')))
    
    # 2. Convert dates to year
    df = df.withColumn('release_year', year(col('releaseDate')))
    
    # 3. Handle numerical features
    numeric_features = ['budget', 'revenue', 'runtime', 'review_count', 
                       'genre_count', 'release_year']
    
    # 4. Handle categorical features
    categorical_features = ['originalLanguage']
    
    # Create indexers for categorical features
    indexers = [StringIndexer(inputCol=feature_name, outputCol=feature_name+"_idx", handleInvalid="keep") 
                for feature_name in categorical_features]
    
    # Create feature vector
    numeric_cols = [feature_name + "_scaled" for feature_name in numeric_features]
    categorical_cols = [feature_name + "_idx" for feature_name in categorical_features]
    feature_cols = numeric_cols + categorical_cols
    
    # Create assemblers and scalers
    assembler1 = VectorAssembler(inputCols=numeric_features, outputCol="numeric_features")
    scaler = StandardScaler(inputCol="numeric_features", outputCol="scaled_numeric_features")
    
    # Create the split_features function
    def split_features(vector_col, i):
        return vector_col[i]
    
    # Create pipeline stages
    stages = indexers + [assembler1, scaler]
    pipeline = Pipeline(stages=stages)
    
    # Fit and transform the data
    transformed_df = pipeline.fit(df).transform(df)
    
    # Split the scaled features into individual columns
    from pyspark.sql.functions import udf
    from pyspark.sql.types import DoubleType
    
    for i, feature_name in enumerate(numeric_features):
        get_item_udf = udf(lambda v: float(v[i]), DoubleType())
        transformed_df = transformed_df.withColumn(
            feature_name + "_scaled", 
            get_item_udf(col("scaled_numeric_features"))
        )
    
    # Create final feature vector
    final_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    final_df = final_assembler.transform(transformed_df)
    
    return final_df

# Apply feature engineering
try:
    featured_df = prepare_features(spark_df)
    print("Feature engineering completed successfully!")
    print("\nTransformed DataFrame:")
    featured_df.select("features", "avg_rating").show(5, truncate=False)
except Exception as e:
    print("Error during feature engineering:", str(e))

In [None]:
# Cell 6 - Split Data
# Split into training and testing sets
train_data, test_data = featured_df.randomSplit([0.8, 0.2], seed=42)

print("Training Set Size:", train_data.count())
print("Testing Set Size:", test_data.count())

In [None]:
# Cell 7 - Train Model
# Create and train Random Forest model
rf = RandomForestRegressor(
    featuresCol="features",
    labelCol="avg_rating",
    numTrees=20,
    maxDepth=10
)

# Train the model
model = rf.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

# Show sample predictions
print("Sample Predictions:")
predictions.select("avg_rating", "prediction").show(5)

In [None]:
# Cell 8 - Evaluate Model
# Calculate evaluation metrics
evaluator = RegressionEvaluator(labelCol="avg_rating", predictionCol="prediction")

rmse = evaluator.setMetricName("rmse").evaluate(predictions)
mae = evaluator.setMetricName("mae").evaluate(predictions)
r2 = evaluator.setMetricName("r2").evaluate(predictions)

print("Model Performance Metrics:")
print(f"Root Mean Square Error (RMSE): {rmse:.3f}")
print(f"Mean Absolute Error (MAE): {mae:.3f}")
print(f"R-squared (R2): {r2:.3f}")

In [None]:
# Cell 9 - Feature Importance
# Get feature importance
feature_importance = model.featureImportances
feature_cols = ['budget', 'revenue', 'runtime', 'review_count', 
                'genre_count', 'release_year', 'originalLanguage_idx']

# Create feature importance DataFrame
importance_df = pd.DataFrame({
    'Feature': feature_cols,
    'Importance': feature_importance.toArray()
})
importance_df = importance_df.sort_values('Importance', ascending=False)

print("Feature Importance:")
print(importance_df)

In [None]:
# Cell 10 - Save Model
# Save the model
model_path = "movie_rating_prediction_model"
model.save(model_path)
print(f"Model saved to: {model_path}")