# 02.Use pyspark in CASD

In this tutorial, we will learn:
- how to install spark and pyspark in CASD
- how to create a spark session
- read a csv file
- read a parquet file


## 1. Install spark and pyspark in CASD

As we explained before,
- `Apache Spark` is a distributed computation framework written mostly in Scala and Java, runs in a JVM(Java Virtual Machine)
- `PySpark` is a Python API for Apache Spark.
- PySpark talks to Spark engine via `Py4J`. Your Python code → Py4J(serialized and sent to JVM) → Spark core executes it → results sent back to python

> Spark framework installation is essential, without it, pyspark will never work.

### 1.1 Install spark framework

CASD provides an installation script(`InstallSpark.ps1`) to install the `latest spark framework` and underlying JDK available in CASD.
You can find this script in `Bureau->Raccourcis->Spark`.

Open a powershell terminal and run the below command

```powershell
# goto the target folder
cd C:\Users\Public\Desktop\Raccourcis\Spark

# run the installation script
.\InstallSpark.ps1
```

> If everything works well, this script will install spark in `C:\Users\<your-id>\AppData\Local\spark\spark-3.5.5-bin-hadoop3`. It will also install open-jdk, winutils, and set up your
> windows env vars.

Now let's check if your spark works or not. Open a new powershell terminal and run the below command

```powershell
# check the installed spark version
spark-shell --version

## it may take few seconds to show the output, be patient
# expected output
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.5
      /_/

Using Scala version 2.12.18, OpenJDK 64-Bit Server VM, 11.0.2
Branch HEAD
Compiled by user ubuntu on 2024-08-06T11:36:15Z
Revision bb7846dd487f259994fdc69e18e03382e3f64f42
Url https://github.com/apache/spark
Type --help for more information.
```

The
> If you can't see the spark output, contact `service@casd.eu`

### 1.2 Install pyspark

As we mentioned before, CASD recommends you to create a `python virtual environment` for each for your python project.

Suppose we will start a new project called `docs_in_paris`, let's create a python virtual environment with this name


```powershell
# create a python virtual environment
conda create --name docs_in_paris python --offline

# activate the virtual environment
conda activate docs_in_paris

# check python version
python -V

# check installed packages
pip list

# install pyspark
pip install pyspark==3.5.5

# check the installed pyspark version
pip show pyspark

# expected output
Name: pyspark
Version: 3.5.5
Summary: Apache Spark Python API
......
```

> You can notice that we have installed a specific version of pyspark. Because the pyspark version must be the same as the spark framework version. As the output in `section 1.1` is **spark-3.5.5**. So we need to
> install pyspark-3.5.5

## 2. Create a spark session

A `Spark session` is the entry point to Apache Spark. A spark session allows us to interact with Spark’s core engine, no matter if you’re working in Python (PySpark), Scala, Java, or R.


It encapsulates:

- Cluster connection (or local JVM if local mode)
- Configuration settings (memory, partitions, serializer, etc.)
- Access to Spark’s APIs: Spark SQL API, DataFrame and Dataset API, RDD API (via .sparkContext), Streaming and machine learning APIs

To create a spark session, you need to
- import the required module
- configure the spark session settings
- create the spark session instance

### 2.1 A minimum spark session creation

Below shows a minimum spark session creation.

In [2]:
from pyspark.sql import SparkSession, DataFrame

In [3]:
# create a spark session in local mode
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Use_pyspark_in_CASD") \
    .getOrCreate()

In [4]:
# you can get and set configuration of your spark session any moments
# get all conf
spark.sparkContext.getConf().getAll()

