In [21]:
import pyspark as sp
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
import pyspark.ml as ml
from pyspark.ml.regression import LinearRegression
spark = SparkSession.builder.appName('BigMartSales').getOrCreate()

In [2]:
df = spark.read.csv('data/cleaned_train.csv', header=True, inferSchema=True)

In [3]:
df.show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+---------+-----------+--------------------+-----------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|outletAge|Outlet_Size|Outlet_Location_Type|Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+---------+-----------+--------------------+-----------+-----------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|       19|     Medium|                   1|         S1|         3735.138|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|        9|     Medium|                   3|         S2|         443.4228|
|          FDN15|       17.5|         Low Fat|    0.016760075|                Me

In [4]:
df.printSchema()

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: double (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: double (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: double (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- outletAge: integer (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: integer (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: double (nullable = true)



In [5]:
df.describe().show()

+-------+---------------+------------------+----------------+-------------------+-------------+-----------------+-----------------+------------------+-----------+--------------------+-----------+------------------+
|summary|Item_Identifier|       Item_Weight|Item_Fat_Content|    Item_Visibility|    Item_Type|         Item_MRP|Outlet_Identifier|         outletAge|Outlet_Size|Outlet_Location_Type|Outlet_Type| Item_Outlet_Sales|
+-------+---------------+------------------+----------------+-------------------+-------------+-----------------+-----------------+------------------+-----------+--------------------+-----------+------------------+
|  count|           8523|              8523|            8523|               8523|         8523|             8523|             8523|              8523|       8523|                8523|       8523|              8523|
|   mean|           NULL|12.875361375103129|            NULL|0.06613202877895127|         NULL|140.9927819781768|             NULL|20.168133

In [6]:
to_encode = {'Item_Identifier': 'itemID',
             'Item_Fat_Content': 'isLF',
             'Item_Type': 'itemTypeID',
             'Outlet_Size': 'outletSize',
             'Outlet_Type': 'outletType'}

for column, new_column in to_encode.items():
    indexer = StringIndexer(inputCol=column, outputCol=new_column)
    df = indexer.fit(df).transform(df)

In [7]:
df = df.drop('Item_Identifier').drop('Item_Fat_Content').drop('Item_Type').drop('Outlet_Size').drop('Outlet_Identifier').drop('Outlet_Type')

In [8]:
df.show()

+-----------+---------------+--------+---------+--------------------+-----------------+------+----+----------+----------+----------+
|Item_Weight|Item_Visibility|Item_MRP|outletAge|Outlet_Location_Type|Item_Outlet_Sales|itemID|isLF|itemTypeID|outletSize|outletType|
+-----------+---------------+--------+---------+--------------------+-----------------+------+----+----------+----------+----------+
|        9.3|    0.016047301|249.8092|       19|                   1|         3735.138|  40.0| 0.0|       4.0|       0.0|       0.0|
|       5.92|    0.019278216| 48.2692|        9|                   3|         443.4228| 392.0| 1.0|       8.0|       0.0|       3.0|
|       17.5|    0.016760075| 141.618|       19|                   1|          2097.27| 243.0| 0.0|       9.0|       0.0|       0.0|
|       19.2|            0.0| 182.095|       20|                   3|           732.38| 671.0| 1.0|       0.0|       1.0|       1.0|
|       8.93|            0.0| 53.8614|       31|                   3|

In [9]:
df.printSchema()

root
 |-- Item_Weight: double (nullable = true)
 |-- Item_Visibility: double (nullable = true)
 |-- Item_MRP: double (nullable = true)
 |-- outletAge: integer (nullable = true)
 |-- Outlet_Location_Type: integer (nullable = true)
 |-- Item_Outlet_Sales: double (nullable = true)
 |-- itemID: double (nullable = false)
 |-- isLF: double (nullable = false)
 |-- itemTypeID: double (nullable = false)
 |-- outletSize: double (nullable = false)
 |-- outletType: double (nullable = false)



In [10]:
df.show()

+-----------+---------------+--------+---------+--------------------+-----------------+------+----+----------+----------+----------+
|Item_Weight|Item_Visibility|Item_MRP|outletAge|Outlet_Location_Type|Item_Outlet_Sales|itemID|isLF|itemTypeID|outletSize|outletType|
+-----------+---------------+--------+---------+--------------------+-----------------+------+----+----------+----------+----------+
|        9.3|    0.016047301|249.8092|       19|                   1|         3735.138|  40.0| 0.0|       4.0|       0.0|       0.0|
|       5.92|    0.019278216| 48.2692|        9|                   3|         443.4228| 392.0| 1.0|       8.0|       0.0|       3.0|
|       17.5|    0.016760075| 141.618|       19|                   1|          2097.27| 243.0| 0.0|       9.0|       0.0|       0.0|
|       19.2|            0.0| 182.095|       20|                   3|           732.38| 671.0| 1.0|       0.0|       1.0|       1.0|
|       8.93|            0.0| 53.8614|       31|                   3|

In [11]:
independent_variables = ["itemID", "Item_Weight", "isLF", "Item_Visibility", "itemTypeID", "Item_MRP", "outletAge", "outletSize", "Outlet_Location_Type", "outletType"]
featuresVector = VectorAssembler(inputCols=independent_variables,outputCol="featuresVector")
output = featuresVector.transform(df)

In [12]:
output.show()

+-----------+---------------+--------+---------+--------------------+-----------------+------+----+----------+----------+----------+--------------------+
|Item_Weight|Item_Visibility|Item_MRP|outletAge|Outlet_Location_Type|Item_Outlet_Sales|itemID|isLF|itemTypeID|outletSize|outletType|      featuresVector|
+-----------+---------------+--------+---------+--------------------+-----------------+------+----+----------+----------+----------+--------------------+
|        9.3|    0.016047301|249.8092|       19|                   1|         3735.138|  40.0| 0.0|       4.0|       0.0|       0.0|[40.0,9.3,0.0,0.0...|
|       5.92|    0.019278216| 48.2692|        9|                   3|         443.4228| 392.0| 1.0|       8.0|       0.0|       3.0|[392.0,5.92,1.0,0...|
|       17.5|    0.016760075| 141.618|       19|                   1|          2097.27| 243.0| 0.0|       9.0|       0.0|       0.0|[243.0,17.5,0.0,0...|
|       19.2|            0.0| 182.095|       20|                   3|       

In [13]:
for col in independent_variables:
    output = output.drop(col)
output = output.withColumnRenamed('Item_Outlet_Sales', 'totalSales')

In [14]:
output.show()

+----------+--------------------+
|totalSales|      featuresVector|
+----------+--------------------+
|  3735.138|[40.0,9.3,0.0,0.0...|
|  443.4228|[392.0,5.92,1.0,0...|
|   2097.27|[243.0,17.5,0.0,0...|
|    732.38|[671.0,19.2,1.0,0...|
|  994.7052|[719.0,8.93,0.0,0...|
|  556.6088|[1450.0,10.395,1....|
|  343.5528|[72.0,13.65,1.0,0...|
| 4022.7636|[254.0,19.0,0.0,0...|
| 1076.5986|[197.0,16.2,1.0,0...|
|  4710.535|[1024.0,19.2,1.0,...|
| 1516.0266|(10,[0,1,5,6,8],[...|
|  2187.153|[427.0,18.5,1.0,0...|
| 1589.2646|[325.0,15.1,1.0,0...|
| 2145.2076|[1003.0,17.6,1.0,...|
|  1977.426|[872.0,16.35,0.0,...|
| 1547.3192|[258.0,9.0,1.0,0....|
| 1621.8888|[714.0,11.8,0.0,0...|
|  718.3982|[258.0,9.0,1.0,0....|
|  2303.668|[151.0,8.26,0.0,0...|
| 2748.4224|[297.0,13.35,0.0,...|
+----------+--------------------+
only showing top 20 rows



In [15]:
train,test = output.randomSplit([0.8, 0.2])

In [45]:
def autoTest(report, label, pred):
    metrics = {"mae": 0,
               'mse': 0,
               'rmse': 0,
               'r2': 0,
               'var': 0}
    for metric in metrics.keys():
        evaluator = ml.evaluation.RegressionEvaluator(labelCol=label, predictionCol=pred, metricName=metric)
        metrics[metric] = evaluator.evaluate(report)
    print(*[f'{met}: {result}' for met, result in metrics.items()],sep='\n')

In [16]:
model = LinearRegression(featuresCol='featuresVector', labelCol='totalSales')
model = model.fit(train)

In [17]:
prediction = model.evaluate(test)

In [18]:
prediction.r2

0.39113478814056146

In [19]:
prediction.meanAbsoluteError

985.1593032620809

In [20]:
prediction.rootMeanSquaredError

1313.6292274371697

In [30]:
model = ml.regression.DecisionTreeRegressor(featuresCol='featuresVector', labelCol='totalSales', maxBins=2048)
model = model.fit(train)

In [67]:
model.save('DecisionTreeRegressor')

Py4JJavaError: An error occurred while calling o1110.save.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.hadoop.mapred.FileOutputCommitter.setupJob(FileOutputCommitter.java:131)
	at org.apache.hadoop.mapred.OutputCommitter.setupJob(OutputCommitter.java:265)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:79)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1091)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1089)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1062)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1027)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1009)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1008)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:965)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:963)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1620)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1620)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1606)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1606)
	at org.apache.spark.ml.util.DefaultParamsWriter$.saveMetadata(ReadWrite.scala:413)
	at org.apache.spark.ml.regression.DecisionTreeRegressionModel$DecisionTreeRegressionModelWriter.saveImpl(DecisionTreeRegressor.scala:303)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:168)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:242)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:94)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:372)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 25 more


In [31]:
prediction = model.transform(train)

In [48]:
prediction.withColumn('difference', prediction['totalSales'] - prediction['prediction']).select('difference').describe().show()

+-------+--------------------+
|summary|          difference|
+-------+--------------------+
|  count|                6786|
|   mean|-6.56455209148806...|
| stddev|   892.0919695940172|
|    min| -3748.9939273631753|
|    max|   5024.769238596492|
+-------+--------------------+



In [46]:
autoTest(report=prediction, label='totalSales', pred='prediction')

mae: 620.9995603331128
mse: 795710.8072241251
rmse: 892.0262368473951
r2: 0.728575108723412
var: 2135895.0726997573
