In [1]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, year
from pathlib import Path

In [2]:
spark=SparkSession.builder.master("local[4]") \
                  .appName("ReadWriteParquet") \
                  .config("spark.sql.legacy.parquet.nanosAsLong", "true") \
                  .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/03 14:03:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# configuration

In [3]:
data_path = Path.cwd().parent / "data"

fr_immo_raw_path = (data_path / "fr_immo_transactions.parquet").as_posix()
fr_immo_valid_path = (data_path / "fr_immo_transactions_valid_ts.parquet").as_posix()

fr_immo_simple_partition_path = (data_path / "fr_immo_transactions_dep_partition").as_posix()

fr_immo_multi_partition_path = (data_path / "fr_immo_transactions_multi_partition").as_posix()

fr_immo_csv_path = (data_path / "fr_immo_transactions.csv").as_posix()

date_col_name = "date_transaction"


## Performance test csv vs parquet

In this part, we will test the query performance between csv and parquet.


1. read(data loading)
2. row_counts
3. group_by
4. filter

### Read parquet vs read csv

In [4]:
%%time
fr_immo_parquet_df = spark.read.parquet(fr_immo_valid_path)

                                                                                

CPU times: user 3.69 ms, sys: 3.72 ms, total: 7.4 ms
Wall time: 2.46 s


In [5]:
%%time
fr_immo_csv_df = spark.read.option("header",True).option("inferSchema", True).csv(fr_immo_csv_path)



CPU times: user 22 ms, sys: 222 µs, total: 22.2 ms
Wall time: 26.7 s


                                                                                

> You can notice read a csv file is ten times longer than read a parquet file

In [6]:
%%time
# read a single partitioned parquet
fr_immo_simple_part= spark.read.parquet(fr_immo_simple_partition_path)

CPU times: user 0 ns, sys: 3.13 ms, total: 3.13 ms
Wall time: 871 ms


In [7]:
%%time
# read a multiple partitioned parquet file
fr_immo_multi_part= spark.read.parquet(fr_immo_multi_partition_path)

CPU times: user 2.71 ms, sys: 0 ns, total: 2.71 ms
Wall time: 818 ms


### Count row number


In [11]:
%%time
print(f"count rows with parquet: {fr_immo_parquet_df.count()}")

count rows with parquet: 9141573
CPU times: user 1.81 ms, sys: 136 µs, total: 1.95 ms
Wall time: 129 ms


In [12]:
%%time
print(f"count rows with parquet: {fr_immo_csv_df.count()}")




count rows with parquet: 9141573
CPU times: user 12.1 ms, sys: 0 ns, total: 12.1 ms
Wall time: 2.04 s


                                                                                

In [13]:
%%time
print(f"count rows with single partitioned parquet: {fr_immo_parquet_df.count()}")

count rows with single partitioned parquet: 9141573
CPU times: user 1.96 ms, sys: 0 ns, total: 1.96 ms
Wall time: 114 ms


In [14]:
%%time
print(f"count rows with multiple partitioned parquet: {fr_immo_parquet_df.count()}")


count rows with multiple partitioned parquet: 9141573
CPU times: user 2.17 ms, sys: 0 ns, total: 2.17 ms
Wall time: 120 ms


### Group by query performence test



In [16]:
batiment_typ_col_name = "type_batiment"

In [17]:
%%time
print(f"group by type_batiment and count rows with parquet")
fr_immo_parquet_df.groupby(batiment_typ_col_name).count().show()

group by type_batiment and count rows with parquet




+-------------+-------+
|type_batiment|  count|
+-------------+-------+
|  Appartement|4079137|
|       Maison|5062436|
+-------------+-------+

CPU times: user 6.27 ms, sys: 489 µs, total: 6.76 ms
Wall time: 1.35 s


                                                                                

In [18]:
%%time
print(f"group by type_batiment and count rows with csv")
fr_immo_csv_df.groupby(batiment_typ_col_name).count().show()

group by type_batiment and count rows with csv




