In [40]:
# https://examples.dask.org/machine-learning.html
# https://ml.dask.org/
# https://medium.com/analytics-vidhya/dask-for-python-and-machine-learning-dbe1356b5d7a
# https://www.manifold.ai/dask-and-machine-learning-preprocessing-tutorial
import pandas as pd
import hvplot.pandas
import dask
import hvplot.dask
import dask.dataframe as dd
import json
import tqdm
from scipy.spatial.distance import cdist
import seaborn as sns
import matplotlib.pyplot as plt
import scipy.stats as stats
import numpy as np
import statsmodels.api as sm
from statsmodels.formula.api import ols
import pingouin as pg
import panel as pn
pn.extension(comms="vscode")

In [41]:
print("Load Data")
DATA_FOLDER = "preflight"
ddf_views = dd.read_csv(f'data_dpg_testdata/{DATA_FOLDER}/reduced_views.csv', dtype={'GEO_ZIPCODE': object,
        'REFR_MEDIUM': object, 
        'IS_LOGGED_IN': object, 
        'file_name': object,
        "privacy_advertising":int,       
        "privacy_analytics":int,       
        "privacy_functional":int,       
        "privacy_geo_location":int,       
        "privacy_marketing":int,       
        "privacy_non-personalised_ads":int,       
        "privacy_personalisation":int,       
        "privacy_social_media":int,       
        "privacy_target_advertising":int,       
       })
ddf_views["DERIVED_TSTAMP"] = dd.to_datetime(ddf_views["DERIVED_TSTAMP"])
ddf_views["hour"] = ddf_views["DERIVED_TSTAMP"].dt.hour
ddf_views["weekday"] = ddf_views["DERIVED_TSTAMP"].dt.weekday
ddf_views["dayofmonth"] = ddf_views["DERIVED_TSTAMP"].dt.day
ddf_views["month"] = ddf_views["DERIVED_TSTAMP"].dt.month
ddf_views = ddf_views.replace("nu.web.advertorial", "nu.web")
device_counts = ddf_views["APP_ID"].value_counts().compute()
sample_amount = device_counts.min()
unique_devices = device_counts.index.unique()
columns_privacy = list(ddf_views.columns[ddf_views.columns.str.startswith("privacy_")].values)
columns_time = ["hour", "weekday", "dayofmonth", "month"]
ddf_views = ddf_views.groupby('APP_ID').apply(lambda df: df.sample(sample_amount), meta=ddf_views.partitions[0]).reset_index(drop=True)
ddf_views.head().T

Load Data


Unnamed: 0,0,1,2,3,4
APP_ID,nu.web,nu.web,nu.web,nu.web,nu.web
ARTICLE_ID,6192559,6210666,6193349,6195684,6213465
DERIVED_TSTAMP,2022-04-01 08:04:31.754000+00:00,2022-07-08 10:27:58.122000+00:00,2022-04-06 07:34:56.660000+00:00,2022-04-17 09:30:34.530000+00:00,2022-07-24 08:04:57.607000+00:00
GEO_CITY,25,218,543,15,625
GEO_COUNTRY,NL,NL,NL,NL,NL
GEO_REGION,UT,UT,ZH,ZH,NB
GEO_ZIPCODE,3584,3446,2645,3014,5106
REFR_MEDIUM,unknown,internal,internal,internal,internal
REFR_URLHOST,3,1,1,1,1
QUASI_USER_ID,2bf8afba500fade30b2ba547792cdbd6,,7ba7802faf4ba90a94afb045e5b0fd3c,693feb4251d02eadde5a81325b521df3,241febda53c76dd55bdf93af98b1e6b7


In [42]:
DATA_FOLDER_ARTICLES = "reduced"
ddf_articles = dd.read_csv(f'data_dpg_testdata/{DATA_FOLDER_ARTICLES}/reduced_articles_df_article.csv', dtype={'sub_section': 'object', 'url': 'object'})
ddf_articles.head().T

