In [None]:
import requests
import os
import tarfile
import gzip
from pathlib import Path
import shutil
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from bs4 import BeautifulSoup
from PIL import Image
from datetime import datetime, timedelta
import rasterio

### Convert 1D calculated coordinates to 2D

In [None]:
# 1by1 based on new coordinates: [517915, 517916, 519016, 519017]
# 1by1 based on old_coordinates: [517445, 517446, 518546, 517446]
# 1by1 based on old_coordinates  = [(470, 444), (471, 444), (471, 445), (470, 445)]
# Window coordinates: [514612, 514620, 523420, 523428]
#coordinates (8*8)= [(467, 444), (467, 452), (475, 444), (475, 452)]
def reshape_1D_to_2D(array_1D):
    if len(array_1D) != 992001:
        raise ValueError("The length of the 1D array must be 992,001.")
    
    array_2D = np.reshape(array_1D, (901, 1101))
    return array_2D.T

# Create a 1D array from 1 to 992001
array_1D = np.arange(1, 992002)

# Use the function to reshape the 1D array into a 2D array
try:
    array_2D = reshape_1D_to_2D(array_1D)
    print("Successfully reshaped to 2D array.")

    # Ask user to input a number from the 1D array
    user_input = int(input("Enter a number from 1 to 992001: "))
    
    if user_input >= 1 and user_input <= 992001:
        width = 1101
        height = 901

        index_1D = user_input - 1  # adjust for 0-based index
        
        row_idx = index_1D // width
        col_idx = index_1D % width
        
        index_2D = (row_idx, col_idx)
        print(f"The 2D index of {user_input} is {index_2D}.")
    else:
        print("Number out of range.")

except ValueError as e:
    print(e)

### Use the cell below to proceed ove a single file

In [None]:
directory_path = "/home/arman_abouali/Downloads/DWD"

year = input('Please inster Year as YYYY (2001-2022): ')
month = input('Please insert Month as MM (01 - 12): ')

### Complete these steps to store and process the image data

In [None]:
def download_weather_data(directory_path,year,month):
        base_url = 'https://opendata.dwd.de/climate_environment/CDC/grids_germany/5_minutes/radolan/reproc/2017_002/asc/'

        # Send a GET request to the base URL and get the HTML content
        response = requests.get(base_url)
        soup = BeautifulSoup(response.content, 'html.parser')

        if os.path.exists(directory_path):
            os.chdir(directory_path)
        else:
            print(f"Directory '{directory_path}' doesn't exist. Please enter an existing directory.")
            return
        
        print("Please be patient!")

        # Construct the URL for the selected weather data file
        file_url = base_url + year + '/' + f"YW2017.002_{year}{month}_asc.tar"
    
        # Download the selected weather data file
        download_file(file_url, directory_path)
   
def download_file(url, directory_path):
        
        # Send a GET request to the specified URL and get the content
        response = requests.get(url)

        # Construct the file name from the URL
        file_name = url.split('/')[-1]

        # Construct the file path from the directory path and file name
        file_path = os.path.join(directory_path, file_name)

        # Save the downloaded content to the file
        with open(file_path, 'wb') as f:
            f.write(response.content)

        print(f"Downloaded {file_name} to {file_path}")
        print("Step 1: Download and Extraction completed!")
        
#download_weather_data(directory_path,year,month)

In [None]:
def extract_tar(directory_path):

    # Loop through all the .tar files in the directory and extract them into new folders
    for file_name in os.listdir(directory_path):
        if file_name.endswith('.tar'):
            file_path = os.path.join(directory_path, file_name)

            # Create a new folder with the same name as the .tar file
            folder_name = file_name.replace('.tar', '')
            folder_path = os.path.join(directory_path, folder_name)
            if os.path.exists(folder_path):
                print(f'Folder {folder_name} already exists. Extracting {file_name} into {folder_name}/')
            else:
                # print(f'Creating folder {folder_name} and extracting {file_name} into {folder_name}/')
                os.makedirs(folder_path, exist_ok=True)

            # Extract the .tar file into the new folder
            tar = tarfile.open(file_path)
            tar.extractall(path=folder_path)
            tar.close()

            # Remove the .tar file from the directory_path directory
            os.remove(file_path)
            # print(f'Removed {file_name} from {directory_path}')
    print('Step 2: Extraction and cleanup completed!')
#extract_tar(directory_path)