+-------------+-------+
|type_batiment|  count|
+-------------+-------+
|  Appartement|4079137|
|       Maison|5062436|
+-------------+-------+

CPU times: user 10.7 ms, sys: 4.29 ms, total: 15 ms
Wall time: 8.52 s


                                                                                

### Filter query performence test

In the first filter query scenario, the filter query matches well with our partition architecture. You can notice the more we partition the best the performence is.



In [20]:
%%time
target_departement = 92
target_type_batiment= "Appartement"

total_transaction = fr_immo_parquet_df.filter((col("departement") == target_departement) & (col("type_batiment")==target_type_batiment)).count()
print(f"Total vente appartement in parquet without partition: {total_transaction}")

Total vente appartement in parquet without partition: 202414
CPU times: user 5.83 ms, sys: 0 ns, total: 5.83 ms
Wall time: 510 ms


In [21]:
%%time

total_transaction = fr_immo_csv_df.filter((col("departement") == target_departement) & (col("type_batiment")==target_type_batiment)).count()
print(f"Total vente appartement in csv: {total_transaction}")



Total vente appartement in csv: 202414
CPU times: user 11.2 ms, sys: 0 ns, total: 11.2 ms
Wall time: 6.69 s


                                                                                

In [22]:
%%time

total_transaction = fr_immo_simple_part.filter((col("departement") == target_departement) & (col("type_batiment")==target_type_batiment)).count()
print(f"Total vente appartement in parquet with signle partition: {total_transaction}")

Total vente appartement in parquet with signle partition: 202414
CPU times: user 3.76 ms, sys: 0 ns, total: 3.76 ms
Wall time: 269 ms


In [23]:
%%time

total_transaction = fr_immo_multi_part.filter((col("departement") == target_departement) & (col("type_batiment")==target_type_batiment)).count()
print(f"Total vente appartement in parquet with multiple partition: {total_transaction}")

Total vente appartement in parquet with multiple partition: 202414
CPU times: user 4.2 ms, sys: 371 µs, total: 4.57 ms
Wall time: 169 ms


### Do a filter which does not match the partition

In [19]:
%%time

target_year =2023
code_postal_montrouge = 92120

total_transaction_montrouge = fr_immo_parquet_df.filter((col("code_postal") == code_postal_montrouge) & (year(col("date_transaction"))==target_year)).count()
print(f"Total transaction in parquet without partition: {total_transaction_montrouge}")

Total transaction montrouge: 656
CPU times: user 3.77 ms, sys: 687 μs, total: 4.45 ms
Wall time: 172 ms


In [18]:
%%time
total_transaction_montrouge = fr_immo_simple_part.filter((col("code_postal") == code_postal_montrouge) & (year(col("date_transaction"))==target_year)).count()
print(f"Total transaction in parquet with single partition: {total_transaction_montrouge}")

Total transaction montrouge: 656
CPU times: user 2.25 ms, sys: 2.2 ms, total: 4.45 ms
Wall time: 353 ms


In [20]:
%%time
total_transaction_montrouge = fr_immo_multi_part.filter((col("code_postal") == code_postal_montrouge) & (year(col("date_transaction"))==target_year)).count()
print(f"Total transaction in parquet with multiple partition: {total_transaction_montrouge}")

Total transaction montrouge: 656
CPU times: user 2.41 ms, sys: 2.23 ms, total: 4.64 ms
Wall time: 915 ms


                                                                                

In [21]:
%%time
total_transaction_montrouge = fr_immo_csv_df.filter((col("code_postal") == code_postal_montrouge) & (year(col("date_transaction"))==target_year)).count()
print(f"Total transaction in csv: {total_transaction_montrouge}")



Total transaction montrouge: 656
CPU times: user 21.1 ms, sys: 4.45 ms, total: 25.5 ms
Wall time: 10.9 s


                                                                                

## Row oriented query

In the below example, we will try to count distinct rows (all column values of the row are unique is considered as a unique row). It's considered as row based operation, which parquet is not designed for.

But distinct value of column is considered as column base operation.

