In [1]:
import findspark
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable



In [2]:

findspark.init()

In [3]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.functions import *



In [4]:
from pyspark.sql import SparkSession
from pyspark import SparkFiles




In [5]:
spark = SparkSession.builder.appName("myApp").config("spark.mongodb.input.uri","mongodb://localhost:27017/project.walmart?readPreference=primaryPreferred").config("spark.mongodb.output.uri","mongodb://localhost:27017/project.walmart").config("spark.jars.packages","org.mongodb.spark:mongo-spark-connector_2.12:3.0.0").getOrCreate()

In [6]:
url="https://raw.githubusercontent.com/kirtigupta10007/Walmart-Store-Sales-Forecasting/master/data/features.csv"
from pyspark import SparkFiles
spark.sparkContext.addFile(url)
SparkFiles.get("features.csv")
features=spark.read.csv("file:///"+SparkFiles.get("features.csv"), header=True, inferSchema= True)

In [7]:
url="https://raw.githubusercontent.com/kirtigupta10007/Walmart-Store-Sales-Forecasting/master/data/stores.csv"

spark.sparkContext.addFile(url)
SparkFiles.get("stores.csv")
stores=spark.read.csv("file:///"+SparkFiles.get("stores.csv"), header=True, inferSchema= True)

In [8]:
url="https://raw.githubusercontent.com/kirtigupta10007/Walmart-Store-Sales-Forecasting/master/data/train.csv"

spark.sparkContext.addFile(url)
SparkFiles.get("train.csv")
train=spark.read.csv("file:///"+SparkFiles.get("train.csv"), header=True, inferSchema= True)

In [9]:
url="https://raw.githubusercontent.com/kirtigupta10007/Walmart-Store-Sales-Forecasting/master/data/test.csv"

spark.sparkContext.addFile(url)
SparkFiles.get("test.csv")
test=spark.read.csv("file:///"+SparkFiles.get("test.csv"), header=True, inferSchema= True)

In [10]:
features.show(5)

+-----+-------------------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+---------+
|Store|               Date|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|        CPI|Unemployment|IsHoliday|
+-----+-------------------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+---------+
|    1|2010-02-05 00:00:00|      42.31|     2.572|       NA|       NA|       NA|       NA|       NA|211.0963582|       8.106|    false|
|    1|2010-02-12 00:00:00|      38.51|     2.548|       NA|       NA|       NA|       NA|       NA|211.2421698|       8.106|     true|
|    1|2010-02-19 00:00:00|      39.93|     2.514|       NA|       NA|       NA|       NA|       NA|211.2891429|       8.106|    false|
|    1|2010-02-26 00:00:00|      46.63|     2.561|       NA|       NA|       NA|       NA|       NA|211.3196429|       8.106|    false|
|    1|2010-03-05 00:00:00|       46.5|     2.62

In [11]:
stores.show(5)

+-----+----+------+
|Store|Type|  Size|
+-----+----+------+
|    1|   A|151315|
|    2|   A|202307|
|    3|   B| 37392|
|    4|   A|205863|
|    5|   B| 34875|
+-----+----+------+
only showing top 5 rows



In [12]:
train.show(5)

+-----+----+-------------------+------------+---------+
|Store|Dept|               Date|Weekly_Sales|IsHoliday|
+-----+----+-------------------+------------+---------+
|    1|   1|2010-02-05 00:00:00|     24924.5|    false|
|    1|   1|2010-02-12 00:00:00|    46039.49|     true|
|    1|   1|2010-02-19 00:00:00|    41595.55|    false|
|    1|   1|2010-02-26 00:00:00|    19403.54|    false|
|    1|   1|2010-03-05 00:00:00|     21827.9|    false|
+-----+----+-------------------+------------+---------+
only showing top 5 rows



In [13]:
test.show(5)

