In [1]:
pip install mysql-connector-python

Collecting mysql-connector-python
  Downloading mysql_connector_python-8.1.0-cp311-cp311-manylinux_2_17_x86_64.whl.metadata (2.0 kB)
Collecting protobuf<=4.21.12,>=4.21.1 (from mysql-connector-python)
  Downloading protobuf-4.21.12-cp37-abi3-manylinux2014_x86_64.whl (409 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m409.8/409.8 kB[0m [31m17.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading mysql_connector_python-8.1.0-cp311-cp311-manylinux_2_17_x86_64.whl (27.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m27.5/27.5 MB[0m [31m63.0 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25hInstalling collected packages: protobuf, mysql-connector-python
  Attempting uninstall: protobuf
    Found existing installation: protobuf 4.24.3
    Uninstalling protobuf-4.24.3:
      Successfully uninstalled protobuf-4.24.3
Successfully installed mysql-connector-python-8.1.0 protobuf-4.21.12
Note: you may need to restart the kernel to use updated packages.


In [2]:
import mysql.connector
from pyspark.sql import SparkSession,functions as F

# Create a Spark session
spark = SparkSession.builder.appName("MariaDBExample").getOrCreate()

# Connecting to the MariaDB database using the mysql.connector package.
connection = mysql.connector.connect(
    host="mariadb_container",
    user="root",
    password="root_password",
    database="queensland_plant"
)

# Set up JDBC connection properties to connect Spark with MariaDB.
jdbc_url = "jdbc:mysql://mariadb_container:3306/queensland_plant"
jdbc_properties = {"user": "root", "password": "root_password", "driver": "org.mariadb.jdbc.Driver"}

# Loading data from MariaDB into Spark DataFrames.
# Using the SparkSession to read data from MariaDB through JDBC.
flora_2019 = spark.read.jdbc(jdbc_url, "flora_2019", properties=jdbc_properties)
flora_2022 = spark.read.jdbc(jdbc_url, "flora_2022", properties=jdbc_properties)
threaten_2019 = spark.read.jdbc(jdbc_url, "threaten_2019", properties=jdbc_properties)

# Registering the loaded data as temporary Spark SQL views.
# This allows you to query them using SQL syntax in Spark.
flora_2019.createOrReplaceTempView("flora_2019")
flora_2022.createOrReplaceTempView("flora_2022")
threaten_2019.createOrReplaceTempView("threaten_2019")


In [3]:
# Display basic statistics about the flora_2019 DataFrame.
# Counting the number of columns and records for flora_2019 DataFrame.
num_columns_flora_2019 = len(flora_2019.columns)
num_records_flora_2019 = flora_2019.count()
print(f"Table flora _ 2019 before data preprocessing, containing: {num_columns_flora_2019} columns and {num_records_flora_2019} records.")
flora_2019.show(5)

# Assuming you might want the same information for flora_2022 and threaten_2019:
# Display basic statistics about the flora_2022 DataFrame.
num_columns_flora_2022 = len(flora_2022.columns)
num_records_flora_2022 = flora_2022.count()
print(f"Table flora _ 2022 before data preprocessing, containing: {num_columns_flora_2022} columns and {num_records_flora_2022} records.")
flora_2022.show(5)

# Display basic statistics about the threaten_2019 DataFrame.
num_columns_threaten_2019 = len(threaten_2019.columns)
num_records_threaten_2019 = threaten_2019.count()
print(f"Table threaten _ 2019 before data preprocessing, containing: {num_columns_threaten_2019} columns and {num_records_threaten_2019} records.")
threaten_2019.show(5)


Table flora _ 2019 before data preprocessing, containing: 54 columns and 34588 records.
+----------+-------------+--------------------+-----+----------+---------+-------+----------+---------------------+----------+------------------+---------+-----+-------+----+-------------+-------------+-------------+----------+-------+--------+-------+-------------+-----------+-------------+-------+--------+----------+---+---+---+---+---+---+---+---------+---+------+-------+----------+----+---+------+-------+---------+---------------+-------+--------------------+---------+-----------+---------------+-----------------+------------------+---------------+
|Group_Name|       Family|      Botanical_Name|BC_NR|     Genus|  Species|   Rank|Infra_Name|Naturalisation_Status|NCA_Status|             Notes|Total_Qld|Burke|Burnett|Cook|Darling_Downs|Gregory_North|Gregory_South|Leichhardt|Maranoa|Mitchell|Moreton|North_Kennedy|Port_Curtis|South_Kennedy|Warrego|Wide_Bay|Queensland|ACT|NSW| NT| SA|TAS|VIC| WA|Austr

In [4]:
# Define the columns that we're targeting for cleanup and updates
target_columns = ['Queensland', 'ACT', 'NSW', 'NT', 'SA', 'TAS', 'VIC', 'WA']

# Create a cursor object to execute SQL queries on the database
cursor = connection.cursor()

# Remove any rows in the tables where the Botanical_Name is set as 'Botanical_Name', which seems to be a placeholder or error
cursor.execute("DELETE FROM flora_2019 WHERE Botanical_Name = 'Botanical_Name'")
cursor.execute("DELETE FROM flora_2022 WHERE Botanical_Name = 'Botanical_Name'")
cursor.execute("DELETE FROM threaten_2019 WHERE Botanical_Name = 'Botanical_Name'")

# Loop through the target columns list and update any empty string values with 0 in the database
for col_name in target_columns:
    update_query = f"UPDATE flora_2019 SET {col_name} = 0 WHERE {col_name} = ''"
    update_query1 = f"UPDATE flora_2022 SET {col_name} = 0 WHERE {col_name} = ''"
    cursor.execute(update_query)
    cursor.execute(update_query1)

# Define columns that we want to keep for the new tables
columns_to_keep =  ['Botanical_Name'] + target_columns
threaten_to_keep = ['Botanical_Name', 'NCA_status']

# Construct SQL select statements to extract desired columns with their original names
select_columns = ', '.join([f'{col} AS {col}' for col in columns_to_keep])
threaten = ', '.join([f'{col} AS {col}' for col in threaten_to_keep])

# Construct SQL statements to create new tables with only the columns we want to keep
create_new_table_query_2019 = f"CREATE TABLE flora_2019_new AS SELECT {select_columns} FROM flora_2019"
create_new_table_query_2022 = f"CREATE TABLE flora_2022_new AS SELECT {select_columns} FROM flora_2022"
create_new_table_threaten_2019 = f"CREATE TABLE threaten_2019_new AS SELECT {threaten} FROM threaten_2019"
cursor.execute(create_new_table_query_2019)
cursor.execute(create_new_table_query_2022)
cursor.execute(create_new_table_threaten_2019)

# Drop the original tables as we have created new tables with cleaned data
drop_old_table_query_2019 = "DROP TABLE flora_2019"
drop_old_table_query_2022 = "DROP TABLE flora_2022"
drop_old_table_threaten_2019 = "DROP TABLE threaten_2019"
cursor.execute(drop_old_table_query_2019)
cursor.execute(drop_old_table_query_2022)
cursor.execute(drop_old_table_threaten_2019)

# Rename the newly created tables to the original table names
rename_new_table_query_2019 = "RENAME TABLE flora_2019_new TO flora_2019"
rename_new_table_query_2022 = "RENAME TABLE flora_2022_new TO flora_2022"
rename_new_table_threaten_2019 = "RENAME TABLE threaten_2019_new TO threaten_2019"
cursor.execute(rename_new_table_query_2019)
cursor.execute(rename_new_table_query_2022)
cursor.execute(rename_new_table_threaten_2019)


# Commit all the changes made in this transaction to the database
connection.commit()

# Close the cursor and the connection as they are no longer needed
cursor.close()
connection.close()

In [5]:
# Reload the data from the database after the cleanup and restructuring
flora_2019 = spark.read.jdbc(jdbc_url, "flora_2019", properties=jdbc_properties)
flora_2022 = spark.read.jdbc(jdbc_url, "flora_2022", properties=jdbc_properties)
threaten_2019 = spark.read.jdbc(jdbc_url, "threaten_2019", properties=jdbc_properties)

# Create or replace temporary views in Spark based on the newly loaded DataFrames.
# This allows for querying the DataFrames using Spark SQL.
flora_2019.createOrReplaceTempView("flora_2019")
flora_2022.createOrReplaceTempView("flora_2022")
threaten_2019.createOrReplaceTempView("threaten_2019")

# Display the top 10 rows of the reloaded and cleaned tables to verify changes
print("Table flora_2019 after data cleansing: fill '' value with 0 value, cut down unnecessary columns")
flora_2019.show(10)
print("Table flora_2022 after data cleansing: fill '' value with 0 value, cut down unnecessary columns")
flora_2022.show(10)
print("Table threaten_2019 after data cleansing:cut down unnecessary columns")
threaten_2019.show(10)


Table flora_2019 after data cleansing: fill '' value with 0 value, cut down unnecessary columns
+--------------------+----------+---+---+---+---+---+---+---+
|      Botanical_Name|Queensland|ACT|NSW| NT| SA|TAS|VIC| WA|
+--------------------+----------+---+---+---+---+---+---+---+
|Achnanthes exigua...|         0|  0|  0|  0|  0|  0|  0|  0|
|Achnanthes hungar...|         0|  0|  0|  0|  0|  0|  0|  0|
|Achnanthes saccul...|         0|  0|  0|  0|  0|  0|  0|  0|
|Achnanthes ventra...|         0|  0|  0|  0|  0|  0|  0|  0|
|Achnanthidium min...|         0|  0|  0|  0|  0|  0|  0|  0|
|Planothidium freq...|         0|  0|  0|  0|  0|  0|  0|  0|
|Feldmannia globif...|         0|  0|  0|  0|  0|  1|  0|  0|
|Feldmannia indica...|         0|  0|  2|  0|  0|  0|  0|  0|
|Feldmannia irregu...|         0|  0|  2|  0|  0|  1|  0|  0|
|Hincksia breviart...|         0|  0|  0|  0|  0|  0|  0|  0|
+--------------------+----------+---+---+---+---+---+---+---+
only showing top 10 rows

Table flor

In [6]:
#Query 1: Count number of threatened species for each state in 2019 and 2022 and the number of species extinct after 3 years
print("Table illustrates the number of endangered species are available at each state in 2019 and 2022, number of endangered species disappear after 3 years.")
result = spark.sql("""
WITH Threatened_2019 AS (
    SELECT DISTINCT
        'Queensland' AS State, COUNT(DISTINCT flora_2019.Botanical_Name) AS Threatened_2019
    FROM flora_2019
    JOIN threaten_2019 ON flora_2019.Botanical_Name = threaten_2019.Botanical_Name
    WHERE CAST(Queensland AS INT) > 0
    UNION
    SELECT DISTINCT
        'ACT' AS State, COUNT(DISTINCT flora_2019.Botanical_Name) AS Threatened_2019
    FROM flora_2019
    JOIN threaten_2019 ON flora_2019.Botanical_Name = threaten_2019.Botanical_Name
    WHERE CAST(ACT AS INT) > 0
    UNION
    SELECT DISTINCT
        'NSW' AS State, COUNT(DISTINCT flora_2019.Botanical_Name) AS Threatened_2019
    FROM flora_2019
    JOIN threaten_2019 ON flora_2019.Botanical_Name = threaten_2019.Botanical_Name
    WHERE CAST(NSW AS INT) > 0
    UNION
    SELECT DISTINCT
        'NT' AS State, COUNT(DISTINCT flora_2019.Botanical_Name) AS Threatened_2019
    FROM flora_2019
    JOIN threaten_2019 ON flora_2019.Botanical_Name = threaten_2019.Botanical_Name
    WHERE CAST(NT AS INT) > 0
    UNION
    SELECT DISTINCT
        'SA' AS State, COUNT(DISTINCT flora_2019.Botanical_Name) AS Threatened_2019
    FROM flora_2019
    JOIN threaten_2019 ON flora_2019.Botanical_Name = threaten_2019.Botanical_Name
    WHERE CAST(SA AS INT) > 0
    UNION
    SELECT DISTINCT
        'TAS' AS State, COUNT(DISTINCT flora_2019.Botanical_Name) AS Threatened_2019
    FROM flora_2019
    JOIN threaten_2019 ON flora_2019.Botanical_Name = threaten_2019.Botanical_Name
    WHERE CAST(TAS AS INT) > 0
    UNION
    SELECT DISTINCT
        'VIC' AS State, COUNT(DISTINCT flora_2019.Botanical_Name) AS Threatened_2019
    FROM flora_2019
    JOIN threaten_2019 ON flora_2019.Botanical_Name = threaten_2019.Botanical_Name
    WHERE CAST(VIC AS INT) > 0
    UNION
    SELECT DISTINCT
        'WA' AS State, COUNT(DISTINCT flora_2019.Botanical_Name) AS Threatened_2019
    FROM flora_2019
    JOIN threaten_2019 ON flora_2019.Botanical_Name = threaten_2019.Botanical_Name
    WHERE CAST(WA AS INT) > 0
),
Threatened_2022 AS (
    SELECT DISTINCT
        'Queensland' AS State, COUNT(DISTINCT flora_2022.Botanical_Name) AS Threatened_2022
    FROM flora_2022
    JOIN threaten_2019 ON flora_2022.Botanical_Name = threaten_2019.Botanical_Name
    WHERE CAST(Queensland AS INT) > 0
    UNION
    SELECT DISTINCT
        'ACT' AS State, COUNT(DISTINCT flora_2022.Botanical_Name) AS Threatened_2022
    FROM flora_2022
    JOIN threaten_2019 ON flora_2022.Botanical_Name = threaten_2019.Botanical_Name
    WHERE CAST(ACT AS INT) > 0
    UNION
    SELECT DISTINCT
        'NSW' AS State, COUNT(DISTINCT flora_2022.Botanical_Name) AS Threatened_2022
    FROM flora_2022
    JOIN threaten_2019 ON flora_2022.Botanical_Name = threaten_2019.Botanical_Name
    WHERE CAST(NSW AS INT) > 0
    UNION
    SELECT DISTINCT
        'NT' AS State, COUNT(DISTINCT flora_2022.Botanical_Name) AS Threatened_2022
    FROM flora_2022
    JOIN threaten_2019 ON flora_2022.Botanical_Name = threaten_2019.Botanical_Name
    WHERE CAST(NT AS INT) > 0
    UNION
    SELECT DISTINCT
        'SA' AS State, COUNT(DISTINCT flora_2022.Botanical_Name) AS Threatened_2022
    FROM flora_2022
    JOIN threaten_2019 ON flora_2022.Botanical_Name = threaten_2019.Botanical_Name
    WHERE CAST(SA AS INT) > 0
    UNION
    SELECT DISTINCT
        'TAS' AS State, COUNT(DISTINCT flora_2022.Botanical_Name) AS Threatened_2022
    FROM flora_2022
    JOIN threaten_2019 ON flora_2022.Botanical_Name = threaten_2019.Botanical_Name
    WHERE CAST(TAS AS INT) > 0
    UNION
    SELECT DISTINCT
        'VIC' AS State, COUNT(DISTINCT flora_2022.Botanical_Name) AS Threatened_2022
    FROM flora_2022
    JOIN threaten_2019 ON flora_2022.Botanical_Name = threaten_2019.Botanical_Name
    WHERE CAST(VIC AS INT) > 0
    UNION
    SELECT DISTINCT
        'WA' AS State, COUNT(DISTINCT flora_2022.Botanical_Name) AS Threatened_2022
    FROM flora_2022
    JOIN threaten_2019 ON flora_2022.Botanical_Name = threaten_2019.Botanical_Name
    WHERE CAST(WA AS INT) > 0
)
SELECT DISTINCT
    t19.State,
    t19.Threatened_2019,
    COALESCE(t22.Threatened_2022, 0) AS Threatened_2022,
    (t19.Threatened_2019 - COALESCE(t22.Threatened_2022, 0)) AS Extinct_Plants
FROM Threatened_2019 t19
LEFT JOIN Threatened_2022 t22 ON t19.State = t22.State
ORDER BY Extinct_Plants DESC;
""")
result.show(truncate=False)
print("CONCLUSION:")
print("Some threatened plants in 2019 are not recorded in the 2022 record. Therefore, they are considered as extinct. The computation shows that 36 species were extinct in NSW, 8 species extinct in NT, SA and VIC each lost 2 species. Other states still maintain the number of threatened species they have.")

Table illustrates the number of endangered species are available at each state in 2019 and 2022, number of endangered species disappear after 3 years.
+----------+---------------+---------------+--------------+
|State     |Threatened_2019|Threatened_2022|Extinct_Plants|
+----------+---------------+---------------+--------------+
|NSW       |123            |87             |36            |
|NT        |16             |8              |8             |
|SA        |3              |1              |2             |
|VIC       |7              |5              |2             |
|Queensland|3              |3              |0             |
|ACT       |1              |1              |0             |
|TAS       |2              |2              |0             |
|WA        |1              |1              |0             |
+----------+---------------+---------------+--------------+

CONCLUSION:
Some threatened plants in 2019 are not recorded in the 2022 record. Therefore, they are considered as extinct. The c

In [7]:
print("Table illustrates the number of remaining threatened species that improved or decreased in distribution over 3 years")

result = spark.sql("""
WITH StateImprovement AS (
    SELECT DISTINCT
        'Queensland' AS State,
        COUNT(DISTINCT CASE WHEN CAST(f2022.Queensland AS INT) > CAST(f2019.Queensland AS INT) THEN f2022.Botanical_Name ELSE NULL END) AS Improved,
        COUNT(DISTINCT CASE WHEN CAST(f2022.Queensland AS INT) < CAST(f2019.Queensland AS INT) THEN f2022.Botanical_Name ELSE NULL END) AS Decreased,
        COUNT(DISTINCT CASE WHEN CAST(f2022.Queensland AS INT) = CAST(f2019.Queensland AS INT) THEN f2022.Botanical_Name ELSE NULL END) AS Balanced
    FROM flora_2019 f2019
    JOIN flora_2022 f2022 ON f2019.Botanical_Name = f2022.Botanical_Name
    JOIN threaten_2019 t2019 ON f2019.Botanical_Name = t2019.Botanical_Name
    WHERE (CAST(f2019.Queensland AS INT) > 0 OR CAST(f2022.Queensland AS INT) > 0)
    
    UNION ALL
    
    SELECT DISTINCT
        'ACT' AS State,
        COUNT(DISTINCT CASE WHEN CAST(f2022.ACT AS INT) > CAST(f2019.ACT AS INT) THEN f2022.Botanical_Name ELSE NULL END) AS Improved,
        COUNT(DISTINCT CASE WHEN CAST(f2022.ACT AS INT) < CAST(f2019.ACT AS INT) THEN f2022.Botanical_Name ELSE NULL END) AS Decreased,
        COUNT(DISTINCT CASE WHEN CAST(f2022.ACT AS INT) = CAST(f2019.ACT AS INT) THEN f2022.Botanical_Name ELSE NULL END) AS Balanced
    FROM flora_2019 f2019
    JOIN flora_2022 f2022 ON f2019.Botanical_Name = f2022.Botanical_Name
    JOIN threaten_2019 t2019 ON f2019.Botanical_Name = t2019.Botanical_Name
    WHERE (CAST(f2019.ACT AS INT) > 0 OR CAST(f2022.ACT AS INT) > 0)
    
    UNION ALL
    
    SELECT DISTINCT
        'NSW' AS State,
        COUNT(DISTINCT CASE WHEN CAST(f2022.NSW AS INT) > CAST(f2019.NSW AS INT) THEN f2022.Botanical_Name ELSE NULL END) AS Improved,
        COUNT(DISTINCT CASE WHEN CAST(f2022.NSW AS INT) < CAST(f2019.NSW AS INT) THEN f2022.Botanical_Name ELSE NULL END) AS Decreased,
        COUNT(DISTINCT CASE WHEN CAST(f2022.NSW AS INT) = CAST(f2019.NSW AS INT) THEN f2022.Botanical_Name ELSE NULL END) AS Balanced
    FROM flora_2019 f2019
    JOIN flora_2022 f2022 ON f2019.Botanical_Name = f2022.Botanical_Name
    JOIN threaten_2019 t2019 ON f2019.Botanical_Name = t2019.Botanical_Name
    WHERE (CAST(f2019.NSW AS INT) > 0 OR CAST(f2022.NSW AS INT) > 0)
    
    UNION ALL
    
    SELECT DISTINCT
        'NT' AS State,
        COUNT(DISTINCT CASE WHEN CAST(f2022.NT AS INT) > CAST(f2019.NT AS INT) THEN f2022.Botanical_Name ELSE NULL END) AS Improved,
        COUNT(DISTINCT CASE WHEN CAST(f2022.NT AS INT) < CAST(f2019.NT AS INT) THEN f2022.Botanical_Name ELSE NULL END) AS Decreased,
        COUNT(DISTINCT CASE WHEN CAST(f2022.NT AS INT) = CAST(f2019.NT AS INT) THEN f2022.Botanical_Name ELSE NULL END) AS Balanced
    FROM flora_2019 f2019
    JOIN flora_2022 f2022 ON f2019.Botanical_Name = f2022.Botanical_Name
    JOIN threaten_2019 t2019 ON f2019.Botanical_Name = t2019.Botanical_Name
    WHERE (CAST(f2019.NT AS INT) > 0 OR CAST(f2022.NT AS INT) > 0)

    UNION ALL
    
    SELECT DISTINCT
        'SA' AS State,
        COUNT(DISTINCT CASE WHEN CAST(f2022.SA AS INT) > CAST(f2019.SA AS INT) THEN f2022.Botanical_Name ELSE NULL END) AS Improved,
        COUNT(DISTINCT CASE WHEN CAST(f2022.SA AS INT) < CAST(f2019.SA AS INT) THEN f2022.Botanical_Name ELSE NULL END) AS Decreased,
        COUNT(DISTINCT CASE WHEN CAST(f2022.SA AS INT) = CAST(f2019.SA AS INT) THEN f2022.Botanical_Name ELSE NULL END) AS Balanced
    FROM flora_2019 f2019
    JOIN flora_2022 f2022 ON f2019.Botanical_Name = f2022.Botanical_Name
    JOIN threaten_2019 t2019 ON f2019.Botanical_Name = t2019.Botanical_Name
    WHERE (CAST(f2019.SA AS INT) > 0 OR CAST(f2022.SA AS INT) > 0)

    UNION ALL

    SELECT DISTINCT
        'TAS' AS State,
        COUNT(DISTINCT CASE WHEN CAST(f2022.TAS AS INT) > CAST(f2019.TAS AS INT) THEN f2022.Botanical_Name ELSE NULL END) AS Improved,
        COUNT(DISTINCT CASE WHEN CAST(f2022.TAS AS INT) < CAST(f2019.TAS AS INT) THEN f2022.Botanical_Name ELSE NULL END) AS Decreased,
        COUNT(DISTINCT CASE WHEN CAST(f2022.TAS AS INT) = CAST(f2019.TAS AS INT) THEN f2022.Botanical_Name ELSE NULL END) AS Balanced
    FROM flora_2019 f2019
    JOIN flora_2022 f2022 ON f2019.Botanical_Name = f2022.Botanical_Name
    JOIN threaten_2019 t2019 ON f2019.Botanical_Name = t2019.Botanical_Name
    WHERE (CAST(f2019.TAS AS INT) > 0 OR CAST(f2022.TAS AS INT) > 0)

    UNION ALL
    
    SELECT DISTINCT
        'VIC' AS State,
        COUNT(DISTINCT CASE WHEN CAST(f2022.VIC AS INT) > CAST(f2019.VIC AS INT) THEN f2022.Botanical_Name ELSE NULL END) AS Improved,
        COUNT(DISTINCT CASE WHEN CAST(f2022.VIC AS INT) < CAST(f2019.VIC AS INT) THEN f2022.Botanical_Name ELSE NULL END) AS Decreased,
        COUNT(DISTINCT CASE WHEN CAST(f2022.VIC AS INT) = CAST(f2019.VIC AS INT) THEN f2022.Botanical_Name ELSE NULL END) AS Balanced
    FROM flora_2019 f2019
    JOIN flora_2022 f2022 ON f2019.Botanical_Name = f2022.Botanical_Name
    JOIN threaten_2019 t2019 ON f2019.Botanical_Name = t2019.Botanical_Name
    WHERE (CAST(f2019.VIC AS INT) > 0 OR CAST(f2022.VIC AS INT) > 0)
    
    UNION ALL

    SELECT DISTINCT
        'WA' AS State,
        COUNT(DISTINCT CASE WHEN CAST(f2022.WA AS INT) > CAST(f2019.WA AS INT) THEN f2022.Botanical_Name ELSE NULL END) AS Improved,
        COUNT(DISTINCT CASE WHEN CAST(f2022.WA AS INT) < CAST(f2019.WA AS INT) THEN f2022.Botanical_Name ELSE NULL END) AS Decreased,
        COUNT(DISTINCT CASE WHEN CAST(f2022.WA AS INT) = CAST(f2019.WA AS INT) THEN f2022.Botanical_Name ELSE NULL END) AS Balanced
    FROM flora_2019 f2019
    JOIN flora_2022 f2022 ON f2019.Botanical_Name = f2022.Botanical_Name
    JOIN threaten_2019 t2019 ON f2019.Botanical_Name = t2019.Botanical_Name
    WHERE (CAST(f2019.WA AS INT) > 0 OR CAST(f2022.WA AS INT) > 0)
)
SELECT DISTINCT
    State,
    Improved,
    Decreased,
    Balanced
FROM StateImprovement
ORDER BY Improved DESC, Decreased DESC, State;
""")

result.show(truncate=False)
print("CONCLUSION:")
print("For the remaining species, NSW is recorded to have a significant change in the distribution of threatened species. Other states didn't have change in their endangered species' distribution.")
print("There is one point that need to consider deeper, in the previous table it stated that NSW only have 87 endangered species in 2022, but according to this table there is 89 species in total of balnaced, improved and decreased. There is explanation in next queries")

Table illustrates the number of remaining threatened species that improved or decreased in distribution over 3 years
+----------+--------+---------+--------+
|State     |Improved|Decreased|Balanced|
+----------+--------+---------+--------+
|NSW       |18      |2        |69      |
|ACT       |0       |0        |1       |
|NT        |0       |0        |8       |
|Queensland|0       |0        |3       |
|SA        |0       |0        |1       |
|TAS       |0       |0        |2       |
|VIC       |0       |0        |5       |
|WA        |0       |0        |1       |
+----------+--------+---------+--------+

CONCLUSION:
For the remaining species, NSW is recorded to have a significant change in the distribution of threatened species. Other states didn't have change in their endangered species' distribution.
There is one point that need to consider deeper, in the previous table it stated that NSW only have 87 endangered species in 2022, but according to this table there is 89 species in total 

In [8]:
#Query 3: Find out the reason why there is difference between Query 1 and Query 2 result
print("Tables of new endangered species that appeared in 2022 in NSW")
result = spark.sql("""
SELECT DISTINCT
    f2022.Botanical_Name,
    CASE 
        WHEN CAST(f2022.NSW AS INT) > CAST(f2019.NSW AS INT) THEN 'Improved'
        ELSE NULL
    END AS Category
FROM flora_2019 f2019
JOIN flora_2022 f2022 ON f2019.Botanical_Name = f2022.Botanical_Name
JOIN threaten_2019 t2019 ON f2019.Botanical_Name = t2019.Botanical_Name
WHERE (CAST(f2019.NSW AS INT) = 0 AND CAST(f2022.NSW AS INT) > 0)
ORDER BY Category, f2022.Botanical_Name;
""")
result.show(truncate=False)
print("Tables of endangered species that extincted in 2022 in NSW")
result = spark.sql("""
SELECT DISTINCT 
    f2022.Botanical_Name,
    CASE 
        WHEN CAST(f2022.NSW AS INT) < CAST(f2019.NSW AS INT) THEN 'Decreased'
        ELSE NULL
    END AS Category
FROM flora_2019 f2019
JOIN flora_2022 f2022 ON f2019.Botanical_Name = f2022.Botanical_Name
JOIN threaten_2019 t2019 ON f2019.Botanical_Name = t2019.Botanical_Name
WHERE (CAST(f2019.NSW AS INT) > 0 AND CAST(f2022.NSW AS INT) = 0)
ORDER BY Category, f2022.Botanical_Name;
""")
result.show(truncate=False)
print("CONCLUSION:")
print("NSW lost two threatenned species in 2022, however, they successfully by raising new two other threatenned species.")
print("Therefore, the total of threatenned species NSW has in 2022 is 87, but the table of changes in distribution is 89, which also include the increase in two new species.")
print("Can confirmed that all data is calculated correctly")

Tables of new endangered species that appeared in 2022 in NSW
+-----------------------------------------+--------+
|Botanical_Name                           |Category|
+-----------------------------------------+--------+
|Antrophyum austroqueenslandicum D.L.Jones|Improved|
|Myrsine serpenticola Jackes              |Improved|
+-----------------------------------------+--------+

Tables of endangered species that extincted in 2022 in NSW
+-------------------------------------------+---------+
|Botanical_Name                             |Category |
+-------------------------------------------+---------+
|Allocasuarina thalassoscopica L.A.S.Johnson|Decreased|
|Arthraxon hispidus (Thunb.) Makino         |Decreased|
+-------------------------------------------+---------+

CONCLUSION:
NSW lost two threatenned species in 2022, however, they successfully by raising new two other threatenned species.
Therefore, the total of threatenned species NSW has in 2022 is 87, but the table of changes in d

In [9]:
#Query 4: Show species that improved with the highest distribution and species that decreased the most in distribution
print("Only NSW will be considered for next computations as NSW is the only states that has changes in distribution of their remaining endangered plants.")
result = spark.sql("""
SELECT DISTINCT
    f2019.Botanical_Name,
    CAST(f2022.NSW AS INT) - CAST(f2019.NSW AS INT) AS NSW_Diff
FROM flora_2019 f2019
JOIN flora_2022 f2022 ON f2019.Botanical_Name = f2022.Botanical_Name
JOIN threaten_2019 t2019 ON f2019.Botanical_Name = t2019.Botanical_Name
ORDER BY NSW_Diff DESC, Botanical_Name ASC
""")
# Determine the highest and lowest values of NSW_Diff
max_diff = result.agg({"NSW_Diff": "max"}).collect()[0][0]
min_diff = result.agg({"NSW_Diff": "min"}).collect()[0][0]

# Display all rows that have the highest NSW_Diff value
print("Rows with the highest NSW_Diff:")
result.filter(result.NSW_Diff == max_diff).show(truncate=False)
# Display all rows that have the lowest NSW_Diff value
print("Rows with the lowest NSW_Diff:")
result.filter(result.NSW_Diff == min_diff).show(truncate=False)
print("CONCLUSION:")
print("Those five remaining endangered species have increased in their distribution after 3 years, which increase by 2 for each. Those are also species that has the highest increase in distribution")
print("The remaining endangered species which has the most significant decrease is Arthraxon hispidus (Thunb.) Makino, which distribution decreased by 5 after 3 years.")

Only NSW will be considered for next computations as NSW is the only states that has changes in distribution of their remaining endangered plants.
Rows with the highest NSW_Diff:
+---------------------------------+--------+
|Botanical_Name                   |NSW_Diff|
+---------------------------------+--------+
|Blandfordia grandiflora R.Br.    |2       |
|Cadellia pentastylis F.Muell.    |2       |
|Eucalyptus dunnii Maiden         |2       |
|Niemeyera whitei (Aubrev.) Jessup|2       |
|Westringia blakeana B.Boivin     |2       |
+---------------------------------+--------+

Rows with the lowest NSW_Diff:
+----------------------------------+--------+
|Botanical_Name                    |NSW_Diff|
+----------------------------------+--------+
|Arthraxon hispidus (Thunb.) Makino|-5      |
+----------------------------------+--------+

CONCLUSION:
Those five remaining endangered species have increased in their distribution after 3 years, which increase by 2 for each. Those are also spec

In [10]:
#Quesry 5: Illustrates the distribution of remaining endangered species after 2022 and compare to the ranking from 2019 to see the differences.
print("Table illustrates top endangered species that distribute across NSW between 2019 and 2022")
result = spark.sql("""
SELECT DISTINCT
    f2019.Botanical_Name AS Botanical_Name_2019,
    f2019.NSW AS NSW_2019,
    f2022.Botanical_Name AS Botanical_Name_2022,
    f2022.NSW AS NSW_2022
FROM
    (SELECT DISTINCT
        f2019.Botanical_Name,
        f2019.NSW,
        ROW_NUMBER() OVER (ORDER BY CAST(f2019.NSW AS INT) DESC, f2019.Botanical_Name ASC) AS rn_2019
    FROM 
        flora_2019 AS f2019
    JOIN
        threaten_2019 AS t2019
        ON f2019.Botanical_Name = t2019.Botanical_Name) AS f2019

JOIN 

    (SELECT DISTINCT
        f2022.Botanical_Name,
        f2022.NSW,
        ROW_NUMBER() OVER (ORDER BY CAST(f2022.NSW AS INT) DESC, f2022.Botanical_Name ASC) AS rn_2022
    FROM 
        flora_2022 AS f2022
    JOIN
        threaten_2019 AS t2019
        ON f2022.Botanical_Name = t2019.Botanical_Name) AS f2022

ON f2019.rn_2019 = f2022.rn_2022
""")

result.show(truncate=False)
total_2019 = result.agg({"NSW_2019": "sum"}).collect()[0][0]
total_2022 = result.agg({"NSW_2022": "sum"}).collect()[0][0]
print(f"Total for NSW_2019: {total_2019}")
print(f"Total for NSW_2022: {total_2022}")
print("CONCLUSION:")
print("Overall, the total distribution of endangered species decreased, which mean that NSW lost more species compared to their in 2019.")
print("Even though some species increased in their distribution such as Westringia blakeana B.Boivin climbed from 16 to 18, it still not able to recover for the rest situation.") 
print("Therefore with this speed, NSW may lost more species in the future if there is no preservation work carried out")
# Stop the Spark session
spark.stop()

Table illustrates top endangered species that distribute across NSW between 2019 and 2022
+----------------------------------------------------------+--------+----------------------------------------------------------+--------+
|Botanical_Name_2019                                       |NSW_2019|Botanical_Name_2022                                       |NSW_2022|
+----------------------------------------------------------+--------+----------------------------------------------------------+--------+
|Digitaria porrecta S.T.Blake                              |33      |Digitaria porrecta S.T.Blake                              |33      |
|Rhodamnia rubescens (Benth.) Miq.                         |26      |Rhodamnia rubescens (Benth.) Miq.                         |27      |
|Acacia ruppii Maiden & Betche                             |21      |Cryptocarya foetida R.T.Baker                             |18      |
|Hicksbeachia pinnatifolia F.Muell.                        |19      |Westringia bl