## Retrieve data using Spark

#### Google Colab Option

In [None]:
#Installing pyspark and findspark in order to build our machine learning using Google colabotary.
!pip install pyspark
!pip install findspark

In [None]:
# Authorizing Google Drive to link the CSVs to this Google colabotary notebook.
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#Dependencies
from pathlib import Path
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler,OneHotEncoder
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.ensemble import RandomForestClassifier
import tensorflow as tf

#Importing packages to retrive data from  spark
from pyspark import SparkFiles
from pyspark.sql import SparkSession
from pathlib import Path
import time
import findspark
findspark.init()

In [None]:
#Showing the content of my Google drive in order to create the path to read the CSVs
!ls "/content/drive/My Drive/"

In [None]:
#Reading all of the CSVs
data0 = pd.read_csv('/content/drive/My Drive/Project 4/Resources/movies_dataset_0.csv')
data1 = pd.read_csv('/content/drive/My Drive/Project 4/Resources/movies_dataset_1.csv')
data2 = pd.read_csv('/content/drive/My Drive/Project 4/Resources/movies_dataset_2.csv')
data3 = pd.read_csv('/content/drive/My Drive/Project 4/Resources/movies_dataset_3.csv')

### Visual Studio Code Option

In [1]:
#Dependencies
from pathlib import Path
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler,OneHotEncoder
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.ensemble import RandomForestClassifier
import tensorflow as tf

In [2]:
#Importing packeges to retrive data from  spark
from pyspark import SparkFiles
from pyspark.sql import SparkSession
from pathlib import Path
import time
import findspark
findspark.init()

In [3]:
# Initialize Spark session
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()
#Define Path 
data0 = Path('Resources/movies_dataset_0.csv').resolve()
data1 = Path('Resources/movies_dataset_1.csv').resolve()
data2 = Path('Resources/movies_dataset_2.csv').resolve()
data3 = Path('Resources/movies_dataset_3.csv').resolve()

# Read the CSV file using the absolute path
df0 = spark.read.csv(
    str(data0), 
    sep=",", 
    header=True, 
    inferSchema=True, 
    quote='"',    # Handles quoted strings
    escape='"',   # Escape character for embedded quotes
    multiLine=True  # Handles multiline fields in case of long text
)

df1 = spark.read.csv(
    str(data1), 
    sep=",", 
    header=True, 
    inferSchema=True, 
    quote='"',    
    escape='"',   
    multiLine=True  
)
df2 = spark.read.csv(
    str(data2), 
    sep=",", 
    header=True, 
    inferSchema=True, 
    quote='"',    
    escape='"',   
    multiLine=True  
)
df3 = spark.read.csv(
    str(data3), 
    sep=",", 
    header=True, 
    inferSchema=True, 
    quote='"',    
    escape='"',   
    multiLine=True  
)
# #Merge the Dataframes using union
merged_df = df0.union(df1).union(df2).union(df3)
merged_df.show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/15 10:49:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/09/15 10:49:21 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


+----------+--------------------+-------+--------------------+---------+----------+----------+-----------+----------------------+-----------------+---------+----------+-----------------+-------------------+--------------------+--------------------+---------+----------+----------+------------------+--------------------+------------+-------+
|   imdb_id|               title|runtime|            overview|    rated|imdb_votes|popularity|imdb_rating|rotten_tomatoes_rating|metacritic_rating|   budget|   revenue|financial_success|             star_1|              star_2|              star_3|  genre_1|   genre_2|   genre_3|        director_1|          director_2|release_year|outcome|
+----------+--------------------+-------+--------------------+---------+----------+----------+-----------+----------------------+-----------------+---------+----------+-----------------+-------------------+--------------------+--------------------+---------+----------+----------+------------------+-----------------

24/09/15 19:09:18 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 626832 ms exceeds timeout 120000 ms
24/09/15 19:09:18 WARN SparkContext: Killing executors is not supported by current scheduler.
24/09/15 19:09:25 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:322)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:641)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1111)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:244)
	at sc

## Clean, normalize, and standardize data before modeling

In [None]:
# Converting merged df from Spark into a Pandas DataFrame
movies_df = merged_df.toPandas()
movies_df.head(3)

Unnamed: 0,imdb_id,title,runtime,overview,rated,imdb_votes,popularity,imdb_rating,rotten_tomatoes_rating,metacritic_rating,...,star_1,star_2,star_3,genre_1,genre_2,genre_3,director_1,director_2,release_year,outcome
0,tt0094721,Beetlejuice,92 min,A newly dead New England couple seeks help fro...,PG,348874,941.557,7.5,86.0,70.0,...,Alec Baldwin,Geena Davis,Michael Keaton,Comedy,Fantasy,,Tim Burton,,1988,True
1,tt27682129,Prey,100 min,A young couple is compelled to leave their Chr...,R,233550,436.919,7.1,94.0,71.0,...,Amber Midthunder,Dakota Beavers,Dane DiLiegro,Action,Adventure,Horror,Dan Trachtenberg,,2024,False
2,tt0295701,xXx,124 min,Xander Cage is your standard adrenaline junkie...,PG-13,187525,369.083,5.8,48.0,48.0,...,Vin Diesel,Asia Argento,Marton Csokas,Action,Adventure,Thriller,Rob Cohen,,2002,False