+-----+----+-------------------+---------+
|Store|Dept|               Date|IsHoliday|
+-----+----+-------------------+---------+
|    1|   1|2012-11-02 00:00:00|    false|
|    1|   1|2012-11-09 00:00:00|    false|
|    1|   1|2012-11-16 00:00:00|    false|
|    1|   1|2012-11-23 00:00:00|     true|
|    1|   1|2012-11-30 00:00:00|    false|
+-----+----+-------------------+---------+
only showing top 5 rows



In [14]:
chcekNullValues = {col:features.filter(features[col].isNull()).count() for col in features.columns}
print(chcekNullValues)

{'Store': 0, 'Date': 0, 'Temperature': 0, 'Fuel_Price': 0, 'MarkDown1': 0, 'MarkDown2': 0, 'MarkDown3': 0, 'MarkDown4': 0, 'MarkDown5': 0, 'CPI': 0, 'Unemployment': 0, 'IsHoliday': 0}


In [15]:
chcekNullValues = {col:stores.filter(stores[col].isNull()).count() for col in stores.columns}
print(chcekNullValues)

{'Store': 0, 'Type': 0, 'Size': 0}


In [16]:
chcekNullValues = {col:train.filter(train[col].isNull()).count() for col in train.columns}
print(chcekNullValues)

{'Store': 0, 'Dept': 0, 'Date': 0, 'Weekly_Sales': 0, 'IsHoliday': 0}


In [17]:
chcekNullValues = {col:test.filter(test[col].isNull()).count() for col in test.columns}
print(chcekNullValues)

{'Store': 0, 'Dept': 0, 'Date': 0, 'IsHoliday': 0}


# Data Pre-Processing

In [18]:
df_aux=train.join(stores, ["Store"])


df_tests=test.join(stores,["Store"])


In [19]:
df_train = df_aux.join(features, on=['Store', 'Date','IsHoliday'], how='inner')


df_test = df_tests.join(features, on=['Store', 'Date','IsHoliday'], how='inner')

