### ML with PySpark
+ Classify/Predict 

In [1]:
# Load our Pkgs

import sys; sys.path.append("../")
from DataPreprocessing.DataPreprocessing import read_data
from pyspark import SparkContext

In [2]:
sc = SparkContext(master='local')

In [3]:
# Load Pkgs 
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType,BooleanType,DateType
from pyspark.sql.functions import col

In [4]:
# Spark
spark = SparkSession.builder.appName("MLwithSpark").getOrCreate()

In [5]:
# Load our dataset
df= read_data(spark,file_name='train')

In [6]:
# Rearrange
df = df.select('App Id', 'App Name', 'Category', 'Rating', 'Rating Count', 'Installs', 'Minimum Installs', 'Maximum Installs', 'Free', 'Price', 'Size', 'Minimum Android', 'Developer Id', 'Developer Email', 'Released', 'Last Updated', 'Content Rating', 'Ad Supported', 'In App Purchases', 'Editors Choice')

In [7]:
#cast some columns to numeric values not strings
df = df.na.drop(how='any', subset=['Rating', 'Rating Count', 'Minimum Installs', 'Maximum Installs', 'Free', 'Price', 'Content Rating', 'Category'])

In [8]:
import pyspark.ml

In [9]:
# Load ML Pkgs
from pyspark.ml.feature import VectorAssembler,StringIndexer

In [10]:
# Convert the string into numerical code
# label encoding
categoryEncoder = StringIndexer(inputCol='Category',outputCol='Category_enc', handleInvalid='skip').fit(df)
df = categoryEncoder.transform(df)

categoryEncoder = StringIndexer(inputCol='Free',outputCol='Free_enc', handleInvalid='skip').fit(df)
df = categoryEncoder.transform(df)

categoryEncoder = StringIndexer(inputCol='Content Rating',outputCol='Content Rating_enc',  handleInvalid='skip').fit(df)
df = categoryEncoder.transform(df)

categoryEncoder = StringIndexer(inputCol='Ad Supported',outputCol='Ad Supported_enc', handleInvalid='skip').fit(df)
df = categoryEncoder.transform(df)

categoryEncoder = StringIndexer(inputCol='In App Purchases',outputCol='In App Purchases_enc', handleInvalid='skip').fit(df)
df = categoryEncoder.transform(df)

categoryEncoder = StringIndexer(inputCol='Editors Choice',outputCol='Editors Choice_enc', handleInvalid='skip').fit(df)
df = categoryEncoder.transform(df)

df.na.fill(value=0.0)



DataFrame[App Id: string, App Name: string, Category: string, Rating: float, Rating Count: int, Installs: string, Minimum Installs: int, Maximum Installs: int, Free: string, Price: float, Size: string, Minimum Android: string, Developer Id: string, Developer Email: string, Released: string, Last Updated: string, Content Rating: string, Ad Supported: string, In App Purchases: string, Editors Choice: string, Category_enc: double, Free_enc: double, Content Rating_enc: double, Ad Supported_enc: double, In App Purchases_enc: double, Editors Choice_enc: double]

In [11]:
# encoded column back to String
from pyspark.ml.feature import IndexToString
converter = IndexToString(inputCol='Category_enc',outputCol='orig_category')
converted_df = converter.transform(df)

### Features


In [12]:
df2 = df.select('Rating Count','Minimum Installs', 'Maximum Installs', 'Free_enc', 'Price', 'Content Rating_enc', 'Ad Supported_enc', 'In App Purchases_enc', 'Editors Choice_enc', 'Category_enc', 'Rating')

In [13]:
required_features = df2.columns[:-1]      #all except the target variable 'Rating' which is at the last index
print(required_features)

['Rating Count', 'Minimum Installs', 'Maximum Installs', 'Free_enc', 'Price', 'Content Rating_enc', 'Ad Supported_enc', 'In App Purchases_enc', 'Editors Choice_enc', 'Category_enc']


In [14]:
# VectorAsm
vec_assembler = VectorAssembler(inputCols=required_features,outputCol='Features')

In [15]:
vec_df = vec_assembler.setHandleInvalid("skip").transform(df2)

In [16]:
vec_df.na.drop(how='any',subset='Features')

DataFrame[Rating Count: int, Minimum Installs: int, Maximum Installs: int, Free_enc: double, Price: float, Content Rating_enc: double, Ad Supported_enc: double, In App Purchases_enc: double, Editors Choice_enc: double, Category_enc: double, Rating: float, Features: vector]

### Train,Test Split

In [17]:
train_df,test_df = vec_df.select(['Features', 'Rating']).randomSplit([0.7,0.3])

