# Data processing

In [1]:
import os

os.environ["SPARK_HOME"] = "/opt/spark"
os.environ["HADOOP_HOME"] = "/usr/local/hadoop"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"

In [None]:
!/usr/local/hadoop/sbin/start-dfs.sh

In [None]:
!/usr/local/hadoop/bin/hadoop fs -ls /user/OBIS

In [4]:
import findspark

findspark.init()

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

def init_spark():
    mongo_conn = "mongodb+srv://obis:obis-project@obis-results.cer3o.mongodb.net/?retryWrites=true&w=majority&appName=OBIS-results"
    conf = SparkConf()

    # Download mongo-spark-connector and its dependencies.
    conf.set("spark.jars.packages",
                "org.mongodb.spark:mongo-spark-connector_2.12:10.3.0")

    # Set up read connection :
    conf.set("spark.mongodb.read.connection.uri", mongo_conn)
    conf.set("spark.mongodb.read.database", "biodiversity_db")

    # Set up write connection
    conf.set("spark.mongodb.write.connection.uri", mongo_conn)
    conf.set("spark.mongodb.write.database", "biodiversity_db")
    conf.set("spark.mongodb.write.operationType", "update")

    SparkContext(conf=conf)

    return SparkSession \
        .builder \
        .appName('OBIS') \
        .getOrCreate()

spark = init_spark()

In [None]:
hdfs_directory_path = "hdfs://localhost:9000/user/OBIS/data"

try:
    df = spark.read.option("header", "true").csv(hdfs_directory_path)
except Exception as e:
    print("Error reading from HDFS:", str(e))

### Getting some stats

In [7]:
from pyspark.sql.functions import count

species_count_per_species = df.groupBy("scientificname").agg(count("*").alias("count_per_species"))

### Biodiversity hotspots

In [9]:
from pyspark.sql.functions import col
cleaned_df = df.filter(col('decimalLatitude').isNotNull() & col('decimalLongitude').isNotNull() & col('scientificName').isNotNull())

In [10]:
from pyspark.sql.functions import col, countDistinct

grid_size = 1.0
species_richness_df = cleaned_df \
    .withColumn("lat_grid", (col("decimalLatitude") / grid_size).cast("int") * grid_size) \
    .withColumn("lon_grid", (col("decimalLongitude") / grid_size).cast("int") * grid_size) \
    .groupBy("lat_grid", "lon_grid") \
    .agg(countDistinct(col('scientificName')).alias('species_richness'))

### Migratory phenomena

In [12]:
from pyspark.sql.functions import col
# Filter out rows with missing coordinates, species, year, or temperature data
filtered_df = df.filter(col('decimalLatitude').isNotNull() & 
                             col('decimalLongitude').isNotNull() & 
                             col('scientificName').isNotNull() & 
                             col('date_year').isNotNull() & 
                             col('sst').isNotNull())

In [13]:
# Classify regions based on sea surface temperature (sst)
def classify_temperature(sst):
    if sst is None:
        return 'unknown'
    if sst < 10:
        return 'cold'
    elif 10 <= sst <= 25:
        return 'temperate'
    else:
        return 'warm'


from pyspark.sql.functions import udf, col
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, DoubleType

filtered_df = filtered_df.withColumn(
    'sst', 
    F.when(F.col('sst').cast(DoubleType()).isNotNull(), F.col('sst').cast(DoubleType())).otherwise(None)
)

temperature_class_udf = udf(classify_temperature, StringType())
classified_df = filtered_df.withColumn('temperature_region', temperature_class_udf(col('sst')))

In [14]:
from pyspark.sql.functions import col, avg
# Calculate the average latitude and longitude for each species in each year
species_movement_df = classified_df.groupBy("scientificName", "date_year", "temperature_region") \
                                       .agg(avg(col('decimalLatitude')).alias('avg_latitude'),
                                            avg(col('decimalLongitude')).alias('avg_longitude'))

### Shannon indices

In [16]:
from pyspark.sql import functions as F

valid_data_df = df.filter(df["scientificName"].isNotNull() & df["date_year"].isNotNull())

# Calculate the number of species observed in each year
species_counts_per_year = valid_data_df.groupBy("date_year", "scientificName").count()
species_counts_per_year = species_counts_per_year.withColumnRenamed("count", "species_count")


# Calculate total species count per year
total_counts_per_year = species_counts_per_year.groupBy("date_year").agg(
    F.sum("species_count").alias("total_count_per_year")
)

# Join total counts back to the original species count DataFrame to calculate proportions
species_with_proportion = species_counts_per_year.join(
    total_counts_per_year, on="date_year"
).withColumn(
    "proportion", F.col("species_count") / F.col("total_count_per_year")
)

# Calculate Shannon component (-p_i * ln(p_i)) for each species within each year
species_with_shannon_component = species_with_proportion.withColumn(
    "shannon_component", -F.col("proportion") * F.log(F.col("proportion"))
)

# Sum the Shannon components per year to get the Shannon index for each year
shannon_indices = species_with_shannon_component.groupBy("date_year").agg(
    F.sum("shannon_component").alias("shannon_index")
)


### Evaluation of data coverage

In [20]:
from pyspark.sql import functions as F


grid_size = 1.0  

# spatial and temporal coverage
coverage_df = df \
    .filter(F.col("decimalLatitude").isNotNull() & 
            F.col("decimalLongitude").isNotNull() & 
            F.col("date_year").isNotNull()) \
    .withColumn("lat_grid", (F.col("decimalLatitude") / grid_size).cast("int") * grid_size) \
    .withColumn("lon_grid", (F.col("decimalLongitude") / grid_size).cast("int") * grid_size) \
    .groupBy("lat_grid", "lon_grid", "date_year") \
    .agg(F.count("*").alias("record_count"))

# Store output datas in database (MongoDB)

### Stats

In [None]:
species_count_per_species.write.format("mongodb") \
    .option("database", "biodiversity_db") \
    .option("collection", "statistics") \
    .mode("append") \
    .save()

### Hotspots

In [None]:
species_richness_df.write.format("mongodb") \
    .option("database", "biodiversity_db") \
    .option("collection", "biodiversity_hotspots") \
    .mode("append") \
    .save()

### Migration

In [None]:
species_movement_df.write.format("mongodb") \
    .option("database", "biodiversity_db") \
    .option("collection", "migration") \
    .mode("append") \
    .save()

### Shannon indices

In [None]:
shannon_indices.write.format("mongodb") \
    .option("database", "biodiversity_db") \
    .option("collection", "shannon") \
    .mode("append") \
    .save()

### Data coverage

In [None]:
coverage_df.write.format("mongodb") \
    .option("database", "biodiversity_db") \
    .option("collection", "data_coverage") \
    .mode("append") \
    .save()

# Stop app

In [24]:
spark.stop()

In [None]:
!/usr/local/hadoop/sbin/stop-dfs.sh