In [None]:
#Checking columns
movies_df.columns

Index(['imdb_id', 'title', 'runtime', 'overview', 'rated', 'imdb_votes',
       'popularity', 'imdb_rating', 'rotten_tomatoes_rating',
       'metacritic_rating', 'budget', 'revenue', 'financial_success', 'star_1',
       'star_2', 'star_3', 'genre_1', 'genre_2', 'genre_3', 'director_1',
       'director_2', 'release_year', 'outcome'],
      dtype='object')

In [None]:
#Dropping unncesary columns for the model
movies_df.drop([
    'imdb_id','title', 'overview','imdb_votes','popularity',
    'imdb_rating', 'rotten_tomatoes_rating', 'metacritic_rating',
      'revenue','financial_success',
      
     ], axis=1, inplace=True)

In [None]:
#Checking data types
movies_df.dtypes

runtime         object
rated           object
budget           int32
star_1          object
star_2          object
star_3          object
genre_1         object
genre_2         object
genre_3         object
director_1      object
director_2      object
release_year     int32
outcome           bool
dtype: object

In [None]:
#Organizing columns names and coverting the 'outcome' column into a boolean 
movies_df = movies_df[['release_year','runtime', 'rated','budget', 'star_1', 'star_2', 'star_3', 'genre_1','genre_2','genre_3', 'director_1','director_2','outcome']]
movies_df['outcome'] = movies_df['outcome'].astype(int)
movies_df.head(3)

Unnamed: 0,release_year,runtime,rated,budget,star_1,star_2,star_3,genre_1,genre_2,genre_3,director_1,director_2,outcome
0,1988,92 min,PG,15000000,Alec Baldwin,Geena Davis,Michael Keaton,Comedy,Fantasy,,Tim Burton,,1
1,2024,100 min,R,0,Amber Midthunder,Dakota Beavers,Dane DiLiegro,Action,Adventure,Horror,Dan Trachtenberg,,0
2,2002,124 min,PG-13,70000000,Vin Diesel,Asia Argento,Marton Csokas,Action,Adventure,Thriller,Rob Cohen,,0


In [None]:
#Extracting the string(min) from the 'runtime' column
def runtime_cleaner (string):
    minutes = string.split(" ")[0]
    return int(minutes)

movies_df['runtime'] = movies_df['runtime'].map(runtime_cleaner)
movies_df.head(3)

Unnamed: 0,release_year,runtime,rated,budget,star_1,star_2,star_3,genre_1,genre_2,genre_3,director_1,director_2,outcome
0,1988,92,PG,15000000,Alec Baldwin,Geena Davis,Michael Keaton,Comedy,Fantasy,,Tim Burton,,1
1,2024,100,R,0,Amber Midthunder,Dakota Beavers,Dane DiLiegro,Action,Adventure,Horror,Dan Trachtenberg,,0
2,2002,124,PG-13,70000000,Vin Diesel,Asia Argento,Marton Csokas,Action,Adventure,Thriller,Rob Cohen,,0


In [None]:
# Identify categorical columns
movies_categorical = movies_df.select_dtypes(include=["object"]).columns.tolist()

In [None]:
#Checking my categorical columns
movies_categorical

['rated',
 'star_1',
 'star_2',
 'star_3',
 'genre_1',
 'genre_2',
 'genre_3',
 'director_1',
 'director_2']

In [None]:
# Create a OneHotEncoder instance
enc = OneHotEncoder(sparse_output=False)

# Fit and transform the OneHotEncoder using the categorical variable list
encode_df = pd.DataFrame(enc.fit_transform(movies_df[movies_categorical]))

# Add the encoded variable names to the DataFrame
encode_df.columns = enc.get_feature_names_out(movies_categorical)

# Display the first few rows of the encoded DataFrame
encode_df.head()

Unnamed: 0,rated_Approved,rated_G,rated_GP,rated_N/A,rated_NC-17,rated_Not Rated,rated_PG,rated_PG-13,rated_Passed,rated_R,...,director_2_ Tyler Gillett,director_2_ Vicky Jenson,"director_2_ Vicky Jenson, Rob Letterman",director_2_ Walt Dohrn,"director_2_ Warren Coleman, Judy Morris","director_2_ Wilfred Jackson, Hamilton Luske",director_2_ Will Merrick,director_2_ Will Speck,"director_2_ Wolfgang Reitherman, Art Stevens",director_2_None
0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0
1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0
2,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0
3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0


In [None]:
# Merge one-hot encoded features and drop the originals
movies_df = movies_df.merge(encode_df,left_index=True, right_index=True)
movies_df = movies_df.drop(movies_categorical, axis=1)
movies_df.head()

