In [None]:
import os
import requests
from bs4 import BeautifulSoup as bs
from datetime import datetime
import zipfile
import shutil
import re

In [None]:
#Ustalenie katalogu roboczego ze zmiennej środowiskowej. 
if os.getenv("Gdrive_studia") is not None:
    os.chdir(os.getenv("Gdrive_studia"))
else:
    print(f"Obecny folder roboczy: {os.getcwdb()}")

In [None]:
# Foldery docelowy na pobrane pliki
foldery_docelowe = {
    "klimat":"Big Data/Projekt/Dane/dane_pomiarowo_obserwacyjne/dane_meteorologiczne/dobowe/klimat/",
    "opad":"Big Data/Projekt/Dane/dane_pomiarowo_obserwacyjne/dane_meteorologiczne/dobowe/opad/",
    "synop":"Big Data/Projekt/Dane/dane_pomiarowo_obserwacyjne/dane_meteorologiczne/dobowe/synop/"
}
bazowe_url = {
   "klimat": "https://danepubliczne.imgw.pl/data/dane_pomiarowo_obserwacyjne/dane_meteorologiczne/dobowe/klimat/",
    "opad": "https://danepubliczne.imgw.pl/data/dane_pomiarowo_obserwacyjne/dane_meteorologiczne/dobowe/opad/"
    #,"synop": "https://danepubliczne.imgw.pl/data/dane_pomiarowo_obserwacyjne/dane_meteorologiczne/dobowe/synop/"
}
skróty = ("k","o"
          #,"s"
         )
lata_początkowe = (1951,1950
                   #,1960
                  )

In [None]:
for docelowy_folder in foldery_docelowe.values():
# Tworzy folder docelowy jeżeli nie istnieje
    os.makedirs(docelowy_folder, exist_ok=True)

#### Stworzenie listy linków na bazie formatu używanego przez IMGW

In [None]:
def stworz_liste_linkow_do_pobrania(baza_url:str ,skrót: str) -> list:
    """
    Funkcja przygotowuję listę linków do plików według szablonu używanego przez IMGW.
    Args:
    baza_url: szablon linku
    skrót: "k" - klimat, "o" - opady
    
    """
    #baza_url = "https://danepubliczne.imgw.pl/data/dane_pomiarowo_obserwacyjne/dane_meteorologiczne/dobowe/klimat/"    
    lista_linków = []
    rok_poczatkowy = 1951
    # Pierwszy folder opadów ma inny format w związku z rozpoczęciem pomiarów od 1950 więc wymaga specjalnego potraktowania
    rok = 1950 if skrót == "o" else rok_poczatkowy
    if skrót == "o":
        for i in range(0,6):
            download_link = f"{baza_url}{1950}_{1955}/{1950+i}_{skrót}.zip"
            print(download_link)
            lista_linków.append(download_link)
        rok = 1956
    while rok <2000:
        for i in range(0,5):
            download_link = f"{baza_url}{rok}_{rok+4}/{rok+i}_{skrót}.zip"
            print(download_link)
            lista_linków.append(download_link)
        rok += 5
    obecny_rok = datetime.now().year
    while rok <= obecny_rok:
        for miesiac in range(1,13):
            miesiac_sformatowany = f"{miesiac:02}"
            download_link = f"{baza_url}{rok}/{rok}_{miesiac_sformatowany}_{skrót}.zip"
            lista_linków.append(download_link)
        rok += 1
    return lista_linków
linki_do_wywolania_klimat = stworz_liste_linkow_do_pobrania(bazowe_url['klimat'],skrót="k")
linki_do_wywolania_opady = stworz_liste_linkow_do_pobrania(bazowe_url['opad'],skrót="o")

#### Funkcja pobierająca dane

In [None]:
def pobierz_dane_z_linkow(lista_linków, folder_docelowy):
    """
    Pobiera dane z listy linków i zapisuje je w podanym folderze.

    Args:
        linki_do_wywolania (list): Lista linków do pobrania.
        folder_docelowy (str): Ścieżka do folderu, w którym mają być zapisane pliki.
    Returns:
        dict: Podsumowanie pobrań (sukcesy i błędy).
    """
    # Sprawdzenie czy folder docelowy istnieje
    os.makedirs(folder_docelowy, exist_ok=True)

    sukcesy = 0
    niepowodzenia = 0

    for link in lista_linków:
        try:
            # Pobranie nazwy pliku z linku
            nazwa_pliku = link.split('/')[-1]
            sciezka_do_pliku = os.path.join(folder_docelowy, nazwa_pliku)
            if nazwa_pliku not in os.listdir(folder_docelowy):
                # Pobierz dane z linku
                print(f"Pobieranie: {link}")
                response = requests.get(link, stream=True)
                response.raise_for_status()  # Sprawdzenie, czy nie było błędów HTTP

                # Zapisanie pliku na dysku
                with open(sciezka_do_pliku, 'wb') as plik:
                    for chunk in response.iter_content(chunk_size=8192):
                        plik.write(chunk)

                print(f"Pobrano i zapisano: {sciezka_do_pliku}")
                sukcesy += 1
        except requests.exceptions.RequestException as e:
            print(f"Błąd podczas pobierania {link}: {e}")
            niepowodzenia += 1

    # Podsumowanie
    print( {
        "sukcesy": sukcesy,
        "niepowodzenia": niepowodzenia
    })

