In [1]:
import pandas as pd

In [None]:
data = pd.read_csv('bvg.csv')

In [None]:
data.head()

Unnamed: 0,Date,Grand Total,Territory,Zone,Item Name,Quantity,Amount
0,10/12/2025,883576.512,T - Nawan Lahore,A - Painsra,Lemon up-1500 ml,208.0,224640.0
1,10/12/2025,883576.512,T - Nawan Lahore,A - Painsra,Cola-1500 ml,641.0,692280.0
2,10/12/2025,883576.512,T - Nawan Lahore,A - Painsra,Cola-1500 ml,399.0,430920.0
3,10/12/2025,985119.632,T - Chaudry Colony (Jhang),A - Jhang-Ii,Cola-1500 ml,520.0,561600.0
4,10/12/2025,985119.632,T - Chaudry Colony (Jhang),A - Jhang-Ii,Cola-2250 ml,600.0,528000.0


In [None]:
data.columns

Index(['Date', 'Grand Total', 'Territory', 'Zone', 'Item Name', 'Quantity',
       'Amount'],
      dtype='object')

In [13]:
# PySpark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, lag, avg, sum as spark_sum, count as spark_count
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType, IntegerType, DateType, TimestampType

# PySpark ML imports (equivalent to sklearn)
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.classification import LogisticRegression as SparkLogisticRegression, \
                                      LinearSVC as SparkSVC, \
                                      GBTClassifier as SparkXGBoost  # Note: XGBoost equivalent
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

# For visualization in PySpark (limited compared to matplotlib)
# We'll use pandas conversion for plotting

# Note: matplotlib and seaborn imports remain the same for visualization
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')

# Create Spark Session - this is required in PySpark
spark = SparkSession.builder \
    .appName("Beverages Price Prediction") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

# Set logging level to WARN to reduce verbosity
spark.sparkContext.setLogLevel("WARN")

print("PySpark session created successfully!")
print(f"PySpark version: {spark.version}")

PySpark session created successfully!
PySpark version: 3.3.2


In [52]:
data = spark.read.csv('bvg.csv', header=True, inferSchema=True)
  

In [53]:
data.show(5)

+----------+-----------+--------------------+------------+----------------+--------+--------+
|      Date|Grand Total|           Territory|        Zone|       Item Name|Quantity|  Amount|
+----------+-----------+--------------------+------------+----------------+--------+--------+
|10/12/2025| 883576.512|    T - Nawan Lahore| A - Painsra|Lemon up-1500 ml|   208.0|224640.0|
|10/12/2025| 883576.512|    T - Nawan Lahore| A - Painsra|    Cola-1500 ml|   641.0|692280.0|
|10/12/2025| 883576.512|    T - Nawan Lahore| A - Painsra|    Cola-1500 ml|   399.0|430920.0|
|10/12/2025| 985119.632|T - Chaudry Colon...|A - Jhang-Ii|    Cola-1500 ml|   520.0|561600.0|
|10/12/2025| 985119.632|T - Chaudry Colon...|A - Jhang-Ii|    Cola-2250 ml|   600.0|528000.0|
+----------+-----------+--------------------+------------+----------------+--------+--------+
only showing top 5 rows



In [54]:

from pyspark.sql.functions import year, month, dayofweek, weekofyear, quarter

data = data.withColumn("Year", year("Date")) \
       .withColumn("Month", month("Date")) \
       .withColumn("DayOfWeek", dayofweek("Date")) \
       .withColumn("WeekOfYear", weekofyear("Date")) \
       .withColumn("Quarter", quarter("Date"))


In [56]:
from pyspark.sql.functions import to_timestamp

data = data.withColumn("Date_ts", to_timestamp("Date", "dd/MM/yyyy"))


In [57]:
from pyspark.sql.functions import year, month, dayofweek, weekofyear, quarter

data = data.withColumn("Year", year("Date_ts")) \
       .withColumn("Month", month("Date_ts")) \
       .withColumn("DayOfWeek", dayofweek("Date_ts")) \
       .withColumn("WeekOfYear", weekofyear("Date_ts")) \
       .withColumn("Quarter", quarter("Date_ts"))


In [58]:
data.show(5)