In [24]:
%%time
distinct_city = fr_immo_parquet_df.select("ville").distinct()
print(f"distinct city count(column based operation): {distinct_city.count()}")
distinct_city.show(5)

                                                                                

distinct city count(column based operation): 32320
+-------------------+
|              ville|
+-------------------+
|              CESSY|
|DOMPIERRE-SUR-VEYLE|
|       GRAND-CORENT|
|            CONFORT|
|LA NEUVILLE BOSMONT|
+-------------------+
only showing top 5 rows

CPU times: user 4.62 ms, sys: 3.79 ms, total: 8.41 ms
Wall time: 2.18 s


                                                                                

In [25]:
%%time
distinct_rows = fr_immo_parquet_df.distinct()
print(f"distinct rows count(row based operation): {distinct_rows.count()}")
distinct_rows.show(5)

25/02/03 14:23:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/03 14:23:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/03 14:23:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/03 14:23:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/03 14:23:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/03 14:23:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/03 14:23:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/03 14:23:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/03 14:23:37 WARN RowBasedKeyValueBatch: Calling spill() on

distinct rows count(row based operation): 9141573


25/02/03 14:24:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/03 14:24:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/03 14:24:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/03 14:24:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/03 14:24:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/03 14:24:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/03 14:24:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/03 14:24:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/03 14:24:09 WARN RowBasedKeyValueBatch: Calling spill() on

+--------------+--------------------+--------+-----------+--------+--------------------+-----------+--------------------+-------------+--------+-----------------+----------------+----------------+
|id_transaction|    date_transaction|    prix|departement|id_ville|               ville|code_postal|             adresse|type_batiment|n_pieces|surface_habitable|        latitude|       longitude|
+--------------+--------------------+--------+-----------+--------+--------------------+-----------+--------------------+-------------+--------+-----------------+----------------+----------------+
|        144226|2014-01-18 10:23:...|110000.0|         01|     345|SAINT-DENIS-EN-BUGEY|       1500|  VILLAGE SAINT D...|  Appartement|       4|              100| 45.951437939058|5.32844001943718|
|        140148|2014-01-19 10:23:...| 83600.0|         01|      53|     BOURG-EN-BRESSE|       1000|        3 RUE GOUNOD|  Appartement|       2|               50|46.1965265769704|5.22048990188062|
|        148148

                                                                                

In [11]:
%%time
distinct_city = fr_immo_csv_df.select("ville").distinct()
print(f"distinct city count: {distinct_city.count()}")
distinct_city.show(5)

                                                                                

distinct city count: 32320




+-------------------+
|              ville|
+-------------------+
|              CESSY|
|DOMPIERRE-SUR-VEYLE|
|       GRAND-CORENT|
|            CONFORT|
|LA NEUVILLE BOSMONT|
+-------------------+
only showing top 5 rows

CPU times: user 31.4 ms, sys: 6.81 ms, total: 38.2 ms
Wall time: 10.6 s


                                                                                

In [13]:
%%time
distinct_rows = fr_immo_csv_df.distinct()
print(f"distinct rows count: {distinct_rows.count()}")
distinct_rows.show(5)

25/01/28 10:09:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/28 10:09:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/28 10:09:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/28 10:09:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/28 10:09:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/28 10:10:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/28 10:10:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/28 10:10:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/28 10:10:06 WARN RowBasedKeyValueBatch: Calling spill() on

distinct rows count: 9141573


25/01/28 10:10:36 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/28 10:10:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/28 10:10:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/28 10:10:38 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/28 10:10:38 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/28 10:10:47 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/28 10:10:48 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/28 10:10:48 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/28 10:10:49 WARN RowBasedKeyValueBatch: Calling spill() on

