In [1]:
!pip install dask[complete]

Collecting lz4>=4.3.2 (from dask[complete])
  Downloading lz4-4.4.5-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (3.8 kB)
Collecting distributed<2025.12.1,>=2025.12.0 (from dask[complete])
  Downloading distributed-2025.12.0-py3-none-any.whl.metadata (3.4 kB)
Collecting sortedcontainers>=2.0.5 (from distributed<2025.12.1,>=2025.12.0->dask[complete])
  Downloading sortedcontainers-2.4.0-py2.py3-none-any.whl.metadata (10 kB)
Collecting tblib!=3.2.0,!=3.2.1,>=1.6.0 (from distributed<2025.12.1,>=2025.12.0->dask[complete])
  Downloading tblib-3.2.2-py3-none-any.whl.metadata (27 kB)
Collecting zict>=3.0.0 (from distributed<2025.12.1,>=2025.12.0->dask[complete])
  Downloading zict-3.0.0-py2.py3-none-any.whl.metadata (899 bytes)
Downloading lz4-4.4.5-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl (1.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m18.8 MB/s[0m eta [36m0:00:00

In [2]:
import pandas as pd
import numpy as np
import dask.dataframe as dd

In [4]:
#utworzenie pliku csv z 5mln rekordów
n = 5_000_000
df = pd.DataFrame({
    "id":np.arange(n),
    "value":np.random.normal(100, 20, size=n),
    "category":np.random.choice(["A", "B", "C"], size=n)
})

In [5]:
df.to_csv('big_data.csv',index=False)

In [6]:
#utworzenie ramki dask na bazie pliku csv
ddf = dd.read_csv("big_data.csv")
print(ddf.head(10))

   id       value category
0   0   72.467291        C
1   1   99.815543        A
2   2   68.634648        A
3   3  129.542669        C
4   4  119.084839        B
5   5  137.266854        A
6   6  118.959417        C
7   7   83.945246        A
8   8  107.694441        C
9   9   87.673579        B


In [7]:
#proste operacje - średnia arytmetyczna - średnia leniwa podział na elementy grafu - uwzględniając partycje..,
#średnia obliczeniowa - wykonanie obliczeń na każdym elemencie grafu i połączenie ich w wartość skalarną.
mean_val = ddf["value"].mean()
print(f"Średnia (leniwa) wynosi: {mean_val}")
print(f"Średnia (obliczeniowa) wynosi: {mean_val.compute()}")

Średnia (leniwa) wynosi: <dask_expr.expr.Scalar: expr=ArrowStringConversion(frame=FromMapProjectable(6389d78))['value'].mean(), dtype=float64>
Średnia (obliczeniowa) wynosi: 99.99078584704856


In [8]:
#obliczenie średnich arytmetycznych dla każdej z kategorii
grouped = ddf.groupby("category")["value"].mean()
print(f"Średnia per kategoria (leniwa): {grouped}")
print(f"Średnia per kategoria (obliczona): {grouped.compute()}")

Średnia per kategoria (leniwa): Dask Series Structure:
npartitions=1
    float64
        ...
Dask Name: getitem, 5 expressions
Expr=((ArrowStringConversion(frame=FromMapProjectable(6389d78))[['value', 'category']]).mean(observed=False, chunk_kwargs={'numeric_only': False}, aggregate_kwargs={'numeric_only': False}, _slice='value'))['value']
Średnia per kategoria (obliczona): category
A    100.003532
B     99.960282
C    100.008521
Name: value, dtype: float64


In [16]:
#wypisanie partycji,  partycja -> najmnijesza jednostka przetwarzania równoległego danych.  - kawałek danych
#na którym system obliczeniowy pracuje niezależnie od innych kawałków..
print(f"liczba partycji: {ddf.partitions}")
print(f"partycje:\n{ddf.map_partitions(len).compute()}")
print(f"rozmiar danych szacowany:\n{ddf.memory_usage(deep=True).compute()}")

liczba partycji: <dask.utils.IndexCallable object at 0x794f632411e0>
partycje:
0    2519592
0    2480408
dtype: int64
rozmiar danych szacowany:
Index            264
id          40000000
value       40000000
category    45000000
dtype: int64


In [18]:
#zmiana rozmiaru max partycji - z czego wynika liczba partycji
ddf = ddf.repartition(partition_size="20MB")

In [19]:
print(f"liczba partycji: {ddf.partitions}")
print(f"partycje:\n{ddf.map_partitions(len).compute()}")
print(f"rozmiar danych szacowany:\n{ddf.memory_usage(deep=True).compute()}")

liczba partycji: <dask.utils.IndexCallable object at 0x794f63258070>
partycje:
0    629898
0    629898
0    629898
0    629898
0    620102
0    620102
0    620102
0    620102
dtype: int64
rozmiar danych szacowany:
Index           1056
id          40000000
value       40000000
category    45000000
dtype: int64