+----------+-----------+--------------------+------------+----------------+--------+--------+----+-----+---------+----------+-------+-------------------+
|      Date|Grand Total|           Territory|        Zone|       Item Name|Quantity|  Amount|Year|Month|DayOfWeek|WeekOfYear|Quarter|            Date_ts|
+----------+-----------+--------------------+------------+----------------+--------+--------+----+-----+---------+----------+-------+-------------------+
|10/12/2025| 883576.512|    T - Nawan Lahore| A - Painsra|Lemon up-1500 ml|   208.0|224640.0|2025|   12|        4|        50|      4|2025-12-10 00:00:00|
|10/12/2025| 883576.512|    T - Nawan Lahore| A - Painsra|    Cola-1500 ml|   641.0|692280.0|2025|   12|        4|        50|      4|2025-12-10 00:00:00|
|10/12/2025| 883576.512|    T - Nawan Lahore| A - Painsra|    Cola-1500 ml|   399.0|430920.0|2025|   12|        4|        50|      4|2025-12-10 00:00:00|
|10/12/2025| 985119.632|T - Chaudry Colon...|A - Jhang-Ii|    Cola-1500 ml| 

In [59]:
data = data.withColumnRenamed("Item Name", "Item_Name") \
       .withColumnRenamed("Grand Total", "Grand_Total")


In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

categorical_cols = ["Territory", "Zone", "Item_Name"]




indexers = [
    StringIndexer(
        inputCol=c,
        outputCol=c + "_idx",
        handleInvalid="keep"
    )
    for c in categorical_cols
]

from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(
    inputCols=[c + "_idx" for c in categorical_cols],
    outputCols=[c + "_vec" for c in categorical_cols]
)



In [60]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=indexers + [encoder])


In [62]:
data.show(5)

+----------+-----------+--------------------+------------+----------------+--------+--------+----+-----+---------+----------+-------+-------------------+
|      Date|Grand_Total|           Territory|        Zone|       Item_Name|Quantity|  Amount|Year|Month|DayOfWeek|WeekOfYear|Quarter|            Date_ts|
+----------+-----------+--------------------+------------+----------------+--------+--------+----+-----+---------+----------+-------+-------------------+
|10/12/2025| 883576.512|    T - Nawan Lahore| A - Painsra|Lemon up-1500 ml|   208.0|224640.0|2025|   12|        4|        50|      4|2025-12-10 00:00:00|
|10/12/2025| 883576.512|    T - Nawan Lahore| A - Painsra|    Cola-1500 ml|   641.0|692280.0|2025|   12|        4|        50|      4|2025-12-10 00:00:00|
|10/12/2025| 883576.512|    T - Nawan Lahore| A - Painsra|    Cola-1500 ml|   399.0|430920.0|2025|   12|        4|        50|      4|2025-12-10 00:00:00|
|10/12/2025| 985119.632|T - Chaudry Colon...|A - Jhang-Ii|    Cola-1500 ml| 

In [61]:

model = pipeline.fit(data)
encoded_df = model.transform(data)


Py4JJavaError: An error occurred while calling o830.fit.
: org.apache.spark.SparkException: Input column Item Name does not exist.
	at org.apache.spark.ml.feature.StringIndexerBase.$anonfun$validateAndTransformSchema$2(StringIndexer.scala:128)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
	at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:198)
	at org.apache.spark.ml.feature.StringIndexerBase.validateAndTransformSchema(StringIndexer.scala:123)
	at org.apache.spark.ml.feature.StringIndexerBase.validateAndTransformSchema$(StringIndexer.scala:115)
	at org.apache.spark.ml.feature.StringIndexer.validateAndTransformSchema(StringIndexer.scala:145)
	at org.apache.spark.ml.feature.StringIndexer.transformSchema(StringIndexer.scala:252)
	at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:71)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:237)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)


In [46]:
from pyspark.ml.regression import RandomForestRegressor

model = RandomForestRegressor(
    featuresCol="scaledFeatures",
    labelCol="Quantity"     # or Amount
)


In [47]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=indexers + [encoder, assembler, scaler, model])
model_fitted = pipeline.fit(data)


In [48]:
data.show(5)

