In [1]:
import pandas as pd
import numpy as np
import dask.dataframe as dd

In [2]:
file_path = 'zamowienia.csv'
zamowienia_df = pd.read_csv(file_path)

# Wprowadzenie wartości NaN w losowych miejscach
rows_with_nan = np.random.choice(
    range(10, len(zamowienia_df)), size=5, replace=False)
num_cols_to_nan = min(3, len(zamowienia_df.columns))
cols_with_nan = np.random.choice(
    zamowienia_df.columns, size=num_cols_to_nan, replace=False)

for row in rows_with_nan:
    for col in cols_with_nan:
        zamowienia_df.at[row, col] = np.nan

missing_file_path = 'zamowienia_missing.csv'
zamowienia_df.to_csv(missing_file_path, index=False)

In [5]:
dask_df = dd.read_csv('zamowienia_missing.csv')

dask_df_types = dask_df.dtypes
print("Typy danych w ramce Dask:")
print(dask_df_types)

original_df = pd.read_csv('zamowienia.csv')
print("\nTypy danych w oryginalnej ramce pandas:")
print(original_df.dtypes)

try:
    computed_sum = dask_df.select_dtypes(include=['number']).sum().compute()
    print("\nSuma wartości liczbowych w ramce Dask:", computed_sum)
except Exception as e:
    print("\nBłąd podczas sumowania:", e)

for sample_size in [10, 50, 100, 500]:
    try:
        dask_df = dd.read_csv('zamowienia_missing.csv', sample=sample_size)
        print(f"\nTypy danych przy sample={sample_size}:")
        print(dask_df.dtypes)
    except Exception as e:
        print(f"\nBłąd przy sample={sample_size}:", e)

Typy danych w ramce Dask:
Kraj;Sprzedawca;Data zamowienia;idZamowienia;Utarg    string[pyarrow]
dtype: object

Typy danych w oryginalnej ramce pandas:
Kraj;Sprzedawca;Data zamowienia;idZamowienia;Utarg    object
dtype: object

Suma wartości liczbowych w ramce Dask: Series([], dtype: float64)

Błąd przy sample=10: An error occurred while calling the read_csv method registered to the pandas backend.
Original Message: Sample is not large enough to include at least one row of data. Please increase the number of bytes in `sample` in the call to `read_csv`/`read_table`

Błąd przy sample=50: An error occurred while calling the read_csv method registered to the pandas backend.
Original Message: Sample is not large enough to include at least one row of data. Please increase the number of bytes in `sample` in the call to `read_csv`/`read_table`

Typy danych przy sample=100:
Kraj;Sprzedawca;Data zamowienia;idZamowienia;Utarg    string[pyarrow]
dtype: object

Typy danych przy sample=500:
Kraj;Sprz

In [15]:
from dask.distributed import Client
import time
client = Client(
    memory_limit='8GB',
    n_workers=2,
    threads_per_worker=2,
    local_directory='C:/path/to/local/temp',
    dashboard_address=':9000'
)
client

ImportError: Dask diagnostics requirements are not installed.

Please either conda or pip install as follows:

  conda install dask                     # either conda install
  python -m pip install "dask[diagnostics]" --upgrade  # or python -m pip install

<Client: 'tcp://127.0.0.1:62139' processes=2 threads=4, memory=14.90 GiB>

In [16]:
import dask
dask.config.refresh()
client.close()

In [17]:
print(client.scheduler_info())

RuntimeError: IOLoop is closed

In [13]:
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
import time

# Konfiguracja lokalnego klastra z ograniczonymi zasobami
cluster = LocalCluster(
    n_workers=4,  # Więcej workerów, aby lepiej rozłożyć obciążenie
    threads_per_worker=1,  # Mniej wątków na workera
    memory_limit='2GB'  # Ograniczenie pamięci na workera, aby zapobiec przeciążeniu
)
client = Client(cluster, timeout=120)  # Zwiększony timeout

# Ścieżki do plików .parquet
file_paths = [
    "0000.parquet",
    "0001.parquet"
]

# Wczytaj dane z wybranymi kolumnami i ograniczoną liczbą partycji
df = dd.read_parquet(file_paths, columns=[
                     'username', 'likes', 'date'], npartitions=10)

# Podziel dane na więcej partycji, aby lepiej zarządzać zasobami
df = df.repartition(npartitions=20)

# Trwałe załadowanie danych do pamięci dla większej wydajności
persisted_df = df.persist()

# 1. Top 10 użytkowników z najwyższą liczbą like'ów (próbka danych dla testów)
start_time = time.time()
# Przetwórz 10% danych dla szybszego testu
sample_df = persisted_df.sample(frac=0.1).persist()
user_likes = sample_df.groupby('username')['likes'].sum().compute()
top_users = user_likes.nlargest(10)
end_time = time.time()
print(f"Top 10 użytkowników z najwyższą liczbą like'ów:\n{top_users}")
print(f"Czas wykonania operacji: {end_time - start_time} sekund")

# 2. Pobranie danych tylko za pierwsze półrocze 2019 roku
start_time = time.time()
persisted_df['date'] = dd.to_datetime(persisted_df['date'])
filtered_data = persisted_df[
    (persisted_df['date'] >=
     '2019-01-01') & (persisted_df['date'] < '2019-07-01')
].compute()
end_time = time.time()
print(f"Liczba rekordów za pierwsze półrocze 2019 roku: {len(filtered_data)}")
print(f"Czas wykonania operacji: {end_time - start_time} sekund")

# Monitorowanie obciążenia zasobów za pomocą dashboardu Dask
print(f"Link do dashboardu: {client.dashboard_link}")

# Zamknięcie klienta po zakończeniu operacji
client.close()

Perhaps you already have a cluster running?
Hosting the HTTP server on port 53515 instead
  next(self.gen)
2024-10-27 01:42:06,319 - distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "C:\Users\M\miniconda3\Lib\site-packages\distributed\protocol\core.py", line 175, in loads
    return msgpack.loads(
           ^^^^^^^^^^^^^^
  File "C:\Users\M\miniconda3\Lib\site-packages\msgpack\fallback.py", line 136, in unpackb
    raise ExtraData(ret, unpacker._get_extradata())
msgpack.exceptions.ExtraData: unpack(b) received extra data.
2024-10-27 01:42:06,362 - distributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
  File "C:\Users\M\miniconda3\Lib\site-packages\distributed\core.py", line 831, in _handle_comm
    result = await result
             ^^^^^^^^^^^^
  File "C:\Users\M\miniconda3\Lib\site-packages\distributed\scheduler.py", line 5899, in add_client
    await self.handle_stream(comm=com

FutureCancelledError: ('repartitiontofewer-b8065787f05c2a4785b3b4cca224e054', 0) cancelled for reason: scheduler-connection-lost.
Client lost the connection to the scheduler. Please check your connection and re-run your work.