In [39]:
import os
import requests

from datetime import datetime
from tqdm import tqdm
import geopandas as gpd
import pyproj
import pandas as pd
import plotly.express as px

from pyspark import SparkFiles
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
spark = SparkSession.builder.master("local").appName("Irish Marine Institute - Data Prep").getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)

your 131072x1 screen size is bogus. expect trouble
23/05/08 04:35:49 WARN Utils: Your hostname, HUSMEN-ASUS resolves to a loopback address: 127.0.1.1; using 172.26.51.133 instead (on interface eth0)
23/05/08 04:35:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/08 04:35:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
dataset_dir = 'dataset'
dataset_base_url = "https://erddap.marine.ie/erddap/tabledap/"
dataset_ids = ["allDatasets", "IrishNationalTideGaugeNetwork", "IMI-TidePrediction", "IWaveBNetwork30Min", "IWaveBNetwork_spectral"]
dataset_format = "csv"

t_0 = datetime(2003, 1, 1)
t_1 = datetime(2024, 1, 1)

In [4]:
def fetch_data(dataset_id, file_format, keys = None, t_range = None, orderby = None, distinct = False):
    os.makedirs(dataset_dir, exist_ok=True)
    file_name = f"{dataset_id}{'_meta' if distinct else ''}.{file_format}"
    file_path = os.path.join(dataset_dir, file_name)
    
    keys_str = ','.join(keys) if keys else ''
    orderby_str = f"&orderBy(\"{','.join(orderby)}\")" if orderby else ''
    t_range_str = (f"&time>={t_range[0].isoformat()}" if t_range else '') + (f"&time<={t_range[1].isoformat()}" if t_range and len(t_range) == 2 else '')
    
    if os.path.isfile(file_path):
        print(f"File {file_path} already exists")
    else:
        print(f"Downloading {file_path} ...")
        url = f"{dataset_base_url}{dataset_id}.{file_format}?{keys_str}{'&distinct()' if distinct else ''}{t_range_str}{orderby_str}"
        res = requests.get(url, allow_redirects=True)
        with open(file_path, 'wb') as f:
            f.write(res.content)
    
    return file_path

In [5]:
# fetch data
data_paths = []
for dataset_id in tqdm(dataset_ids):
    if dataset_id == "allDatasets":
        data_paths.append(fetch_data("allDatasets", dataset_format, None, None, ["institution", "datasetID", "title"], True))
    else:
        s_id = "stationID" if dataset_id == 'IMI-TidePrediction' else "station_id"
        data_paths.append(fetch_data(dataset_id, dataset_format, ["longitude", "latitude", s_id], None, None, True))
        data_paths.append(fetch_data(dataset_id, dataset_format, None, [t_0, t_1], None))

100%|██████████| 5/5 [00:00<00:00, 103.31it/s]

File dataset/allDatasets_meta.csv already exists
File dataset/IrishNationalTideGaugeNetwork_meta.csv already exists
File dataset/IrishNationalTideGaugeNetwork.csv already exists
File dataset/IMI-TidePrediction_meta.csv already exists
File dataset/IMI-TidePrediction.csv already exists
File dataset/IWaveBNetwork30Min_meta.csv already exists
File dataset/IWaveBNetwork30Min.csv already exists
File dataset/IWaveBNetwork_spectral_meta.csv already exists
File dataset/IWaveBNetwork_spectral.csv already exists





In [6]:
for data_path in data_paths:
    print(data_path)
    spark.sparkContext.addFile(data_path)
spark.sparkContext.listFiles

dataset/allDatasets_meta.csv
dataset/IrishNationalTideGaugeNetwork_meta.csv
dataset/IrishNationalTideGaugeNetwork.csv
dataset/IMI-TidePrediction_meta.csv
dataset/IMI-TidePrediction.csv
dataset/IWaveBNetwork30Min_meta.csv
dataset/IWaveBNetwork30Min.csv
dataset/IWaveBNetwork_spectral_meta.csv
dataset/IWaveBNetwork_spectral.csv


