In [1]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.4.0'
spark_version = 'spark-3.4.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java - uncomment if software is not installed
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Hit:2 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Hit:6 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Get:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [109 kB]
Get:8 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ Packages [43.3 kB]
Get:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease [18.1 kB]
Hit:10 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:11 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:12 http://security.ubuntu.com/ubuntu jammy-security/restricted amd64 Packages [850 kB]
Get:13 http://archive.ubuntu.com/ubuntu ja

In [2]:
# Import Remaining Packages (Post-Install)
from pyspark.sql import SparkSession
from pyspark import SparkFiles
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.functions import col, count, year, month, avg, when, max, min, round, cast
from pyspark.sql.types import IntegerType

In [3]:
# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [5]:
# Prep for file reading from google drive
from google.colab import drive

drive.mount('/content/gdrive/')

Drive already mounted at /content/gdrive/; to attempt to forcibly remount, call drive.mount("/content/gdrive/", force_remount=True).


In [6]:
#read in the csv data from google drive
spark.sparkContext.addFile('/content/gdrive/MyDrive/Colab Notebooks/Project4/Data/train_timeseries.csv')
train_timeseries_df = spark.read.csv(SparkFiles.get('train_timeseries.csv'), sep=",", header=True, inferSchema=True)

spark.sparkContext.addFile('/content/gdrive/MyDrive/Colab Notebooks/Project4/Data/test_timeseries.csv')
test_timeseries_df = spark.read.csv(SparkFiles.get('test_timeseries.csv'), sep=",", header=True, inferSchema=True)

spark.sparkContext.addFile('/content/gdrive/MyDrive/Colab Notebooks/Project4/Data/validation_timeseries.csv')
validation_timeseries_df = spark.read.csv(SparkFiles.get('validation_timeseries.csv'), sep=",", header=True, inferSchema=True)

spark.sparkContext.addFile('/content/gdrive/MyDrive/Colab Notebooks/Project4/Data/soil_data.csv')
soil_data_df = spark.read.csv(SparkFiles.get('soil_data.csv'), sep=",", header=True, inferSchema=True)

In [7]:
train_timeseries_df.show(20)

+----+----------+-------+------+-----+-----+------+------+-------+-------+---------+-----+-----+---------+---------+-----------+-----+---------+---------+-----------+-----+
|fips|      date|PRECTOT|    PS| QV2M|  T2M|T2MDEW|T2MWET|T2M_MAX|T2M_MIN|T2M_RANGE|   TS|WS10M|WS10M_MAX|WS10M_MIN|WS10M_RANGE|WS50M|WS50M_MAX|WS50M_MIN|WS50M_RANGE|score|
+----+----------+-------+------+-----+-----+------+------+-------+-------+---------+-----+-----+---------+---------+-----------+-----+---------+---------+-----------+-----+
|1001|2000-01-01|   0.22|100.51| 9.65|14.74| 13.51| 13.51|  20.96|  11.46|      9.5|14.65|  2.2|     2.94|     1.49|       1.46| 4.85|     6.04|     3.23|       2.81| null|
|1001|2000-01-02|    0.2|100.55|10.42|16.69| 14.71| 14.71|   22.8|  12.61|    10.18| 16.6| 2.52|     3.43|     1.83|        1.6| 5.33|     6.13|     3.72|       2.41| null|
|1001|2000-01-03|   3.65|100.15|11.76|18.49| 16.52| 16.52|  22.73|  15.32|     7.41|18.41| 4.03|     5.33|     2.66|       2.67| 7.53| 

In [8]:
test_timeseries_df.show()

