<a href="https://colab.research.google.com/github/PiotrMaciejKowalski/BigData2024Project/blob/Szablon-wczytywania-danych-w-sparku/colabs/Wczytywanie_danych_NASA_w_sparku.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Wczytywanie danych w sparku

Utworzenie środowiska pyspark do obliczeń:

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

import findspark
findspark.init()

from pyspark.sql import SparkSession, DataFrame

from google.colab import drive

from pyspark.sql.types import IntegerType, FloatType, StringType, StructType, StructField

from pyspark.sql.functions import max, col

import pyspark.pandas as ps

from functools import reduce

from typing import Optional, List, Tuple

import pandas as pd

import numpy as np

import random

import math




Utworzenie sesji:

In [4]:
spark = (
         SparkSession.builder
        .master("local")
        .appName("Colab")
        .config('spark.ui.port', '4050')
        .getOrCreate()
)
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '100g'), ('spark.driver.memory','64g')])
spark.conf.set("park.driver.maxResultSize", "80g")

spark.conf.set('spark.sql.execution.arrow.enabled', 'true')



Połączenie z dyskiem:

In [5]:
drive.mount('/content/drive')

Mounted at /content/drive


Wczytanie danych NASA znajdujących się na dysku w sparku:

In [6]:
# Wczytanie zbioru sampled w celu pobrania nazw kolumn
sampled = pd.read_csv('/content/drive/MyDrive/BigMess/NASA/sampled_NASA_200k.csv')

# Utworzenie schematu określającego typ zmiennych
schemat = StructType()
for i in sampled.columns:
  if i == "Date":
    schemat = schemat.add(i, StringType(), True)
  else:
    schemat = schemat.add(i, FloatType(), True)

In [7]:
# Wczytanie zbioru Nasa w sparku
nasa = spark.read.format('csv').option("header", True).schema(schemat).load('/content/drive/MyDrive/BigMess/NASA/NASA.csv')
nasa.show(5)

+---------+-------+------+---------+---------+---------+---------+---------+---------+----------+---+-----+-----+---------+---+----------+---+---------+---------+---+---------+--------+------------+-------------+--------------+---------------+------------+-------------+--------------+---------------+-------------+-------------+----------+------------+-------------+--------------+---------------+---------------+---------------+---------+----------+---------+----------+-------+------------+-----------+------------+----------+----------+----------+----------+---------+------------+---------+----------+----------+
|      lon|    lat|  Date|   SWdown|   LWdown|    SWnet|    LWnet|      Qle|       Qh|        Qg| Qf|Snowf|Rainf|     Evap| Qs|       Qsb|Qsm| AvgSurfT|   Albedo|SWE|SnowDepth|SnowFrac|SoilT_0_10cm|SoilT_10_40cm|SoilT_40_100cm|SoilT_100_200cm|SoilM_0_10cm|SoilM_10_40cm|SoilM_40_100cm|SoilM_100_200cm|SoilM_0_100cm|SoilM_0_200cm| RootMoist|SMLiq_0_10cm|SMLiq_10_40cm|SMLiq_40_100cm|

Zanim zacznimy pisać kwerendy należy jeszcze dodać nasz DataFrame (df) do "przestrzeni nazw tabel" Sparka:

In [8]:
nasa.createOrReplaceTempView("nasa")

Rodzielenie kolumny "Date" na kolumny "Year" oraz "Month"

In [9]:
nasa = spark.sql("""
          SELECT
          CAST(SUBSTRING(CAST(Date AS STRING), 1, 4) AS INT) AS Year,
          CAST(SUBSTRING(CAST(Date AS STRING), 5, 2) AS INT) AS Month,
          n.*
          FROM nasa n
          """)

In [10]:
nasa = nasa.drop("Date")
nasa.show(5)

+----+-----+---------+-------+---------+---------+---------+---------+---------+---------+----------+---+-----+-----+---------+---+----------+---+---------+---------+---+---------+--------+------------+-------------+--------------+---------------+------------+-------------+--------------+---------------+-------------+-------------+----------+------------+-------------+--------------+---------------+---------------+---------------+---------+----------+---------+----------+-------+------------+-----------+------------+----------+----------+----------+----------+---------+------------+---------+----------+----------+
|Year|Month|      lon|    lat|   SWdown|   LWdown|    SWnet|    LWnet|      Qle|       Qh|        Qg| Qf|Snowf|Rainf|     Evap| Qs|       Qsb|Qsm| AvgSurfT|   Albedo|SWE|SnowDepth|SnowFrac|SoilT_0_10cm|SoilT_10_40cm|SoilT_40_100cm|SoilT_100_200cm|SoilM_0_10cm|SoilM_10_40cm|SoilM_40_100cm|SoilM_100_200cm|SoilM_0_100cm|SoilM_0_200cm| RootMoist|SMLiq_0_10cm|SMLiq_10_40cm|SMLiq_4