['file:/mnt/d/ws/IrelandMarineDataAnalysis/dataset/IWaveBNetwork_spectral.csv',
 'file:/mnt/d/ws/IrelandMarineDataAnalysis/dataset/IWaveBNetwork30Min.csv',
 'file:/mnt/d/ws/IrelandMarineDataAnalysis/dataset/allDatasets_meta.csv',
 'file:/mnt/d/ws/IrelandMarineDataAnalysis/dataset/IMI-TidePrediction_meta.csv',
 'file:/mnt/d/ws/IrelandMarineDataAnalysis/dataset/IMI-TidePrediction.csv',
 'file:/mnt/d/ws/IrelandMarineDataAnalysis/dataset/IWaveBNetwork30Min_meta.csv',
 'file:/mnt/d/ws/IrelandMarineDataAnalysis/dataset/IrishNationalTideGaugeNetwork_meta.csv',
 'file:/mnt/d/ws/IrelandMarineDataAnalysis/dataset/IrishNationalTideGaugeNetwork.csv',
 'file:/mnt/d/ws/IrelandMarineDataAnalysis/dataset/IWaveBNetwork_spectral_meta.csv']

In [7]:
df = spark.read.csv(SparkFiles.get(os.path.join(os.getcwd(), 'dataset/allDatasets_meta.csv')), header=True, inferSchema=True)
# df = spark.read.csv('dataset/allDatasets_meta.csv', header=True, inferSchema=True)
df.printSchema()
df.show(2, vertical=True)

                                                                                

root
 |-- datasetID: string (nullable = true)
 |-- accessible: string (nullable = true)
 |-- institution: string (nullable = true)
 |-- dataStructure: string (nullable = true)
 |-- cdm_data_type: string (nullable = true)
 |-- class: string (nullable = true)
 |-- title: string (nullable = true)
 |-- minLongitude: string (nullable = true)
 |-- maxLongitude: string (nullable = true)
 |-- longitudeSpacing: string (nullable = true)
 |-- minLatitude: string (nullable = true)
 |-- maxLatitude: string (nullable = true)
 |-- latitudeSpacing: string (nullable = true)
 |-- minAltitude: string (nullable = true)
 |-- maxAltitude: string (nullable = true)
 |-- minTime: string (nullable = true)
 |-- maxTime: string (nullable = true)
 |-- timeSpacing: string (nullable = true)
 |-- griddap: string (nullable = true)
 |-- subset: string (nullable = true)
 |-- tabledap: string (nullable = true)
 |-- MakeAGraph: string (nullable = true)
 |-- sos: string (nullable = true)
 |-- wcs: string (nullable = true)


23/05/08 04:37:55 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


-RECORD 0--------------------------------
 datasetID        | null                 
 accessible       | null                 
 institution      | null                 
 dataStructure    | null                 
 cdm_data_type    | null                 
 class            | null                 
 title            | null                 
 minLongitude     | degrees_east         
 maxLongitude     | degrees_east         
 longitudeSpacing | degrees_east         
 minLatitude      | degrees_north        
 maxLatitude      | degrees_north        
 latitudeSpacing  | degrees_north        
 minAltitude      | m                    
 maxAltitude      | m                    
 minTime          | UTC                  
 maxTime          | UTC                  
 timeSpacing      | seconds              
 griddap          | null                 
 subset           | null                 
 tabledap         | null                 
 MakeAGraph       | null                 
 sos              | null          

In [96]:
tide_real_sdf = spark.read.csv(SparkFiles.get(os.path.join(os.getcwd(), 'dataset/IrishNationalTideGaugeNetwork_meta.csv')), header=True, inferSchema=True).dropna()
tide_real_sdf.printSchema()
tide_real_sdf.show()

root
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- station_id: string (nullable = true)

+---------+---------+--------------------+
|longitude| latitude|          station_id|
+---------+---------+--------------------+
|-10.27732| 52.13924|      Dingle Harbour|
| -9.90442| 53.76235|        Roonagh Pier|
|  -9.9034|  51.6496| Castletownbere Port|
|  -9.8928|  54.2536|  Ballyglass Harbour|
|  -9.6669|  53.1178|           Inishmore|
| -9.50208| 52.63191|       Kilrush Lough|
| -9.13349|51.558964|   Union Hall Harbor|
|   -9.048|   53.269|         Galway Port|
| -8.93758| 53.14052|Kinvara - Unrefer...|
|   -8.582|  54.3099|               Sligo|
|  -8.4955|  54.9905|Aranmore Island -...|
|  -8.3949|  54.6364|      Killybegs Port|
|  -8.0007|  51.8278| Ballycotton Harbour|
|  -7.3344|  55.3717|Malin Head - Port...|
| -6.99188| 52.14767|Dunmore East Harbour|
|  -6.4589|  52.3385|     Wexford Harbour|
|-6.334861|  52.2546|            Rosslare|
|  -6.2217|

