In [1]:
!wget -qq https://downloads.apache.org/hadoop/common/hadoop-3.3.3/hadoop-3.3.3.tar.gz

In [28]:
import numpy as np
import pandas as pd
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler,OneHotEncoder,OrdinalEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from catboost import CatBoostRegressor
from sklearn.ensemble import RandomForestRegressor
import matplotlib.pyplot as plt
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split


def preprocess_movie_data(data_path):
    data = pd.read_csv(data_path)
    no_genre = data[pd.isnull(data['genres'])]
    i = data[pd.isnull(data['genres'])].index
    listgenre = ['Science Fiction & Fantasy','Drama','Animation','Animation','Animation','Action & Adventure','Musical & Performing Arts','Romance','Action & Adventure','Drama','Comedy','Animation','Action & Adventure','Horror','Action & Adventure','Mystery & Suspense','Science Fiction & Fantasy','Documentary','Animation']
    for g in range(0, len(no_genre)):
        data.loc[i[g], 'genres'] = listgenre[g]
    df = data['genres'].str.get_dummies(',')
    data = pd.concat([data, df], axis=1)
    drop_column = ["movie_title","rotten_tomatoes_link","movie_info","original_release_date","streaming_release_date","critics_consensus","genres","directors","authors","actors","production_company"]
    data.drop(drop_column, axis=1, inplace=True)
    cont_col = list(data.describe())
    cat_col = [c for c in data.columns if c not in cont_col]
    cont_data = data[cont_col]
    cat_data = data[cat_col]
    imputer = SimpleImputer(missing_values=np.nan, strategy='mean')
    cont_data = imputer.fit_transform(cont_data)
    cont_data = pd.DataFrame(cont_data, columns=cont_col)
    imputer2 = SimpleImputer(missing_values=np.nan, strategy='most_frequent')
    cat_data = imputer2.fit_transform(cat_data)
    cat_data = pd.DataFrame(cat_data, columns=cat_col)
    oe = OrdinalEncoder(categories=[['Rotten','Fresh','Certified-Fresh']])
    cat_data['tomatometer_status'] = oe.fit_transform(cat_data[['tomatometer_status']])
    final_data = pd.concat([cat_data, cont_data], axis=1)
    column_name = 'tomatometer_rating'
    column = final_data.pop(column_name)
    final_data[column_name] = column
    final_data.to_csv('bda.csv', index=False)
    return final_data

data_path = '/kaggle/input/rotten-tomatoes-movies-and-critic-reviews-dataset/rotten_tomatoes_movies.csv'
preprocessed_data = preprocess_movie_data(data_path)
preprocessed_data.sample(5)

Unnamed: 0,content_rating,tomatometer_status,audience_status,runtime,tomatometer_count,audience_rating,audience_count,tomatometer_top_critics_count,tomatometer_fresh_critics_count,tomatometer_rotten_critics_count,...,Horror,Kids & Family,Musical & Performing Arts,Mystery & Suspense,Romance,Science Fiction & Fantasy,Special Interest,Television,Western,tomatometer_rating
10799,PG-13,0.0,Upright,100.0,10.0,65.0,1889.0,6.0,3.0,7.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,30.0
14873,R,0.0,Upright,109.0,14.0,65.0,106.0,10.0,6.0,8.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,43.0
11678,NR,1.0,Upright,102.214048,5.0,85.0,47.0,3.0,5.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,100.0
10539,NR,1.0,Upright,96.0,15.0,83.0,2850.0,5.0,15.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,100.0
10081,PG,0.0,Upright,117.0,77.0,76.0,6179.0,26.0,43.0,34.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,56.0


In [29]:
preprocessed_data.columns