#Wyodrębnienie zbiorów testowych i treningowych

**1. Zbiór testowy i treningowy dla problemu predykcji procesu pustynnienia**

In [126]:
max_year = nasa.agg(max('Year').alias("max_year")).first()["max_year"]

In [127]:
test_set = nasa.where(nasa.Year >= max_year-1)

In [85]:
%%time
test_set.show(5)

+----+-----+---------+-------+--------+--------+--------+---------+--------+--------+---------+---+-----+-----+--------+---+---------+---+--------+--------+---+---------+--------+------------+-------------+--------------+---------------+------------+-------------+--------------+---------------+-------------+-------------+---------+------------+-------------+--------------+---------------+---------------+---------------+--------+---------+--------+--------+-------+----------+-----------+-----------+---------+---------+---------+---------+-----+------+--------+---------+----------+
|Year|Month|      lon|    lat|  SWdown|  LWdown|   SWnet|    LWnet|     Qle|      Qh|       Qg| Qf|Snowf|Rainf|    Evap| Qs|      Qsb|Qsm|AvgSurfT|  Albedo|SWE|SnowDepth|SnowFrac|SoilT_0_10cm|SoilT_10_40cm|SoilT_40_100cm|SoilT_100_200cm|SoilM_0_10cm|SoilM_10_40cm|SoilM_40_100cm|SoilM_100_200cm|SoilM_0_100cm|SoilM_0_200cm|RootMoist|SMLiq_0_10cm|SMLiq_10_40cm|SMLiq_40_100cm|SMLiq_100_200cm|SMAvail_0_100cm|SMAvai

In [44]:
%%time
train_set = nasa.where(nasa.Year < max_year-1)
train_set.show(5)

+----+-----+---------+-------+---------+---------+---------+---------+---------+---------+----------+---+-----+-----+---------+---+----------+---+---------+---------+---+---------+--------+------------+-------------+--------------+---------------+------------+-------------+--------------+---------------+-------------+-------------+----------+------------+-------------+--------------+---------------+---------------+---------------+---------+----------+---------+----------+-------+------------+-----------+------------+----------+----------+----------+----------+---------+------------+---------+----------+----------+
|Year|Month|      lon|    lat|   SWdown|   LWdown|    SWnet|    LWnet|      Qle|       Qh|        Qg| Qf|Snowf|Rainf|     Evap| Qs|       Qsb|Qsm| AvgSurfT|   Albedo|SWE|SnowDepth|SnowFrac|SoilT_0_10cm|SoilT_10_40cm|SoilT_40_100cm|SoilT_100_200cm|SoilM_0_10cm|SoilM_10_40cm|SoilM_40_100cm|SoilM_100_200cm|SoilM_0_100cm|SoilM_0_200cm| RootMoist|SMLiq_0_10cm|SMLiq_10_40cm|SMLiq_4

In [42]:
test_set.count()

1597323

In [45]:
train_set.count()

20132607

**2. Zbiór testowy i treningowy dla problemu klasyfikacji (pustynia/nie-pustynia)**

Największe wyzwanie i zasadniczy problem stanowi tutaj autokorelacja przestrzenna (*spatial autocorrelation*) typowa dla danych przestrzennych, a w konsekwencji wybór zbioru testowego o możliwie niskiej zależności przestrzennej (*spatial dependency*) ze zbiorem treningowym, by nie dopuścić do *data leakage* danych ze zbioru treningowego do testowego.



In [11]:
min_lat = 25.0625
max_lat = 52.9375
min_lon = -124.9375
max_lon = -67.0625

In [31]:
coordinatesDF = spark.sql("""SELECT DISTINCT lon, lat from nasa""").show()

**Pierwsze podejście**: wybieramy prostokąt, bądź sumę prostokątów (kwadratów) stykających się wierzchołkami. Poniższy kod pozwala na wygenerowanie listy współrzędnych "wpadających" do zadanego prostokąta (należy podać współrzędne lewego dolnego wierzchołka i prawego górnego wierzchołka, jeśli boki prostokąta są równoległe do osi, a jeśli prostokąt posiada inną orientację, należy dodatkowo podać współrzędne trzeciego wierzchołka).

In [88]:

#Input format:
#coords: spark DataFrame containing coordinates (with columns "lon" at "lat")
#lower_left_vertex = [longitude of the lower left vertex, latitude of the lower left vertex]
#upper_right_vertex = [longitude of the upper right vertex , latitude of the upper right vertex]
#orientation = True/False; True if the points are vertically or horizontally aligned
#if rectangle's sides are not parallel with axes then one more argument is required: lower_right (representing the coordinates of the lower right vertex)

def get_rectangle_coordinates(coords: DataFrame, lower_left_vertex: List[float], upper_right_vertex: List[float], orientation: Optional[bool]=True, lower_right: Optional[List[float]]=None) -> List[List[float]] :
    rect_coords = []   #coordinates inside the rectangle
    coordinates = coords.collect()


    if orientation:

        assert (lower_left_vertex[0]<upper_right_vertex[0])&(lower_left_vertex[1]<upper_right_vertex[1])

        lower_right_vertex = [upper_right_vertex[0], lower_left_vertex[1]]
        upper_left_vertex = [lower_left_vertex[0], upper_right_vertex[1]]

        for row in coordinates:
           lon = row["lon"]
           lat = row["lat"]
           if lower_left_vertex[0]< lon < upper_right_vertex[0]:
              if lower_right_vertex[1]< lat < upper_right_vertex[1]:
                 rect_coords.append([lon, lat])
    else:

        assert (lower_left_vertex[0]< lower_right[0] <upper_right_vertex[0]) & (lower_right[1] <upper_right_vertex[1]) #tyle wystarczy, czy dodac jeszcze  warunki sprawdzajace, czy boki utworzone z tych wierzcholkow sa do siebie prostopadle?

        AB = [lower_right[0]-lower_left_vertex[0], lower_right[1]-lower_left_vertex[1]]
        BC = [upper_right_vertex[0]-lower_right[0], upper_right_vertex[1]-lower_right[1]]

        for i in range(len(coords)):

           lon = row["lon"]
           lat = row["lon"]

           AM = [lon-lower_left_vertex[0], lat-lower_left_vertex[1]]
           BM = [lon-lower_right[0], lat-lower_right[1]]
           AM_AB = np.inner(AM, AB)
           AB_AB = np.inner(AB, AB)
           BC_BM = np.inner(BC, BM)
           BC_BC = np.inner(BC, BC)

           if all([(0<=AM_AB),(AM_AB <= AB_AB),(0<=BC_BM),(BC_BM<=BC_BC)]):
               rect_coords.append([lon, lat])


    return rect_coords

Pozostaje oszacować pole prostokąta (bądź sumy prostokątów), z którego/których powinien składać się zbiór testowy (w rachunkach uwzględniamy fakt, że około 7% powierzchni USA stanowią obszary wodne, dla których nie posiadamy obserwacji w naszym datasecie, oraz to, że obszar na szerokości geograficznej znajdującej się pomiędzy 25 a 30 stopniem, to zgrubsza w ponad 3/4 woda, a teren znajdujący się pomiędzy 30 a 40 stopniem szerokości oraz -80 a -67 stopniem długosci geograficznej, to mniej więcej w połowie woda):

In [15]:
area = (max_lat-40)*(max_lon-min_lon)*0.93 + 0.20*(30-min_lat)*(max_lon-min_lon) + 0.5*(40-30)*(max_lon-(-80)) +0.93*(40-30)*(-80-min_lon)

test_area = 0.15*area
test_area


185.41538671875

In [18]:
len(coordinatesDF)*0.10  # approx. number of points in test set

7636.0

Poniżej wygenerowano przykładowy zbiór testowy. (Tworząc inne, należy zatroszczyć się o to, by w takim prostokącie/prostokątach znalazł się jakiś fragment terenu pustynnego).

Poglądowa grafika przedstawiająca zbiór testowy:
![testset.png](images/testset.png)

In [58]:
rectangles = [([-122,25.5], [-115, 37]),([-124,37], [-122, 52]),
              ([-93, 24.5], [-79.5, 34]),([-79.5, 34], [-75, 37]),
              ([-115, 25.5], [-111, 30])]

test_set_coordinates = reduce(
lambda rect1, rect2 : rect1+rect2,
[
get_rectangle_coordinates( coordinatesDF, lower_left , upper_right , orientation=True )
for lower_left, upper_right
in rectangles
]
)

In [59]:
len(test_set_coordinates)

8228

In [16]:

schema_c = StructType([StructField("lon", FloatType(), True), StructField("lat", FloatType(), True)])
test_coordsDF = spark.createDataFrame(test_set_coordinates, schema_c)
test_coordsDF.show(5)