Unnamed: 0,release_year,runtime,budget,outcome,rated_Approved,rated_G,rated_GP,rated_N/A,rated_NC-17,rated_Not Rated,...,director_2_ Tyler Gillett,director_2_ Vicky Jenson,"director_2_ Vicky Jenson, Rob Letterman",director_2_ Walt Dohrn,"director_2_ Warren Coleman, Judy Morris","director_2_ Wilfred Jackson, Hamilton Luske",director_2_ Will Merrick,director_2_ Will Speck,"director_2_ Wolfgang Reitherman, Art Stevens",director_2_None
0,1988,92,15000000,1,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0
1,2024,100,0,0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0
2,2002,124,70000000,0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0
3,2018,149,300000000,1,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,2024,116,5000000,1,0.0,0.0,0.0,0.0,0.0,1.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0


In [None]:
#Checking columns 
column_list = movies_df.columns.tolist()
column_list[0:5]

['release_year', 'runtime', 'budget', 'outcome', 'rated_Approved']

 ## Separate the Features (X) from the Target (y)

In [None]:
# Split our preprocessed data into our features and target arrays
y = movies_df["outcome"].values
X = movies_df.drop(["outcome"], axis=1).values

## Training and Scaling Data

In [None]:
#Split the processed data from the movies_df
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=1, stratify=y)

In [None]:
#Double checking the splitted data
print(X_train.shape)
print(X_test.shape)
print(X.shape)

(1532, 4840)
(511, 4840)
(2043, 4840)


In [None]:
# Create a StandardScaler instances
scaler = StandardScaler()

# Fit the StandardScaler
X_scaler = scaler.fit(X_train)

# Scale the data
X_train_scaled = X_scaler.transform(X_train)
X_test_scaled = X_scaler.transform(X_test)

## Create the model using tensor flow

In [None]:
# Define the model - deep neural net

number_input_features = len(X_train[0])
hidden_nodes_layer_1 = 1000
hidden_nodes_layer_2 = 1500

# Define the model - deep neural net, i.e., the number of input features and hidden nodes for each layer.
nn_model = tf.keras.models.Sequential()

# First hidden layer
nn_model.add(tf.keras.layers.Dense(units=hidden_nodes_layer_1, activation="relu", input_dim=number_input_features))

# Second hidden layer
nn_model.add(tf.keras.layers.Dense(units=hidden_nodes_layer_2, activation="relu"))

# Output layer
nn_model.add(tf.keras.layers.Dense(units=1, activation="sigmoid"))

# Check the structure of the model
nn_model.summary()


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


In [None]:
# Compile the model
nn_model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])

In [None]:
#Train the model
fit_model = nn_model.fit(X_train_scaled, y_train, epochs=20)

Epoch 1/20
[1m48/48[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 183ms/step - accuracy: 0.9931 - loss: 0.0158
Epoch 2/20
[1m48/48[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 176ms/step - accuracy: 0.9911 - loss: 0.0140
Epoch 3/20
[1m48/48[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 199ms/step - accuracy: 0.9924 - loss: 0.0170
Epoch 4/20
[1m48/48[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 206ms/step - accuracy: 0.9932 - loss: 0.0159
Epoch 5/20
[1m48/48[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 192ms/step - accuracy: 0.9902 - loss: 0.0140
Epoch 6/20
[1m48/48[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 192ms/step - accuracy: 0.9909 - loss: 0.0135
Epoch 7/20
[1m48/48[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 184ms/step - accuracy: 0.9935 - loss: 0.0115
Epoch 8/20
[1m48/48[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 181ms/step - accuracy: 0.9941 - loss: 0.0103
Epoch 9/20
[1m48/48[0m [32m━━━━━━━━

In [None]:
# Evaluate the model using the test data
model_loss, model_accuracy = nn_model.evaluate(X_test_scaled,y_test,verbose=2)
print(f"Loss: {model_loss}, Accuracy: {model_accuracy}")

16/16 - 1s - 32ms/step - accuracy: 0.5930 - loss: 1.0200
Loss: 1.0200446844100952, Accuracy: 0.5929549932479858


24/09/13 22:01:07 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1962679 ms exceeds timeout 120000 ms
24/09/13 22:01:07 WARN SparkContext: Killing executors is not supported by current scheduler.
24/09/13 22:01:09 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:322)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:641)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1111)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:244)
	at s

#### --------


In [None]:
random_forest_model = RandomForestClassifier(n_estimators= 37, min_samples_split=10, random_state=42)
random_forest_model

## Fit Model Using Training Data

In [None]:
random_forest_model.fit(X_train, y_train)

In [None]:
print(f"Training Data Score: {random_forest_model.score(X_train, y_train)}")
print(f"Testing Data Score: {random_forest_model.score(X_test, y_test)}")

Training Data Score: 0.97911227154047
Testing Data Score: 0.7064579256360078


# Make Predictions

In [None]:
predictions = random_forest_model.predict(X_test)
results = pd.DataFrame({"Prediction": predictions, "Actual": y_test}).reset_index(drop=True)
results.head(10)

Unnamed: 0,Prediction,Actual
0,0,1
1,0,0
2,0,0
3,0,1
4,0,0
5,0,0
6,0,0
7,0,1
8,0,0
9,0,0


## Calculate the Accuracy Score

In [None]:
# Display the accuracy score for the test dataset.
accuracy_score(y_test, predictions)

0.7064579256360078