In [9]:
df = tide_real_sdf.toPandas()
gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.longitude, df.latitude))
gdf.set_crs(epsg=4326, inplace=True)
gdf.explore(column='station_id', legend=False, marker_type='marker', map_kwds={'scrollWheelZoom': False})

In [10]:
tide_pred_sdf = spark.read.csv(SparkFiles.get(os.path.join(os.getcwd(), 'dataset/IMI-TidePrediction_meta.csv')), header=True, inferSchema=True).dropna()
tide_pred_sdf = tide_pred_sdf.withColumnRenamed('stationID', 'station_id')
tide_pred_sdf = tide_pred_sdf[~tide_pred_sdf['station_id'].endswith('MODELLED')]
tide_pred_sdf.printSchema()
tide_pred_sdf.show()

df = tide_pred_sdf.toPandas()
gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.longitude, df.latitude))
gdf.set_crs(epsg=4326, inplace=True)
gdf.explore(column='station_id', legend=False, marker_type='marker', map_kwds={'scrollWheelZoom': False})

root
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- station_id: string (nullable = true)

+---------+--------+-----------------+
|longitude|latitude|       station_id|
+---------+--------+-----------------+
|-10.27732|52.13924|           Dingle|
| -9.90442|53.76235|          Roonagh|
|  -9.9034| 51.6496|   Castletownbere|
|    -9.89|  54.253|       Ballyglass|
|  -9.8644|52.27129|            Fenit|
|    -9.66|  53.126|        Inishmore|
|-9.562056|53.26693|        Rossaveel|
| -9.50208|52.63191|          Kilrush|
|  -9.1335|  51.559|       Union_Hall|
| -9.04796|53.26895|           Galway|
|  -8.5689| 54.3046|            Sligo|
| -8.49562| 54.9896|         Aranmore|
|  -8.3949| 54.6364|        Killybegs|
|   -8.304|   51.84|      Ringaskiddy|
|  -8.0007|51.82776|      Ballycotton|
| -7.33432|55.37168|       Malin_Head|
| -6.99166|52.14754|          Dunmore|
|  -6.4589|52.33852|          Wexford|
|-6.334861| 52.2546|         Rosslare|
|-6.227383|53

In [11]:
tide_real_sdf.toPandas()

Unnamed: 0,longitude,latitude,station_id
0,-10.27732,52.13924,Dingle Harbour
1,-9.90442,53.76235,Roonagh Pier
2,-9.9034,51.6496,Castletownbere Port
3,-9.8928,54.2536,Ballyglass Harbour
4,-9.6669,53.1178,Inishmore
5,-9.50208,52.63191,Kilrush Lough
6,-9.13349,51.558964,Union Hall Harbor
7,-9.048,53.269,Galway Port
8,-8.93758,53.14052,Kinvara - Unreferenced
9,-8.582,54.3099,Sligo


In [12]:
tide_pred_sdf.toPandas()

Unnamed: 0,longitude,latitude,station_id
0,-10.27732,52.13924,Dingle
1,-9.90442,53.76235,Roonagh
2,-9.9034,51.6496,Castletownbere
3,-9.89,54.253,Ballyglass
4,-9.8644,52.27129,Fenit
5,-9.66,53.126,Inishmore
6,-9.562056,53.26693,Rossaveel
7,-9.50208,52.63191,Kilrush
8,-9.1335,51.559,Union_Hall
9,-9.04796,53.26895,Galway