+----+----------+-------+------+----+-----+------+------+-------+-------+---------+-----+-----+---------+---------+-----------+-----+---------+---------+-----------+-----+
|fips|      date|PRECTOT|    PS|QV2M|  T2M|T2MDEW|T2MWET|T2M_MAX|T2M_MIN|T2M_RANGE|   TS|WS10M|WS10M_MAX|WS10M_MIN|WS10M_RANGE|WS50M|WS50M_MAX|WS50M_MIN|WS50M_RANGE|score|
+----+----------+-------+------+----+-----+------+------+-------+-------+---------+-----+-----+---------+---------+-----------+-----+---------+---------+-----------+-----+
|1001|2019-01-01|   2.25|100.51|9.69|14.71| 13.55| 13.52|  17.38|  10.92|     6.46|14.63|  1.2|      1.5|     0.79|       0.71| 2.74|     4.01|     1.23|       2.78|  0.0|
|1001|2019-01-02|   4.94|100.48|8.65|13.05| 11.83| 11.74|  17.76|   9.54|     8.23|13.06| 1.02|     1.35|     0.32|       1.03| 2.13|     3.37|     0.56|       2.81| null|
|1001|2019-01-03|  20.74|100.03|8.59|12.12| 11.67| 11.67|  13.74|  10.44|      3.3|12.12| 1.83|     4.23|     0.34|       3.88| 3.41|     7.

In [9]:
validation_timeseries_df.show(50)

+----+----------+-------+------+-----+-----+------+------+-------+-------+---------+-----+-----+---------+---------+-----------+-----+---------+---------+-----------+-----+
|fips|      date|PRECTOT|    PS| QV2M|  T2M|T2MDEW|T2MWET|T2M_MAX|T2M_MIN|T2M_RANGE|   TS|WS10M|WS10M_MAX|WS10M_MIN|WS10M_RANGE|WS50M|WS50M_MAX|WS50M_MIN|WS50M_RANGE|score|
+----+----------+-------+------+-----+-----+------+------+-------+-------+---------+-----+-----+---------+---------+-----------+-----+---------+---------+-----------+-----+
|1001|2017-01-01|   32.5|100.02|10.47|14.69| 14.47| 14.47|  17.68|  10.53|     7.15|14.63| 2.14|     2.71|     1.52|       1.19|  4.4|     5.96|     2.25|       3.71| null|
|1001|2017-01-02|  63.52|100.04|12.75|17.96| 17.75| 17.75|   20.3|  16.14|     4.16|17.85| 2.75|     4.31|      1.6|       2.71|  5.5|     8.16|     4.05|       4.11| null|
|1001|2017-01-03|  18.82| 99.69| 9.74|14.24| 13.44| 13.44|  18.48|   9.29|      9.2|14.06| 2.25|     3.73|     1.64|       2.09|  4.8| 

In [10]:
soil_data_df.show()

+----+---------+----------+---------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------------+-----------------+----------------+-----------------+-----------------+----------------+-----------------+-------------------+-------------------+---+---+---+---+---+---+---+
|fips|      lat|       lon|elevation|slope1|slope2|slope3|slope4|slope5|slope6|slope7|slope8|aspectN|aspectE|aspectS|aspectW|aspectUnknown|         WAT_LAND|        NVG_LAND|         URB_LAND|         GRS_LAND|        FOR_LAND|      CULTRF_LAND|        CULTIR_LAND|          CULT_LAND|SQ1|SQ2|SQ3|SQ4|SQ5|SQ6|SQ7|
+----+---------+----------+---------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------------+-----------------+----------------+-----------------+-----------------+----------------+-----------------+-------------------+-------------------+---+---+---+---+---+---+---+
|1001|32.536382| -86.64449|       63|0.0419|0.2788|0.2984|

In [11]:
print(f"Number of rows is {soil_data_df.count()}")

Number of rows is 3109


In [12]:
# combine all the weather data into one table
weather_df = train_timeseries_df.union(validation_timeseries_df).union(test_timeseries_df)

In [13]:
weather_df.head()

Row(fips=1001, date=datetime.date(2000, 1, 1), PRECTOT=0.22, PS=100.51, QV2M=9.65, T2M=14.74, T2MDEW=13.51, T2MWET=13.51, T2M_MAX=20.96, T2M_MIN=11.46, T2M_RANGE=9.5, TS=14.65, WS10M=2.2, WS10M_MAX=2.94, WS10M_MIN=1.49, WS10M_RANGE=1.46, WS50M=4.85, WS50M_MAX=6.04, WS50M_MIN=3.23, WS50M_RANGE=2.81, score=None)

In [14]:
# Create a temporary view of the DataFrames.

