<a href="https://colab.research.google.com/github/chrispi21/python-dataeng/blob/main/04_obsluga_plikow_i_manager_kontekstu.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Przygotowanie do zajęć



Zacznijmy od pobrania plików z danymi:

In [None]:
!wget -O pracownicy.csv https://raw.githubusercontent.com/chrispi21/python-dataeng/refs/heads/main/pracownicy.csv

UWAGA: powyższą linię trzeba wykonać po każdym restarcie środowiska wykonawczego

`!` służy do wykonywania poleceń powłoki systemowej (ang. `shell`).

Sprawdźmy, czy udało się pobieranie naszego pliku i gdzie się znajduje:

In [None]:
!ls -la pracownicy.csv

In [None]:
!pwd

# Standardowe podejście

Docs:
1. https://realpython.com/working-with-files-in-python/
2. https://docs.python.org/3/library/filesys.html
3. https://docs.python.org/3/library/io.html#io-overview
4. https://docs.python.org/3/tutorial/inputoutput.html#reading-and-writing-files


Ćwiczenia:
1. https://www.w3resource.com/python-exercises/file/

Otwórzmy plik:

In [None]:
sciezka = "/content/pracownicy.csv"
plik = open(sciezka)

Odczytajmy dane:

In [None]:
dane = plik.readlines()

Dane są w postaci niesparsowanej:

In [None]:
dane

Zamknijmy plik:

In [None]:
plik.close()

Wady powyższego podejścia:

* Konieczność parsowania danych
* Trzeba zamknąć plik
* Musimy zadbać o obsługę wyjątków i wymusić zamknięcie pliku

Domyślnie pliki otwierane są w następującym trybie:
1. Do odczytu (`read mode`). Można zmienić na `write` albo otworzyć w trybie zapis i odczyt.
2. Tekstowym (domyślnie kodowanie zgodne z platformą). Można zmienić kodowanie. Można zmienić na tryb binarny.

Później zajmiemy się zapisem danych - spróbujmy znaleźć obejścia dla problemów.

# Manager kontekstu (ang. `context manager`)

Rozwiążemy problem pamiętania o zamknięciu pliku.

Docs:
1. https://book.pythontips.com/en/latest/context_managers.html
2. https://realpython.com/python-with-statement/#managing-resources-in-python

Dla chętnych:
1. https://docs.python.org/3/library/contextlib.html + https://realpython.com/python-with-statement/#creating-function-based-context-managers


In [None]:
with open(sciezka) as plik:
  dane_v2 = plik.readlines()

Manager kontekstu pozwala na bezpieczne otwarcie i zamknięcie pliku (oraz innych zasobów).

Możemy wyświetlić dane:

In [None]:
dane_v2

Nie możemy już odczytać zamkniętego pliku:

In [None]:
plik.readlines()

Ćwiczenie

Co się stanie w przypadku błędu?

In [None]:
with open(sciezka) as plik2:
  # Wymuszamy błąd
  1 / 0
  plik2.readlines()

In [None]:
# czy mogę odczytać dabe
plik2.readlines()

A teraz?

In [None]:
plik3 = open(sciezka)
1 / 0
plik3.readlines()

In [None]:
plik3.readlines()

In [None]:
plik3.close()

Ćwiczenie dla chętnych

Zapoznaj się z:
1. https://realpython.com/python-with-statement/#creating-function-based-context-managers
2. https://docs.python.org/3/library/os.html#os.environ

Utwórz manager kontekstu, który będzie inicjalizował zmienne środowiskowe a następnie je czyścił korzystając z `contextlib.contextmanager`.

Dla uproszczenie nie przejmujemy się obecnym stanem zmiennych środowiskowych i przywracaniem ich pierwotnych wartości.

Przykład:
```python
# powinno zadziałać wyświetlając kolejno:
# moja zmienna 1
# moja_zmienna_2
with env_var(MY_ENV_1="moja zmienna 1", MY_ENV_2="moja_zmienna_2"):
  print(os.environ["MY_ENV_1"])
  print(os.environ["MY_ENV_2"])

# powinno zakończyć się błędem:
print(os.environ["MY_ENV_1"])

```

In [None]:
# @title Rozwiazanie

In [None]:
# @title Podpowiedź

import os
import contextlib

@contextlib.contextmanager
def env_var(**kwargs):
  os.environ.update(kwargs)
  yield
  for k in kwargs.keys():
    del os.environ[k]

# powinno zadziałać
with env_var(MY_ENV_1="moja zmienna 1", MY_ENV_2="moja_zmienna_2"):
  print(os.environ["MY_ENV_1"])
  print(os.environ["MY_ENV_2"])