#### Pobranie danych

In [None]:
pobierz_dane_z_linkow(lista_linków=linki_do_wywolania_klimat ,folder_docelowy=foldery_docelowe['klimat'])
pobierz_dane_z_linkow(lista_linków=linki_do_wywolania_opady ,folder_docelowy=foldery_docelowe['opad'])

#### Wypakowanie wszystkich plików zip

In [None]:
def wypakuj_pliki_zip(folder_docelowy: str):
    """
    Wypakowuje wszystkie pliki ZIP w podanym folderze. Finalnie pokazuje listę plików błędów przy rozpakowaniu.

    Args:
        folder_docelowy (str): Ścieżka do folderu, w którym znajdują się pliki ZIP.
    """
    błędy = {}
    
    # Wypakowanie wszystkich plików ZIP
    for nazwa_pliku in os.listdir(folder_docelowy):
        sciezka_pliku = os.path.join(folder_docelowy, nazwa_pliku)

        # Sprawdzenie, czy plik jest archiwum ZIP
        if zipfile.is_zipfile(sciezka_pliku):
            try:
                print(f"Wypakowywanie: {nazwa_pliku}")
                with zipfile.ZipFile(sciezka_pliku, 'r') as zip_ref:
                    zip_ref.extractall(folder_docelowy)
                print(f"Pomyślnie wypakowano: {nazwa_pliku}")
            except Exception as e:
                błędy[nazwa_pliku] = e
                print(f"Błąd podczas wypakowywania {nazwa_pliku}: {e}")
    
    # Usunięcie plików ZIP po udanym wypakowaniu     
    for nazwa_pliku in os.listdir(folder_docelowy):
        if nazwa_pliku not in list(błędy.keys()):      
            sciezka_pliku = os.path.join(folder_docelowy, nazwa_pliku)
            if sciezka_pliku.endswith('.zip') and os.path.isfile(sciezka_pliku):
                try:
                    os.remove(sciezka_pliku)
                    print(f"Usunięto plik ZIP: {nazwa_pliku}")
                except Exception as e:
                    błędy[nazwa_pliku] = e
                    print(f"Błąd podczas usuwania {nazwa_pliku}: {e}")
        else:
            continue

    print(błędy)


In [None]:
wypakuj_pliki_zip(foldery_docelowe['klimat'])
wypakuj_pliki_zip(foldery_docelowe['opad'])

In [None]:
def partycjonowanie_pionowe_rok(folder: str):
    """
    Partycjonuje pliki CSV w folderze względem roku, tworząc podfoldery dla każdego roku.

    Args:
        folder (str): Ścieżka do folderu z plikami.
    """
    # Iteruj przez wszystkie pliki w folderze
    for nazwa_pliku in os.listdir(folder):
        sciezka_pliku = os.path.join(folder, nazwa_pliku)

        # Sprawdź, czy plik jest CSV i dopasuj jego nazwę do wzorca
        if nazwa_pliku.endswith('.csv'):
            # Wzorce dla nazw plików
            wzorzec_nowy_format = r'^k?o?_d_?t?_\d{2}_(\d{4})\.csv$'  # np. k_d_01_2022.csv
            wzorzec_stary_format = r'^k?o?_d_?t?_(\d{4})\.csv$'       # np. k_d_2000.csv            
            dopasowanie_nowy = re.match(wzorzec_nowy_format, nazwa_pliku)
            dopasowanie_stary = re.match(wzorzec_stary_format, nazwa_pliku)

            if dopasowanie_nowy:
                rok = dopasowanie_nowy.group(1)
            elif dopasowanie_stary:
                rok = dopasowanie_stary.group(1)
            else:
                print(f"Plik {nazwa_pliku} nie pasuje do żadnego wzorca, pomijam.")
                continue

            # Tworzenie folderu dla danego roku
            folder_rok = os.path.join(folder, rok)
            os.makedirs(folder_rok, exist_ok=True)

            # Przeniesienie pliku do folderu
            try:
                shutil.move(sciezka_pliku, os.path.join(folder_rok, nazwa_pliku))
                print(f"Przeniesiono {nazwa_pliku} do {folder_rok}")
            except Exception as e:
                print(f"Błąd podczas przenoszenia pliku {nazwa_pliku}: {e}")



In [None]:
partycjonowanie_pionowo_rok(foldery_docelowe['klimat'])

In [None]:
partycjonowanie_pionowe_rok(foldery_docelowe['opad'])