soil_data_df.createOrReplaceTempView('soil')
weather_df.createOrReplaceTempView('weather')

In [15]:
# Pull unique fips locations from the soil DB
locations_df = spark.sql("""SELECT DISTINCT fips, lat, lon
   from soil
   ORDER BY fips ASC""")
locations_df.show()

+----+---------+----------+
|fips|      lat|       lon|
+----+---------+----------+
|1001|32.536382| -86.64449|
|1003|30.659218|-87.746067|
|1005| 31.87067|-85.405456|
|1007|33.015893|-87.127148|
|1009|33.977448|-86.567246|
|1011|32.101759|-85.717261|
|1013|31.751667|-86.681969|
|1015|33.771706|-85.822513|
|1017|32.917943|-85.391812|
|1019|34.069515|-85.654242|
|1021| 32.85406|-86.726628|
|1023|31.991008|-88.248887|
|1025|31.685521|-87.818624|
|1027|33.267809|-85.862051|
|1029|33.671981|-85.516109|
|1031|31.402183|-85.989201|
|1033|34.703112|-87.801457|
|1035|31.428293|-86.992029|
|1037|32.931445|-86.243482|
|1039|31.243987|-86.448721|
+----+---------+----------+
only showing top 20 rows



In [45]:
print(f"Number of locations is {locations_df.count()}")

Number of locations is 3109


In [16]:
# Pull weather readings, separate date into year and month
scores_df = spark.sql("""SELECT fips, date, score, year(date) as year, month(date) as month
   from weather
   ORDER BY fips ASC""")
scores_df.show()

+----+----------+-----+----+-----+
|fips|      date|score|year|month|
+----+----------+-----+----+-----+
|1001|2000-01-01| null|2000|    1|
|1001|2019-01-01|  0.0|2019|    1|
|1001|2017-01-01| null|2017|    1|
|1001|2019-01-02| null|2019|    1|
|1001|2000-01-02| null|2000|    1|
|1001|2019-01-03| null|2019|    1|
|1001|2017-01-02| null|2017|    1|
|1001|2019-01-04| null|2019|    1|
|1001|2000-01-03| null|2000|    1|
|1001|2019-01-05| null|2019|    1|
|1001|2017-01-03|  2.0|2017|    1|
|1001|2019-01-06| null|2019|    1|
|1001|2000-01-04|  1.0|2000|    1|
|1001|2019-01-07| null|2019|    1|
|1001|2017-01-04| null|2017|    1|
|1001|2019-01-08|  0.0|2019|    1|
|1001|2000-01-05| null|2000|    1|
|1001|2019-01-09| null|2019|    1|
|1001|2017-01-05| null|2017|    1|
|1001|2019-01-10| null|2019|    1|
+----+----------+-----+----+-----+
only showing top 20 rows



In [17]:
scores_df.printSchema()

root
 |-- fips: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- score: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



In [18]:
# replace null scores with 0, and shift all other scores +1, giving us a scale of 0-6, where 0 is no drought and 6 is the most drought
scaled_df = scores_df.withColumn("score", when(scores_df.score.isNull(), 0).otherwise(scores_df["score"]+1))

In [19]:
 # reduce the variance in the data by rounding the drought scores
 scaled_df = scaled_df.withColumn("score", round(scaled_df["score"]).cast(IntegerType()))

In [20]:
score_distrib = scaled_df.groupBy('score').agg(count("*").alias("count")).sort(col("count").desc())
score_distrib.show()

+-----+--------+
|score|   count|
+-----+--------+
|    0|20435100|
|    1| 2120440|
|    2|  565845|
|    3|  345986|
|    4|  218728|
|    5|  114419|
|    6|   40950|
+-----+--------+



In [22]:
scaled_df.createOrReplaceTempView('scores')

In [23]:
# reduce the volumne of data further by replacing the daily measures for each location with the maximum monthly measure for each location
monthly_scores_df = spark.sql("""SELECT fips, year, month, MAX(score) as max_score
   from scores
   GROUP BY year, fips, month
   ORDER BY fips ASC""")
monthly_scores_df.show(50)

