In [167]:
# pip install "dask[complete]"

In [168]:
# pip install dask-ml

Starting Dask Client

In [169]:
from dask.distributed import Client 
client = Client() # local Dask Client
print("Dashboard: ", client.dashboard_link)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 59060 instead


Dashboard:  http://127.0.0.1:59060/status


Loading Black Friday Dataset w Dask Dataframe

In [170]:
import dask.dataframe as dd

In [171]:
df = dd.read_csv(r"C:\Users\hp\OneDrive\Desktop\SEM7PRACS\BDA\exp2\blackfriday_train.csv")

In [172]:
df.head()

Unnamed: 0,User_ID,Product_ID,Gender,Age,Occupation,City_Category,Stay_In_Current_City_Years,Marital_Status,Product_Category_1,Product_Category_2,Product_Category_3,Purchase
0,1000001,P00069042,F,0-17,10,A,2,0,3,,,8370
1,1000001,P00248942,F,0-17,10,A,2,0,1,6.0,14.0,15200
2,1000001,P00087842,F,0-17,10,A,2,0,12,,,1422
3,1000001,P00085442,F,0-17,10,A,2,0,12,14.0,,1057
4,1000002,P00285442,M,55+,16,C,4+,0,8,,,7969


Checking for missing values

In [173]:
print(df.isnull().sum().compute())

User_ID                            0
Product_ID                         0
Gender                             0
Age                                0
Occupation                         0
City_Category                      0
Stay_In_Current_City_Years         0
Marital_Status                     0
Product_Category_1                 0
Product_Category_2            173638
Product_Category_3            383247
Purchase                           0
dtype: int64


Filling up missing values

In [174]:
df['Product_Category_2'] = df['Product_Category_2'].fillna(0)
df['Product_Category_3'] = df['Product_Category_3'].fillna(0)

In [175]:
print(df.isnull().sum().compute())

User_ID                       0
Product_ID                    0
Gender                        0
Age                           0
Occupation                    0
City_Category                 0
Stay_In_Current_City_Years    0
Marital_Status                0
Product_Category_1            0
Product_Category_2            0
Product_Category_3            0
Purchase                      0
dtype: int64


Converting categorical columns into categorical type

In [176]:
categorical_cols = ['City_Category', 'Stay_In_Current_City_Years']
df = df.categorize(columns=categorical_cols).persist()

In [177]:
df = df.categorize(columns=['City_Category', 'Stay_In_Current_City_Years']).persist()

Code them into integers (One Hot Encoding)

In [178]:
from dask_ml.preprocessing import OneHotEncoder

In [179]:
encoder = OneHotEncoder(sparse_output=True)
encoded_city = encoder.fit_transform(df[['City_Category']])
encoded_stay = encoder.fit_transform(df[['Stay_In_Current_City_Years']])
age_map = {
    '0-17': 8.5,
    '18-25': 21.5,
    '26-35': 30.5,
    '36-45': 40.5,
    '46-50': 48,
    '51-55': 53,
    '55+': 60
}
df['Age'] = df['Age'].map(age_map)

You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map function that you are using.
  Before: .map(func)
  After:  .map(func, meta=('Age', 'float64'))



Persisting keeps the transformed data in distributed memory for faster later access

In [180]:
df = df.persist()

Train-test (80/20)

In [181]:
from dask_ml.model_selection import train_test_split # works lazily on large datasets

In [182]:
X = dd.concat([df[['User_ID', 'Age', 'Occupation', 'Product_Category_1', 'Product_Category_2', 'Product_Category_3']], 
			   encoded_city, encoded_stay], axis=1) 
y = df['Purchase']

In [183]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, shuffle=True)

In [184]:
print(f"Training set size: {len(X_train)}")
print(f"Test set size: {len(X_test)}")

Training set size: 440339
Test set size: 109729


Converting dataframes to Dask arrays for compatibility with Dask ML models

In [185]:
import dask.array as da

In [186]:
X_train = X_train.persist()
X_test = X_test.persist()
y_train = y_train.persist()
y_test = y_test.persist()
# Converts dataframes to Dask arrays for compatibility with Dask ML models.
X_train = X_train.to_dask_array(lengths=True)
X_test = X_test.to_dask_array(lengths=True)
y_train = y_train.to_dask_array(lengths=True)
y_test = y_test.to_dask_array(lengths=True)

Train Linear Regression model

In [187]:
from dask_ml.linear_model import LinearRegression

In [188]:
model = LinearRegression()
model.fit(X_train, y_train)

0,1,2
,penalty,'l2'
,dual,False
,tol,0.0001
,C,1.0
,fit_intercept,True
,intercept_scaling,1.0
,class_weight,
,random_state,
,solver,'admm'
,max_iter,100


Predictions

In [189]:
y_pred = model.predict(X_test)
print(y_pred.compute())

[ 9251.79979929  8894.09012137 12165.17155586 ...  7819.97630246
  7377.52503267 11058.74274892]


R2 score

In [190]:
from sklearn.metrics import r2_score

In [191]:
score = r2_score(y_test.compute(), y_pred)
print("R² score:", score)

R² score: 0.15039522746744927


In [192]:
print(f"\nDask Dashboard was available at: {client.dashboard_link}")
client.close() # to prevent the port being open forever
print("Dask client closed successfully!")


Dask Dashboard was available at: http://127.0.0.1:59060/status
Dask client closed successfully!