In [20]:
df_train.printSchema()
print("*******************")
df_test.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- IsHoliday: boolean (nullable = true)
 |-- Dept: integer (nullable = true)
 |-- Weekly_Sales: double (nullable = true)
 |-- Type: string (nullable = true)
 |-- Size: integer (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- MarkDown1: string (nullable = true)
 |-- MarkDown2: string (nullable = true)
 |-- MarkDown3: string (nullable = true)
 |-- MarkDown4: string (nullable = true)
 |-- MarkDown5: string (nullable = true)
 |-- CPI: string (nullable = true)
 |-- Unemployment: string (nullable = true)

*******************
root
 |-- Store: integer (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- IsHoliday: boolean (nullable = true)
 |-- Dept: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Size: integer (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- MarkDown

In [21]:
from pyspark.sql.functions import when
df3 = df_train.withColumn("MarkDown1", when(df_train.MarkDown1 == "NA",0))
df3 = df3.withColumn("MarkDown2", when(df3.MarkDown2 == "NA",0)) 
df3 = df3.withColumn("MarkDown3", when(df3.MarkDown3 == "NA",0))
df3 = df3.withColumn("MarkDown4", when(df3.MarkDown4 == "NA",0))
df_train = df3.withColumn("MarkDown5", when(df3.MarkDown5 == "NA",0))
                          

df_train.show(5)
df_train.printSchema()


+-----+-------------------+---------+----+------------+----+------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+
|Store|               Date|IsHoliday|Dept|Weekly_Sales|Type|  Size|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|        CPI|Unemployment|
+-----+-------------------+---------+----+------------+----+------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+
|    1|2010-02-05 00:00:00|    false|   1|     24924.5|   A|151315|      42.31|     2.572|        0|        0|        0|        0|        0|211.0963582|       8.106|
|    1|2010-02-12 00:00:00|     true|   1|    46039.49|   A|151315|      38.51|     2.548|        0|        0|        0|        0|        0|211.2421698|       8.106|
|    1|2010-02-19 00:00:00|    false|   1|    41595.55|   A|151315|      39.93|     2.514|        0|        0|        0|        0|        0|211.2891429|       8.106|
|   

In [22]:
df_train=df_train.withColumn('MarkDown1',df_train.MarkDown1.cast("float"))
df_train=df_train.withColumn('MarkDown2',df_train.MarkDown2.cast("float"))
df_train=df_train.withColumn('MarkDown3',df_train.MarkDown3.cast("float"))
df_train=df_train.withColumn('MarkDown4',df_train.MarkDown4.cast("float"))
train=df_train.withColumn('MarkDown5',df_train.MarkDown5.cast("float"))

In [23]:
train.printSchema()
train.show(2)

root
 |-- Store: integer (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- IsHoliday: boolean (nullable = true)
 |-- Dept: integer (nullable = true)
 |-- Weekly_Sales: double (nullable = true)
 |-- Type: string (nullable = true)
 |-- Size: integer (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- MarkDown1: float (nullable = true)
 |-- MarkDown2: float (nullable = true)
 |-- MarkDown3: float (nullable = true)
 |-- MarkDown4: float (nullable = true)
 |-- MarkDown5: float (nullable = true)
 |-- CPI: string (nullable = true)
 |-- Unemployment: string (nullable = true)

+-----+-------------------+---------+----+------------+----+------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+
|Store|               Date|IsHoliday|Dept|Weekly_Sales|Type|  Size|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|        CPI|Unemployment|
+-----+------------------

In [24]:
df4 = df_test.withColumn("MarkDown1", when(df_test.MarkDown1 == "NA",0))
df4 = df4.withColumn("MarkDown2", when(df4.MarkDown2 == "NA",0)) 
df4 = df4.withColumn("MarkDown3", when(df4.MarkDown3 == "NA",0))
df4 = df4.withColumn("MarkDown4", when(df4.MarkDown4 == "NA",0))
df_test = df4.withColumn("MarkDown5", when(df4.MarkDown5 == "NA",0))
                          

df_test.show(5)



+-----+-------------------+---------+----+----+------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+
|Store|               Date|IsHoliday|Dept|Type|  Size|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|        CPI|Unemployment|
+-----+-------------------+---------+----+----+------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+
|    1|2012-11-02 00:00:00|    false|   1|   A|151315|      55.32|     3.386|     null|     null|     null|     null|     null|223.4627793|       6.573|
|    1|2012-11-09 00:00:00|    false|   1|   A|151315|      61.24|     3.314|     null|     null|     null|     null|     null|223.4813073|       6.573|
|    1|2012-11-16 00:00:00|    false|   1|   A|151315|      52.92|     3.252|     null|     null|     null|     null|     null|223.5129105|       6.573|
|    1|2012-11-23 00:00:00|     true|   1|   A|151315|      56.23|     3.211|     

In [25]:
df_test=df_test.withColumn('MarkDown1',df_test.MarkDown1.cast("float"))
df_test=df_test.withColumn('MarkDown2',df_test.MarkDown2.cast("float"))
df_test=df_test.withColumn('MarkDown3',df_test.MarkDown3.cast("float"))
df_test=df_test.withColumn('MarkDown4',df_test.MarkDown4.cast("float"))
test=df_test.withColumn('MarkDown5',df_test.MarkDown5.cast("float"))
test.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- IsHoliday: boolean (nullable = true)
 |-- Dept: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Size: integer (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- MarkDown1: float (nullable = true)
 |-- MarkDown2: float (nullable = true)
 |-- MarkDown3: float (nullable = true)
 |-- MarkDown4: float (nullable = true)
 |-- MarkDown5: float (nullable = true)
 |-- CPI: string (nullable = true)
 |-- Unemployment: string (nullable = true)



In [26]:
ml = spark.read.format("mongo").option("uri","mongodb://localhost:27017/walmart.data_train").load()
train.write.format("mongo").option("uri","mongodb://localhost:27017/walmart.data_train").save()




Py4JJavaError: An error occurred while calling o251.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 96.0 failed 1 times, most recent failure: Lost task 1.0 in stage 96.0 (TID 91) (DESKTOP-376FPEQ executor driver): com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast 0.0 into a BsonValue. FloatType has no matching BsonValue.
	at com.mongodb.spark.sql.MapFunctions$.$anonfun$dataTypeToBsonValueMapper$15(MapFunctions.scala:122)
	at com.mongodb.spark.sql.MapFunctions$.$anonfun$dataTypeToBsonValueMapper$16(MapFunctions.scala:125)
	at com.mongodb.spark.sql.MapFunctions$.$anonfun$wrappedDataTypeToBsonValueMapper$2(MapFunctions.scala:82)
	at scala.util.Try$.apply(Try.scala:213)
	at com.mongodb.spark.sql.MapFunctions$.$anonfun$wrappedDataTypeToBsonValueMapper$1(MapFunctions.scala:82)
	at com.mongodb.spark.sql.MapFunctions$.$anonfun$rowToDocumentMapper$3(MapFunctions.scala:62)
	at com.mongodb.spark.sql.MapFunctions$.$anonfun$rowToDocumentMapper$5(MapFunctions.scala:73)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at com.mongodb.spark.sql.MapFunctions$.$anonfun$rowToDocumentMapper$4(MapFunctions.scala:70)
	at com.mongodb.spark.MongoSpark$.$anonfun$save$4(MongoSpark.scala:154)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1161)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1213)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at com.mongodb.spark.MongoSpark$.$anonfun$save$2(MongoSpark.scala:119)
	at com.mongodb.spark.MongoSpark$.$anonfun$save$2$adapted(MongoSpark.scala:118)
	at com.mongodb.spark.MongoConnector.$anonfun$withCollectionDo$1(MongoConnector.scala:186)
	at com.mongodb.spark.MongoConnector.$anonfun$withDatabaseDo$1(MongoConnector.scala:171)
	at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154)
	at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171)
	at com.mongodb.spark.MongoConnector.withCollectionDo(MongoConnector.scala:184)
	at com.mongodb.spark.MongoSpark$.$anonfun$save$1(MongoSpark.scala:118)
	at com.mongodb.spark.MongoSpark$.$anonfun$save$1$adapted(MongoSpark.scala:117)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1011)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1011)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1011)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1009)
	at com.mongodb.spark.MongoSpark$.save(MongoSpark.scala:117)
	at com.mongodb.spark.MongoSpark$.save(MongoSpark.scala:159)
	at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:78)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:745)