+----+----+-----+---------+
|fips|year|month|max_score|
+----+----+-----+---------+
|1001|2014|    7|        1|
|1001|2006|    2|        1|
|1001|2014|    1|        1|
|1001|2005|    5|        1|
|1001|2002|    7|        3|
|1001|2012|    9|        3|
|1001|2003|    6|        1|
|1001|2001|    9|        1|
|1001|2004|   11|        1|
|1001|2013|    3|        1|
|1001|2008|    3|        5|
|1001|2003|   11|        1|
|1001|2005|    2|        1|
|1001|2006|    7|        4|
|1001|2012|   11|        3|
|1001|2003|   10|        1|
|1001|2000|    4|        2|
|1001|2009|   12|        1|
|1001|2012|    2|        3|
|1001|2006|    9|        4|
|1001|2003|    1|        1|
|1001|2010|   12|        1|
|1001|2004|    1|        1|
|1001|2011|    8|        5|
|1001|2006|   12|        2|
|1001|2012|    1|        3|
|1001|2008|    4|        5|
|1001|2009|    4|        1|
|1001|2006|   10|        3|
|1001|2016|    3|        1|
|1001|2010|    1|        1|
|1001|2005|   11|        2|
|1001|2008|    6|   

In [31]:
# collect all scores from 2000-2010 in one dataframe
scores_2000s_decade_df = monthly_scores_df.filter(scores_df.year < 2011)
scores_2000s_decade_df.show()

+----+----+-----+---------+
|fips|year|month|max_score|
+----+----+-----+---------+
|1001|2001|    1|        4|
|1001|2008|    4|        5|
|1001|2010|   12|        1|
|1001|2003|    2|        1|
|1001|2006|    1|        2|
|1001|2007|    5|        5|
|1001|2001|   10|        1|
|1001|2007|    7|        6|
|1001|2002|    1|        1|
|1001|2000|    1|        3|
|1001|2005|   10|        1|
|1001|2008|    3|        5|
|1001|2002|    5|        3|
|1001|2005|    8|        1|
|1001|2003|   10|        1|
|1001|2009|    3|        2|
|1001|2009|    8|        1|
|1001|2004|    9|        1|
|1001|2007|    3|        3|
|1001|2004|    1|        1|
+----+----+-----+---------+
only showing top 20 rows



In [32]:
# collect all scores from 2011-2020 in one dataframe
scores_2010s_decade_df = monthly_scores_df.filter(scores_df.year > 2010)
scores_2010s_decade_df.show()

+----+----+-----+---------+
|fips|year|month|max_score|
+----+----+-----+---------+
|1001|2014|    3|        2|
|1001|2016|    4|        1|
|1001|2015|   11|        2|
|1001|2018|    5|        2|
|1001|2012|    9|        3|
|1001|2015|   10|        2|
|1001|2015|    9|        2|
|1001|2012|   11|        3|
|1001|2013|    3|        1|
|1001|2013|    5|        1|
|1001|2011|   12|        4|
|1001|2014|   10|        2|
|1001|2012|    1|        3|
|1001|2012|    7|        4|
|1001|2011|    6|        3|
|1001|2015|    6|        1|
|1001|2016|    3|        1|
|1001|2013|    7|        2|
|1001|2015|    7|        1|
|1001|2011|    5|        2|
+----+----+-----+---------+
only showing top 20 rows



In [33]:
monthly_scores_df.createOrReplaceTempView('monthly_scores')

In [34]:
# reduce the volumne of data further by replacing the maximum monthly measure for each location with the the maximum annual measure for each location
annual_scores_df = spark.sql("""SELECT fips, year, MAX(max_score) as annual_max_score
   from monthly_scores
   GROUP BY year, fips
   ORDER BY fips ASC""")
annual_scores_df.show(50)