In [13]:
station_id_mapping = {
    'Dingle Harbour': 'Dingle',
    'Roonagh Pier': 'Roonagh',
    'Castletownbere Port': 'Castletownbere',
    'Ballyglass Harbour': 'Ballyglass',
    'Inishmore': 'Inishmore',
    'Kilrush Lough': 'Kilrush',
    'Union Hall Harbor': 'Union_Hall',
    'Galway Port': 'Galway',
    'Sligo': 'Sligo',
    'Aranmore Island - Leabgarrow': 'Aranmore',
    'Killybegs Port': 'Killybegs',
    'Ballycotton Harbour': 'Ballycotton',
    'Malin Head - Portmore Pier': 'Malin_Head',
    'Dunmore East Harbour': 'Dunmore',
    'Wexford Harbour': 'Wexford',
    'Rosslare': 'Rosslare',
    'Skerries Harbour': 'Skerries',
    'Howth Water Level 1': 'Howth'
}

In [100]:
tide_real_sdf = spark.read.csv(SparkFiles.get(os.path.join(os.getcwd(), 'dataset/IrishNationalTideGaugeNetwork.csv')), header=True, inferSchema=True).dropna()
tide_real_sdf.printSchema()
tide_real_sdf.show()



root
 |-- time: string (nullable = true)
 |-- altitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- station_id: string (nullable = true)
 |-- datasourceid: integer (nullable = true)
 |-- Water_Level_LAT: string (nullable = true)
 |-- Water_Level_OD_Malin: string (nullable = true)
 |-- QC_Flag: integer (nullable = true)

+--------------------+--------+--------+---------+----------------+------------+---------------+--------------------+-------+
|                time|altitude|latitude|longitude|      station_id|datasourceid|Water_Level_LAT|Water_Level_OD_Malin|QC_Flag|
+--------------------+--------+--------+---------+----------------+------------+---------------+--------------------+-------+
|2006-10-26T13:00:00Z|     0.0|  53.585|  -6.1081|Skerries Harbour|          11|          4.679|                1.82|      1|
|2006-10-26T14:00:00Z|     0.0|  53.585|  -6.1081|Skerries Harbour|          11|          4.939|           

                                                                                

In [16]:
print(tide_real_sdf.count())
print(tide_real_sdf.select('station_id').distinct().count())

                                                                                

17052727




20


                                                                                

In [104]:
tide_real_sdf.columns

['time',
 'station_id',
 'datasourceid',
 'Water_Level_LAT',
 'Water_Level_OD_Malin']

In [103]:
# drop unnecessary columns
tide_real_sdf = tide_real_sdf.drop('altitude', 'latitude', 'longitude', 'datasourceid', 'QC_Flag')

# filter out stations that are not in tide_pred_sdf
tide_real_sdf = tide_real_sdf.filter(tide_real_sdf.station_id.isin(list(station_id_mapping.keys())))

# fix data types
tide_real_sdf = tide_real_sdf.withColumn("time", tide_real_sdf.time.cast(TimestampType()))
tide_real_sdf = tide_real_sdf.withColumn("Water_Level_LAT", tide_real_sdf.Water_Level_LAT.cast(FloatType()))
tide_real_sdf = tide_real_sdf.withColumn("Water_Level_OD_Malin", tide_real_sdf.Water_Level_OD_Malin.cast(FloatType()))

In [106]:
# map station_id to tide_pred_sdf
tide_real_sdf = tide_real_sdf.rdd.map(lambda x: (x.time ,station_id_mapping[x.station_id], x.Water_Level_LAT, x.Water_Level_OD_Malin)
    ).toDF(tide_real_sdf.columns)

In [107]:
tide_real_sdf.show()

+-------------------+----------+------------+------------------+--------------------+
|               time|station_id|datasourceid|   Water_Level_LAT|Water_Level_OD_Malin|
+-------------------+----------+------------+------------------+--------------------+
|2006-10-26 16:00:00|  Skerries|          11| 4.678999900817871|  1.8200000524520874|
|2006-10-26 17:00:00|  Skerries|          11| 4.939000129699707|  2.0799999237060547|
|2006-10-26 17:06:00|  Skerries|          11| 4.968999862670898|   2.109999895095825|
|2006-10-26 17:12:00|  Skerries|          11| 4.859000205993652|                 2.0|
|2006-10-26 17:18:00|  Skerries|          11| 4.809000015258789|  1.9500000476837158|
|2006-10-26 17:24:00|  Skerries|          11| 4.789000034332275|  1.9299999475479126|
|2006-10-26 17:30:00|  Skerries|          11| 4.738999843597412|  1.8799999952316284|
|2006-10-26 17:36:00|  Skerries|          11| 4.669000148773193|   1.809999942779541|
|2006-10-26 17:42:00|  Skerries|          11| 4.609000