In [None]:
def Unzip_gz_files(directory_path):
    
    for root, dirs, files in os.walk(directory_path):
        for file in files:
            if file.endswith('.gz'):
                gz_path = os.path.join(root, file)
                folder_path = os.path.splitext(gz_path)[0]
                os.makedirs(folder_path, exist_ok=True)
                with gzip.open(gz_path, 'rb') as f_in:
                    with open(folder_path + '/' + file[:-3], 'wb') as f_out:
                        shutil.copyfileobj(f_in, f_out) 
                os.remove(gz_path)
    print('Step 3: Unzip completed!')
#Unzip_gz_files(directory_path)

In [None]:
def extract_zip(directory_path):
    for dirpath, dirnames, filenames in os.walk(directory_path):
        for filename in filenames:
            if filename.endswith('.tar'):
                tar_path = os.path.join(dirpath, filename)
                with tarfile.open(tar_path, 'r') as tar:
                    tar.extractall(path=dirpath)
                os.remove(tar_path)
    print('Step 4: Extraction completed!')
#extract_zip(directory_path)

In [None]:
def aggregate_array(directory_path, year, month):

    for day in range(1, 32):
        day_str = str(day).zfill(2)
        sub_folder_name = f"YW2017.002_{year}{month}{day_str}_asc.tar"

        sub_folder_path = next(Path(directory_path).rglob(sub_folder_name), None)

        if not sub_folder_path:
            break
        
        agg_folder_name = "agg_" + sub_folder_name.replace(".tar", "")
        agg_folder_path = Path(sub_folder_path).parent / agg_folder_name
        agg_folder_path.mkdir(parents=True, exist_ok=True)
        os.chdir(Path(sub_folder_path).parent)

        asc_files = sorted(list(Path(sub_folder_path).rglob('*.asc')))

        aggregated_files = []
        for i in range(0, len(asc_files), 3):
            file1 = np.loadtxt(asc_files[i], skiprows=6)
            file2 = np.loadtxt(asc_files[i+1], skiprows=6)
            file3 = np.loadtxt(asc_files[i+2], skiprows=6)

            aggregated_file = np.sum([np.maximum(file1, 0), np.maximum(file2, 0), np.maximum(file3, 0)], axis=0)

            # Save as .npy instead of .asc
            output_filename = agg_folder_path / (asc_files[i+2].stem + '.npy')
            np.save(output_filename, aggregated_file)

            # Remove the original .asc files
            for f in [asc_files[i], asc_files[i+1], asc_files[i+2]]:
                os.remove(f)

            aggregated_files.append(aggregated_file)

        aggregated_array = np.stack(aggregated_files, axis=0)
        agg_array_name = f"agg_array_{year}{month}{day_str}.npy"
        np.save(os.path.join(agg_folder_path, agg_array_name), aggregated_array)

        # print(f"Shape of the aggregated file of the date {day_str}/{month}/{year} is:", aggregated_array.shape)

        # Remove the original folder containing ASCII files
        shutil.rmtree(sub_folder_path)

    print("Step 5: The Numpy arrays derived from the aggregated files are complete!")
    
#aggregate_array(directory_path, year, month)

In [None]:
import zipfile

def compress_aggregated_folder(directory_path, year, month, output_compressed_path):
    print(f"Entered compress_aggregated_folder with {directory_path}, {year}, {month}, {output_compressed_path}")
    
    folder_name_comp = f'YW2017.002_{year}{month}_asc'
    folder_path_comp = os.path.join(directory_path, folder_name_comp)
    
    print(f"Constructed folder path: {folder_path_comp}")
    
    if not os.path.exists(folder_path_comp):
        print(f"Folder '{folder_path_comp}' doesn't exist. Skipping.")
        return

    zip_name = f"{folder_name_comp}.zip"
    zip_path = os.path.join(output_compressed_path, zip_name)
    
    print(f"Starting compression from {folder_path_comp} to {zip_path}")

    with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for root, dirs, files in os.walk(folder_path_comp):
            for file in files:
                zipf.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), folder_path_comp))
        
    print("Step 6: Compression complete.")

#compress_aggregated_folder(directory_path, year, month, output_compressed_path)