#### Model Building
+ Pyspark.ml: DataFrame
+ Pyspark.mllib: RDD /Legacy

In [18]:
from pyspark.ml.regression import LinearRegression

In [19]:
# Regr Model
lr = LinearRegression(featuresCol='Features',labelCol='Rating')

In [20]:
lr_model = lr.fit(train_df)

#just some training metrics
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 1.874600
r2: 0.217931


In [21]:
y_pred = lr_model.transform(test_df)

#notice another column called 'prediction' is automatically made
y_pred.show(5)

+----------+------+------------------+
|  Features|Rating|        prediction|
+----------+------+------------------+
|(10,[],[])|   0.0|1.2354008672863137|
|(10,[],[])|   0.0|1.2354008672863137|
|(10,[],[])|   0.0|1.2354008672863137|
|(10,[],[])|   0.0|1.2354008672863137|
|(10,[],[])|   0.0|1.2354008672863137|
+----------+------+------------------+
only showing top 5 rows



#### Model Evaluation

In [22]:
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="Rating",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(y_pred))

R Squared (R2) on test data = 0.189946


In [23]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 1.90788


#### Decision Tree Regression

In [24]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='Features', labelCol = 'Rating', maxBins=48)   #setting a larger maxbins(default=32) is an ad-hoc step just to solve an error I dont understand :)
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(
    labelCol="Rating", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

IllegalArgumentException: requirement failed: DecisionTree requires maxBins (= 48) to be at least as large as the number of values in each categorical feature, but categorical feature 9 has 53 values. Consider removing this and other categorical features with a large number of values, or add more training examples.

In [None]:
#Feature Importance
dt_model.featureImportances

#notice that Feature at index 0 has higher importance, this feature is the 'Rating Count'

SparseVector(10, {0: 0.9917, 1: 0.0, 2: 0.0061, 3: 0.0001, 4: 0.0, 6: 0.0003, 9: 0.0018})

### Gradient-Boosted Decision Tree Regressor

In [None]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'Features', labelCol = 'Rating', maxIter=10, maxBins=48) #same as above, maxbins is set just to avoid an error I dont understand
# maxIter is just a hyperparameter we put by hand
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'Rating', 'Features').show(5)

+--------------------+------+----------+
|          prediction|Rating|  Features|
+--------------------+------+----------+
|-0.00190186993011...|   0.0|(10,[],[])|
|-0.00190186993011...|   0.0|(10,[],[])|
|-0.00190186993011...|   0.0|(10,[],[])|
|-0.00190186993011...|   0.0|(10,[],[])|
|-0.00190186993011...|   0.0|(10,[],[])|
+--------------------+------+----------+
only showing top 5 rows



In [None]:
gbt_evaluator = RegressionEvaluator(
    labelCol="Rating", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 0.45322


### Saving Model


In [None]:
#this is giving me errors I still cannot resolve

from pyspark.ml.pipeline import PipelineModel
gbt_model.save("D:/CLASSSWORKKK/BD/BD_PROJECT/GBT_Model")
#to overwrite an existing model
gbt_model.write().overwrite().save("D:/CLASSSWORKKK/BD/BD_PROJECT/GBT_Model")

Py4JJavaError: An error occurred while calling o913.save.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.hadoop.mapred.FileOutputCommitter.setupJob(FileOutputCommitter.java:131)
	at org.apache.hadoop.mapred.OutputCommitter.setupJob(OutputCommitter.java:265)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:79)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1091)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1089)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1062)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1027)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1009)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1008)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:965)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:963)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1593)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1593)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1579)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1579)
	at org.apache.spark.ml.util.DefaultParamsWriter$.saveMetadata(ReadWrite.scala:413)
	at org.apache.spark.ml.tree.EnsembleModelReadWrite$.saveImpl(treeModels.scala:473)
	at org.apache.spark.ml.regression.GBTRegressionModel$GBTRegressionModelWriter.saveImpl(GBTRegressor.scala:367)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:168)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1623)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:341)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:331)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:370)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 23 more


### Loading Model

In [None]:
from pyspark.ml.pipeline import PipelineModel
LoadedModel = PipelineModel.load("./GBT_Model")

Py4JJavaError: An error occurred while calling o1696.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/d:/CLASSSWORKKK/BD/BD_PROJECT/GBT_Model/metadata
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:304)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:208)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:291)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:291)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:287)
	at org.apache.spark.api.java.JavaRDDLike.partitions(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.JavaRDDLike.partitions$(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: Input path does not exist: file:/d:/CLASSSWORKKK/BD/BD_PROJECT/GBT_Model/metadata
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:278)
	... 25 more