In [17]:




# breakdown timestamp for easy access
# tide_real_sdf = tide_real_sdf.withColumn('year', year(tide_real_sdf.time))
# tide_real_sdf = tide_real_sdf.withColumn('month', month(tide_real_sdf.time))

# other: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html#datetime-functions

In [18]:
# save as parquet, partitioned by station_id

tide_real_sdf.write.parquet("dataset/IrishNationalTideGaugeNetwork.parquet", mode="overwrite", partitionBy=["station_id"])

                                                                                

In [19]:
tmp = spark.read.parquet("dataset/IrishNationalTideGaugeNetwork.parquet")
tmp.printSchema()
tmp.show()

root
 |-- time: timestamp (nullable = true)
 |-- datasourceid: integer (nullable = true)
 |-- Water_Level_LAT: float (nullable = true)
 |-- Water_Level_OD_Malin: float (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- station_id: string (nullable = true)

+-------------------+------------+---------------+--------------------+----+-----+-----------+
|               time|datasourceid|Water_Level_LAT|Water_Level_OD_Malin|year|month| station_id|
+-------------------+------------+---------------+--------------------+----+-----+-----------+
|2009-04-08 15:50:00|          17|          3.202|               0.697|2009|    4|Dublin Port|
|2009-04-08 15:55:00|          17|           3.12|               0.615|2009|    4|Dublin Port|
|2009-04-08 16:00:00|          17|          3.035|                0.53|2009|    4|Dublin Port|
|2009-04-08 16:05:00|          17|          2.957|               0.452|2009|    4|Dublin Port|
|2009-04-08 16:10:00|          

In [20]:
tide_pred_sdf = spark.read.csv(SparkFiles.get(os.path.join(os.getcwd(), 'dataset/IMI-TidePrediction.csv')), header=True, inferSchema=True).dropna()

                                                                                

In [21]:
tide_pred_sdf.printSchema()
tide_pred_sdf.show()

root
 |-- time: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- stationID: string (nullable = true)
 |-- Water_Level: string (nullable = true)
 |-- Water_Level_ODM: string (nullable = true)

+--------------------+---------+--------+--------------------+-----------+---------------+
|                time|longitude|latitude|           stationID|Water_Level|Water_Level_ODM|
+--------------------+---------+--------+--------------------+-----------+---------------+
|2023-01-01T00:15:00Z| -10.1016| 53.9522|Achill_Island_MOD...|       3.35|           0.92|
|2023-01-01T00:20:00Z| -10.1016| 53.9522|Achill_Island_MOD...|       3.36|           0.93|
|2023-01-01T00:25:00Z| -10.1016| 53.9522|Achill_Island_MOD...|       3.37|           0.94|
|2023-01-01T00:30:00Z| -10.1016| 53.9522|Achill_Island_MOD...|       3.38|           0.95|
|2023-01-01T00:35:00Z| -10.1016| 53.9522|Achill_Island_MOD...|       3.38|           0.95|
|2023-01-01T00:40:

In [None]:

    
# drop unnecessary columns and rows
tide_pred_sdf = tide_pred_sdf.drop('latitude', 'longitude')
tide_pred_sdf = tide_pred_sdf.withColumnRenamed('stationID', 'station_id')
tide_pred_sdf = tide_pred_sdf[~tide_pred_sdf['station_id'].endswith('MODELLED')]

# fix data types
tide_pred_sdf = tide_pred_sdf.withColumn("time", tide_real_sdf.time.cast(TimestampType()))
tide_pred_sdf = tide_pred_sdf.withColumn("Water_Level", tide_real_sdf.Water_Level_LAT.cast(FloatType()))
tide_pred_sdf = tide_pred_sdf.withColumn("Water_Level_ODM", tide_real_sdf.Water_Level_OD_Malin.cast(FloatType()))
# tide_real_sdf = tide_real_sdf.withColumn("QC_Flag", tide_real_sdf.QC_Flag.cast(BooleanType()))