In [None]:
def generate_new_array(directory_path, year, month, coordinates):
    # Extract the coordinates
    min_x = min(coord[0] for coord in coordinates)
    max_x = max(coord[0] for coord in coordinates)
    min_y = min(coord[1] for coord in coordinates)
    max_y = max(coord[1] for coord in coordinates)

    if min_x >= max_x or min_y >= max_y:
        print("Error: Minimum value must be less than maximum value for x and y.")
        return

    for day in range(1, 32):
        day_str = str(day).zfill(2)
        npy_file_name = f"agg_array_{year}{month}{day_str}.npy"
        npy_file_path = Path(directory_path) / f"YW2017.002_{year}{month}_asc" / f"agg_YW2017.002_{year}{month}{day_str}_asc" / npy_file_name

        if not npy_file_path.exists():
            print(f"No .npy file found for the date {day_str}/{month}/{year}")
            continue

        # Load the original array from .npy file
        original_array = np.load(npy_file_path)

        # Create a window of the desired region
        window = original_array[:, min_y:max_y, min_x:max_x]

        # Save the new array to a new .npy file
        new_file_path = npy_file_path.parent.parent.parent / f"Window_array_{year}{month}" / f"window_{year}{month}{day_str}.npy"
        new_file_path.parent.mkdir(parents=True, exist_ok=True)
        np.save(new_file_path, window)

        print(f"Shape of the sliced array for the date {day_str}/{month}/{year} is:", window.shape)

    # After processing all days of the month:
    original_folder_path = Path(directory_path) / f"YW2017.002_{year}{month}_asc"
    if original_folder_path.exists():
        shutil.rmtree(original_folder_path)
        print(f"Original folder {original_folder_path} has been removed.")

    print("Step 7: New arrays have been generated for all available files in the directory!")
#generate_new_array(directory_path, year, month, coordinates)

### Execute the pipeline for processing image data if it is needed for over a year

In [None]:
def pipeline(directory_path, start_year, end_year, coordinates, output_compressed_path):
    for year in range(start_year, end_year + 1):
        for month in range(1, 13):  # Loop through months 1 to 12
            month_str = str(month).zfill(2)
            year_str = str(year)
            
            print(f"Processing data for {year_str}-{month_str}...")

            # Download the weather data.
            download_weather_data(directory_path, year_str, month_str)

            # Extract the data from the tar archive.
            extract_tar(directory_path)

            # Unzip the gz files.
            Unzip_gz_files(directory_path)

            # Extract the zip files.
            extract_zip(directory_path)
            
            # Aggregate the data into a single array.
            aggregate_array(directory_path, year_str, month_str)
            
            compress_aggregated_folder(directory_path, year_str, month_str, output_compressed_path)
            
            # Generate new arrays for all coordinates.
            generate_new_array(directory_path, year_str, month_str, coordinates)
                        

    print("All steps have been completed successfully!")

# Define start and end years
start_year = int(input("Enter the start year: "))
end_year = int(input("Enter the end year: "))

# Define coordinates
coordinates = [(466, 445), (466, 452), (475, 445), (475, 452)]

# Define the directory path

directory_path = "/home/arman_abouali/Downloads/DWD"  
output_compressed_path = "/home/arman_abouali/Downloads/DWD/Original_files"
# Call the pipeline function
pipeline(directory_path, start_year, end_year, coordinates, output_compressed_path)


### For slicsing original files according to new coordinates, utilize the following two cells.

In [None]:
import zipfile
def generate_new_array(directory_path, temp_unzip_folder, output_path, year, month, coordinates):
    # Extract the coordinates
    min_x = min(coord[0] for coord in coordinates)
    max_x = max(coord[0] for coord in coordinates)
    min_y = min(coord[1] for coord in coordinates)
    max_y = max(coord[1] for coord in coordinates)

    if min_x >= max_x or min_y >= max_y:
        print("Error: Minimum value must be less than maximum value for x and y.")
        return

    for day in range(1, 32):
        day_str = str(day).zfill(2)
        npy_file_name = f"agg_array_{year}{month}{day_str}.npy"
        npy_file_path = Path(directory_path) / temp_unzip_folder/ f"agg_YW2017.002_{year}{month}{day_str}_asc" / npy_file_name

        if not npy_file_path.exists():
            print(f"No .npy file found for the date {day_str}/{month}/{year}")
            continue

        # Load the original array from .npy file
        original_array = np.load(npy_file_path)

        # Create a window of the desired region
        window = original_array[:, min_y:max_y, min_x:max_x]

        # Save the new array to a new .npy file
        new_file_path = Path(output_path) / f"Window_array_{year}{month}" / f"window_{year}{month}{day_str}.npy"
        new_file_path.parent.mkdir(parents=True, exist_ok=True)
        np.save(new_file_path, window)

        print(f"Shape of the sliced array for the date {day_str}/{month}/{year} is:", window.shape)

    # After processing all days of the month:
    original_folder_path = Path(directory_path) / f"YW2017.002_{year}{month}_asc"
    if original_folder_path.exists():
        shutil.rmtree(original_folder_path)
        print(f"Original folder {original_folder_path} has been removed.")

    print("Step 6: New arrays have been generated for all available files in the directory!")