+---------+-------+
|      lon|    lat|
+---------+-------+
|-117.8125|35.1875|
|-115.6875|30.1875|
|-119.5625|35.6875|
|-116.8125|35.8125|
|-120.1875|36.5625|
+---------+-------+
only showing top 5 rows



In [38]:
train_coordsDF = coordinatesDF.exceptAll(test_coordsDF)
train_coordsDF.show(5)

+---------+-------+
|      lon|    lat|
+---------+-------+
|-107.6875|38.5625|
| -82.9375|48.9375|
|-111.6875|51.1875|
|-106.5625|27.8125|
| -98.0625|43.4375|
+---------+-------+
only showing top 5 rows



In [39]:
trainset_df = nasa.join(train_coordsDF, ["lon", "lat"], "inner").select(nasa['*'])   #"tylko" 11 minut zajęło
trainset_df.show()

+----+-----+---------+-------+---------+---------+----------+-----------+---------+---------+-----------+------------+----------+----------+---------+-----------+----------+-----------+---------+---------+------------+------------+------------+------------+-------------+--------------+---------------+------------+-------------+--------------+---------------+-------------+-------------+---------+------------+-------------+--------------+---------------+---------------+---------------+---------+-----------+----------+----------+------------+------------+------------+------------+----------+----------+----------+-----------+---------+------------+----------+-----------+----------+
|Year|Month|      lon|    lat|   SWdown|   LWdown|     SWnet|      LWnet|      Qle|       Qh|         Qg|          Qf|     Snowf|     Rainf|     Evap|         Qs|       Qsb|        Qsm| AvgSurfT|   Albedo|         SWE|   SnowDepth|    SnowFrac|SoilT_0_10cm|SoilT_10_40cm|SoilT_40_100cm|SoilT_100_200cm|SoilM_0_10c

In [69]:
#To rozwiązanie podane w pierwszej kolejności przez Chat-GPT wygeneruje nam nie taki wynik, jaki chcielibyśmy (wszystkie kombinacje wartości z jednej listy w kolumnie 1 i z
#drugiej listy w kolumnie 2 znalezione w dataframe, zamiast tylko konkretnych par (u nas wspolrzednych), o które nam chodzi:
#filtered_df = df.filter((col("col1").isin(filter_list1)) & (col("col2").isin(filter_list2))

#test_set2 = nasa.filter((col('lon'), col('lat')).isin(test_set_coordinates)) #to nie dziala wbrew temu co podpowiadal chatGPT
#test_set2.show(5)

#Chat_GPT podpowiada tez rozwiazanie typu
#filtered_df = df.filter((col("column1").isin([pair[0] for pair in list_of_pairs])) & (col("column2").isin([pair[1] for pair in list_of_pairs])))
#i to takze jest niepoprawne oczywiscie

#Chat_GPT podpowiadal tez taka konstrukcje w ktorej stosowal "isin" bezpośrednio do sparkowego DataFrame'u ("df["column1", "column2"].isin(...)") co także jest niepoprawne....

#to tez nie dziala:
#filter_conditions = [(col('lon') == pair[0]) & (col('lat') == pair[1]) for pair in test_set_coordinates]
#combined_condition = reduce(lambda a, b: a | b, filter_conditions)
#filtered_df = nasa.filter(combined_condition)


In [19]:
testset_df = nasa.join(test_coordsDF, ["lon", "lat"], "inner").select(nasa['*'])
testset_df.show()

+----+-----+---------+-------+----------+---------+----------+----------+---------+---------+----------+------------+---------+---------+---------+-----------+-----------+-----------+---------+---------+-----------+-------------+------------+------------+-------------+--------------+---------------+------------+-------------+--------------+---------------+-------------+-------------+---------+------------+-------------+--------------+---------------+---------------+---------------+----------+----------+----------+---------+------------+------------+-----------+------------+----------+----------+----------+-----------+---------+-----------+---------+----------+-----------+
|Year|Month|      lon|    lat|    SWdown|   LWdown|     SWnet|     LWnet|      Qle|       Qh|        Qg|          Qf|    Snowf|    Rainf|     Evap|         Qs|        Qsb|        Qsm| AvgSurfT|   Albedo|        SWE|    SnowDepth|    SnowFrac|SoilT_0_10cm|SoilT_10_40cm|SoilT_40_100cm|SoilT_100_200cm|SoilM_0_10cm|SoilM_

