## **1. Installing Dependencies**



*   Installs **Dask** (a parallel computing framework) with all optional dependencies.


*   Installs **Dask-ML**, an extension of Dask for scalable machine learning.


*   --quiet suppresses output messages






In [1]:
!pip install dask[complete] dask-ml --quiet

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.8/149.8 kB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m29.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m147.3/147.3 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25h

### **2. Importing Required Libraries**



*   **dask.dataframe:** Works like Pandas but handles large datasets efficiently.


*   **dask.array:** Similar to NumPy but operates on chunks of data in parallel.

*   **Client**: Manages Dask’s distributed execution.
*   **pandas and numpy**: Used for creating initial data.




In [None]:
import dask.dataframe as dd
import dask.array as da
from dask.distributed import Client
import pandas as pd
import numpy as np



### **3. Initializing a Dask Client**



*   Initializes a **Dask Client** in single-machine mode.
*   Allows better monitoring and parallel processing.

*   Prints the client details, such as the number of workers and available resources.








In [None]:
#Initialize Dask Client(Single Machine Mode)
client = Client()
print(client)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 42633 instead
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:34465
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:42633/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:33651'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:33125'
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:43053 name: 0
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:43053
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:42940
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:46057 name: 1
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:46057
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:42944
INFO:distributed.scheduler:Receive client conne

<Client: 'tcp://127.0.0.1:34465' processes=2 threads=2, memory=12.67 GiB>


### **4. Creating a Large Pandas DataFrame**


 Simulates a big dataset with:
*   id: Unique identifiers (0 to 999,999).



*   value: Random numbers between 1 and 99.


*   category: Randomly assigned categories ("A", "B", or "C").






In [None]:
#Create a large pandas dataframe (simulating big data)
n_rows = 1_000_000 # 1 million rows
df = pd.DataFrame({
    "id": np.arange(n_rows),
    "value": np.random.randint(1, 100, n_rows),
    "category": np.random.choice(["A","B","C"],n_rows)
})



## **5. Converting Pandas DataFrame to Dask DataFrame**



*   Converts the large Pandas DataFrame into a **Dask DataFrame.**
*   **Partitions** allow parallel computation (here, into 10 parts).



In [None]:
#convert pandas dataframe to dask datatrame
ddf = dd.from_pandas(df,npartitions=10) #split into 10 partitions

## **6. Exploring the Dask DataFrame**



*  ** head()**: Fetches a small part of the data (triggers computation).


*   **dtypes**: Shows column types (metadata only, no computation).





In [None]:
#Display the first few rows(lazy execution)
print(ddf.head())

#Get data types(metadata only,doesn't load full data)
print(ddf.dtypes)

   id  value category
0   0     94        A
1   1     44        A
2   2     59        C
3   3     60        C
4   4     91        C
id                    int64
value                 int64
category    string[pyarrow]
dtype: object


7. **Filtering** Data (Lazy Execution)
python

*   Filters rows where value > 50.
*   Lazy Execution: Dask only records this operation but doesn't execute it yet.







### 8. **Aggregation (Lazy Execution)**

*   Groups by category and computes the mean of value.
*   Again, execution is **deferred** until .compute() is called.





## 9**. Computing Results (Trigger Execution)**

*   **.compute() **forces execution and converts the **Dask** **DataFrame** back into Pandas.
*  **head()**: Fetches first few rows after filtering.

*  **compute()** on grouped_ddf: Performs the aggregation.





In [None]:
#Display the first rows (lazy execution)
print(ddf.head())

#Get data types(metadata only,doesn't load full data)
print(ddf.dtypes)

#Perform filtering (lazy execution)
filtered_ddf = ddf[ddf["value"] >50]

#Perform Aggregations (lazy execution)
grouped_ddf = ddf.groupby("category")["value"].mean()

#Compute results(Trigger execution)
print(filtered_ddf.compute().head()) #convert to pandas for viewing
print(grouped_ddf.compute()) #perform actual aggregation

   id  value category
0   0     94        A
1   1     44        A
2   2     59        C
3   3     60        C
4   4     91        C
id                    int64
value                 int64
category    string[pyarrow]
dtype: object


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


   id  value category
0   0     94        A
2   2     59        C
3   3     60        C
4   4     91        C
5   5     56        B


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


category
A    49.944410
B    50.036332
C    50.108784
Name: value, dtype: float64


### **10. Creating a Large Dask Array python**

*  Creates a **Dask Array** with **10 million** random numbers (between 0 and 1).
*   Uses **chunks** of 1 million for parallel processing.



## **11. Computing Mean & Sum (Lazy Until .compute() is Called)**

*  ** .mean():** Computes the mean value (lazy until .compute() is called).


*   **.sum():** Computes the sum.

*   **.compute(): **Triggers actual calculation.






In [None]:
#create a large dask array (10 million elements,chunked)
arr = da.random.random(size=(10_000_000,), chunks=(1_000_000,))

# compute mean and sum (lazy until '.compute()' is called)
mean_value = arr.mean().compute()
sum_value = arr.sum().compute()

print("Mean:", mean_value)
print("Sum:", sum_value)

Mean: 0.5001505350535234
Sum: 5001505.3505352335


## **12. Training a Linear Regression Model with Dask-ML**

*   **Imports Dask-ML’s** LinearRegression, which works on large datasets in parallel.



  .Generates synthetic data for regression (10,000 rows, 2 features).

  .Splits into chunks of 1,000 for efficient parallel processing

 .Initializes and trains a linear regression model using Dask-ML.

 .Unlike Scikit-learn, this can scale better with large datasets.


**Makes predictions.**

**.compute()** converts lazy predictions into a NumPy array.

**[:5] prints** the first five predicted values.

In [None]:
from dask_ml.linear_model import LinearRegression

# Create a synthetic dataset
X = da.random.random((10_000,2), chunks=(1_000,2))
y = da.random.random(10_000, chunks=(1_000,))

# Train a scalable Linear Regression model
model = LinearRegression()
model.fit(X,y)

#Predict values
predictions = model.predict(X)
print(predictions.compute()[:5])

[0.50064557 0.50399265 0.5004294  0.49909728 0.50151214]


## **13. Closing the Dask Client**

*   Shuts down the Dask Client to free resources.



In [None]:
client.close()

INFO:distributed.scheduler:Remove client Client-79fe885d-0ae0-11f0-855f-0242ac1c000c
INFO:distributed.core:Received 'close-stream' from tcp://127.0.0.1:33822; closing.
INFO:distributed.scheduler:Remove client Client-79fe885d-0ae0-11f0-855f-0242ac1c000c
INFO:distributed.scheduler:Close client connection: Client-79fe885d-0ae0-11f0-855f-0242ac1c000c
INFO:distributed.scheduler:Retire worker addresses (stimulus_id='retire-workers-1743064522.4764302') (0, 1)
INFO:distributed.nanny:Closing Nanny at 'tcp://127.0.0.1:38997'. Reason: nanny-close
INFO:distributed.nanny:Nanny asking worker to close. Reason: nanny-close
INFO:distributed.nanny:Closing Nanny at 'tcp://127.0.0.1:34217'. Reason: nanny-close
INFO:distributed.nanny:Nanny asking worker to close. Reason: nanny-close
INFO:distributed.core:Received 'close-stream' from tcp://127.0.0.1:33802; closing.
INFO:distributed.scheduler:Remove worker addr: tcp://127.0.0.1:36021 name: 1 (stimulus_id='handle-worker-cleanup-1743064522.4915686')
INFO:distr