+----------+-----------+--------------------+------------+----------------+--------+--------+----+-----+---------+----------+-------+-----+-------------------+
|      Date|Grand Total|           Territory|        Zone|       Item Name|Quantity|  Amount|Year|Month|DayOfWeek|WeekOfYear|Quarter| temp|            Date_ts|
+----------+-----------+--------------------+------------+----------------+--------+--------+----+-----+---------+----------+-------+-----+-------------------+
|10/12/2025| 883576.512|    T - Nawan Lahore| A - Painsra|Lemon up-1500 ml|   208.0|224640.0|2025|   12|        4|        50|      4|10/12|2025-12-10 00:00:00|
|10/12/2025| 883576.512|    T - Nawan Lahore| A - Painsra|    Cola-1500 ml|   641.0|692280.0|2025|   12|        4|        50|      4|10/12|2025-12-10 00:00:00|
|10/12/2025| 883576.512|    T - Nawan Lahore| A - Painsra|    Cola-1500 ml|   399.0|430920.0|2025|   12|        4|        50|      4|10/12|2025-12-10 00:00:00|
|10/12/2025| 985119.632|T - Chaudry Colo

In [38]:
train_df, test_df = data.randomSplit([0.8, 0.2], seed=42)


In [39]:
model_fitted = pipeline.fit(train_df)


In [40]:
predictions = model_fitted.transform(test_df)


In [41]:
from pyspark.ml.evaluation import RegressionEvaluator

rmse_eval = RegressionEvaluator(
    labelCol="Quantity",
    predictionCol="prediction",
    metricName="rmse"
)

rmse = rmse_eval.evaluate(predictions)
print("RMSE =", rmse)


Py4JJavaError: An error occurred while calling o804.evaluate.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 62.0 failed 1 times, most recent failure: Lost task 0.0 in stage 62.0 (TID 53) (MRizwanRaj.mshome.net executor driver): org.apache.spark.SparkException: Failed to execute user defined function (StringIndexerModel$$Lambda$3643/0x0000000101598840: (string) => double)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:190)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$4(RDD.scala:1236)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$6(RDD.scala:1237)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.SparkException: Unseen label: Chiniot City. To handle unseen labels, set Param handleInvalid to keep.
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1(StringIndexer.scala:406)
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1$adapted(StringIndexer.scala:391)
	... 31 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2333)
	at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1174)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1168)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$2(RDD.scala:1267)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1228)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1214)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1214)
	at org.apache.spark.mllib.stat.Statistics$.colStats(Statistics.scala:58)
	at org.apache.spark.mllib.evaluation.RegressionMetrics.summary$lzycompute(RegressionMetrics.scala:70)
	at org.apache.spark.mllib.evaluation.RegressionMetrics.summary(RegressionMetrics.scala:62)
	at org.apache.spark.mllib.evaluation.RegressionMetrics.SSerr$lzycompute(RegressionMetrics.scala:74)
	at org.apache.spark.mllib.evaluation.RegressionMetrics.SSerr(RegressionMetrics.scala:74)
	at org.apache.spark.mllib.evaluation.RegressionMetrics.meanSquaredError(RegressionMetrics.scala:106)
	at org.apache.spark.mllib.evaluation.RegressionMetrics.rootMeanSquaredError(RegressionMetrics.scala:115)
	at org.apache.spark.ml.evaluation.RegressionEvaluator.evaluate(RegressionEvaluator.scala:101)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function (StringIndexerModel$$Lambda$3643/0x0000000101598840: (string) => double)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:190)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$4(RDD.scala:1236)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$6(RDD.scala:1237)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: org.apache.spark.SparkException: Unseen label: Chiniot City. To handle unseen labels, set Param handleInvalid to keep.
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1(StringIndexer.scala:406)
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1$adapted(StringIndexer.scala:391)
	... 31 more


In [50]:
data.printSchema()


root
 |-- Date: string (nullable = true)
 |-- Grand Total: double (nullable = true)
 |-- Territory: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- Item Name: string (nullable = true)
 |-- Quantity: double (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- WeekOfYear: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- temp: string (nullable = true)
 |-- Date_ts: timestamp (nullable = true)