Caused by: com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast 0.0 into a BsonValue. FloatType has no matching BsonValue.
	at com.mongodb.spark.sql.MapFunctions$.$anonfun$dataTypeToBsonValueMapper$15(MapFunctions.scala:122)
	at com.mongodb.spark.sql.MapFunctions$.$anonfun$dataTypeToBsonValueMapper$16(MapFunctions.scala:125)
	at com.mongodb.spark.sql.MapFunctions$.$anonfun$wrappedDataTypeToBsonValueMapper$2(MapFunctions.scala:82)
	at scala.util.Try$.apply(Try.scala:213)
	at com.mongodb.spark.sql.MapFunctions$.$anonfun$wrappedDataTypeToBsonValueMapper$1(MapFunctions.scala:82)
	at com.mongodb.spark.sql.MapFunctions$.$anonfun$rowToDocumentMapper$3(MapFunctions.scala:62)
	at com.mongodb.spark.sql.MapFunctions$.$anonfun$rowToDocumentMapper$5(MapFunctions.scala:73)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at com.mongodb.spark.sql.MapFunctions$.$anonfun$rowToDocumentMapper$4(MapFunctions.scala:70)
	at com.mongodb.spark.MongoSpark$.$anonfun$save$4(MongoSpark.scala:154)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1161)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1213)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at com.mongodb.spark.MongoSpark$.$anonfun$save$2(MongoSpark.scala:119)
	at com.mongodb.spark.MongoSpark$.$anonfun$save$2$adapted(MongoSpark.scala:118)
	at com.mongodb.spark.MongoConnector.$anonfun$withCollectionDo$1(MongoConnector.scala:186)
	at com.mongodb.spark.MongoConnector.$anonfun$withDatabaseDo$1(MongoConnector.scala:171)
	at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154)
	at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171)
	at com.mongodb.spark.MongoConnector.withCollectionDo(MongoConnector.scala:184)
	at com.mongodb.spark.MongoSpark$.$anonfun$save$1(MongoSpark.scala:118)
	at com.mongodb.spark.MongoSpark$.$anonfun$save$1$adapted(MongoSpark.scala:117)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1011)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1011)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [None]:
