In [20]:
import os
import re

from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession 
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col, current_timestamp

from delta.tables import DeltaTable

conf = (
    SparkConf()
    .setAppName("Spark minIO Test")
    .set("spark.hadoop.fs.s3a.endpoint", "http://192.168.86.192:9000")
    .set("spark.hadoop.fs.s3a.access.key", os.getenv('MINIO_ROOT_USER'))
    .set("spark.hadoop.fs.s3a.secret.key", os.getenv('MINIO_ROOT_PASSWORD'))
    .set("spark.hadoop.fs.s3a.path.style.access", True)
    .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .set("spark.driver.memory", "8g")
    .set("spark.executor.memory", "8g")
    .set("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") 
    .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
    .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
)
sc = SparkContext(conf=conf).getOrCreate()
spark = SparkSession(sc).builder.getOrCreate()

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=Spark minIO Test, master=local[*]) created by __init__ at /tmp/ipykernel_446/3269140975.py:25 

In [21]:
weather_stations_file = sc.textFile('s3a://landing-knmi/weather_stations/*')

In [22]:
weather_stations_file.take(10)

['# Opmerking: door stationsverplaatsingen en veranderingen in waarneemmethodieken zijn deze tijdreeksen van uurwaarden mogelijk inhomogeen! Dat betekent dat deze reeks van gemeten waarden niet geschikt is voor trendanalyse. Voor studies naar klimaatverandering verwijzen we naar de gehomogeniseerde dagreeksen <http://www.knmi.nl/nederland-nu/klimatologie/daggegevens> of de Centraal Nederland Temperatuur <http://www.knmi.nl/kennis-en-datacentrum/achtergrond/centraal-nederland-temperatuur-cnt>.',
 '# ',
 '# SOURCE: ROYAL NETHERLANDS METEOROLOGICAL INSTITUTE (KNMI)',
 '# Comment: These time series are inhomogeneous because of station relocations and changes in observation techniques. As a result these series are not suitable for trend analysis. For climate change studies we refer to the homogenized series of daily data <http://www.knmi.nl/nederland-nu/klimatologie/daggegevens> or the Central Netherlands Temperature <http://www.knmi.nl/kennis-en-datacentrum/achtergrond/centraal-nederland-t

In [23]:
weather_stations = (weather_stations_file
                        .filter(lambda x: x.startswith('# STN         ') or re.match('# \d\d\d', x))
                        .map(lambda x: x.replace('# ', ''))
                        .map(lambda x: re.sub('\s\s+', ';', x.strip()))
                        .map(lambda x: x.split(';'))
                   )

In [24]:
weather_stations.take(5)

[['STN', 'LON(east)', 'LAT(north)', 'ALT(m)', 'NAME'],
 ['209', '4.518', '52.465', '0.00', 'IJmond'],
 ['210', '4.430', '52.171', '-0.20', 'Valkenburg Zh'],
 ['215', '4.437', '52.141', '-1.10', 'Voorschoten'],
 ['225', '4.555', '52.463', '4.40', 'IJmuiden']]

In [25]:
header = weather_stations.first()
header

['STN', 'LON(east)', 'LAT(north)', 'ALT(m)', 'NAME']

In [26]:
data = weather_stations.filter(lambda row : row != header)

In [27]:
df = data.toDF(header)
df.show(5)

+---+---------+----------+------+------------------+
|STN|LON(east)|LAT(north)|ALT(m)|              NAME|
+---+---------+----------+------+------------------+
|209|    4.518|    52.465|  0.00|            IJmond|
|210|    4.430|    52.171| -0.20|     Valkenburg Zh|
|215|    4.437|    52.141| -1.10|       Voorschoten|
|225|    4.555|    52.463|  4.40|          IJmuiden|
|235|    4.781|    52.928|  1.20|           De Kooy|
|240|    4.790|    52.318| -3.30|          Schiphol|
|242|    4.921|    53.241| 10.80|          Vlieland|
|248|    5.174|    52.634|  0.80|          Wijdenes|
|249|    4.979|    52.644| -2.40|          Berkhout|
|251|    5.346|    53.392|  0.70|Hoorn Terschelling|
|257|    4.603|    52.506|  8.50|      Wijk aan Zee|
|258|    5.401|    52.649|  7.30|       Houtribdijk|
|260|    5.180|    52.100|  1.90|           De Bilt|
|265|    5.274|    52.130| 13.90|       Soesterberg|
|267|    5.384|    52.898| -1.30|          Stavoren|
|269|    5.520|    52.458| -3.70|          Lel

In [28]:
weather_stations_df = df.select(col('STN').alias('weather_station_code')
                               ,col('LON(east)').alias('longitude')
                               ,col('LAT(north)').alias('latitude')
                               ,col('ALT(m)').alias('altitude')
                               ,col('NAME').alias('weather_station'))
weather_stations_df.show(5)

+--------------------+---------+--------+--------+------------------+
|weather_station_code|longitude|latitude|altitude|   weather_station|
+--------------------+---------+--------+--------+------------------+
|                 209|    4.518|  52.465|    0.00|            IJmond|
|                 210|    4.430|  52.171|   -0.20|     Valkenburg Zh|
|                 215|    4.437|  52.141|   -1.10|       Voorschoten|
|                 225|    4.555|  52.463|    4.40|          IJmuiden|
|                 235|    4.781|  52.928|    1.20|           De Kooy|
|                 240|    4.790|  52.318|   -3.30|          Schiphol|
|                 242|    4.921|  53.241|   10.80|          Vlieland|
|                 248|    5.174|  52.634|    0.80|          Wijdenes|
|                 249|    4.979|  52.644|   -2.40|          Berkhout|
|                 251|    5.346|  53.392|    0.70|Hoorn Terschelling|
|                 257|    4.603|  52.506|    8.50|      Wijk aan Zee|
|                 25

In [29]:
weather_stations_df = weather_stations_df.withColumn('load_datetime', current_timestamp())

In [30]:
weather_stations_df.show(5)

+--------------------+---------+--------+--------+------------------+--------------------+
|weather_station_code|longitude|latitude|altitude|   weather_station|       load_datetime|
+--------------------+---------+--------+--------+------------------+--------------------+
|                 209|    4.518|  52.465|    0.00|            IJmond|2021-08-25 20:23:...|
|                 210|    4.430|  52.171|   -0.20|     Valkenburg Zh|2021-08-25 20:23:...|
|                 215|    4.437|  52.141|   -1.10|       Voorschoten|2021-08-25 20:23:...|
|                 225|    4.555|  52.463|    4.40|          IJmuiden|2021-08-25 20:23:...|
|                 235|    4.781|  52.928|    1.20|           De Kooy|2021-08-25 20:23:...|
|                 240|    4.790|  52.318|   -3.30|          Schiphol|2021-08-25 20:23:...|
|                 242|    4.921|  53.241|   10.80|          Vlieland|2021-08-25 20:23:...|
|                 248|    5.174|  52.634|    0.80|          Wijdenes|2021-08-25 20:23:...|

In [32]:
if not DeltaTable.isDeltaTable(spark, 's3a://bronze-knmi/weather_stations'):
    print("Not a delta table, write the full df")
    weather_stations_df.dropDuplicates().coalesce(1).write.format("delta").mode("overwrite").option("mergeSchema", "true").save('s3a://bronze-knmi/weather_stations')
else:
    weather_stations_df.createOrReplaceTempView('weather_stations')
    spark.sql("""
        create table if not exists bronze_knmi_weather_stations
        using delta 
        location 's3a://bronze-knmi/weather_stations'
        """)
    # We don't want to update values, only insert with new load_date_time when data is changed, so we don't use a when matched
    spark.sql("""
    MERGE INTO bronze_knmi_weather_stations
    USING weather_stations
        ON bronze_knmi_weather_stations.weather_station_code = weather_stations.weather_station_code
        AND bronze_knmi_weather_stations.longitude = weather_stations.longitude
        AND bronze_knmi_weather_stations.latitude = weather_stations.latitude
        AND bronze_knmi_weather_stations.altitude = weather_stations.altitude
        AND bronze_knmi_weather_stations.weather_station = weather_stations.weather_station
    WHEN NOT MATCHED
      THEN INSERT *
    """)
    