In [1]:
from pyspark.sql import SparkSession
import os

os.environ["HADOOP_HOME"] = r"C:\hadoop"
os.environ["SPARK_HOME"]  = r"C:\spark"
os.environ["PATH"] = os.path.join(os.environ["HADOOP_HOME"], "bin") + ";" + os.environ["PATH"]

spark = (
    SparkSession.builder
    .appName("SparkLocalTest")
    .master("local[*]")
    .config("spark.sql.warehouse.dir", r"C:\spark-warehouse")
    .config("spark.hadoop.fs.defaultFS", "file:///")

    # Force: NO native Windows Hadoop calls
    .config("spark.hadoop.io.native.lib.available", "false")
    .config("spark.hadoop.io.native.lib", "false")

    # Less fragile commit path on Windows
    .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")

    .getOrCreate()
)

print("Spark:", spark.version)
print("Hadoop:", spark.sparkContext._jvm.org.apache.hadoop.util.VersionInfo.getVersion())

df = spark.read.csv(r"C:\datasets\datasets\\employee.csv", header=True, inferSchema=True)
df.write.mode("overwrite").csv(r"C:\spark_output")


Spark: 3.5.7
Hadoop: 3.3.4


In [2]:
sales_df = spark.read.csv("C:\datasets\datasets\superstore.csv",header=True,inferSchema=True)

In [4]:
sales_df.rdd.getNumPartitions()

3

In [6]:
sales_df.write.mode("overwrite").parquet("C:\\targets\sales_parquet")

In [7]:
sales_df.repartition(8).write.mode("overwrite").parquet("C:\\targets\sales_parquet")

In [8]:
sales_df.coalesce(2).write.mode("overwrite").orc("C:\\targets\sales_orc")

In [None]:
sales_df.coalesce(8).write.mode("overwrite").orc("C:\\targets\sales_orc")

In [10]:
sales_df.write.partitionBy("country","state").csv("c:\\targets\sales_partition")

In [11]:
sales_US_part = spark.read.csv("c:\\targets\sales_partition",header=True,inferSchema=True)

In [12]:
sales_US_part.count()

48346

In [13]:
sales_df.filter("country=='United States'").show()

+-----+--------------+----------+----------+--------------+----------+--------------------+-----------+----------------+--------------+-------------+----------+------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+----------+------------+-------------+
|   ID|       OrderID| OrderDate|  ShipDate|      ShipMode|CustomerID|        CustomerName|    Segment|            City|         State|      Country|PostalCode|Market| Region|      ProductID|       Category|Sub-Category|         ProductName|   Sales|Quantity|Discount|    Profit|ShippingCost|OrderPriority|
+-----+--------------+----------+----------+--------------+----------+--------------------+-----------+----------------+--------------+-------------+----------+------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+----------+------------+-------------+
|32298|CA-2012-124891|31/07/2012|31/07/2012|      Same Day|  RH-19495|         

In [14]:
sales_US_part.filter("country=='United States'").show()

+-----+--------------+----------+----------+--------------+--------+-------------------+-----------+--------------+-----+---+----+---------------+---------------+-----------+--------------------+--------------------+------------+------------+-------+--------+------+-------------+----------+
|38225|CA-2013-140382|24/06/2013|26/06/2013|  Second Class|RD-19900|        Ruben Dartt|   Consumer| San Francisco|94109| US|West|OFF-LA-10001934|Office Supplies|     Labels|           Avery 516|               21.93|           3|           0|10.3071|    3.38|  High|      country|     state|
+-----+--------------+----------+----------+--------------+--------+-------------------+-----------+--------------+-----+---+----+---------------+---------------+-----------+--------------------+--------------------+------------+------------+-------+--------+------+-------------+----------+
|38736|US-2014-163657|03/09/2014|07/09/2014|Standard Class|JL-15235|          Janet Lee|   Consumer|   Los Angeles|90049| US

In [16]:
spark.sql("create database ltim")
spark.sql("use ltim")
sales_df.write.partitionBy("country","state")\
.bucketBy(4,"city")\
.saveAsTable("ltim.sales_bucket")


In [17]:
spark.sql("select * from sales_bucket where country = 'India'")

DataFrame[ID: int, OrderID: string, OrderDate: string, ShipDate: string, ShipMode: string, CustomerID: string, CustomerName: string, Segment: string, City: string, PostalCode: int, Market: string, Region: string, ProductID: string, Category: string, Sub-Category: string, ProductName: string, Sales: string, Quantity: string, Discount: string, Profit: string, ShippingCost: string, OrderPriority: string, Country: string, State: string]

In [18]:
sc = spark.sparkContext
rdd1 = sc.parallelize([1,2,3])

In [20]:
emprdd = sc.textFile("C:\datasets\datasets\emp.csv")
emprdd.collect()

['111,zzz,8000', '111,aaa,8888', '121,bbb,8000']