ml_test = spark.read.format("mongo").option("uri","mongodb://localhost:27017/walmart.data_test").load()
test.write.format("mongo").option("uri","mongodb://localhost:27017/walmart.data_test").save()

In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import pymongo
from pymongo import MongoClient
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

In [None]:
client = MongoClient('localhost',27017)
db = client.walmart
data = db.data_for_ml
new_df = pd.DataFrame(list(data.find()))
new_df

In [None]:
walmart_data = new_df.drop(['_id'],axis=1)
walmart_data

In [None]:
client = MongoClient('localhost',27017)
db_test = client.walmart
data_test = db.data_for_ml_test
new_test = pd.DataFrame(list(data.find()))
new_test

In [None]:
walmart_data_test = new_test.drop(['_id'],axis=1)
walmart_data_test

In [None]:
print(walmart_data.info())
print ("*****************************************")
print(walmart_data_test.info())

In [None]:
# Train Numerical Data
numeric_walmart_data=[key for key in dict(walmart_data.dtypes) if dict(walmart_data.dtypes)[key] in ['float64', 'int64', 'float32', 'int32']]
walmart_data_num=walmart_data[numeric_walmart_data]
print(walmart_data_num)
# Train Categorical Data
cat_walmart_data=[key for key in dict(walmart_data.dtypes) if dict(walmart_data.dtypes)[key] in ['object']]
walmart_data_cat=walmart_data[cat_walmart_data]
print(walmart_data_cat)


In [None]:
walmart_data_num.describe().transpose()

In [None]:
walmart_data_cat.describe().transpose()

In [None]:
# Test Numerical Data
numeric_walmart_data_test=[key for key in dict(walmart_data_test.dtypes) if dict(walmart_data_test.dtypes)[key] in ['float64', 'int64', 'float32', 'int32']]
walmart_data_test_num=walmart_data_test[numeric_walmart_data_test]
print(walmart_data_test_num)
# Test Categorical Data
cat_walmart_data_test=[key for key in dict(walmart_data_test.dtypes) if dict(walmart_data_test.dtypes)[key] in ['object']]
walmart_data_test_cat=walmart_data_test[cat_walmart_data_test]
print(walmart_data_test_cat)


In [None]:
walmart_data_test_num.describe().transpose()

In [None]:
walmart_data_test_cat.describe().transpose()

# correlation


In [None]:
train_corr=pd.DataFrame(walmart_data.corr())

train_corr.head()

In [None]:
test_corr=pd.DataFrame(walmart_data_test.corr())
#test_corr.to_excel(writer,'Test_Data Corr',index=True)
test_corr.head()

In [None]:
# visualize correlation matrix in Seaborn using a heatmap
sns.heatmap(train_corr.corr())

In [None]:
# visualize correlation matrix in Seaborn using a heatmap
sns.heatmap(test_corr.corr())

# EDA

In [None]:
sns.distplot(walmart_data.Weekly_Sales)

In [None]:
walmart_data.plot(kind='line', x='Weekly_Sales', y='Store', alpha=0.5)

sales vs type

In [None]:
sns.barplot(x=walmart_data["Weekly_Sales"],y=walmart_data["Type"])

sales vs department

In [None]:
print(walmart_data.isnull().sum())
print("*"*30)
print(walmart_data_test.isnull().sum())

outliers treatment

In [None]:
walmart_data.Weekly_Sales=np.where(walmart_data.Weekly_Sales>100000, 100000,walmart_data.Weekly_Sales)