+----+----+----------------+
|fips|year|annual_max_score|
+----+----+----------------+
|1001|2000|               6|
|1001|2004|               2|
|1001|2014|               3|
|1001|2018|               2|
|1001|2016|               5|
|1001|2019|               4|
|1001|2013|               3|
|1001|2020|               2|
|1001|2001|               4|
|1001|2008|               6|
|1001|2017|               3|
|1001|2009|               2|
|1001|2003|               1|
|1001|2007|               6|
|1001|2012|               4|
|1001|2011|               5|
|1001|2010|               4|
|1001|2006|               4|
|1001|2005|               2|
|1001|2015|               2|
|1001|2002|               4|
|1003|2003|               2|
|1003|2016|               4|
|1003|2013|               3|
|1003|2002|               4|
|1003|2012|               4|
|1003|2007|               4|
|1003|2020|               4|
|1003|2010|               4|
|1003|2011|               6|
|1003|2004|               3|
|1003|2014|   

In [35]:
print(f"Number of rows is {annual_scores_df.count()}")

Number of rows is 65268


In [36]:
print(f"Number of rows is {scores_2000s_decade_df.count()}")

Number of rows is 410256


In [37]:
decade_samples_df = scores_2000s_decade_df.join(locations_df,on='fips', how="left").sort("year","month","fips")

In [38]:
full_samples_df = annual_scores_df.join(locations_df,on='fips', how="left").sort("year","fips")

In [39]:
decade_samples_df.show()

+----+----+-----+---------+---------+----------+
|fips|year|month|max_score|      lat|       lon|
+----+----+-----+---------+---------+----------+
|1001|2000|    1|        3|32.536382| -86.64449|
|1003|2000|    1|        2|30.659218|-87.746067|
|1005|2000|    1|        3| 31.87067|-85.405456|
|1007|2000|    1|        3|33.015893|-87.127148|
|1009|2000|    1|        3|33.977448|-86.567246|
|1011|2000|    1|        3|32.101759|-85.717261|
|1013|2000|    1|        2|31.751667|-86.681969|
|1015|2000|    1|        4|33.771706|-85.822513|
|1017|2000|    1|        4|32.917943|-85.391812|
|1019|2000|    1|        4|34.069515|-85.654242|
|1021|2000|    1|        3| 32.85406|-86.726628|
|1023|2000|    1|        3|31.991008|-88.248887|
|1025|2000|    1|        2|31.685521|-87.818624|
|1027|2000|    1|        3|33.267809|-85.862051|
|1029|2000|    1|        4|33.671981|-85.516109|
|1031|2000|    1|        2|31.402183|-85.989201|
|1033|2000|    1|        3|34.703112|-87.801457|
|1035|2000|    1|   

In [40]:
full_samples_df.show()

+----+----+----------------+---------+----------+
|fips|year|annual_max_score|      lat|       lon|
+----+----+----------------+---------+----------+
|1001|2000|               6|32.536382| -86.64449|
|1003|2000|               6|30.659218|-87.746067|
|1005|2000|               6| 31.87067|-85.405456|
|1007|2000|               6|33.015893|-87.127148|
|1009|2000|               6|33.977448|-86.567246|
|1011|2000|               6|32.101759|-85.717261|
|1013|2000|               6|31.751667|-86.681969|
|1015|2000|               6|33.771706|-85.822513|
|1017|2000|               6|32.917943|-85.391812|
|1019|2000|               6|34.069515|-85.654242|
|1021|2000|               6| 32.85406|-86.726628|
|1023|2000|               6|31.991008|-88.248887|
|1025|2000|               6|31.685521|-87.818624|
|1027|2000|               6|33.267809|-85.862051|
|1029|2000|               6|33.671981|-85.516109|
|1031|2000|               6|31.402183|-85.989201|
|1033|2000|               6|34.703112|-87.801457|


In [41]:
print(f"Number of rows is {decade_samples_df.count()}")

Number of rows is 410256


In [42]:
locations_df.toPandas().to_csv("/content/gdrive/MyDrive/Colab Notebooks/Project4/Data/locations.csv", index = False)

In [43]:
# Saving the monthly drought samples to a CSV file
decade_samples_df.toPandas().to_csv("/content/gdrive/MyDrive/Colab Notebooks/Project4/Data/monthly_samples_2000s.csv", index=False)

In [44]:
# Saving the annual drought samples to a CSV file
full_samples_df.toPandas().to_csv("/content/gdrive/MyDrive/Colab Notebooks/Project4/Data/annual_samples_full.csv", index=False)