In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, when, round, year, month, dayofmonth, count as spark_count
import requests
import zipfile
import os
from concurrent.futures import ThreadPoolExecutor

# Initialize Spark session
spark = SparkSession.builder.appName("DataFrameInit").getOrCreate()

# List of regions to process. Add more regions if desired
regions = [260, 344]


class DataFrameInit:
    def __init__(self, regions):
        # Load dataframes for the given regions
        self.dataframes = self.load_dataframes(regions)

    def download_and_extract(self, region):
        # URL to download the zip file for the given region
        url = f"https://cdn.knmi.nl/knmi/map/page/klimatologie/gegevens/daggegevens/etmgeg_{region}.zip"
        local_zip_path = f"/tmp/etmgeg_{region}.zip"
        local_txt_path = f"/tmp/etmgeg_{region}.txt"

        try:
            # Download the zip file
            response = requests.get(url)
            response.raise_for_status()
            with open(local_zip_path, 'wb') as file:
                file.write(response.content)

            # Extract the text file from the zip file
            with zipfile.ZipFile(local_zip_path, 'r') as zip_ref:
                zip_ref.extractall("/tmp")

            return local_txt_path
        except requests.exceptions.RequestException as e:
            print(f"Error downloading file for region {region}: {e}")
            return None

    def load_dataframes(self, regions):
        dataframes = {}

        # Use ThreadPoolExecutor to download and extract files in parallel
        with ThreadPoolExecutor() as executor:
            region_urls = list(executor.map(
                self.download_and_extract, regions))

        for region, region_url in zip(regions, region_urls):
            if not region_url:
                continue

            try:
                # Read the text file into a Spark dataframe, skipping the first 51 rows
                rdd = spark.sparkContext.textFile(
                    region_url).zipWithIndex().filter(lambda x: x[1] >= 51).keys()
                df = spark.read.csv(
                    rdd, header=True, sep=",", inferSchema=True)

                # Select necessary columns, create new columns, and perform calculation
                df = df.select(col("YYYYMMDD").alias("Date"),
                               col("   TG").alias("Degrees Celsius"))
                df = df.withColumn("Date", to_date(col("Date"), "yyyyMMdd"))
                df = df.withColumn("Degrees Celsius", col(
                    "Degrees Celsius").cast("float") / 10)
                df = df.na.drop(subset=["Degrees Celsius"])
                df = df.withColumn("Degree Days", self.temp_to_degreeday(df))

                # Round the Degrees Celsius and Degree Days columns to 2 decimal places
                df = df.withColumn("Degrees Celsius", round(
                    col("Degrees Celsius"), 2))
                df = df.withColumn("Degree Days", round(col("Degree Days"), 2))

                # Split Date into Year, Month, and Day columns
                df = df.withColumn("Year", year(col("Date")))
                df = df.withColumn("Month", month(col("Date")))
                df = df.withColumn("Day", dayofmonth(col("Date")))

                # Reorder columns
                df = df.select("Date", "Year", "Month", "Day",
                               "Degrees Celsius", "Degree Days")

                # Cache the DataFrame to avoid recomputation
                df.cache()

                dataframes[region] = df

                # Create directory if it doesn't exist
                output_dir = "extracted_files"
                os.makedirs(output_dir, exist_ok=True)
                # Define output filename with region number
                output_file = f"{output_dir}/etmgeg_{region}.csv"
                # Save dataframe to CSV, overwrite if file exists. Don't save the index row
                df.write.csv(output_file, header=True, mode="overwrite")

            except Exception as e:
                print(f"Error processing file for region {region}: {e}")

        return dataframes

    def temp_to_degreeday(self, df):
        ref_temp = 18
        calc = ref_temp - col("Degrees Celsius")
        degree_days = when(calc > 0, calc).otherwise(0)
        month = col("Date").substr(6, 2)

        # Calculate degree days based on the month
        return when(month.isin("04", "05", "06", "07", "08", "09"), degree_days * 0.8) \
            .when(month.isin("03", "10"), degree_days) \
            .otherwise(degree_days * 1.1)


# Initialize the DataFrameInit class with the list of regions
df_init = DataFrameInit(regions)

# Access and display the most recent 365 records for each region. Alter limit if desired
for region, df in df_init.dataframes.items():
    print(f"\nDataframe for region {region}")
    recent_df = df.orderBy(col("Date").desc()).limit(365)
    display(recent_df)

    # Create a new DataFrame with Year, Month, and Amount of Degree Days
    degree_days_df = df.filter(col("Degree Days") > 0) \
                       .groupBy("Year", "Month") \
                       .agg(spark_count("Degree Days").alias("Amount of Degree Days")) \
                       .orderBy(col("Year").desc(), col("Month").desc())

    print(f"\nAmount of Degree Days per month for region {region}")
    display(degree_days_df)



Dataframe for region 260


Date,Year,Month,Day,Degrees Celsius,Degree Days
2024-08-06,2024,8,6,21.4,0.0
2024-08-05,2024,8,5,19.2,0.0
2024-08-04,2024,8,4,17.8,0.16
2024-08-03,2024,8,3,20.3,0.0
2024-08-02,2024,8,2,19.9,0.0
2024-08-01,2024,8,1,19.5,0.0
2024-07-31,2024,7,31,21.6,0.0
2024-07-30,2024,7,30,22.8,0.0
2024-07-29,2024,7,29,19.5,0.0
2024-07-28,2024,7,28,18.3,0.0



Amount of Degree Days per month for region 260


Year,Month,Amount of Degree Days
2024,8,1
2024,7,16
2024,6,25
2024,5,26
2024,4,29
2024,3,31
2024,2,29
2024,1,31
2023,12,31
2023,11,30



Dataframe for region 344


Date,Year,Month,Day,Degrees Celsius,Degree Days
2024-08-06,2024,8,6,22.3,0.0
2024-08-05,2024,8,5,19.8,0.0
2024-08-04,2024,8,4,18.2,0.0
2024-08-03,2024,8,3,20.6,0.0
2024-08-02,2024,8,2,20.9,0.0
2024-08-01,2024,8,1,19.7,0.0
2024-07-31,2024,7,31,21.7,0.0
2024-07-30,2024,7,30,23.3,0.0
2024-07-29,2024,7,29,20.8,0.0
2024-07-28,2024,7,28,18.5,0.0



Amount of Degree Days per month for region 344


Year,Month,Amount of Degree Days
2024,7,14
2024,6,24
2024,5,28
2024,4,30
2024,3,31
2024,2,29
2024,1,31
2023,12,31
2023,11,30
2023,10,27
