### Read data and import necessary libraries and functions

In [138]:
# Import necessary functionality
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, udf, cos, radians, sin, sqrt, atan2, year, concat, substring, lit
from pyspark.sql.types import FloatType
import math
import os

# Create a SparkSession
spark = SparkSession.builder.appName("WindEnergy").getOrCreate()

# Read wind core sites data
df = spark.read.option("sep", ";").csv("data/metobs_wind_core_sites.csv", header=True, inferSchema=True)

# Read wind turbine data
df_2 = spark.read.option("sep", ",").csv("data/VBK_export_allman_prod - Vindkraftverk.csv", header=True, inferSchema=True)

df.show(n=5)
df_2.show(n=5)


+------+---------------+-------+--------+--------+-----+
|    Id|           Namn|Latitud|Longitud|Höjd (m)|Aktiv|
+------+---------------+-------+--------+--------+-----+
|154860|Abelvattnet Aut|  65.53|   14.97|   665.0|  Nej|
|188790|     Abisko Aut|68.3538| 18.8164| 392.303|   Ja|
|158990|         Abraur|65.9857| 18.9195| 368.079|  Nej|
| 97280|       Adelsö A|59.3579| 17.5213|   5.612|   Ja|
|117330|            Agö|  61.55| 17.4666|    20.0|  Nej|
+------+---------------+-------+--------+--------+-----+
only showing top 5 rows

+--------------+----------+-------------------+-------------------+--------------------+-----------+--------------+----+---------------+------------+------+--------------------------+-----------------------+-----------------+---------------------+----------+-------------+----------+-----------+-----------+------------+-------------+-----------+-----------------+--------------+----------------------------+------------+--------------+--------------------+-----

### Define helper functions

In [139]:
# Function for transformation sweref99 tm to longitude and latitude coordinates
def sweref99_to_latlon(E, N):
    # Constants for SWEREF99 TM projection
    E0 = 500000  # False Easting in meters
    N0 = 0       # False Northing in meters
    F0 = 0.9996  # Scale factor at central meridian
    lo0 = math.radians(15)  # Central meridian in radians
    a = 6378137.0  # Semi-major axis of WGS 84 ellipsoid in meters
    la0 = 0  # Latitude of projection origin in radians

    # Convert
    lat = la0 + (N - N0) / (a * F0)
    long = lo0 + (E - E0) / (a * F0 * cos(lat))

    return lat, long

# Calculate distance between points (E, N) and (lat, long)
def distance(E, N, lat, long):

    # Convert E-kooridnat and N-kooridnat to (lat1, lon1)
    lat1, lon1 = sweref99_to_latlon(E, N)
    
    # Convert to radians
    lat2 = radians(lat)
    lon2 = radians(long)
    
    # Haversine formula
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * atan2(sqrt(a), sqrt(1-a))
    
    # Radius of the Earth in kilometers (mean value)
    radius_earth = 6371.0
    
    # Calculate the distance
    distance = radius_earth * c
    
    return distance

distance_udf = udf(distance, FloatType())

### Clean data and map turbine to closest wind core site

In [140]:
# Clean uneccesary spaces after Elområde (Electricity zone)
df_2 = df_2.withColumn("Elområde", regexp_replace(col("Elområde"), "\\s+", ""))

# Filter out turbines:
#    not on land, 
#    not mounted and 
#    not in Elområde 1

df_2 = df_2.filter((col("Status") == "Uppfört") & (col("Placering") == "Land") & (col("Elområde") == "Luleå"))

# Make life easier
df_2.createOrReplaceTempView("df2")

# Get relevant fields
df_2 = spark.sql("SELECT `Verk-ID`, `E-koordinat`, `N-koordinat`, `Uppfört`, `Maxeffekt (MW)` FROM df2")

# Join tables ignoring no longer active wind core sites
df_2 = df_2.crossJoin(df.filter((col("Aktiv") == "Ja")))

# Add column that calculate distance between all turbines and wind core sites
df_final = df_2.withColumn("distance (km)", distance(df_2["E-koordinat"], df_2["N-koordinat"], df_2["Latitud"], df_2["Longitud"]))

