**Get the environment properties**
---

In [None]:
print("\t---Get distro and release information---")
!lsb_release -a 
print("\n\t---Get cpu properties---")
!lscpu | grep -E '^Model name|^Thread|^Core|^Socket|^CPU\('
print("\n\t---Get available memory---")
!free -m -h 
print("\n\t---Get file system attributes---")
!df -h

	---Get distro and release information---
No LSB modules are available.
Distributor ID:	Ubuntu
Description:	Ubuntu 18.04.6 LTS
Release:	18.04
Codename:	bionic

	---Get cpu properties---
CPU(s):              2
Thread(s) per core:  2
Core(s) per socket:  1
Socket(s):           1
Model name:          Intel(R) Xeon(R) CPU @ 2.20GHz

	---Get available memory---
              total        used        free      shared  buff/cache   available
Mem:            12G        1.3G        8.9G        1.3M        2.5G         11G
Swap:            0B          0B          0B

	---Get file system attributes---
Filesystem      Size  Used Avail Use% Mounted on
overlay         108G   25G   84G  23% /
tmpfs            64M     0   64M   0% /dev
shm             5.8G     0  5.8G   0% /dev/shm
/dev/root       2.0G  1.1G  910M  54% /sbin/docker-init
tmpfs           6.4G   72K  6.4G   1% /var/colab
/dev/sda1        41G   27G   14G  67% /etc/hosts
tmpfs           6.4G     0  6.4G   0% /proc/acpi
tmpfs           6.4G

**Requirements installation**
---

In [None]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.2/spark-3.2.2-bin-hadoop3.2.tgz
!tar xf spark-3.2.2-bin-hadoop3.2.tgz
!pip -qq install findspark
print("Java version: ")
!java -version
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.2-bin-hadoop3.2"
!pip -qq install pyspark
!pip -qq install geopandas attrs shapely rtree apache-sedona[spark]


Java version: 
openjdk version "11.0.16" 2022-07-19
OpenJDK Runtime Environment (build 11.0.16+8-post-Ubuntu-0ubuntu118.04)
OpenJDK 64-Bit Server VM (build 11.0.16+8-post-Ubuntu-0ubuntu118.04, mixed mode, sharing)


**Get data from drive**
---

In [None]:
#gets data from csv files in google and puts them in a data folder
!gdown --fuzzy 'https://drive.google.com/file/d/1QztphAhscVlaY5vB2s6b2LuCiTzDp39N/view?usp=share_link' -O /content/
!unzip -o data.zip 

Downloading...
From: https://drive.google.com/uc?id=1QztphAhscVlaY5vB2s6b2LuCiTzDp39N
To: /content/data.zip
  0% 0.00/3.60M [00:00<?, ?B/s]100% 3.60M/3.60M [00:00<00:00, 173MB/s]
Archive:  data.zip
  inflating: data/CottonYield_1980-2020.csv  
  inflating: data/SurfaceWaterQuality.csv  
   creating: data/toxicityData/
  inflating: data/toxicityData/chihuahua_data.csv  
  inflating: data/toxicityData/chihuahua_site.csv  
  inflating: data/toxicityData/coahuila_data.csv  
  inflating: data/toxicityData/coahuila_site.csv  
  inflating: data/toxicityData/durango_data.csv  
  inflating: data/toxicityData/durango_site.csv  
  inflating: data/toxicityData/nuevoleon_data.csv  
  inflating: data/toxicityData/nuevoleon_sites.csv  
  inflating: data/toxicityData/simbology.csv  
  inflating: data/toxicityData/sinaloa_data.csv  
  inflating: data/toxicityData/sinaloa_site.csv  
  inflating: data/toxicityData/sonora_data.csv  
  inflating: data/toxicityData/sonora_site.csv  
  inflating: data/toxi

**Setup Apache Spark and Sedona**
---

In [None]:
import findspark
findspark.init()
import shapely
import pandas as pd 
import geopandas as gpd
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
spark = SparkSession. \
builder. \
appName('GeoBigData'). \
config("spark.serializer", KryoSerializer.getName). \
config("spark.executor.memory", "5g"). \
config("spark.driver.memory", "10g"). \
config('spark.driver.maxResultSize', '5g'). \
config("spark.kryo.registrator", SedonaKryoRegistrator.getName). \
config('spark.jars.packages',
           'org.apache.sedona:sedona-python-adapter-3.0_2.12:1.2.0-incubating,'
           'org.datasyslab:geotools-wrapper:1.1.0-25.2'). \
