In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.getOrCreate()
spark

#### TURN OFF AQE

In [4]:
spark.conf.set("spark.sql.adaptive.enabled", "false")

In [5]:
# Checking AQE status
spark.conf.get("spark.sql.adaptive.enabled")

'false'

In [6]:
from pyspark.sql.functions import *

#### Data Reading

In [7]:
df = spark.read.format("csv")\
        .option("inferSchema", True)\
        .option("header", True)\
        .load("BigMart Sales.csv")

In [8]:
df.show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|              Tier 3|Superma

In [9]:
df.count()

8523

In [10]:
df.rdd.getNumPartitions()

1

#### Changing DEFAULT Partition Size from 128mb to 128kb

In [11]:
# Changind the default partition size to 128kb
spark.conf.set("spark.sql.files.maxPartitionBytes", 1000)

In [12]:
spark.conf.get("spark.sql.files.maxPartitionBytes")

'1000'

In [13]:
df.rdd.getNumPartitions()

1

#### Repartitioning

In [14]:
df = df.repartition(10)

In [15]:
df.rdd.getNumPartitions()

10

#### Get Partition Info

In [16]:
df.withColumn("partition_id", spark_partition_id()).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|partition_id|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+------------+
|          FDY07|       11.8|         Low Fat|            0.0|Fruits and Vegeta...| 45.5402|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|        1516.0266|           0|
|          NCB30|       14.6|         Low Fat|    0.025698134|           Household|196.5084|           OUT035|              

#### Data Writing

In [17]:
df.write.format("parquet")\
    .mode("overwrite")\
    .save('parquetWrite')

#### Scanning Optimization

In [18]:
df.write.format("parquet")\
        .mode("overwrite")\
        .partitionBy("Outlet_Location_Type")\
        .save("parquetWriteOptimization")

In [19]:
df_new = spark.read.format("parquet")\
                    .load("parquetWriteOptimization")

df_new = df_new.filter(col("Outlet_Location_Type") == 'Tier 1')