<a href="https://colab.research.google.com/github/achmadgani/colletothricum_model_development/blob/main/Disease_prediction_dataset.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Weather based plant disease prediction.

Loading the data from 2 kind of sources, the weather source and the disease severity source.

# 1. Connect to the gdrive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# 2. Installing Pyspark in Google Colab

In [None]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("Weather-Disease Prediction") \
       .getOrCreate()

spark

[33m0% [Working][0m            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Ign:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [945 kB]
Get:7 https://r2u.stat.illinois.edu/ubuntu jammy Release [5,713 B]
Get:8 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:9 https://r2u.stat.illinois.edu/ubuntu jammy Release.gpg [793 B]
Hit:10 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Get:11 https://r2u.stat.illinois.edu/ubuntu jammy/main amd64 Packages [2,560 kB]
Get:12 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease [24.3 kB]
Get:13 http://archive.ubuntu

# 3. Load data and preprocessing the data

In [None]:
#Load the weather data

import pyspark.sql.functions as F

TEMP_LIST = ["temp", "temp_min", "temp_max", "feels_like", "dew_point"] # temperature list needed to be converted

def convert_to_celcius(df, columns):
    """
    Convert specified columns from Kelvin to Celsius in a PySpark DataFrame.
    This function overwrites the original columns.

    Parameters:
    - df: PySpark DataFrame
    - columns: List of column names to be converted from Kelvin to Celsius

    Returns:
    - PySpark DataFrame with columns converted to Celsius
    """
    for column in columns:
        df = df.withColumn(column, F.round(F.col(column) - 273.15, 2))
    return df

# Path to your CSV file in Google Drive
csv_file_path = "/content/drive/MyDrive/病害診断データ/Italy & India.csv"

# Read the CSV file into a Spark DataFrame
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

# Get distinct city names
city_names = [row.city_name for row in df.select("city_name").distinct().collect()]

# Create a dictionary to store DataFrames for each city
city_dfs = {}

for city in city_names:
    # Filter the DataFrame by city_name and store it in the dictionary
    city_dfs[city] = df.filter(F.col("city_name") == city)

sardinia_df = city_dfs.get("Sardinia")
nagayalanka_df = city_dfs.get("Nagayalanka")

# Apply the function with a list of columns
sardinia_df = convert_to_celcius(sardinia_df, TEMP_LIST)
nagayalanka_df = convert_to_celcius(nagayalanka_df, TEMP_LIST)

sardinia_df.show(10)

+---------+--------------------+--------+---------+---------+--------+-----+----------+---------+----------+--------+--------+--------+---------+----------+--------+----------+--------+---------+-------+-------+-------+-------+----------+----------+------------+-------------------+------------+
|       dt|              dt_iso|timezone|city_name|      lat|     lon| temp|visibility|dew_point|feels_like|temp_min|temp_max|pressure|sea_level|grnd_level|humidity|wind_speed|wind_deg|wind_gust|rain_1h|rain_3h|snow_1h|snow_3h|clouds_all|weather_id|weather_main|weather_description|weather_icon|
+---------+--------------------+--------+---------+---------+--------+-----+----------+---------+----------+--------+--------+--------+---------+----------+--------+----------+--------+---------+-------+-------+-------+-------+----------+----------+------------+-------------------+------------+
|283996800|1979-01-01 00:00:...|    3600| Sardinia|40.120875|9.012893|11.37|      NULL|    10.44|     11.02|   1

In [None]:
# Load the disease severity data

# Path to your CSV file in Google Drive
csv_pr = "/content/drive/MyDrive/病害診断データ/severity_results_pr.csv"

# Read the CSV file into a Spark DataFrame

sar_disease_df = spark.read.csv(csv_pr, header=True, inferSchema=True)
sar_disease_df.show(5)

import pyspark.sql.functions as F
from pyspark.sql import types as T

def round_to_nearest_hour(dt_col):
    # Extract components of the timestamp
    year_col = F.year(dt_col)
    month_col = F.month(dt_col)
    day_col = F.dayofmonth(dt_col)
    hour_col = F.hour(dt_col)
    minute_col = F.minute(dt_col)

    # Create the rounded timestamp column
    rounded_timestamp = F.when(minute_col < 30,
                               F.make_timestamp(year_col, month_col, day_col, hour_col, F.lit(0), F.lit(0))) \
                         .otherwise(F.make_timestamp(year_col, month_col, day_col, hour_col + 1, F.lit(0), F.lit(0)))

    return rounded_timestamp

# Apply the rounding function to the Date Time column and create a new column
sar_disease_df = sar_disease_df.withColumn("Round_dt", round_to_nearest_hour(F.col("Date Time")))

# Show the updated DataFrame
sar_disease_df.show(5)


+----------+-------------------+--------------------+
|  Filename|          Date Time|            Severity|
+----------+-------------------+--------------------+
|u113_0.png|2020-05-04 13:21:00|0.008749119323640557|
|u115_0.png|2020-05-04 13:21:00|0.030556496044815168|
|u119_0.png|2020-05-04 13:22:00|0.022122323435466697|
| u12_0.png|2020-05-02 09:53:00|0.006733798089336...|
| u12_1.png|2020-05-02 09:53:00|0.003651097218967...|
+----------+-------------------+--------------------+
only showing top 5 rows

+----------+-------------------+--------------------+-------------------+
|  Filename|          Date Time|            Severity|           Round_dt|
+----------+-------------------+--------------------+-------------------+
|u113_0.png|2020-05-04 13:21:00|0.008749119323640557|2020-05-04 13:00:00|
|u115_0.png|2020-05-04 13:21:00|0.030556496044815168|2020-05-04 13:00:00|
|u119_0.png|2020-05-04 13:22:00|0.022122323435466697|2020-05-04 13:00:00|
| u12_0.png|2020-05-02 09:53:00|0.00673379808

# 3. Feature Engineering

In this part we do feature manipulation such as wind velocity

In [None]:
import numpy as np

def calculate_wind_velocity(df):
    # Convert wind direction to radians
    df = df.withColumn('wind_deg_rad', F.col('wind_deg') * np.pi / 180)

    # Calculate wind velocity components Wx and Wy
    df = df.withColumn('Wx', F.col('wind_speed') * F.cos(F.col('wind_deg_rad')))
    df = df.withColumn('Wy', F.col('wind_speed') * F.sin(F.col('wind_deg_rad')))

    # Drop the temporary wind_deg_rad column if not needed anymore
    df = df.drop('wind_deg_rad')

    return df

# Example usage on the weather dataframe
featured_sardinia_df = calculate_wind_velocity(sardinia_df)
featured_sardinia_df.show(10)

+---------+--------------------+--------+---------+---------+--------+-----+----------+---------+----------+--------+--------+--------+---------+----------+--------+----------+--------+---------+-------+-------+-------+-------+----------+----------+------------+-------------------+------------+--------------------+-------------------+
|       dt|              dt_iso|timezone|city_name|      lat|     lon| temp|visibility|dew_point|feels_like|temp_min|temp_max|pressure|sea_level|grnd_level|humidity|wind_speed|wind_deg|wind_gust|rain_1h|rain_3h|snow_1h|snow_3h|clouds_all|weather_id|weather_main|weather_description|weather_icon|                  Wx|                 Wy|
+---------+--------------------+--------+---------+---------+--------+-----+----------+---------+----------+--------+--------+--------+---------+----------+--------+----------+--------+---------+-------+-------+-------+-------+----------+----------+------------+-------------------+------------+--------------------+----------

# 4. Create a dataset

In [None]:
from pyspark.sql.window import Window
from pyspark.sql import Row
from pyspark.sql.functions import to_timestamp, date_format, regexp_replace
from pyspark.sql.types import StructType, StructField, TimestampType, DoubleType, ArrayType, FloatType

class WindowGenerator():
  def __init__(self, input_width, label_width, shift,label_columns=None, disease_df=None, weather_df=None, spark=None):

    # Work out the label column indices.
    self.label_columns = label_columns
    if label_columns is not None:
      self.label_columns_indices = {name: i for i, name in
                                    enumerate(label_columns)}

    # Work out the window parameters.
    self.input_width = input_width
    self.label_width = label_width
    self.shift = shift
    self.disease_df = disease_df
    self.weather_df = weather_df

    self.total_window_size = input_width + shift

    self.input_slice = slice(0, input_width)
    self.input_indices = np.arange(self.total_window_size)[self.input_slice]

    self.label_start = self.total_window_size - self.label_width
    self.labels_slice = slice(self.label_start, None)
    self.label_indices = np.arange(self.total_window_size)[self.labels_slice]
    self.spark = spark

  def __repr__(self):
    return '\n'.join([
        f'Total window size: {self.total_window_size}',
        f'Input indices: {self.input_indices}',
        f'Label indices: {self.label_indices}',
        f'Label column name(s): {self.label_columns}'])

  def create_dataset(self):
      if self.disease_df is None or self.weather_df is None:
          return print('Insufficient dataset to perform dataset creation')

      # Select the required weather features
      weather_features = ['temp', 'dew_point', 'feels_like', 'temp_min', 'temp_max',
                          'pressure', 'humidity', 'Wx', 'Wy', 'rain_1h', 'rain_3h']

      # Create an empty list to store the results
      results = []

      # Iterate over each row in the disease_df
      # DEBUGGING
      for row in self.disease_df.collect():
          round_dt = row['Round_dt']

          # Convert round_dt to a Spark Column using lit
          round_dt_col = F.lit(round_dt).cast(TimestampType())

          # Filter the weather_df to get the past M timesteps
          past_weather_data = self.weather_df.filter(
              (F.col('dt_iso') <= round_dt_col) &
              (F.col('dt_iso') >= F.date_sub(round_dt_col, self.input_width))
          ).orderBy(F.col('dt_iso').desc()).limit(self.input_width)

          # Collect the features for these timesteps
          feature_values = past_weather_data.select(weather_features).collect()
          feature_list = [[float(x[feature]) if x[feature] is not None else None for x in feature_values]
                            for feature in weather_features]

          # Append the result as a tuple (Round_dt, Severity, Features)
          results.append((row['Round_dt'], row['Severity'], feature_list))

      # Convert the results to a DataFrame
      schema = StructType([
            StructField("Round_dt", TimestampType(), True),
            StructField("Severity", FloatType(), True),
            StructField("Features", ArrayType(ArrayType(FloatType())), True)
        ])
      final_df = self.spark.createDataFrame(results, schema=schema)

      return final_df



In [None]:
# Pear dataset
w1 = WindowGenerator(input_width=48, label_width=1, shift=1,disease_df=sar_disease_df, weather_df=featured_sardinia_df,
                     label_columns=['Disease severity'], spark=spark)

#Blackgram dataset
w2 = WindowGenerator(input_width=48, label_width=1, shift=1,
                     label_columns=['Disease severity'])
w1, w2

(Total window size: 49
 Input indices: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
  24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47]
 Label indices: [48]
 Label column name(s): ['Disease severity'],
 Total window size: 49
 Input indices: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
  24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47]
 Label indices: [48]
 Label column name(s): ['Disease severity'])

In [None]:
def fixed_timestamp(self):
  # Convert dt_iso and Round_dt to timestamp format
  self.weather_df = self.weather_df.withColumn('dt_iso', regexp_replace('dt_iso', r'\s\+\d{4}\s\w+', ''))
  self.weather_df = self.weather_df.withColumn('dt_iso', to_timestamp('dt_iso', 'yyyy-MM-dd HH:mm:ss'))

  # Convert Round_dt to timestamp
  self.disease_df = self.disease_df.withColumn('Round_dt', to_timestamp('Round_dt'))

WindowGenerator.fixed_timestamp = fixed_timestamp

w1.fixed_timestamp()

In [None]:
sardinia=w1.create_dataset()
sardinia.write.partitionBy('Round_dt').parquet('/content/drive/MyDrive/病害診断データ/sardinia_dataset.parquet', mode='overwrite')
sardinia.show(10, truncate=False)

+-------------------+------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------