getOrCreate()
SedonaRegistrator.registerAll(spark)

True

**Load DB**
---

Load the csv files into databases with spark for later use

In [None]:
DB_SWQ = spark.read.option("header",True).csv(f"/content/data/SurfaceWaterQuality.csv")
DB_SWQ_Q = DB_SWQ.select('CLAVE','SITIO','ESTADO', 'TOX_D_48_UT', 'CALIDAD_TOX_D_48')
DB_SWQ_Q.cache()
DB_cotton = spark.read.option("header",True).csv(f"/content/data/CottonYield_1980-2020.csv")
DB_Cotton_Q = DB_cotton.select('Ano','Estado','Ciclo', 'Modalidad', 'Produccion','Superficie')
DB_Cotton_Q.cache()
DB_ToxChi = spark.read.option("header",True).csv(f"/content/data/toxicityData/chihuahua_data.csv")
DB_ToxChi_Q = DB_ToxChi.select('C_SITIO', 'C_MONITOREO', 'FECHA', 'AS_SOL')
DB_ToxChi_Q.cache()
DB_ToxCoa = spark.read.option("header",True).csv(f"/content/data/toxicityData/coahuila_data.csv")
DB_ToxCoa_Q = DB_ToxCoa.select('C_SITIO', 'C_MONITOREO', 'FECHA', 'AS_SOL')
DB_ToxCoa_Q.cache()
DB_ToxDur = spark.read.option("header",True).csv(f"/content/data/toxicityData/durango_data.csv")
DB_ToxDur_Q = DB_ToxDur.select('C_SITIO', 'C_MONITOREO', 'FECHA', 'AS_SOL')
DB_ToxDur_Q.cache()
DB_ToxNL = spark.read.option("header",True).csv(f"/content/data/toxicityData/nuevoleon_data.csv")
DB_ToxNL_Q = DB_ToxNL.select('C_SITIO', 'C_MONITOREO', 'FECHA', 'AS_SOL')
DB_ToxNL_Q.cache()
DB_ToxSin = spark.read.option("header",True).csv(f"/content/data/toxicityData/sinaloa_data.csv")
DB_ToxSin_Q = DB_ToxSin.select('C_SITIO', 'C_MONITOREO', 'FECHA', 'AS_SOL')
DB_ToxSin_Q.cache()
DB_ToxSon = spark.read.option("header",True).csv(f"/content/data/toxicityData/sonora_data.csv")
DB_ToxSon_Q = DB_ToxSon.select('C_SITIO', 'C_MONITOREO', 'FECHA', 'AS_SOL')
DB_ToxSon_Q.cache()
DB_ToxTam = spark.read.option("header",True).csv(f"/content/data/toxicityData/tamaulipas_data.csv")
DB_ToxTam_Q = DB_ToxTam.select('C_SITIO', 'C_MONITOREO', 'FECHA', 'AS_SOL')
DB_ToxTam_Q.cache()
DB_ToxTam_Q.printSchema()
DB_ToxTam_Q.show()

root
 |-- C_SITIO: string (nullable = true)
 |-- C_MONITOREO: string (nullable = true)
 |-- FECHA: string (nullable = true)
 |-- AS_SOL: string (nullable = true)

+--------------+--------------------+---------+------+
|       C_SITIO|         C_MONITOREO|    FECHA|AS_SOL|
+--------------+--------------------+---------+------+
|OCGNO0001RNL21|OCGNO0001RNL21-30...|4/30/2021|  null|
|OCGNO0002RNL21|OCGNO0002RNL21-20...|5/20/2021|  null|
|OCGNO0003RNL21|OCGNO0003RNL21-20...|5/20/2021|  null|
|OCGNO0004RNL21|OCGNO0004RNL21-20...|5/20/2021|  null|
|OCGNO0005RNL21|OCGNO0005RNL21-20...|5/20/2021|  null|
|OCGNO0006RNL21|OCGNO0006RNL21-20...|5/20/2021|  null|
|OCGNO0007RNL21|OCGNO0007RNL21-20...|5/20/2021|  null|
|OCGNO0008RNL21|OCGNO0008RNL21-20...|5/20/2021|  null|
|OCGNO0009RNL21|OCGNO0009RNL21-20...|5/20/2021|  null|
|OCGNO0010RNL21|OCGNO0010RNL21-20...|5/20/2021|  null|
|OCGNO0011RNL21|OCGNO0011RNL21-20...|5/20/2021|  null|
|OCGNO0012RNL21|OCGNO0012RNL21-20...|5/20/2021|  null|
|OCGNO0013RN