Unnamed: 0,0,1,2,3,4
article_id,6178608,6178610,6178739,6178681,6178734
cds_content_id,article-mm-61e525b134c4a700014017f7,article-mm-61e5302d6ae9490001e6d524,article-mm-61e5b36154be560001ebf681,article-mm-61e570a560f2eb0001784c84,article-mm-61e5ac5d3637950001c51671
brands,nu.nl,nu.nl,nu.nl,nu.nl,nu.nl
title,Unilever geeft strijd om overname Aquafresh-ma...,"Had de metro één meter verder gereden, dan was...",Stalker van Kylie Jenner krijgt permanent cont...,Karatsev na bijna 5 uur en 107 onnodige fouten...,Lees alles over de tweede award op rij voor Le...
text,Unilever stelt maandag in een verklaring dat d...,_Dit artikel is afkomstig uit HLN. Elke dag ve...,De man kreeg vorig jaar al een tijdelijk conta...,De krachtmeting op Melbourne Park duurde om pr...,
authors,NU.nl/ANP,HLN/Jeffrey Dujardin en Robby Dierickx,NU.nl/ANP,NU.nl,NU.nl
url,https://nu.nl/economie/6178608/unilever-geeft-...,https://nu.nl/uit-andere-media/6178610/had-de-...,https://nu.nl/achterklap/6178739/stalker-van-k...,https://nu.nl/sport/6178681/karatsev-na-bijna-...,https://nu.nl/sport/6178734/lees-alles-over-de...
main_section,Economie,Uit andere media,Achterklap,Sport,Sport
sub_section,,,,,
num_words,0,0,0,0,0


In [43]:
ddf_userneeds = dd.read_csv(f'data_dpg_testdata/{DATA_FOLDER_ARTICLES}/reduced_articles_df_fixed_set_userneeds.csv', dtype={'sub_section': 'object','url': 'object'})
columns_userneeds = list(ddf_userneeds.columns[ddf_userneeds.columns.str.startswith("userneed_")].values)
ddf_userneeds = ddf_userneeds.assign(argmax_userneeds = ddf_userneeds[columns_userneeds].idxmax(axis=1))
ddf_userneeds.head().T

Unnamed: 0,0,1,2,3,4
article_id,6178608,6178610,6178739,6178681,6178734
userneed_hou_me_op_de_hoogte,0.987202,0.018365,0.891345,0.481953,0.033568
userneed_geef_me_context,0.007797,0.010142,0.025051,0.379622,0.048142
userneed_vermaak_me,0.001776,0.098376,0.049026,0.118641,0.808459
userneed_raak_me_verbind_me,0.00223,0.234295,0.029105,0.010875,0.091164
userneed_help_me,0.000995,0.638822,0.005473,0.008909,0.018666
file_name,0,0,0,0,0
argmax_userneeds,userneed_hou_me_op_de_hoogte,userneed_help_me,userneed_hou_me_op_de_hoogte,userneed_hou_me_op_de_hoogte,userneed_vermaak_me


In [44]:
print("Running Merge")
merge_cols_view = ["ARTICLE_ID", "APP_ID", "QUASI_USER_ID", "IS_LOGGED_IN", "GEO_REGION"]+columns_privacy+columns_time
merge_cols_article = ["article_id", "main_section", "sub_section"]
merge_cols_drop = ['argmax_userneeds', 'file_name']
ddf_merged = ddf_userneeds.drop(merge_cols_drop, axis=1)
ddf_merged = ddf_merged.merge(ddf_views[merge_cols_view], how="inner", left_on='article_id', right_on='ARTICLE_ID')
ddf_merged = ddf_merged.merge(ddf_articles[merge_cols_article], how="inner", left_on='article_id', right_on='article_id')

ddf_merged.head().T

Running Merge


Unnamed: 0,0,1,2,3,4
article_id,6178608.0,6178610.0,6178739.0,6178681.0,6178734.0
userneed_hou_me_op_de_hoogte,0.9872023,0.01836487,0.8913448,0.4819525,0.03356804
userneed_geef_me_context,0.007797464,0.01014207,0.02505052,0.3796216,0.04814172
userneed_vermaak_me,0.001775515,0.09837589,0.0490263,0.1186413,0.8084594
userneed_raak_me_verbind_me,0.002229572,0.2342948,0.02910537,0.01087536,0.09116449
userneed_help_me,0.0009951068,0.6388225,0.005472925,0.008909226,0.01866633


In [45]:
ddf_merged.head().T