[('spark.driver.extraJavaOptions',
  '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false'),
 ('spark.app.name', 'Use_pyspark_in_CASD'),
 ('spark.app.startTime', '1755593027837'),
 ('spark.executo

### 2.2 Check if the spark session works

Let's create a dataframe and do some basic operations.

In [5]:
from pyspark.sql.types import StructType, StringType, StructField, IntegerType

# create a dataframe by using List
dept = [("Alice", "Finance", 10),
        ("Bob", "Marketing", 20),
        ("Charlie", "Sales", 30),
        ("Toto", "IT", 40)
        ]

# give an explicit schema
deptSchema = StructType([
    StructField('name', StringType(), True),
    StructField('dept_name', StringType(), True),
    StructField('age', IntegerType(), True)
])

# create dataframe
deptDF1 = spark.createDataFrame(data=dept, schema=deptSchema)
deptDF1.printSchema()
deptDF1.show(truncate=False)

root
 |-- name: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- age: integer (nullable = true)

+-------+---------+---+
|name   |dept_name|age|
+-------+---------+---+
|Alice  |Finance  |10 |
|Bob    |Marketing|20 |
|Charlie|Sales    |30 |
|Toto   |IT       |40 |
+-------+---------+---+



In [7]:
# show the basic stats
deptDF1.describe().show()

+-------+-----+---------+------------------+
|summary| name|dept_name|               age|
+-------+-----+---------+------------------+
|  count|    4|        4|                 4|
|   mean| NULL|     NULL|              25.0|
| stddev| NULL|     NULL|12.909944487358056|
|    min|Alice|  Finance|                10|
|    max| Toto|    Sales|                40|
+-------+-----+---------+------------------+



## 2.2 Optimize the spark session config (Optional)

When the data volume which you treat is much bigger than your memory, you may need to optimize the spark session config. Here we only give
you an example, it may be not suitable for your workflow, because the config depends on your `server configuration` and `data volume`.

In general, spark tries to store the data source and intermediate data(shuffle results) in memory. If the memory is not big enough to store the
data, we can ask spark to spill the data on hard drive.

In the below example, we configure:
- how many memory the spark can use
- how many memory spark will use for storage and calculation
- ETC.

> You don't need to understand all, just keep in mind we can optimize spark for large data volume calculation


In [2]:
spark = (
    SparkSession.builder
    .appName("LocalMode_memo_config")
    .master("local[*]")
    # JVM memory allocation
    .config("spark.driver.memory", "16g")  # Half of RAM for driver
    .config("spark.driver.maxResultSize", "4g")  # Avoid OOM on collect()
    # Shuffle & partition tuning
    .config("spark.sql.shuffle.partitions", "12")  # Lower than default 200
    .config("spark.sql.files.maxPartitionBytes", "128m")  # Avoid large partitions in memory
    .config("spark.reducer.maxSizeInFlight", "48m")  # Limit shuffle buffer
    # Unified memory management
    .config("spark.memory.fraction", "0.7")  # Reduce pressure on execution memory
    .config("spark.memory.storageFraction", "0.3")  # Smaller cache area
    .config("spark.memory.offHeap.enabled", "true")
    .config("spark.memory.offHeap.size", "1g")
    # Spill to disk early instead of crashing
    .config("spark.shuffle.spill", "true")
    .config("spark.shuffle.spill.compress", "true")
    .config("spark.shuffle.compress", "true")
    # optimize jvm GC
    .config("spark.driver.extraJavaOptions",
            "-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -XX:+HeapDumpOnOutOfMemoryError")
    # Use Kryo serializer
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    # Optional: buffer size for serialization
    .config("spark.kryoserializer.buffer", "64m")
    .config("spark.kryoserializer.buffer.max", "512m")
    .getOrCreate()
)

> In this tutorial, we only focus on spark on local mode. CASD also proposes spark on `yarn` and `k8s` mode. For more information, please contact `service@casd.eu`.

## 3. Use spark to treat data

In this section, we will show the basics of spark
- read data (e.g. csv, parquet, etc.)
- basic data validation
- basic data cleaning
- basic data transformation
- basic statistics
- write data(e.g. csv, parquet, etc.)

The data which we will use in this repo is from [kaggle](https://www.kaggle.com/datasets/benoitfavier/immobilier-france). It contains several interesting datasets. In this repo,
we use two of them:

- transaction.npz: It contains all French real state transactions between 2012 and 2024
- transactions_sample.csv: A subset of all transactions

### 3.1 Read csv file

CSV files are semi-structure data(the column types are not provided), spark offers the `inferSchema` functionality. It works good enough for most of the case, but you need to
always check the inferred column types.

> The dataset is all france immo transaction records.

In [8]:
from pathlib import Path

project_root_dir = Path.cwd().parent
data_dir = project_root_dir / "data"
print(data_dir)

C:\Users\PLIU\Documents\git\Seminar_PySpark_Sedona_GeoParquet\data


In [9]:
csv_immo_file_path = data_dir / "transactions_sample.csv"

# the option header
immo_csv_df = spark.read.csv(csv_immo_file_path.as_posix(), header=True, inferSchema=True)

In [10]:
immo_csv_df.show(5)

+--------------+----------------+--------+-----------+--------+--------------------+-----------+--------------------+-------------+-----+--------+-----------------+--------------------+----------------+----------------+-------------------+--------------------------+--------------------------+---------------------+-----------------------+
|id_transaction|date_transaction|    prix|departement|id_ville|               ville|code_postal|             adresse|type_batiment| vefa|n_pieces|surface_habitable|id_parcelle_cadastre|        latitude|       longitude|surface_dependances|surface_locaux_industriels|surface_terrains_agricoles|surface_terrains_sols|surface_terrains_nature|
+--------------+----------------+--------+-----------+--------+--------------------+-----------+--------------------+-------------+-----+--------+-----------------+--------------------+----------------+----------------+-------------------+--------------------------+--------------------------+---------------------+-----

In [11]:
immo_csv_df.printSchema()

root
 |-- id_transaction: integer (nullable = true)
 |-- date_transaction: date (nullable = true)
 |-- prix: double (nullable = true)
 |-- departement: integer (nullable = true)
 |-- id_ville: integer (nullable = true)
 |-- ville: string (nullable = true)
 |-- code_postal: integer (nullable = true)
 |-- adresse: string (nullable = true)
 |-- type_batiment: string (nullable = true)
 |-- vefa: string (nullable = true)
 |-- n_pieces: integer (nullable = true)
 |-- surface_habitable: integer (nullable = true)
 |-- id_parcelle_cadastre: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- surface_dependances: string (nullable = true)
 |-- surface_locaux_industriels: string (nullable = true)
 |-- surface_terrains_agricoles: string (nullable = true)
 |-- surface_terrains_sols: string (nullable = true)
 |-- surface_terrains_nature: string (nullable = true)



In [12]:
immo_csv_df.describe().show()

+-------+-----------------+------------------+------------------+------------------+---------------+------------------+----------------+-------------+----+------------------+-----------------+--------------------+-----------------+------------------+-------------------+--------------------------+--------------------------+---------------------+-----------------------+
|summary|   id_transaction|              prix|       departement|          id_ville|          ville|       code_postal|         adresse|type_batiment|vefa|          n_pieces|surface_habitable|id_parcelle_cadastre|         latitude|         longitude|surface_dependances|surface_locaux_industriels|surface_terrains_agricoles|surface_terrains_sols|surface_terrains_nature|
+-------+-----------------+------------------+------------------+------------------+---------------+------------------+----------------+-------------+----+------------------+-----------------+--------------------+-----------------+------------------+--------

## 3.2 Read a parquet file



In [13]:
fr_immo_transaction_path = "C:/Users/PLIU/Documents/git/Seminar_PySpark_Sedona_GeoParquet/data/fr_immo_transaction.parquet"
fr_immo_transactions_df = spark.read.parquet(fr_immo_transaction_path)

In [14]:
required_col = ["id_transaction", "date_transaction", "prix", "departement", "ville", "code_postal", "adresse",
                "type_batiment", "n_pieces", "surface_habitable", "latitude", "longitude"]
clean_fr_immo_df = fr_immo_transactions_df.select(required_col)

In [15]:
# cache the dataframe for better performance
# clean_fr_immo_df.cache()
clean_fr_immo_df.show(5)

+--------------+----------------+--------+-----------+--------------------+-----------+--------------------+-------------+--------+-----------------+----------------+----------------+
|id_transaction|date_transaction|    prix|departement|               ville|code_postal|             adresse|type_batiment|n_pieces|surface_habitable|        latitude|       longitude|
+--------------+----------------+--------+-----------+--------------------+-----------+--------------------+-------------+--------+-----------------+----------------+----------------+
|        141653|      2014-01-02|197000.0|         01|             TREVOUX|       1600|  6346 MTE DES LILAS|  Appartement|       4|               84|45.9423014034837|4.77069364742062|
|        141970|      2014-01-02|157500.0|         01|              VIRIAT|       1440|1369 RTE DE STRAS...|       Maison|       4|              103|46.2364072868351|5.26293493674271|
|        139240|      2014-01-02|112000.0|         01|SAINT-JEAN-SUR-VEYLE|     

In [16]:
clean_fr_immo_df.describe().show()

+-------+-----------------+------------------+-----------------+---------+-----------------+--------------------+-------------+------------------+-----------------+-----------------+-----------------+
|summary|   id_transaction|              prix|      departement|    ville|      code_postal|             adresse|type_batiment|          n_pieces|surface_habitable|         latitude|        longitude|
+-------+-----------------+------------------+-----------------+---------+-----------------+--------------------+-------------+------------------+-----------------+-----------------+-----------------+
|  count|          9141573|           9141573|          9141573|  9141573|          9141573|             9141573|      9141573|           9141573|          9141573|          9141573|          9141573|
|   mean|7739952.499090474|225329.25414140944|61.77079502619517|     NULL|52936.02979454411|   611.3308270676691|         NULL| 3.524251132709874|82.18102442544625| 46.2699373612654|2.385461679804

In [21]:
from pyspark.sql.functions import col
# get the most expensive house
print(clean_fr_immo_df.filter(col("prix")>1E9).show())

+--------------+----------------+-------+-----------+------------+-----------+--------------------+-------------+--------+-----------------+----------------+-----------------+
|id_transaction|date_transaction|   prix|departement|       ville|code_postal|             adresse|type_batiment|n_pieces|surface_habitable|        latitude|        longitude|
+--------------+----------------+-------+-----------+------------+-----------+--------------------+-------------+--------+-----------------+----------------+-----------------+
|       3454911|      2019-02-21| 1.75E9|         29|PLOUGOURVEST|      29400|    7015A  KERVICHEN|       Maison|       4|               97|48.5687995867852|-4.06245022089523|
|       3450591|      2019-04-19|2.086E9|         29|  PLOUZEVEDE|      29440|  33  CORNIC AN HENT|       Maison|       5|              132|48.5995097745626|-4.11136334325347|
|       8848963|      2014-06-26| 1.72E9|         60|     CHAMBLY|      60230|80 RUE DES MARCHANDS|  Appartement|       

In [20]:
# get the biggest house
print(clean_fr_immo_df.filter(col("n_pieces")>100).show())

+--------------+----------------+--------+-----------+-----------+-----------+--------------------+-------------+--------+-----------------+----------------+----------------+
|id_transaction|date_transaction|    prix|departement|      ville|code_postal|             adresse|type_batiment|n_pieces|surface_habitable|        latitude|       longitude|
+--------------+----------------+--------+-----------+-----------+-----------+--------------------+-------------+--------+-----------------+----------------+----------------+
|      14017595|      2020-12-17|119560.0|         91|    DRAVEIL|      91210|15 ALL DES PASTOU...|       Maison|     109|               61|48.6860409415854|2.43481957002938|
|      14413157|      2017-03-31|275000.0|         93|MONTFERMEIL|      93370|    42 AV DES CHENES|       Maison|     112|               93|48.8883232214767|2.56155159324066|
+--------------+----------------+--------+-----------+-----------+-----------+--------------------+-------------+--------+---

> You can notice the data quality is not very good, even thought the data is provided by the French government.

### 3.3 Data validation

To make sure your statistics make sense, always validate the raw data before you start. In general, we have two categories:
- general data validation rules: detect duplication, null value in non-null column, Etc.
- domain specific validation rules: column values in acceptable range or list, Etc.

In [27]:
from pyspark.sql.functions import length
def find_bad_immo_rows(df:DataFrame)->DataFrame:
    """
    This function takes an immo dataframe and returns a dataframe where rows contains anomaly values
    The price is negative, the house surface is negative, and the postal code is not 5 digits number
    """
    return df.filter((col("prix") <= 0) | (col("surface_habitable") <= 0) | (length(col("code_postal")) != 5))

In [28]:
bad_rows = find_bad_immo_rows(immo_csv_df)
bad_rows.show()

+--------------+----------------+--------+-----------+--------+--------------------+-----------+--------------------+-------------+-----+--------+-----------------+--------------------+----------------+----------------+-------------------+--------------------------+--------------------------+---------------------+-----------------------+
|id_transaction|date_transaction|    prix|departement|id_ville|               ville|code_postal|             adresse|type_batiment| vefa|n_pieces|surface_habitable|id_parcelle_cadastre|        latitude|       longitude|surface_dependances|surface_locaux_industriels|surface_terrains_agricoles|surface_terrains_sols|surface_terrains_nature|
+--------------+----------------+--------+-----------+--------+--------------------+-----------+--------------------+-------------+-----+--------+-----------------+--------------------+----------------+----------------+-------------------+--------------------------+--------------------------+---------------------+-----

We have found the error which we introduced, now let's check the real data


In [30]:
bad_rows = find_bad_immo_rows(clean_fr_immo_df)
print(f"Total bad rows count: {bad_rows.count()}")
bad_rows.show()

Total bad rows count: 604235
+--------------+----------------+--------+-----------+--------------------+-----------+--------------------+-------------+--------+-----------------+----------------+----------------+
|id_transaction|date_transaction|    prix|departement|               ville|code_postal|             adresse|type_batiment|n_pieces|surface_habitable|        latitude|       longitude|
+--------------+----------------+--------+-----------+--------------------+-----------+--------------------+-------------+--------+-----------------+----------------+----------------+
|        141653|      2014-01-02|197000.0|         01|             TREVOUX|       1600|  6346 MTE DES LILAS|  Appartement|       4|               84|45.9423014034837|4.77069364742062|
|        141970|      2014-01-02|157500.0|         01|              VIRIAT|       1440|1369 RTE DE STRAS...|       Maison|       4|              103|46.2364072868351|5.26293493674271|
|        139240|      2014-01-02|112000.0|         

### 3.4 Clean dataframe

After we have detected the bad rows, we need to clean them. Even though the best way it to correct the bad values, but in general, we don't have the
 knowledge to correct them. So we just drop the bad rows:
- drop duplicated rows
- drop bad value rows
- handle mission values(e.g. null, empty strings, NaN, etc.)


In [8]:
null_col_stats = get_empty_row_count_per_column(clean_fr_immo_df)

In [9]:
null_col_stats.show(20)

+-----------------+----------+---------+-----------+-----------------+---------------------+---------------+
|      column_name|null_count|nan_count|blank_count|null_symbol_count|total_empty_row_count|total_row_count|
+-----------------+----------+---------+-----------+-----------------+---------------------+---------------+
|   id_transaction|         0|        0|          0|                0|                    0|        9141573|
| date_transaction|         0|        0|          0|                0|                    0|        9141573|
|             prix|         0|        0|          0|                0|                    0|        9141573|
|      departement|         0|        0|          0|                0|                    0|        9141573|
|            ville|         0|        0|          0|                0|                    0|        9141573|
|      code_postal|         0|        0|          0|                0|                    0|        9141573|
|          adresse|

In [10]:
sample_null_col_stats = get_empty_row_count_per_column(sample_df)
sample_null_col_stats.show(20)

NameError: name 'sample_df' is not defined

In [18]:
get_duplicated_row_count(clean_fr_immo_df)

Duplicate row count: 0


In [20]:
from pyspark.sql.functions import max as spark_max


def has_value(df: DataFrame):
    exprs = []
    nullSymbols = ["?", "-"]
    for colName in df.columns:
        colRef = col(colName)
        colType = df.schema[colName].dataType
        # base condition, the given column is not null
        conditions = [colRef.isNotNull()]

        # for numeric column
        if isinstance(colType, spark_types.NumericType):
            conditions.append(~isnan(colRef))

        # for string column
        if isinstance(colType, spark_types.StringType):
            conditions.append(trim(colRef) != "")
            conditions.append(~colRef.isin(nullSymbols))

        # build final filter condition
        hasValCond = conditions[0]
        for cond in conditions[1:]:
            hasValCond = hasValCond & cond
        exprs.append(spark_max(when(hasValCond, 1).otherwise(0)).alias(colName))
    result = df.agg(*exprs).collect()[0].asDict()
    result["toto"] = 0
    return [c for c, has_value in result.items() if has_value == 0]
    print(result)



In [21]:
has_value(clean_fr_immo_df)

['toto']

In [None]:
# creating a geometry column

