In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('partitions') \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/02 23:15:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### 0.Prepare Sample Data:

Pull Green Taxi data in 2019-2020 
The csv files will be stored in /data/raw/green folder

In [13]:
%system mkdir -p data
%system mkdir -p data/partitions

[]

In [7]:
from pyspark.sql import types

In [8]:
green_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("lpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("lpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("ehail_fee", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("trip_type", types.IntegerType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])

YEARS = [2019, 2020]

In [9]:
def populate_data(taxi_color:str ,schema: types.StructType):
    input_paths = []
    for year in YEARS:
        for month in range(1, 13):
            input_paths.append(f'data/raw/{taxi_color}/{year}/{month:02d}/')
    
    print(f'processing data for {taxi_color}_tripdata')    
    df = spark.read \
        .option("header", "true") \
        .schema(schema) \
        .csv(input_paths)
    print(f'data processing finished for {taxi_color}_tripdata')
    return df

In [10]:
from pyspark.sql.functions import year
green_df = populate_data("green",green_schema)

green_df \
    .filter(year("lpep_pickup_datetime") < 2019)\
    .filter(year("lpep_pickup_datetime") > 2020)


processing data for green_tripdata
data processing finished for green_tripdata


DataFrame[VendorID: int, lpep_pickup_datetime: timestamp, lpep_dropoff_datetime: timestamp, store_and_fwd_flag: string, RatecodeID: int, PULocationID: int, DOLocationID: int, passenger_count: int, trip_distance: double, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, ehail_fee: double, improvement_surcharge: double, total_amount: double, payment_type: int, trip_type: int, congestion_surcharge: double]

In [44]:
from pyspark.sql.functions import year, month, dayofmonth

green_df = green_df \
    .withColumn("pickup_year", year("lpep_pickup_datetime")) \
    .withColumn("pickup_month", month("lpep_pickup_datetime")) \
    .withColumn("pickup_day", dayofmonth("lpep_pickup_datetime"))

### 1. Partitions

#### 1.1 : How data is written without specific partition configuration

In [12]:
print(f'Green Partitions by default {green_df.rdd.getNumPartitions()}')

Green Partitions by default 15


In [15]:
green_df.write.mode("overwrite").csv("data/partitions/default.csv", header=True)

                                                                                

In [16]:
! find data/partitions/default.csv ! -name ".*" ! -name "_SUCCESS" -type f | wc -l

      15


In [29]:
! ls -sh data/partitions/default.csv/*

     0 data/partitions/default.csv/_SUCCESS
163968 data/partitions/default.csv/part-00000-3b00a939-a590-48d0-8c4e-568af0a4175a-c000.csv
163968 data/partitions/default.csv/part-00001-3b00a939-a590-48d0-8c4e-568af0a4175a-c000.csv
163968 data/partitions/default.csv/part-00002-3b00a939-a590-48d0-8c4e-568af0a4175a-c000.csv
123008 data/partitions/default.csv/part-00003-3b00a939-a590-48d0-8c4e-568af0a4175a-c000.csv
120960 data/partitions/default.csv/part-00004-3b00a939-a590-48d0-8c4e-568af0a4175a-c000.csv
112768 data/partitions/default.csv/part-00005-3b00a939-a590-48d0-8c4e-568af0a4175a-c000.csv
114816 data/partitions/default.csv/part-00006-3b00a939-a590-48d0-8c4e-568af0a4175a-c000.csv
112768 data/partitions/default.csv/part-00007-3b00a939-a590-48d0-8c4e-568af0a4175a-c000.csv
108672 data/partitions/default.csv/part-00008-3b00a939-a590-48d0-8c4e-568af0a4175a-c000.csv
108672 data/partitions/default.csv/part-00009-3b00a939-a590-48d0-8c4e-568af0a4175a-c000.csv
229504 data/partitions/de

#### 1.2 : Coalesce

In [17]:
# As seen coalesce does not effect partition number 
# when the given number is greater than the current partition count
# therefore coalesce is used only for reducing the partitions..
df = green_df.coalesce(24)
print(df.rdd.getNumPartitions())

15


In [18]:
df = green_df.coalesce(8)
print(df.rdd.getNumPartitions())

8


In [19]:
## Does not shuffle data therefore, parttion sizes may vary
df.write.mode("overwrite").csv("data/partitions/coalesce_8.csv", header=True)

                                                                                

In [30]:
! ls -sh data/partitions/coalesce_8.csv/*

     0 data/partitions/coalesce_8.csv/_SUCCESS
163968 data/partitions/coalesce_8.csv/part-00000-14ccac30-383e-4856-86d1-ccaea29bbdc8-c000.csv
295040 data/partitions/coalesce_8.csv/part-00001-14ccac30-383e-4856-86d1-ccaea29bbdc8-c000.csv
262272 data/partitions/coalesce_8.csv/part-00002-14ccac30-383e-4856-86d1-ccaea29bbdc8-c000.csv
229504 data/partitions/coalesce_8.csv/part-00003-14ccac30-383e-4856-86d1-ccaea29bbdc8-c000.csv
229504 data/partitions/coalesce_8.csv/part-00004-14ccac30-383e-4856-86d1-ccaea29bbdc8-c000.csv
327808 data/partitions/coalesce_8.csv/part-00005-14ccac30-383e-4856-86d1-ccaea29bbdc8-c000.csv
327808 data/partitions/coalesce_8.csv/part-00006-14ccac30-383e-4856-86d1-ccaea29bbdc8-c000.csv
116864 data/partitions/coalesce_8.csv/part-00007-14ccac30-383e-4856-86d1-ccaea29bbdc8-c000.csv


####  1.3 :  Repartition: Number

In [52]:
df = green_df.repartition(8)
print(df.rdd.getNumPartitions())



8


In [53]:
## Shuffles data therefore, expected to have similiar size files
df.write.mode("overwrite").csv("data/partitions/repartition_8.csv", header=True)

                                                                                

In [54]:
! ls -sh data/partitions/repartition_8.csv/*

     0 data/partitions/repartition_8.csv/_SUCCESS
264192 data/partitions/repartition_8.csv/part-00000-bc690253-3c0c-48f7-bfc5-8d1eb0619026-c000.csv
263936 data/partitions/repartition_8.csv/part-00001-bc690253-3c0c-48f7-bfc5-8d1eb0619026-c000.csv
263680 data/partitions/repartition_8.csv/part-00002-bc690253-3c0c-48f7-bfc5-8d1eb0619026-c000.csv
263552 data/partitions/repartition_8.csv/part-00003-bc690253-3c0c-48f7-bfc5-8d1eb0619026-c000.csv
263680 data/partitions/repartition_8.csv/part-00004-bc690253-3c0c-48f7-bfc5-8d1eb0619026-c000.csv
264064 data/partitions/repartition_8.csv/part-00005-bc690253-3c0c-48f7-bfc5-8d1eb0619026-c000.csv
263424 data/partitions/repartition_8.csv/part-00006-bc690253-3c0c-48f7-bfc5-8d1eb0619026-c000.csv
263936 data/partitions/repartition_8.csv/part-00007-bc690253-3c0c-48f7-bfc5-8d1eb0619026-c000.csv


#### 1.4 : Repartition: Column (V1)

In [34]:
df = green_df.repartition("payment_type")
print(df.rdd.getNumPartitions())



4


In [35]:
df.write.mode("overwrite").csv("data/partitions/repartition_col_v1.csv", header=True)

                                                                                

In [36]:
! ls -sh data/partitions/repartition_col_v1.csv/*

     0 data/partitions/repartition_col_v1.csv/_SUCCESS
262272 data/partitions/repartition_col_v1.csv/part-00000-a5c7301a-5575-41d0-a2f8-7f3a8ca6c790-c000.csv
919168 data/partitions/repartition_col_v1.csv/part-00001-a5c7301a-5575-41d0-a2f8-7f3a8ca6c790-c000.csv
 12256 data/partitions/repartition_col_v1.csv/part-00002-a5c7301a-5575-41d0-a2f8-7f3a8ca6c790-c000.csv
689920 data/partitions/repartition_col_v1.csv/part-00003-a5c7301a-5575-41d0-a2f8-7f3a8ca6c790-c000.csv


In [58]:
df_sample = spark.read.csv('data/partitions/repartition_col_v1.csv/part-00003-a5c7301a-5575-41d0-a2f8-7f3a8ca6c790-c000.csv', header=True)

In [59]:
df_sample.select('payment_type').distinct().collect()

                                                                                

[Row(payment_type='2')]

#### 1.5 : Repartition: Column (V2)

In [38]:
df = green_df.repartition("pickup_year", "pickup_month", "pickup_day")
print(df.rdd.getNumPartitions())



13


In [39]:
df.write.mode("overwrite").csv("data/partitions/repartition_col_v2.csv", header=True)

                                                                                

In [40]:
! ls -sh data/partitions/repartition_col_v2.csv/*

     0 data/partitions/repartition_col_v2.csv/_SUCCESS
165120 data/partitions/repartition_col_v2.csv/part-00000-15acf577-f098-4818-9b0b-5c6b5f04fdc6-c000.csv
164864 data/partitions/repartition_col_v2.csv/part-00001-15acf577-f098-4818-9b0b-5c6b5f04fdc6-c000.csv
164608 data/partitions/repartition_col_v2.csv/part-00002-15acf577-f098-4818-9b0b-5c6b5f04fdc6-c000.csv
164608 data/partitions/repartition_col_v2.csv/part-00003-15acf577-f098-4818-9b0b-5c6b5f04fdc6-c000.csv
164864 data/partitions/repartition_col_v2.csv/part-00004-15acf577-f098-4818-9b0b-5c6b5f04fdc6-c000.csv
197504 data/partitions/repartition_col_v2.csv/part-00005-15acf577-f098-4818-9b0b-5c6b5f04fdc6-c000.csv
164608 data/partitions/repartition_col_v2.csv/part-00006-15acf577-f098-4818-9b0b-5c6b5f04fdc6-c000.csv
164864 data/partitions/repartition_col_v2.csv/part-00007-15acf577-f098-4818-9b0b-5c6b5f04fdc6-c000.csv
164608 data/partitions/repartition_col_v2.csv/part-00008-15acf577-f098-4818-9b0b-5c6b5f04fdc6-c000.csv
164608 d

#### 1.6  PartitionBy: Column

In [61]:
green_df \
    .write \
    .partitionBy("pickup_year", "pickup_month") \
    .mode("overwrite") \
    .csv("data/partitions/partitionBy.csv", header=True)

                                                                                

In [84]:
! ls data/partitions/partitionBy.csv

_SUCCESS         [1m[36mpickup_year=2010[m[m [1m[36mpickup_year=2020[m[m [1m[36mpickup_year=2041[m[m
[1m[36mpickup_year=2008[m[m [1m[36mpickup_year=2018[m[m [1m[36mpickup_year=2021[m[m [1m[36mpickup_year=2062[m[m
[1m[36mpickup_year=2009[m[m [1m[36mpickup_year=2019[m[m [1m[36mpickup_year=2035[m[m


In [99]:
! ls data/partitions/partitionBy.csv/pickup_year=2020 

ls: e: No such file or directory
data/partitions/partitionBy.csv/pickup_year=2020:
[1m[36mpickup_month=1[m[m  [1m[36mpickup_month=12[m[m [1m[36mpickup_month=4[m[m  [1m[36mpickup_month=7[m[m
[1m[36mpickup_month=10[m[m [1m[36mpickup_month=2[m[m  [1m[36mpickup_month=5[m[m  [1m[36mpickup_month=8[m[m
[1m[36mpickup_month=11[m[m [1m[36mpickup_month=3[m[m  [1m[36mpickup_month=6[m[m  [1m[36mpickup_month=9[m[m


In [93]:
# ! cd data/partitions/partitionsBy.csv | tree --du -d -shaC | grep -Ev '(  *[^ ]* ){4}\['

#### 1.7 Repartion + PartitionBy : Column

In [48]:
green_df \
    .repartition("pickup_year") \
    .write \
    .partitionBy("pickup_year") \
    .mode("overwrite")\
    .csv("data/partitions/repartion_partionBy_col.csv", header=True)

                                                                                

In [1]:
! tree data/partitions/repartion_partionBy_col.csv/ 

[01;34mdata/partitions/repartion_partionBy_col.csv/[0m
├── [00m_SUCCESS[0m
├── [01;34mpickup_year=2008[0m
│   └── [00mpart-00001-fa509a03-3198-4aec-80b2-ea0d0b74e53c.c000.csv[0m
├── [01;34mpickup_year=2009[0m
│   └── [00mpart-00001-fa509a03-3198-4aec-80b2-ea0d0b74e53c.c000.csv[0m
├── [01;34mpickup_year=2010[0m
│   └── [00mpart-00001-fa509a03-3198-4aec-80b2-ea0d0b74e53c.c000.csv[0m
├── [01;34mpickup_year=2018[0m
│   └── [00mpart-00000-fa509a03-3198-4aec-80b2-ea0d0b74e53c.c000.csv[0m
├── [01;34mpickup_year=2019[0m
│   └── [00mpart-00000-fa509a03-3198-4aec-80b2-ea0d0b74e53c.c000.csv[0m
├── [01;34mpickup_year=2020[0m
│   └── [00mpart-00001-fa509a03-3198-4aec-80b2-ea0d0b74e53c.c000.csv[0m
├── [01;34mpickup_year=2021[0m
│   └── [00mpart-00001-fa509a03-3198-4aec-80b2-ea0d0b74e53c.c000.csv[0m
├── [01;34mpickup_year=2035[0m
│   └── [00mpart-00000-fa509a03-3198-4aec-80b2-ea0d0b74e53c.c000.csv[0m
├── [01;34mpickup_year=2041[0m
│   └── [00m

In [90]:
! tree data/partitions/repartion_partionBy_col.csv 

[01;34mdata/partitions/repartion_partionBy_col.csv[0m
├── [00m_SUCCESS[0m
├── [01;34mpickup_year=2008[0m
│   └── [00mpart-00001-fa509a03-3198-4aec-80b2-ea0d0b74e53c.c000.csv[0m
├── [01;34mpickup_year=2009[0m
│   └── [00mpart-00001-fa509a03-3198-4aec-80b2-ea0d0b74e53c.c000.csv[0m
├── [01;34mpickup_year=2010[0m
│   └── [00mpart-00001-fa509a03-3198-4aec-80b2-ea0d0b74e53c.c000.csv[0m
├── [01;34mpickup_year=2018[0m
│   └── [00mpart-00000-fa509a03-3198-4aec-80b2-ea0d0b74e53c.c000.csv[0m
├── [01;34mpickup_year=2019[0m
│   └── [00mpart-00000-fa509a03-3198-4aec-80b2-ea0d0b74e53c.c000.csv[0m
├── [01;34mpickup_year=2020[0m
│   └── [00mpart-00001-fa509a03-3198-4aec-80b2-ea0d0b74e53c.c000.csv[0m
├── [01;34mpickup_year=2021[0m
│   └── [00mpart-00001-fa509a03-3198-4aec-80b2-ea0d0b74e53c.c000.csv[0m
├── [01;34mpickup_year=2035[0m
│   └── [00mpart-00000-fa509a03-3198-4aec-80b2-ea0d0b74e53c.c000.csv[0m
├── [01;34mpickup_year=2041[0m
│   └── [00mp

#### 1.8 Repartion + PartittionBy : Num + Column

In [106]:
green_df \
    .repartition(2) \
    .write \
    .partitionBy("pickup_year") \
    .mode("overwrite")\
    .csv("data/partitions/repartion_partionBy_num.csv", header=True)

                                                                                

In [146]:
! ls data/partitions/repartion_partionBy_num.csv

_SUCCESS         [1m[36mpickup_year=2010[m[m [1m[36mpickup_year=2020[m[m [1m[36mpickup_year=2041[m[m
[1m[36mpickup_year=2008[m[m [1m[36mpickup_year=2018[m[m [1m[36mpickup_year=2021[m[m [1m[36mpickup_year=2062[m[m
[1m[36mpickup_year=2009[m[m [1m[36mpickup_year=2019[m[m [1m[36mpickup_year=2035[m[m


In [145]:
! ls -sh data/partitions/repartion_partionBy_num.csv/pickup_year=2020

total 460288
230144 part-00000-13b54b24-2a2c-4476-bc1d-6af3fb8e3c7a.c000.csv
230144 part-00001-13b54b24-2a2c-4476-bc1d-6af3fb8e3c7a.c000.csv
