In [1]:
from pyspark.sql import SparkSession
# from pyspark.sql.function import col, sum, when, count

In [2]:
# Step 1: Initialize Spark session
spark = SparkSession.builder \
    .appName("Forestry ETL Pipeline") \
    .getOrCreate()

In [11]:
df = spark.read.csv("datasets/forest_reserve_state.csv", header=True, inferSchema=True)
df.show(5)
print("The data schema is:")
df.printSchema()


+----------+---------------+--------+
|      date|          state|    area|
+----------+---------------+--------+
|2003-01-01|          Johor|356922.0|
|2003-01-01|          Kedah|344530.0|
|2003-01-01|       Kelantan|629687.0|
|2003-01-01|         Melaka|  5468.0|
|2003-01-01|Negeri Sembilan|165639.0|
+----------+---------------+--------+
only showing top 5 rows

The data schema is:
root
 |-- date: date (nullable = true)
 |-- state: string (nullable = true)
 |-- area: double (nullable = true)



In [13]:
from pyspark.sql.functions import year, to_date, col
df = df.withColumn('year', year(col('date')))
df.show(5)

+----------+---------------+--------+----+
|      date|          state|    area|year|
+----------+---------------+--------+----+
|2003-01-01|          Johor|356922.0|2003|
|2003-01-01|          Kedah|344530.0|2003|
|2003-01-01|       Kelantan|629687.0|2003|
|2003-01-01|         Melaka|  5468.0|2003|
|2003-01-01|Negeri Sembilan|165639.0|2003|
+----------+---------------+--------+----+
only showing top 5 rows



In [14]:
df.describe().show()


+-------+--------------+------------------+-----------------+
|summary|         state|              area|             year|
+-------+--------------+------------------+-----------------+
|  count|           323|               321|              323|
|   mean|          NULL|1017556.8255451714|           2012.0|
| stddev|          NULL|1551939.5109304378|5.485723990674799|
|    min|         Johor|               0.0|             2003|
|    max|W.P. Putrajaya|         4936290.0|             2021|
+-------+--------------+------------------+-----------------+



In [15]:
df_zeros = df.filter(df['area'] == 0)
df_zeros.describe().show()

+-------+--------------+----+-----------------+
|summary|         state|area|             year|
+-------+--------------+----+-----------------+
|  count|            38|  38|               38|
|   mean|          NULL| 0.0|           2012.0|
| stddev|          NULL| 0.0|5.550748671198406|
|    min|   W.P. Labuan| 0.0|             2003|
|    max|W.P. Putrajaya| 0.0|             2021|
+-------+--------------+----+-----------------+



In [16]:
# Generate State with the lowest forest reserve
df.groupBy('state').min('area').orderBy('min(area)').show(5)

+-----------------+---------+
|            state|min(area)|
+-----------------+---------+
|      W.P. Labuan|      0.0|
|   W.P. Putrajaya|      0.0|
|W.P. Kuala Lumpur|     61.0|
|           Melaka|   4818.0|
|     Pulau Pinang|   5101.0|
+-----------------+---------+
only showing top 5 rows

