In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
dbutils.fs.ls('/FileStore/tables/')

Out[53]: [FileInfo(path='dbfs:/FileStore/tables/2011_summary.csv', name='2011_summary.csv', size=7069, modificationTime=1702969429000),
 FileInfo(path='dbfs:/FileStore/tables/bad_records/', name='bad_records/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/bucket_by_id/', name='bucket_by_id/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/corrupted_json.json', name='corrupted_json.json', size=218, modificationTime=1703184986000),
 FileInfo(path='dbfs:/FileStore/tables/csv_Write/', name='csv_Write/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/emp_data-1.csv', name='emp_data-1.csv', size=230, modificationTime=1703056833000),
 FileInfo(path='dbfs:/FileStore/tables/emp_data.csv', name='emp_data.csv', size=230, modificationTime=1703056812000),
 FileInfo(path='dbfs:/FileStore/tables/file5.json', name='file5.json', size=669503, modificationTime=1703187988000),
 FileInfo(path='dbfs:/FileStore/tables/lec8.csv', name='lec8.csv'

In [0]:
%fs
ls FileStore/tables

path,name,size,modificationTime
dbfs:/FileStore/tables/2011_summary.csv,2011_summary.csv,7069,1702969429000
dbfs:/FileStore/tables/bad_records/,bad_records/,0,0
dbfs:/FileStore/tables/bucket_by_id/,bucket_by_id/,0,0
dbfs:/FileStore/tables/corrupted_json.json,corrupted_json.json,218,1703184986000
dbfs:/FileStore/tables/csv_Write/,csv_Write/,0,0
dbfs:/FileStore/tables/emp_data-1.csv,emp_data-1.csv,230,1703056833000
dbfs:/FileStore/tables/emp_data.csv,emp_data.csv,230,1703056812000
dbfs:/FileStore/tables/file5.json,file5.json,669503,1703187988000
dbfs:/FileStore/tables/lec8.csv,lec8.csv,443,1704647534000
dbfs:/FileStore/tables/line_delimited_json.json,line_delimited_json.json,219,1703184986000


In [0]:
flight_df = spark.read.format('csv')\
    .option('header','true')\
    .option('inferschema','true')\
    .load('/FileStore/tables/2011_summary.csv')
flight_df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|       Saint Martin|    2|
|       United States|             Guinea|    2|
|       United States|            Croatia|    1|
|       United States|            Romania|    3|
|       United States|            Ireland|  268|
|               Egypt|      United States|   13|
|       United States|              India|   76|
|       United States|          Singapore|   24|
|       United States|            Grenada|   59|
|          Costa Rica|      United States|  494|
|             Senegal|      United States|   29|
|              Guyana|      United States|   26|
|       United States|   Marshall Islands|   49|
|       United States|       Sint Maarten|  223|
|               Malta|      United States|    1|
|             Bolivia|      United States|   61|
|            Anguilla|      United States|   21|
|       United State

In [0]:
flight_df.columns

Out[55]: ['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

In [0]:
flight_df.count() #number of records

Out[56]: 255

In [0]:
flight_df.rdd.getNumPartitions() #get no. of partition by converting to RDD first.

Out[57]: 1

In [0]:
partitioned_flight_df = flight_df.repartition(4) #to see the data we need to save it first. But count of records can be seen by the below function.

In [0]:
partitioned_flight_df.withColumn('partitionId',spark_partition_id()).groupBy('partitionId').count().show()

+-----------+-----+
|partitionId|count|
+-----------+-----+
|          0|   63|
|          1|   64|
|          2|   64|
|          3|   64|
+-----------+-----+



##### above data is evenly distributed.
###below is repartition another option - 'Partitioning on column.'

In [0]:
partitioned_on_column = flight_df.repartition(300, 'ORIGIN_COUNTRY_NAME')  #here optional to give number of partitions

In [0]:
partitioned_on_column.rdd.getNumPartitions()
#although, it has 255 records and we are giving no. of partitions as 300. So, it will keep null records in few partitions

Out[61]: 300

In [0]:
partitioned_on_column.withColumn('partitionId',spark_partition_id()).groupBy('partitionId').count().show(300)

Coalesce

In [0]:
flight_df.show(truncate=False)

In [0]:
coalesce_flight_df = flight_df.repartition(8)

In [0]:
coalesce_flight_df.withColumn('partitionId',spark_partition_id()).groupBy('partitionId').count().show()

In [0]:
three_coalesce_df = coalesce_flight_df.coalesce(3)

In [0]:
three_coalesce_df.withColumn('partitionId',spark_partition_id()).groupBy('partitionId').count().show()

In [0]:
three_repartition_df = three_coalesce_df.repartition(3)

In [0]:
three_repartition_df.withColumn('partitionId',spark_partition_id()).groupBy('partitionId').count().show()

So coalesce gives undistributed data and repartition gives evenly distributed data.

In [0]:
coalesce_flight_df.rdd.getNumPartitions()

In [0]:
coalesce_flight_df.coalesce(10).rdd.getNumPartitions()  #does not increase partition count when coalesce is done, can only descrease as we merge the data. 

Out[63]: 8

In [0]:

data = [( 1,   1),   
( 2,   1),   
( 3,   1),  
( 4,   2),   
( 5,   1),   
( 6,   2),   
( 7,   2),]   

In [0]:
flight_df.printSchema()
flight_df.columns

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)

Out[76]: ['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']