In [21]:
rdd1.collect()

[1, 2, 3]

In [4]:
empdf = spark.read.csv("C:\datasets\datasets\employee.csv",header=True,inferSchema=True)

In [5]:
sales_df.show()

+-----+---------------+----------+----------+--------------+----------+----------------+-----------+-------------+---------------+-------------+----------+------+------------+----------------+---------------+------------+--------------------+----------------+--------+--------+----------+------------+-------------+
|   ID|        OrderID| OrderDate|  ShipDate|      ShipMode|CustomerID|    CustomerName|    Segment|         City|          State|      Country|PostalCode|Market|      Region|       ProductID|       Category|Sub-Category|         ProductName|           Sales|Quantity|Discount|    Profit|ShippingCost|OrderPriority|
+-----+---------------+----------+----------+--------------+----------+----------------+-----------+-------------+---------------+-------------+----------+------+------------+----------------+---------------+------------+--------------------+----------------+--------+--------+----------+------------+-------------+
|32298| CA-2012-124891|31/07/2012|31/07/2012|      S

In [6]:
empdf.show()

+-----+-----+----+
|empno|ename| sal|
+-----+-----+----+
|  111|  zzz|8000|
|  111|  aaa|8888|
|  121|  bbb|8000|
| NULL|  ccc|9000|
|false|  ddd|6000|
|  555|  eee|7000|
|  765| NULL|NULL|
|  666|  fff|8890|
|  aaa| NULL|NULL|
| True| NULL|NULL|
| true| NULL|NULL|
|    a|    b|   c|
|    x| NULL|NULL|
| 1000| null|null|
|  222| NULL|NULL|
+-----+-----+----+



In [8]:
sc = spark.sparkContext
sc.uiWebUrl

'http://10.33.198.42:4041'

In [9]:
sales_df.cache()

DataFrame[ID: int, OrderID: string, OrderDate: string, ShipDate: string, ShipMode: string, CustomerID: string, CustomerName: string, Segment: string, City: string, State: string, Country: string, PostalCode: int, Market: string, Region: string, ProductID: string, Category: string, Sub-Category: string, ProductName: string, Sales: string, Quantity: string, Discount: string, Profit: string, ShippingCost: string, OrderPriority: string]

In [12]:
sales_df.count()

51290

In [13]:
empdf.count()

Py4JJavaError: An error occurred while calling o61.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 1 times, most recent failure: Lost task 0.0 in stage 21.0 (TID 26) (10.33.198.42 executor driver): org.apache.spark.SparkFileNotFoundException: File file:/C:/datasets/datasets/employee.csv does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:781)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:222)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:621)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:624)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2898)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2834)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2833)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2833)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1253)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1253)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3102)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3036)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3025)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkFileNotFoundException: File file:/C:/datasets/datasets/employee.csv does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:781)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:222)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:621)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:624)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [14]:
sales_df.unpersist()

DataFrame[ID: int, OrderID: string, OrderDate: string, ShipDate: string, ShipMode: string, CustomerID: string, CustomerName: string, Segment: string, City: string, State: string, Country: string, PostalCode: int, Market: string, Region: string, ProductID: string, Category: string, Sub-Category: string, ProductName: string, Sales: string, Quantity: string, Discount: string, Profit: string, ShippingCost: string, OrderPriority: string]

In [15]:
sales_df.count()

Py4JJavaError: An error occurred while calling o49.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 22.0 failed 1 times, most recent failure: Lost task 1.0 in stage 22.0 (TID 28) (10.33.198.42 executor driver): org.apache.spark.SparkFileNotFoundException: File file:/C:/datasets/datasets/superstore.csv does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:781)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:222)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:621)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:624)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2898)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2834)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2833)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2833)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1253)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1253)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3102)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3036)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3025)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkFileNotFoundException: File file:/C:/datasets/datasets/superstore.csv does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:781)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:222)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:621)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:624)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [17]:
from pyspark.storagelevel import StorageLevel

df2 = sales_df.persist(StorageLevel.MEMORY_AND_DISK)


In [18]:
df2.count()

51290

In [19]:
df2.unpersist()

DataFrame[ID: int, OrderID: string, OrderDate: string, ShipDate: string, ShipMode: string, CustomerID: string, CustomerName: string, Segment: string, City: string, State: string, Country: string, PostalCode: int, Market: string, Region: string, ProductID: string, Category: string, Sub-Category: string, ProductName: string, Sales: string, Quantity: string, Discount: string, Profit: string, ShippingCost: string, OrderPriority: string]

In [21]:
from pyspark.storagelevel import StorageLevel
df = sales_df.persist(StorageLevel.MEMORY_AND_DISK_2)

In [24]:
df.count()

51290

In [25]:
from pyspark.storagelevel import StorageLevel
df = sales_df.persist(StorageLevel.MEMORY_ONLY)

In [26]:
df.count()

51290