Unnamed: 0,0,1,2,3,4
article_id,6178608.0,6178610.0,6178739.0,6178681.0,6178734.0
userneed_hou_me_op_de_hoogte,0.9872023,0.01836487,0.8913448,0.4819525,0.03356804
userneed_geef_me_context,0.007797464,0.01014207,0.02505052,0.3796216,0.04814172
userneed_vermaak_me,0.001775515,0.09837589,0.0490263,0.1186413,0.8084594
userneed_raak_me_verbind_me,0.002229572,0.2342948,0.02910537,0.01087536,0.09116449
userneed_help_me,0.0009951068,0.6388225,0.005472925,0.008909226,0.01866633


In [46]:
from dask_ml.compose import ColumnTransformer
from dask_ml.preprocessing import Categorizer, DummyEncoder, StandardScaler
from sklearn.pipeline import make_pipeline
from dask_ml.cluster import KMeans
from dask_ml.decomposition import PCA, IncrementalPCA

col_categoricals = ["GEO_REGION", "APP_ID", "main_section", "sub_section", "IS_LOGGED_IN"]
col_numericals = columns_userneeds + columns_time 
col_passed = columns_privacy 
col_dropped = ["ARTICLE_ID", "article_id", "QUASI_USER_ID"] 
prep_categoricals = make_pipeline(Categorizer(), DummyEncoder(drop_first=True), verbose=True)
prep_numericals = StandardScaler()

dim_reducer = IncrementalPCA(n_components=10)


col_transformer = ColumnTransformer([
    ("cat", prep_categoricals, col_categoricals),
    ("num", prep_numericals, col_numericals),
    ("dropped", 'drop', col_dropped),
    ("unchanged", 'passthrough', col_passed),
], n_jobs=4, preserve_dataframe=True, remainder='drop')
# col_transformer_1 = ColumnTransformer([
#     ("cat", prep_categoricals, col_categoricals),
#     # ("num", prep_numericals, col_numericals),
#     # ("dropped", 'drop', col_dropped),
#     # ("unchanged", 'passthrough', col_passed),
# ], n_jobs=4, preserve_dataframe=True, remainder='passthrough')
# col_transformer_2 = ColumnTransformer([
#     # ("cat", prep_categoricals, col_categoricals),
#     ("num", prep_numericals, col_numericals),
#     # ("dropped", 'drop', col_dropped),
#     # ("unchanged", 'passthrough', col_passed),
# ], n_jobs=4, preserve_dataframe=True, remainder='passthrough')
# col_transformer_3 = ColumnTransformer([
#     # ("cat", prep_categoricals, col_categoricals),
#     # ("num", prep_numericals, col_numericals),
#     # ("dropped", 'drop', col_dropped),
#     ("unchanged", 'passthrough', col_passed),
# ], n_jobs=4, preserve_dataframe=True, remainder='passthrough')
# estimators = [('reduce_dim', PCA(3)), ('cluster', KMeans())]



# pipe = make_pipeline(
#     col_transformer, 
#     # col_transformer_1, 
#     # col_transformer_2, 
#     # col_transformer_3, 
#     IncrementalPCA(n_components=10, svd_solver='auto'), 
#     # KMeans(),
#     verbose=True,
# )
# pipe = pipe.fit(ddf_merged)
# pipe

In [47]:
print("Running Clustering")
print("Step Transform")
ddf_transformed = col_transformer.fit_transform(ddf_merged).dropna().to_dask_array(lengths=True).rechunk()
ddf_transformed

Running Clustering
Step Transform


ValueError: A given column is not a column of the dataframe

In [None]:
# xxx = np.isnan(ddf_transformed).sum(axis=0).compute()
# xxx, xxx.shape

In [None]:
# ddf_transformed.visualize()

In [None]:
print("Step Dimension Reduction")
ddf_reduced = dim_reducer.fit_transform(ddf_transformed) #.compute_chunk_sizes() #.compute_chunksizes(lengths=True)
ddf_reduced


In [None]:
print("Step Clustering")
cluster_alg = KMeans().fit(ddf_transformed.rechunk({1: ddf_transformed.shape[1]}))
print("Done")
cluster_alg

In [None]:
ddf_merged.to_dask_array(lengths=True).compute()

In [None]:
ddf_merged