def process_single_zip(directory_path, output_path, year, month, coordinates):
    # Step 1: Target a specific .zip folder based on the given year and month
    zip_file_name = f"YW2017.002_{year}{month}_asc.zip"
    original_zip_path = os.path.join(directory_path, zip_file_name)
    
    if not os.path.exists(original_zip_path):
        print(f"{zip_file_name} does not exist in {directory_path}")
        return
    
    # Step 2: Create temporary folder and unzip all contents of the zip file there
    temp_unzip_folder = os.path.join(directory_path, 'temp_unzip_folder')
    with zipfile.ZipFile(original_zip_path, 'r') as zip_ref:
        zip_ref.extractall(temp_unzip_folder)

    # Step 3: Execute the generate_new_array function based on its new location
    generate_new_array(directory_path, temp_unzip_folder, output_path, year, month, coordinates)

    # Step 4: Remove the temporary unzipped folder
    temp_unzip_folder = os.path.join(directory_path, 'temp_unzip_folder')
    shutil.rmtree(temp_unzip_folder)
    print("Temporary folder removed, and no redundant unzipping in output_path.")

#directory_path = "/home/arman_abouali/Downloads/DWD/Original_files"
#output_path = '/home/arman_abouali/Downloads/DWD/Original_files/DWD_1by1_old'
#coordinates = [(470, 444), (470, 445), (471, 444), (471, 445)]
#year = "2003"
#month = "11"

#process_single_zip(directory_path, output_path, year, month, coordinates)

In [None]:
def process_all_zips(directory_path, output_path, coordinates, start_year, end_year):
    for year in range(start_year, end_year + 1):
        for month in range(1, 13):  # Loop through months 1 to 12
            month_str = str(month).zfill(2)
            year_str = str(year)
            
            # Process this single zip file
            process_single_zip(directory_path, output_path, year_str, month_str, coordinates)

# Example usage
start_year = int(input("Enter the start year: "))
end_year = int(input("Enter the end year: "))
directory_path = "/home/arman_abouali/Downloads/DWD/Original_files"
output_path = '/home/arman_abouali/Downloads/DWD/Original_files/DWD_1by1_old'
coordinates = [(470, 444), (470, 445), (471, 444), (471, 445)]
#coordinates = [(467, 444), (467, 452), (475, 444), (475, 452)]
process_all_zips(directory_path, output_path, coordinates, start_year, end_year)

### Store the processed files into a Parquet file

In [None]:
def create_parquet_file(directory_path, output_file):
    dictionary = {}
    
    for root, dirs, files in os.walk(directory_path):
        for file_name in files:
            if file_name.endswith('.npy'):
                arr = np.load(os.path.join(root, file_name))

                try:
                    date = file_name.split('_')[1][:-4]
                    year, month, day = date[:4], date[4:6].lstrip('0'), date[6:].lstrip('0')
                    current_time = datetime.strptime(f'{year}-{month}-{day}T00:00:00', '%Y-%m-%dT%H:%M:%S')
                except IndexError:
                    print(f"Error: Could not extract date from file name: {file_name}")
                    continue

                for i in range(0, arr.shape[0], 96):
                    channel_key = f"Channel {i//96 + 1}"
                    channel_dict = {}

                    for j in range(i, min(i+96, arr.shape[0])):
                        if j < arr.shape[0]:
                            channel_dict[current_time.strftime("%Y%m%d_%H%M")] = arr[j, :, :].tolist()
                        current_time += timedelta(minutes=15)

                    dictionary.setdefault(channel_key, {}).update(channel_dict)

    parquet_df = pd.DataFrame.from_dict(dictionary, orient='index')
    parquet_df = parquet_df.stack().reset_index().rename(columns={'level_0': 'Key', 'level_1': 'Timestamp', 0: 'Value'})
    parquet_df = parquet_df.reset_index(drop=True)
    parquet_df = parquet_df.drop(columns=['Key'])
    parquet_df = parquet_df.rename(columns={'Timestamp': 'Key'})

    table = pa.Table.from_pandas(parquet_df)
    pq.write_table(table, output_file)
    
    return parquet_df

output_file = "/home/arman_abouali/Downloads/DWD/X_data.parquet"
print(f"Parquet file '{output_file}' created successfully.")
parquet_df = create_parquet_file(directory_path, output_file)
print(parquet_df)