# Select valid values

In [None]:
DB_ToxChi_Q.createOrReplaceTempView("ToxChi")
DB_ToxChi_Qf = spark.sql("""select C_SITIO, C_MONITOREO, FECHA, if(AS_SOL = '<0.01', 0.005, AS_SOL) as AS_SOL from ToxChi where !isnull(AS_SOL)""")
DB_ToxChi_Qf.cache()
DB_ToxChi_Qf.show()
DB_ToxCoa_Q.createOrReplaceTempView("ToxCoa")
DB_ToxCoa_Qf = spark.sql("""select C_SITIO, C_MONITOREO, FECHA, if(AS_SOL = '<0.01', 0.005, AS_SOL) as AS_SOL from ToxCoa where !isnull(AS_SOL)""")
DB_ToxCoa_Qf.cache()
DB_ToxDur_Q.createOrReplaceTempView("ToxDur")
DB_ToxDur_Qf = spark.sql("""select C_SITIO, C_MONITOREO, FECHA, if(AS_SOL = '<0.01', 0.005, AS_SOL) as AS_SOL from ToxDur where !isnull(AS_SOL)""")
DB_ToxDur_Qf.cache()
DB_ToxNL_Q.createOrReplaceTempView("ToxNL")
DB_ToxNL_Qf = spark.sql("""select C_SITIO, C_MONITOREO, FECHA, if(AS_SOL = '<0.01', 0.005, AS_SOL) as AS_SOL from ToxNL where !isnull(AS_SOL)""")
DB_ToxNL_Qf.cache()
DB_ToxSin_Q.createOrReplaceTempView("ToxSin")
DB_ToxSin_Qf = spark.sql("""select C_SITIO, C_MONITOREO, FECHA, if(AS_SOL = '<0.01', 0.005, AS_SOL) as AS_SOL from ToxSin where !isnull(AS_SOL)""")
DB_ToxSin_Qf.cache()
DB_ToxSon_Q.createOrReplaceTempView("ToxSon")
DB_ToxSon_Qf = spark.sql("""select C_SITIO, C_MONITOREO, FECHA, if(AS_SOL = '<0.01', 0.005, AS_SOL) as AS_SOL from ToxSon where !isnull(AS_SOL)""")
DB_ToxSon_Qf.cache()
DB_ToxTam_Q.createOrReplaceTempView("ToxTam")
DB_ToxTam_Qf = spark.sql("""select C_SITIO, C_MONITOREO, FECHA, if(AS_SOL = '<0.01', 0.005, AS_SOL) as AS_SOL from ToxTam where !isnull(AS_SOL)""")
DB_ToxTam_Qf.cache()

+----------+-----------------+----------+------+
|   C_SITIO|      C_MONITOREO|     FECHA|AS_SOL|
+----------+-----------------+----------+------+
|  DLCHI291|  DLCHI291-120921|09/07/2021|0.0247|
|  DLCHI296|  DLCHI296-270920| 9/22/2020| 0.005|
|  DLCHI296|  DLCHI296-120921|09/07/2021| 0.005|
|  DLCHI297|  DLCHI297-270920| 9/22/2020| 0.005|
|  DLCHI297|  DLCHI297-120921|09/07/2021| 0.005|
|  DLCHI300|  DLCHI300-270920| 9/22/2020| 0.005|
|  DLCHI301|  DLCHI301-270920| 9/21/2020| 0.005|
|  DLCHI301|  DLCHI301-120921|09/07/2021| 0.005|
|  DLCHI307|  DLCHI307-041020|10/04/2020|0.0239|
|  DLCHI308|  DLCHI308-120921|09/06/2021|0.0157|
|  DLCHI309|  DLCHI309-041020| 9/25/2020| 0.005|
|  DLCHI311|  DLCHI311-041020| 9/24/2020| 0.005|
|  DLCHI315|  DLCHI315-111020|10/11/2020| 0.005|
|  DLCHI316|  DLCHI316-111020|10/11/2020| 0.005|
|  DLCHI318|  DLCHI318-111020|10/11/2020| 0.005|
|DLCHI326M1|DLCHI326M1-270920| 9/18/2020|0.3935|
|DLCHI327M1|DLCHI327M1-270920| 9/18/2020| 0.005|
|  DLCHI329|  DLCHI3

DataFrame[C_SITIO: string, C_MONITOREO: string, FECHA: string, AS_SOL: string]