Index(['content_rating', 'tomatometer_status', 'audience_status', 'runtime',
       'tomatometer_count', 'audience_rating', 'audience_count',
       'tomatometer_top_critics_count', 'tomatometer_fresh_critics_count',
       'tomatometer_rotten_critics_count', ' Animation', ' Anime & Manga',
       ' Art House & International', ' Classics', ' Comedy', ' Cult Movies',
       ' Documentary', ' Drama', ' Faith & Spirituality', ' Gay & Lesbian',
       ' Horror', ' Kids & Family', ' Musical & Performing Arts',
       ' Mystery & Suspense', ' Romance', ' Science Fiction & Fantasy',
       ' Special Interest', ' Sports & Fitness', ' Television', ' Western',
       'Action & Adventure', 'Animation', 'Art House & International',
       'Classics', 'Comedy', 'Cult Movies', 'Documentary', 'Drama', 'Horror',
       'Kids & Family', 'Musical & Performing Arts', 'Mystery & Suspense',
       'Romance', 'Science Fiction & Fantasy', 'Special Interest',
       'Television', 'Western', 'tomatometer_ratin

In [30]:
!tar -xzvf hadoop-3.3.3.tar.gz

hadoop-3.3.3/
hadoop-3.3.3/licenses-binary/
hadoop-3.3.3/licenses-binary/LICENSE-dust.txt
hadoop-3.3.3/licenses-binary/LICENSE-re2j.txt
hadoop-3.3.3/licenses-binary/LICENSE-slf4j.txt
hadoop-3.3.3/licenses-binary/LICENSE-jquery.txt
hadoop-3.3.3/licenses-binary/LICENSE-zstd-jni.txt
hadoop-3.3.3/licenses-binary/LICENSE-hsql.txt
hadoop-3.3.3/licenses-binary/LICENSE-datatables.txt
hadoop-3.3.3/licenses-binary/LICENSE-jaf.txt
hadoop-3.3.3/licenses-binary/LICENSE-hamcrest.txt
hadoop-3.3.3/licenses-binary/LICENSE-ojalgo.txt
hadoop-3.3.3/licenses-binary/LICENSE-mssql-jdbc.txt
hadoop-3.3.3/licenses-binary/LICENSE-jdom.txt
hadoop-3.3.3/licenses-binary/LICENSE-jsr305.txt
hadoop-3.3.3/licenses-binary/LICENSE-nvd3.txt
hadoop-3.3.3/licenses-binary/LICENSE-angular-nvd3.txt
hadoop-3.3.3/licenses-binary/LICENSE-bloomfilter.txt
hadoop-3.3.3/licenses-binary/LICENSE-azure-keyvault.txt
hadoop-3.3.3/licenses-binary/LICENSE-xtermjs.txt
hadoop-3.3.3/licenses-binary/LICENSE-leveldb.txt
hadoop-3.3.3/licenses-bin

In [31]:
!cp -r hadoop-3.3.3/ /usr/local/

In [32]:
!readlink -f /usr/bin/java | sed "s:bin/java::"

/usr/lib/jvm/java-11-openjdk-amd64/


In [33]:
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-11-openjdk-amd64/'

In [34]:
!/usr/local/hadoop-3.3.3/bin/hadoop

Usage: hadoop [OPTIONS] SUBCOMMAND [SUBCOMMAND OPTIONS]
 or    hadoop [OPTIONS] CLASSNAME [CLASSNAME OPTIONS]
  where CLASSNAME is a user-provided Java class

  OPTIONS is none or any of:

--config dir                     Hadoop config directory
--debug                          turn on shell script debug mode
--help                           usage information
buildpaths                       attempt to add class files from build tree
hostnames list[,of,host,names]   hosts to use in slave mode
hosts filename                   list of hosts to use in slave mode
loglevel level                   set the log4j level for this command
workers                          turn on worker mode

  SUBCOMMAND is one of:


    Admin Commands:

daemonlog     get/set the log level for each daemon

    Client Commands:

archive       create a Hadoop archive
checknative   check native Hadoop and compression libraries availability
classpath     prints the class path needed to get the Hadoop jar and the
    

In [35]:
!/usr/local/hadoop-3.3.3/bin/hadoop version

Hadoop 3.3.3
Source code repository https://github.com/apache/hadoop.git -r d37586cbda38c338d9fe481addda5a05fb516f71
Compiled by stevel on 2022-05-09T16:36Z
Compiled with protoc 3.7.1
From source with checksum eb96dd4a797b6989ae0cdb9db6efc6
This command was run using /usr/local/hadoop-3.3.3/share/hadoop/common/hadoop-common-3.3.3.jar


In [36]:
!/usr/local/hadoop-3.3.3/bin/hdfs dfs -ls /

Found 29 items
-rwxr-xr-x   1 root root          0 2023-10-30 16:55 /.dockerenv
drwxr-xr-x   - root root       4096 2023-09-05 17:29 /.jupyter
-rw-r--r--   1 root root        523 2023-09-05 17:29 /=2.*
-rw-r--r--   1 root root      12831 2023-09-05 17:13 /=2023.0.1
drwxr-xr-x   - root root       4096 2023-09-05 17:28 /bin
drwxr-xr-x   - root root       4096 2020-04-15 11:09 /boot
drwxr-xr-x   - root root        340 2023-10-30 16:55 /dev
-rwxr-xr-x   1 root root        219 2023-06-26 03:34 /entrypoint.sh
drwxr-xr-x   - root root       4096 2023-10-30 16:55 /etc
drwxr-xr-x   - root root       4096 2023-06-26 03:52 /home
-rwxr-xr-x   1 root root       1084 2023-06-26 03:34 /install_packages.sh
drwxr-xr-x   - root root       4096 2023-10-30 16:55 /kaggle
drwxr-xr-x   - root root       4096 2023-09-05 17:20 /lib
drwxr-xr-x   - root root       4096 2023-06-05 14:03 /lib32
drwxr-xr-x   - root root       4096 2023-06-05 14:05 /lib64
drwxr-xr-x   - root root       4096 2023-06-05 14:03 /libx32


In [37]:
!/usr/local/hadoop-3.3.3/bin/hdfs dfs -df /

Filesystem           Size           Used      Available  Use%
file:///    8656922775552  5388028657664  3268894117888   62%


In [38]:
!/usr/local/hadoop-3.3.3/bin/hdfs dfs -mkdir /hadoop

In [39]:
!/usr/local/hadoop-3.3.3/bin/hdfs dfs -rm -r /hadoop

2023-10-30 17:03:33,067 INFO Configuration.deprecation: io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum
Deleted /hadoop


### Hadoop Streaming

In [40]:
!find / -name 'hadoop-streaming*.jar'

/usr/local/hadoop-3.3.3/share/hadoop/tools/sources/hadoop-streaming-3.3.3-sources.jar
/usr/local/hadoop-3.3.3/share/hadoop/tools/sources/hadoop-streaming-3.3.3-test-sources.jar
/usr/local/hadoop-3.3.3/share/hadoop/tools/lib/hadoop-streaming-3.3.3.jar
/kaggle/working/hadoop-3.3.3/share/hadoop/tools/sources/hadoop-streaming-3.3.3-test-sources.jar
/kaggle/working/hadoop-3.3.3/share/hadoop/tools/sources/hadoop-streaming-3.3.3-sources.jar
/kaggle/working/hadoop-3.3.3/share/hadoop/tools/lib/hadoop-streaming-3.3.3.jar


### Write the mapper and reducer files

In [49]:
%%writefile mapper.py
import sys
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
import pickle
import numpy as np
import pandas as pd
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler,OneHotEncoder,OrdinalEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from catboost import CatBoostRegressor
from sklearn.ensemble import RandomForestRegressor
import matplotlib.pyplot as plt
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split
import joblib

rf_model = RandomForestRegressor(n_estimators=100, random_state=42)

X = []  
y = []

first_line = True
X_data = []  
y_data = []  
for line in sys.stdin:
    data = line.strip().split(',')
    
    if first_line:
        first_line = False
        continue
    try:
        features = data[:-1]
        label = data[-1]
    except Exception as e:
        continue
    X_data.append(features)
    y_data.append(label)
X_columns = ['content_rating', 'tomatometer_status', 'audience_status', 'runtime',
       'tomatometer_count', 'audience_rating', 'audience_count',
       'tomatometer_top_critics_count', 'tomatometer_fresh_critics_count',
       'tomatometer_rotten_critics_count', ' Animation', ' Anime & Manga',
       ' Art House & International', ' Classics', ' Comedy', ' Cult Movies',
       ' Documentary', ' Drama', ' Faith & Spirituality', ' Gay & Lesbian',
       ' Horror', ' Kids & Family', ' Musical & Performing Arts',
       ' Mystery & Suspense', ' Romance', ' Science Fiction & Fantasy',
       ' Special Interest', ' Sports & Fitness', ' Television', ' Western',
       'Action & Adventure', 'Animation', 'Art House & International',
       'Classics', 'Comedy', 'Cult Movies', 'Documentary', 'Drama', 'Horror',
       'Kids & Family', 'Musical & Performing Arts', 'Mystery & Suspense',
       'Romance', 'Science Fiction & Fantasy', 'Special Interest',
       'Television', 'Western']

X = pd.DataFrame(X_data, columns=X_columns)
y = pd.DataFrame(y_data, columns=['tomatometer_rating'])
numerical_columns = [
    'runtime', 'tomatometer_count', 'audience_rating',
    'audience_count', 'tomatometer_top_critics_count',
    'tomatometer_fresh_critics_count', 'tomatometer_rotten_critics_count'
]

X[numerical_columns] = X[numerical_columns].astype(float)
X_train, X_test, y_train, y_test = train_test_split(X,y,test_size=0.2,random_state=1)
X_test.to_csv('test.csv', index=False)
x_cont_col = list(X.describe())
x_cat_col = list(c for c in X.columns if c not in x_cont_col)
x_train_cont_data = X_train.loc[:, x_cont_col]
x_train_cat_data = X_train.loc[:, x_cat_col]
x_test_cont_data = X_test.loc[:, x_cont_col]
x_test_cat_data = X_test.loc[:, x_cat_col]

sc = StandardScaler()
y_train = sc.fit_transform(y_train)
y_test = sc.transform(y_test)
cont_transform = Pipeline(steps=[('scaler', StandardScaler())])
cat_transform = Pipeline(steps=[('onehot', OneHotEncoder(handle_unknown='ignore',sparse=False))])
ct = ColumnTransformer(
    transformers=[
        ('num', cont_transform, x_cont_col),
        ('cat', cat_transform, x_cat_col)],
        remainder='passthrough')
x_train = pd.concat([x_train_cont_data, x_train_cat_data], axis=1)
x_test = pd.concat([x_test_cont_data, x_test_cat_data], axis=1)
xt_train = ct.fit_transform(x_train)
xt_test = ct.transform(x_test)
file_name = 'label_array.npy'
np.save(file_name, y_train)
rf = RandomForestRegressor()
rf.fit(xt_train,y_train)

with open('RF_model.pkl', 'wb') as model_file:
    pickle.dump(rf, model_file)
    
joblib.dump(sc, 'LabelEncoder.pkl')

for i in range(len(y_test)):
    print(f"{','.join(map(str, xt_test[i]))},{y_test[i][0]}")

Overwriting mapper.py


In [50]:
%%writefile reducer.py
import sys
import pickle
from sklearn.metrics import accuracy_score, classification_report
import sys
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
import pickle
import numpy as np
import pandas as pd
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler,OneHotEncoder,OrdinalEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from catboost import CatBoostRegressor
from sklearn.ensemble import RandomForestRegressor
import matplotlib.pyplot as plt
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split

with open('RF_model.pkl', 'rb') as model_file:
    rf = pickle.load(model_file)

first_line = True
X_data = []  # Initialize a list to store the features
y_data = []  # Initialize a list to store the labels

for line in sys.stdin:
    data = line.strip().split(',')
    
    if first_line:
        first_line = False
        continue
    try:
        features = data[:-1]
        label = data[-1]
    except Exception as e:
        continue

    X_data.append(features)
    y_data.append(label)

X_test = X_data
y_test = y_data
y_pred = rf.predict(X_test)
y_test = np.array(y_test).astype(float)  # Convert y_test to a numeric array
y_pred = y_pred.astype(float)
y_pred = y_pred.reshape(-1,1)
r1 = r2_score(y_test,y_pred)
print("Random Forest r2: ", r1)
with open('RF_model.pkl', 'wb') as model_file:
    pickle.dump(rf, model_file)

Overwriting reducer.py


In [51]:
!chmod u+rwx /kaggle/working/mapper.py
!chmod u+rwx /kaggle/working/reducer.py

In [18]:
# !/usr/local/hadoop-3.3.3/bin/hadoop jar /usr/local/hadoop-3.3.3/share/hadoop/tools/lib/hadoop-streaming-3.3.3.jar -input /kaggle/working/your_data.csv -output /kaggle/working/output1 -file /kaggle/working/mapper.py  -file /kaggle/working/reducer.py  -mapper 'python mapper.py'  -reducer 'python reducer.py'

In [52]:
!cat /kaggle/working/bda.csv | python mapper.py | python reducer.py 2>&1


  rf.fit(xt_train,y_train)
Random Forest r2:  0.9975076195604055


Random Forest r2:  0.9975076195604055

RF_model.pkl and LabelEncoder.pkl are saved to host flask model. server.py have the flask deployment , the data is being displayed using postman api requests