df_final.createOrReplaceTempView("dfinal")
# Select the closest wind core site for each wind turbine (defined by Verk-ID)
df_final = spark.sql("SELECT * FROM dfinal WHERE `distance (km)` IN (SELECT MIN(`distance (km)`) FROM dfinal GROUP BY `Verk-ID`)")


df_final.createOrReplaceTempView("dfinal")
# Find needed wind core sites
temp = spark.sql("SELECT Id, Namn, AVG(`distance (km)`) as `avg distance`, COUNT(Namn) as turbines FROM dfinal GROUP BY Namn, Id ORDER BY Id")

temp.show(n=50, truncate=False)

+------+----------------------+------------------+--------+
|Id    |Namn                  |avg distance      |turbines|
+------+----------------------+------------------+--------+
|140460|Holmön A              |31.593722317795642|7       |
|140480|Umeå Flygplats        |40.71140988554128 |25      |
|149340|Petisträsk A          |31.695296320914863|27      |
|149560|Norsjö A              |27.311162906168107|67      |
|151380|Skellefteå Flygplats  |20.60428277675917 |4       |
|157870|Buresjön A            |26.160925654400817|7       |
|158740|Malå-Brännan A        |21.49654244211856 |41      |
|159880|Arvidsjaur A          |46.30123260037012 |16      |
|161710|Pite-Rönnskär A       |39.674710253650446|210     |
|161910|Älvsbyn A             |44.27760914653739 |206     |
|162860|Luleå-Kallax Flygplats|16.62211951817588 |1       |
|163900|Storön A              |9.099583528119986 |4       |
|163960|Haparanda A           |19.251840822187297|5       |
|172940|Paharova A            |22.443951

In [141]:
# List to store individual DataFrames
dataframes = []

# Loop through all files in the folder
for filename in os.listdir("data/weather_data_mean/"):
    if filename.endswith('.csv'):  # Assuming your files have a .csv extension
        file_path = os.path.join("data/weather_data_mean/", filename)
        parts = filename.split('_')
        id = parts[3]
        df = spark.read.option("sep", ";").csv(file_path, header=True, inferSchema=True)
        df = df.filter(year('Datum') >= 2020)
        df = df.withColumn('timestamp', concat(col('Datum'), substring(col('Tid (UTC)').cast('string'), 11, 100)))

        df = df.drop(*["Datum", "Tid (UTC)", "Vindriktning", "Kvalitet3", "Kvalitet5", "_c6", "Tidsutsnitt:"])
        df = df.withColumn('Id', lit(id))
        
        dataframes.append(df)

# Concatenate the DataFrames
concatenated_df = dataframes[0]  # Initialize with the first DataFrame
for df in dataframes[1:]:
    concatenated_df = concatenated_df.union(df)

result_df = concatenated_df.join(temp, "Id", "inner")

# Show the result DataFrame
result_df.show()

                                                                                

+------+-------------+-------------------+------------+------------------+--------+
|    Id|Vindhastighet|          timestamp|        Namn|      avg distance|turbines|
+------+-------------+-------------------+------------+------------------+--------+
|149340|          0.0|2020-01-01 00:00:00|Petisträsk A|31.695296320914863|      27|
|149340|          0.0|2020-01-01 01:00:00|Petisträsk A|31.695296320914863|      27|
|149340|          0.8|2020-01-01 02:00:00|Petisträsk A|31.695296320914863|      27|
|149340|          0.7|2020-01-01 03:00:00|Petisträsk A|31.695296320914863|      27|
|149340|          1.1|2020-01-01 04:00:00|Petisträsk A|31.695296320914863|      27|
|149340|          0.0|2020-01-01 05:00:00|Petisträsk A|31.695296320914863|      27|
|149340|          0.0|2020-01-01 06:00:00|Petisträsk A|31.695296320914863|      27|
|149340|          0.6|2020-01-01 07:00:00|Petisträsk A|31.695296320914863|      27|
|149340|          0.8|2020-01-01 08:00:00|Petisträsk A|31.695296320914863|  