# powinno zakończyć się błędem:
print(os.environ["MY_ENV_1"])

Inne zastosowania:
1. Obsługa połączeń (np. do baz danych)
2. Obsługa transakcji w bazach danych
3. Obsługa plików temporalnych i innych zasobów tymczasowych
4. Obsługa innych zasobów, dla których wymagane jest obsłużenie zamknięcia zasobu

Do tworzenie własnych managerów kontekstu można wykorzystać bibiotekę `contextlib` ([link](https://docs.python.org/3/library/contextlib.html)). Posiada ona też wiele gotowych managerów kontekstu.

# Odczyt *csv*

Docs:
1. https://docs.python.org/3/library/csv.html#csv.reader

Rozwiążemy teraz problem samodzielnego parsowania zawartości pliku *csv*

In [None]:
import csv

In [None]:
sciezka = "/content/pracownicy.csv"
with open(sciezka,  newline='') as plik_csv:
  dane_csv = list(csv.reader(plik_csv, delimiter=","))

## Przetwarzanie linia po linii

Docs dla chętnych:
1. https://realpython.com/python-iterators-iterables/

Wczytywanie wszystkich danych do listy nie jest dobrym pomysłem w przypadku dużych plików. Za pomocą iteratora możemy przetwarzać dane linia po linii za pomocą pętli lub wyrażenia `for` jak w naszym przykładzie:

In [None]:
sciezka = "/content/pracownicy.csv"
with open(sciezka,  newline='') as plik_csv:
  csv_gen = csv.reader(plik_csv, delimiter=",")
  header = next(csv_gen)

  # to zadziała tak samo:
  # dane_csv = (linia for linia in csv.reader(plik_csv, delimiter=","))

  # jak to:
  dane_csv = (linia for linia in csv_gen)

  # co się stanie, jak wykonamy poniższy kod poza with?
  for wiersz in dane_csv:
    print(wiersz)

Ćwiczenie

Wyciągnij zestawienie zawierające unikalne imiona z pliku `pracownicy.csv`.

In [None]:
# @title Rozwiązanie

In [None]:
# @title Podpowiedź
sciezka = "/content/pracownicy.csv"
with open(sciezka,  newline='') as plik_csv:
  csv_gen = csv.reader(plik_csv, delimiter=",")
  header = next(csv_gen)
  imiona = {imie for _, imie, *_ in csv_gen}

print(imiona)

## `pandas`

Docs:
1. https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html
2. https://pandas.pydata.org/pandas-docs/stable/user_guide/cookbook.html#cookbook-csv
3. Ogólnie o obsłudze wejścia/wyjścia: https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html

Docs dla chętnych:
1. Zapoznaj się z opisem parametrów: `low_memory` i `engine`. Zastanów się jak wpływają na wydajność i utylizację pamięci operacyjnej:
* https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#specifying-the-parser-engine
* https://www.geeksforgeeks.org/pandas-read_csv-low_memory-and-dtype-options/
* https://www.kaggle.com/code/timetraveller98/testing-pandas-read-csv-performance


### Odczyt

Nie musimy pobierać pliku lokalnie. Odczyt jest bardzo prosty!

In [None]:
import pandas as pd

url = "https://raw.githubusercontent.com/chrispi21/python-dataeng/refs/heads/main/pracownicy.csv"

In [None]:
pd.read_csv(url)

Inne formaty wspierane przez `pandas` - https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#

### Odczyt w kawałkach (`chunk`) - dla chętnych

Docs:
1. https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#iterating-through-files-chunk-by-chunk
2. https://www.geeksforgeeks.org/how-to-load-a-massive-file-as-small-chunks-in-pandas/

Domyślnie parametr `low_memory=True`, więc duże pliki nie są odczytywane w całości. Jeśli chcemy mieć większą kontrolę nad odczytywanym plikiem możemy skorzystać z parametrów:
1. `iterator` - zwraca iterator, który umożliwia iterowanie po fragmentach pliku
2. `chunksize` - zwraca iterator i ustawia domyślną liczbę linii w zwracanym fragmencie

Przykłady

Bez ustawiania `chunksize`

In [None]:
csv_iter = pd.read_csv(url, iterator=True)


In [None]:
csv_iter.get_chunk(3)

In [None]:
csv_iter.get_chunk(10)

In [None]:
csv_iter.get_chunk(30)

In [None]:
csv_iter.get_chunk(30)

W pętli - w tym przypadku danych jest **mało**:

In [None]:
for chunk in  pd.read_csv(url, iterator=True):
  print(chunk)

Z ustawieniem `chunksize`:

In [None]:
csv_iter = pd.read_csv(url, chunksize=10)

Możemy odczytać wskazaną liczbę wierszy:

In [None]:
csv_iter.get_chunk(2)

Lub w prosty sposób iterować (po jeszcze nie odczytanych wierszach):

In [None]:
for chunk in csv_iter:
  print(chunk)

# Zapis `csv`

Docs:
1. https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#
2. https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-store-in-csv
3. https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_csv.html

## `pandas`

Posłużmy się przykładem z imionami:

In [None]:
imiona = pd.read_csv(url)[["Imię"]].drop_duplicates()

Zapiszemy dane do pliku w następujący sposób:

In [None]:
imiona.to_csv("/content/imiona.csv", index=False)

Warto zwrócić uwage na parametr `mode`:
* `w` - domyślna wartość - czyści plik przed zapisem albo go tworzy
* `x` - tworzy nowy plik, albo błąd, gdy plik istnieje
* `a` - dodaje nowe rekordy do istniejącego pliku albo tworzy nowy

`mode="a"` jest przydatny gdy musimy przetwarzać dane w kawałkach, np. gdy odczytywane dane nie mieszczą się w pamięci operacyjnej. Dobrze jest przed operacją zapisu czyścić zawartość pliku (zobacz [idempotencja / idempotency](https://airbyte.com/data-engineering-resources/idempotency-in-data-pipelines))

Ćwiczenie dla chętnych

Odczytaj dane z pliku pracownicy w pętli ustawiając `chunksize=10`. W pętli odfiltruj pracowników z działu HR. Zapisz imię i nazwisko ustawiając `mode="a"` do pliku pracownicy_hr.csv. Jak zapewnić, żeby proces wczytywania, przetwarzania i zapisu danych był idempotentny?

In [None]:
# @title Rozwiązanie

In [None]:
# @title Podpowiedź

import os
from contextlib import suppress

out = "pracownicy_hr.csv"

# dobra praktyka - usuwamy plik o ile istnieje
with suppress(FileNotFoundError):
  os.remove(out)

for chunk in pd.read_csv(url, chunksize=10):
  hr = chunk[chunk["Departament"] == "HR"][["Imię", "Nazwisko"]]
  hr.to_csv(out, index=False, mode="a")

# Zapis i odczyt *JSON*


## `pandas`

Docs:
1. https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_json.html

Docs dla chętnych:
1. Normalizacja (tj. "wypłaszczenie") danych w przypadku zagnieżdżonych dokumentów: https://pandas.pydata.org/docs/reference/api/pandas.json_normalize.html

Zacznijmy tym razem od zapisu danych w formacie JSON. Najczęściej spotykanym wariantem jest umieszczanie każdego dokumentu w osobnej linii. Można to zrobić jak pokazano poniżej:

In [None]:
dane_csv = pd.read_csv(url)
dane_csv.to_json("/content/pracownicy.json", orient="records", lines=True)

Odczyt danych

In [None]:
pd.read_json("/content/pracownicy.json", orient="records", lines=True)

## Biblioteka wbudowana `json`

Docs:
1. https://realpython.com/python-json/

Zaczerpnijmy przykładowy dokument z https://json.org/example.html:

In [None]:
json_doc = """
{
    "glossary": {
        "title": "example glossary",
		"GlossDiv": {
            "title": "S",
			"GlossList": {
                "GlossEntry": {
                    "ID": "SGML",
					"SortAs": "SGML",
					"GlossTerm": "Standard Generalized Markup Language",
					"Acronym": "SGML",
					"Abbrev": "ISO 8879:1986",
					"GlossDef": {
                        "para": "A meta-markup language, used to create markup languages such as DocBook.",
						"GlossSeeAlso": ["GML", "XML"]
                    },
					"GlossSee": "markup"
                }
            }
        }
    }
}
"""

Parsowanie JSON do postaci słownika:

In [None]:
import json

doc = json.loads(json_doc)

In [None]:
doc["glossary"]["title"]

Konwersja na JSON:

In [None]:
json.dumps(doc)

Jakie podejście wybrać?
* `pandas`, gdy:
 * nasze dane można (efektywnie) reprezentować w formacie tabelarycznym
 * jest to dla nas wygodniejsze
 * chcemy przetwarzać dane w kawałkach (`chunks`)
* Biblioteka wbudowana `json`, gdy:
 * chcemy pełnej kontroli nad parsowaniem
 * nie możemy skorzystać z `pandas` albo byłoby to nieefektywne (np. dlatego, że trzeba zainstalować dodatkową zależność przy starcie kontenera albo dlatego, że trzeba użyć własnego obrazu do wystartowania kontenera)
 * parsujemy dane dokument po dokumencie (np. streaming, funkcje typu serverless np. AWS lambda, GCP Cloud Run Functions, Azure Functions)


# Extract Transform Load

Potrafimy już odczytywać dane z plików oraz je zapisywać. Potrafimy również robić proste transformacje. Połączmy to wszystko!

Uwaga: Tutaj celowo przechodzimy na angielski.

Nasz przykład jest bardzo prosty. Definiujemy 3 funkcje odpowiedzialne za przetwarzanie danych:
1. `extract` - odpowiada za pobranie danych źródłowych
2. `transform` - przekształca dane z postaci źródłowej na docelową
3. `load` - zapisuje dane

In [None]:
import pandas as pd

def extract(path):
  return pd.read_csv(path)

def transform(df):
  return df[["Imię"]].drop_duplicates()

def load(df, path):
  return df.to_json(path, orient="records", index=False, lines=True)


def job(input_path, output_path):
  source_data = extract(input_path)
  transformed_data = transform(source_data)
  load(transformed_data, output_path)


Możemy uruchomić nasze przetwarzanie:

In [None]:
job(
    input_path="https://raw.githubusercontent.com/chrispi21/python-dataeng/refs/heads/main/pracownicy.csv",
    output_path="/content/imiona_v2.json"
)

Będziemy inspirować się powyższym podejściem w kolejnych materiałach, aby pokazać jak:
1. Tworzyć modularny kod
2. Uprościć testowanie

Ćwiczenie

Zrób refactoring powyższego kodu, tak aby korzystać tylko z bibliotek standardowych (`json`, `csv`). Funkcja `load` została już zaimplementowana. Tym razem musimy odczytać dane z zasobów dyskowych notebook'a.

In [None]:
# @title Rozwiązanie

import csv
import json

def extract(path):
  # implement me
  pass

def transform(employees):
  # implement me
  pass

def load(names, path):
  with open(path, mode="w") as file_handler:
    json_data = [
        json.dumps({"Imię": name}) + "\n" for name in names
    ]
    file_handler.writelines(json_data)


def job(input_path, output_path):
  source_data = extract(input_path)
  transformed_data = transform(source_data)
  load(transformed_data, output_path)



job(
    input_path="/content/pracownicy.csv",
    output_path="/content/imiona_v3.json"
)

In [None]:
# @title Podpowiedź

import csv
import json

def extract(path):
  with open(path, newline="") as file_handler:
    return list(csv.reader(file_handler, delimiter=","))[1:]

def transform(employees):
  return {
      e[1] for e in employees
  }

def load(names, path):
  with open(path, mode="w") as file_handler:
    json_data = [
        json.dumps({"Imię": name}) + "\n" for name in names
    ]
    file_handler.writelines(json_data)


def job(input_path, output_path):
  source_data = extract(input_path)
  transformed_data = transform(source_data)
  load(transformed_data, output_path)



job(
    input_path="/content/pracownicy.csv",
    output_path="/content/imiona_v3.json"
)

# Bonus - uruchomienie aplikacji z linii poleceń

Uruchomimy nasz przykład z linii poleceń:
https://github.com/chrispi21/python-dataeng/blob/main/04_main.py

Możemy uruchomić poniższy kod w:
1. Github codespaces
2. Colab notebook
3. Inne środowisko terminalowe (może być wymagana instalacja dodatkowych bibliotek)

In [None]:
!git clone https://github.com/chrispi21/python-dataeng.git

In [None]:
!python python-dataeng/04_main.py "pracownicy.csv" "imiona_v4.json"

Nasz pierwszy przykład ma wady - trzeba w odpowiedniej kolejności przekazywać argumenty. Możemy ten problem rozwiązać używając `argparse`.

https://github.com/chrispi21/python-dataeng/blob/main/04_main_argparse.py


In [None]:
!python python-dataeng/04_main_argparse.py --input_path="pracownicy.csv" --output_path="imiona_v5.json"

Więcej można dowiedzieć się tutaj:
1. https://docs.python.org/3/howto/argparse.html
2. https://realpython.com/command-line-interfaces-python-argparse/

Inne narzędzie, które polecam:
1. https://github.com/google/python-fire
2. https://click.palletsprojects.com/en/stable/