In [None]:
walmart_data.Weekly_Sales.plot.hist(bins=25)

Feature Extraction

In this section, we select the appropriate features to train our classifier. Here, we create new features based on existing features. We also convert categorical features into numeric form.

Date Feature

In [None]:
walmart_data.info()

In [None]:
walmart_data['Date'] = pd.to_datetime(walmart_data['Date'])
walmart_data_test['Date'] = pd.to_datetime(walmart_data_test['Date'])

In [None]:
# Extract date features
walmart_data['Date_dayofweek'] =walmart_data['Date'].dt.dayofweek
walmart_data['Date_month'] =walmart_data['Date'].dt.month 
walmart_data['Date_year'] =walmart_data['Date'].dt.year
walmart_data['Date_day'] =walmart_data['Date'].dt.day 

walmart_data_test['Date_dayofweek'] =walmart_data_test['Date'].dt.dayofweek
walmart_data_test['Date_month'] =walmart_data_test['Date'].dt.month 
walmart_data_test['Date_year'] =walmart_data_test['Date'].dt.year
walmart_data_test['Date_day'] =walmart_data_test['Date'].dt.day 

In [None]:
print(walmart_data.Type.value_counts())
print("*"*30)
print(walmart_data_test.Type.value_counts())


In [None]:
print(walmart_data.IsHoliday.value_counts())
print("*"*30)
print(walmart_data_test.IsHoliday.value_counts())

In [None]:
train_test_data = [walmart_data, walmart_data_test]

Converting Categorical Variable 'Type' into Numerical Variable 
For A=1 , B=2, C=3

In [None]:
type_mapping = {"A": 1, "B": 2, "C": 3}
for dataset in train_test_data:
    dataset['Type'] = dataset['Type'].map(type_mapping)

Converting Categorical Variable 'IsHoliday' into Numerical Variable 

In [None]:
type_mapping = {False: 0, True: 1}
for dataset in train_test_data:
    dataset['IsHoliday'] = dataset['IsHoliday'].map(type_mapping)

In [None]:
from datetime import datetime

Creating Extra Holiday Variable.


If that week comes under extra holiday then 1(=Yes) else 2(=No)



Making New Holiday Variable Based on Given Data....

In [None]:
walmart_data['Super_Bowl'] = np.where((walmart_data['Date']==datetime(2010, 2, 12)) | (walmart_data['Date']==datetime(2011, 2, 11)) | (walmart_data['Date']==datetime(2012, 2, 10)) | (walmart_data['Date']==datetime(2013, 2, 8)),1,0)
walmart_data['Labour_Day'] = np.where((walmart_data['Date']==datetime(2010, 9, 10)) | (walmart_data['Date']==datetime(2011, 9, 9)) | (walmart_data['Date']==datetime(2012, 9, 7)) | (['Date']==datetime(2013, 9, 6)),1,0)
walmart_data['Thanksgiving'] = np.where((walmart_data['Date']==datetime(2010, 11, 26)) | (walmart_data['Date']==datetime(2011, 11, 25)) | (walmart_data['Date']==datetime(2012, 11, 23)) | (walmart_data['Date']==datetime(2013, 11, 29)),1,0)
walmart_data['Christmas'] = np.where((walmart_data['Date']==datetime(2010, 12, 31)) | (walmart_data['Date']==datetime(2011, 12, 30)) | (walmart_data['Date']==datetime(2012, 12, 28)) | (walmart_data['Date']==datetime(2013, 12, 27)),1,0)
#........................................................................
walmart_data_test['Super_Bowl'] = np.where((walmart_data_test['Date']==datetime(2010, 2, 12)) | (walmart_data_test['Date']==datetime(2011, 2, 11)) | (walmart_data_test['Date']==datetime(2012, 2, 10)) | (walmart_data_test['Date']==datetime(2013, 2, 8)),1,0)
walmart_data_test['Labour_Day'] = np.where((walmart_data_test['Date']==datetime(2010, 9, 10)) | (walmart_data_test['Date']==datetime(2011, 9, 9)) | (walmart_data_test['Date']==datetime(2012, 9, 7)) | (walmart_data_test['Date']==datetime(2013, 9, 6)),1,0)
walmart_data_test['Thanksgiving'] = np.where((walmart_data_test['Date']==datetime(2010, 11, 26)) | (walmart_data_test['Date']==datetime(2011, 11, 25)) | (walmart_data_test['Date']==datetime(2012, 11, 23)) | (walmart_data_test['Date']==datetime(2013, 11, 29)),1,0)
walmart_data_test['Christmas'] = np.where((walmart_data_test['Date']==datetime(2010, 12, 31)) | (walmart_data_test['Date']==datetime(2011, 12, 30)) | (walmart_data_test['Date']==datetime(2012, 12, 28)) | (walmart_data_test['Date']==datetime(2013, 12, 27)),1,0)