+--------------+--------------------+--------+-----------+--------+------------+-----------+--------------------+-------------+--------+-----------------+----------------+----------------+
|id_transaction|    date_transaction|    prix|departement|id_ville|       ville|code_postal|             adresse|type_batiment|n_pieces|surface_habitable|        latitude|       longitude|
+--------------+--------------------+--------+-----------+--------+------------+-----------+--------------------+-------------+--------+-----------------+----------------+----------------+
|        143595|2014-01-06 10:24:...|206800.0|          1|     446|  VILLENEUVE|       1480|10 ALL DES MARGUE...|       Maison|       5|              100| 46.017215697925|4.83236118261167|
|        146931|2014-02-24 10:17:...| 50000.0|          1|     430|    VARAMBON|       1160|      5154  VARAMBON|       Maison|       2|               56|46.0400470273731|5.31682925744396|
|        146921|2014-02-25 10:17:...|236000.0|         

                                                                                

In [16]:

import time


def random_batch(df:DataFrame,fmt:str):
    start = time.time()
    result=df.sample(False, 0.05).collect()
    stats="{},{},{}".format(fmt, "random_batch", time.time() - start)
    print(stats)

In [17]:
random_batch(fr_immo_parquet_df,"parquet")

                                                                                

parquet,random_batch,9.600819110870361


In [18]:
random_batch(fr_immo_csv_df,"csv")

                                                                                

csv,random_batch,20.17941665649414


## Compatibility problems

As there are many libraries that can write parquet files, there are some compatibility problems.

### Timestamp data type

The default Timestamp data type implementation in **pyarrow/pandas is INT64 (TIMESTAMP(NANOS))**.

The default Timestamp data type implementation in **spark is INT64 (TIMESTAMP(MICROS)) or INT96 (NANOS)**.

INT96 (NANOS) is deprecated in the newer spark version. You can still activate in Spark environments with `config("spark.sql.legacy.parquet.nanosAsLong", "true")`

In the below example, we use pyspark to read a parquet file which is generated by using the pandas/pyarrow. You may receive an error message (based on your spark version)

```java
org.apache.spark.sql.AnalysisException: Illegal Parquet type: INT64 (TIMESTAMP(NANOS,false)).
```

In [31]:
# the raw parquet file is writen with pandas/arrow with default config
fr_immo_raw_df = spark.read.parquet(fr_immo_raw_path)


In [15]:
# you can notice that the date_transaction is not conidered as timestamp type, but long type
fr_immo_raw_df.printSchema()

root
 |-- id_transaction: integer (nullable = true)
 |-- date_transaction: long (nullable = true)
 |-- prix: double (nullable = true)
 |-- departement: string (nullable = true)
 |-- id_ville: integer (nullable = true)
 |-- ville: string (nullable = true)
 |-- code_postal: integer (nullable = true)
 |-- adresse: string (nullable = true)
 |-- type_batiment: string (nullable = true)
 |-- n_pieces: integer (nullable = true)
 |-- surface_habitable: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [32]:
fr_immo_raw_df.select([date_col_name]).show(5)

+-------------------+
|   date_transaction|
+-------------------+
|1388620800000000000|
|1388620800000000000|
|1388620800000000000|
|1388620800000000000|
|1388707200000000000|
+-------------------+
only showing top 5 rows



In [34]:
# the valid df is writen with the correct timestamp conf which spark can read
fr_immo_valid_df = spark.read.parquet(fr_immo_valid_path)



In [35]:
# you can notice that the date_transaction is conidered as timestamp type.
fr_immo_valid_df.printSchema()

root
 |-- id_transaction: integer (nullable = true)
 |-- date_transaction: timestamp (nullable = true)
 |-- prix: double (nullable = true)
 |-- departement: string (nullable = true)
 |-- id_ville: integer (nullable = true)
 |-- ville: string (nullable = true)
 |-- code_postal: integer (nullable = true)
 |-- adresse: string (nullable = true)
 |-- type_batiment: string (nullable = true)
 |-- n_pieces: integer (nullable = true)
 |-- surface_habitable: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [29]:
fr_immo_valid_df.select([date_col_name]).show(5)

+--------------------+
|    date_transaction|
+--------------------+
|2013-12-31 10:25:...|
|2013-12-31 10:25:...|
|2013-12-31 10:25:...|
|2013-12-31 10:25:...|
|2014-01-01 10:25:...|
+--------------------+
only showing top 5 rows