In [22]:
#Drugi sposób na wyłuskanie zbioru treningowego (liczyło 38 minut!):
trainset_df2 = nasa.subtract(testset_df).show()

+----+-----+---------+-------+----------+---------+---------+----------+----------+----------+----------+-------------+-----------+----------+----------+-----------+----------+-----------+---------+---------+-----------+------------+------------+------------+-------------+--------------+---------------+------------+-------------+--------------+---------------+-------------+-------------+---------+------------+-------------+--------------+---------------+---------------+---------------+----------+----------+-----------+---------+------------+------------+------------+------------+----------+-----------+----------+-----------+---------+-----------+---------+------------+----------+
|Year|Month|      lon|    lat|    SWdown|   LWdown|    SWnet|     LWnet|       Qle|        Qh|        Qg|           Qf|      Snowf|     Rainf|      Evap|         Qs|       Qsb|        Qsm| AvgSurfT|   Albedo|        SWE|   SnowDepth|    SnowFrac|SoilT_0_10cm|SoilT_10_40cm|SoilT_40_100cm|SoilT_100_200cm|SoilM_0

In [87]:
testset_df.count()

2337170

**Drugi wariant:**  spatial block cross-validation, którą możemy spróbować delikatnie zmodyfikować dodając za każdym razem do *foldu* testowego uprzednio "wycięty" fragment pustyni (być może *Baja California*?)

Poniższy kod dzieli współrzędne na kwadratowe bloki (generuje grid) o zadanej wielkości:

In [85]:
#grid_cell_size - the grid cell size (the length of the side of a square) in degrees
def get_grid(grid_cell_size: float) -> Tuple[List[float], List[float]] :

 min_lat = 25.0625
 max_lat = 52.9375
 min_lon = -124.9375
 max_lon = -67.0625

 area = (max_lat-min_lat)*(max_lon-min_lon)
 cells_num = area//(grid_cell_size*grid_cell_size)
 actual_grid_size = math.sqrt(area/cells_num)

 xx=np.arange(min_lon, max_lon, step = actual_grid_size)
 yy=np.arange(min_lat, max_lat, step = actual_grid_size)
 #xx, yy = np.meshgrid(x,y, sparse=True)

 return(xx, yy)

#coords_set: spark DataFrame containing coordinates
#block_size: approximate size of a block
def block_partition(coords_set: DataFrame, block_size: float) -> List[List[List[float]]]:

  coords = coords_set.collect()
  blocks = []
  xx, yy = get_grid(block_size)

  for y in range(len(yy)-1):
    for x in range(len(xx)-1):
        block = []
        for row in coords:
            lon = row['lon']
            lat = row['lat']
            if xx[x]<= lon <= xx[x+1]:
                  if yy[y]<= lat < yy[y+1]:
                     block.append([lon, lat])

        if len(block)>0:        #checking if block is non-empty (it is possible since coordinates coresponding to water areas are missing)
           blocks.append(block)


  return blocks



Wycinamy wspomniany wcześniej fragment terenu pustynnego:

In [79]:
block_with_desert = get_rectangle_coordinates(coordinatesDF, [-115, 25.5], [-111, 31])   #block containing desert, to be removed from dataset before generating blocks and added to test fold afterwards

coordinates2 = coordinatesDF.exceptAll(spark.createDataFrame(block_with_desert, schema_c))
coordinates2.show(5)


+---------+-------+
|      lon|    lat|
+---------+-------+
|-107.6875|38.5625|
| -82.9375|48.9375|
|-111.6875|51.1875|
|-106.5625|27.8125|
| -98.0625|43.4375|
+---------+-------+
only showing top 5 rows



Funkcja generująca podział na foldy:

In [67]:
#k - number of folds
#blocks - list of lists of blocks' coordinates
def Kfolds(k : int, blocks: List[List[List[float]]]) -> List[List[List[List[float]]]] :

    n = len(blocks)//k
    reminder = len(blocks)%k
    folds=[]

    for i in range(k):
          fold = []
          for j in range(n):
                r = random.randint(0, len(blocks)-1)
                fold.append(blocks[r])
                blocks.remove(blocks[r])
          folds.append(fold)

    if reminder!=0:
       for b in blocks:
           n_fol = random.randint(0, k-1)
           folds[n_fol].append(b)

    return(folds)

Dla przykładu wygenerujemy foldy dla k = 6 oraz block_size = 2:

In [80]:
blocks =  block_partition(coordinates2, block_size=2)
folds = Kfolds(6, blocks)

Następnie za każdym razem należy do foldu testowego dodawać wygenerowany powyżej *block_with_desert*