In [None]:
walmart_data['IsHoliday']=walmart_data['IsHoliday']|walmart_data['Super_Bowl']|walmart_data['Labour_Day']|walmart_data['Thanksgiving']|walmart_data['Christmas']
walmart_data_test['IsHoliday']=walmart_data_test['IsHoliday']|walmart_data_test['Super_Bowl']|walmart_data_test['Labour_Day']|walmart_data_test['Thanksgiving']|walmart_data_test['Christmas']

In [None]:
print (walmart_data.Christmas.value_counts())
print (walmart_data.Super_Bowl.value_counts())
print (walmart_data.Thanksgiving.value_counts())
print (walmart_data.Labour_Day.value_counts())

In [None]:
print (walmart_data_test.Christmas.value_counts())
print (walmart_data_test.Super_Bowl.value_counts())
print (walmart_data_test.Thanksgiving.value_counts())
print (walmart_data_test.Labour_Day.value_counts())

In [None]:
# Since we have Imputed IsHoliday according to Extra holidays..These extra holiday variable has redundant..
# Droping the Extra holiday variables because its redundant..
dp=['Super_Bowl','Labour_Day','Thanksgiving','Christmas']
walmart_data.drop(dp,axis=1,inplace=True)
walmart_data_test.drop(dp,axis=1,inplace=True)

Feature Selection


In [None]:
walmart_data.info()

Droping irrevelent variable:

In [None]:
features_drop=['CPI','Unemployment']
walmart_data=walmart_data.drop(features_drop, axis=1)
walmart_data_test=walmart_data_test.drop(features_drop, axis=1)

In [None]:
walmart_data.head(3)

In [None]:
walmart_data_test.head(3)

In [None]:
#Converting all float var int integer..
#for var in walmart_data:
    #if walmart_data[var].dtypes == float:
       # walmart_data[var]=walmart_data[var].astype(int)
        
#for var in walmart_data_test:
   # if walmart_data_test[var].dtypes == float:
       # walmart_data_test[var]=walmart_data_test[var].astype(int)

Classification & Accuracy


Define training and testing set

In [None]:
#### train X= Exery thing except Weekly_Sales
train_X=walmart_data.drop(['Weekly_Sales','Date'], axis=1)

#### train Y= Only Weekly_Sales 
train_y=walmart_data['Weekly_Sales'] 
test_X=walmart_data_test.drop(['Date','Fuel_Price'],axis=1).copy()

train_X.shape, train_y.shape, test_X.shape

In [None]:
test_X.info()

train_X.info()

Building models & comparing their RMSE values


1.Linear Regression

In [None]:
from sklearn import metrics
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import AdaBoostRegressor
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.svm import SVC, LinearSVC
from sklearn.metrics import mean_squared_error as mse
from sklearn.metrics import mean_absolute_error, mean_squared_error

In [None]:
## Methood 1..
clf = LinearRegression()
clf.fit(train_X, train_y)
y_pred_linear=clf.predict(test_X)
acc_linear=round( clf.score(train_X, train_y) * 100, 2)
print ('scorbe:'+str(acc_linear) + ' percent')