In [108]:
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import cross_validate, GridSearchCV
from sklearn.metrics import roc_auc_score
import joblib
from dask.distributed import Client, progress
from dask_ml.model_selection import train_test_split
import dask.dataframe as dd
import pandas as pd
import warnings

warnings.filterwarnings("ignore")

In [109]:
client = Client(n_workers=4, threads_per_worker=2, memory_limit='2GB')
client

0,1
Client  Scheduler: tcp://127.0.0.1:58549  Dashboard: http://127.0.0.1:58548/status,Cluster  Workers: 4  Cores: 8  Memory: 7.45 GiB


In [110]:
play_df = dd.read_csv('Google-Playstore.csv') #dtype={'Time': 'float64'}

In [111]:
play_df.shape[0].compute()

1118136

In [112]:
play_df = play_df.drop(['App Name', 'App Id', 'Developer Website', 'Developer Email', 'Privacy Policy', 'Ad Supported', 
                        'Content Rating', 'In App Purchases', 'Released', 'Last Updated', 'Category', 
                        'Developer Id', 'Currency', 'Size', 'Minimum Android', 'Installs'], axis=1)

In [113]:
play_df['Free'] = play_df['Free'].apply(lambda x: 1 if x == True else 0)
play_df['Editors Choice'] = play_df['Editors Choice'].apply(lambda x: 1 if x == True else 0)

In [114]:
play_df.head()

Unnamed: 0,Rating,Rating Count,Minimum Installs,Maximum Installs,Free,Price,Editors Choice
0,3.6,2848.0,100000.0,351560,1,0.0,0
1,4.3,17297.0,1000000.0,2161778,1,0.0,0
2,4.2,488639.0,50000000.0,79304739,1,0.0,0
3,4.2,1224420.0,100000000.0,163660067,1,0.0,0
4,4.2,665.0,50000.0,73463,1,0.0,0


In [115]:
for i in play_df.select_dtypes(include=['O']):
    print(i)

In [116]:
for var in play_df.columns:
    play_df[var] = play_df[var].fillna(play_df[var].mean())

In [117]:
# This is our feature set
X = play_df.drop("Price", axis=1)

# This is our target variable
Y = play_df["Price"]

X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2)

# Since our data can fit into memory
# we persist them to the RAM.
X_train.persist()
X_test.persist()
y_train.persist()
y_test.persist()

Dask Series Structure:
npartitions=6
    float64
        ...
     ...   
        ...
        ...
Name: Price, dtype: float64
Dask Name: split, 6 tasks

In [118]:
lr = LinearRegression()

with joblib.parallel_backend('dask'):
    lr.fit(X_train.compute(), y_train.compute())
    
preds_train = lr.predict(X_train.values.compute())
preds_test = lr.predict(X_test.values.compute())

In [119]:
preds_train

array([-0.01371023, -0.02245598, -0.02008315, ...,  0.03144047,
        0.03144048, -0.02626264])

In [120]:
preds_test

array([-0.00615427, -0.00869984, -0.02186948, ...,  0.03144047,
        0.03144047,  0.03144047])

In [None]:
lr.score(X_train, y_train)

In [None]:
lr.score(X_test, y_test)