# Klasterizavimas naudojant KMeans modelį

Šiame pavyzdyje naudosime suagreguotus ir su churn stulepiu apjungtus duomenis, kuriuose gavosi įvykdę `simple_aggregation_join.ipynb` pavyzdį

In [6]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [7]:
import os
import operator

import jsonlines
import pandas as pd
import pyspark
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.clustering import KMeans, GaussianMixture

In [8]:
spark = (
    pyspark.sql.SparkSession
    .builder
    .appName("Python Spark K-means minimal example")
    .enableHiveSupport()
    .getOrCreate()
)

Sukuriame įvesties `parquet` DataFrame kintamąjį ir priskiriame jam reikšmę (kai rašysime pyspark job'ą, šių kintamųjų reikšmes perduosime per komandinės eilutės parametrus)

In [21]:
path_aggregated_df = "../data/output/sample_aggregated_usage_with_churn.csv"

In [31]:
clustering_df = spark.read.csv(path_aggregated_df, header=True, inferSchema=True, sep = ',')

clustering_df

DataFrame[user_account_id: int, user_lifetime: double, user_no_outgoing_activity_in_days: double, user_account_balance_last: double, user_spendings: double, reloads_inactive_days: double, reloads_count: double, reloads_sum: double, calls_outgoing_count: double, calls_outgoing_spendings: double, calls_outgoing_duration: double, calls_outgoing_spendings_max: double, calls_outgoing_duration_max: double, calls_outgoing_inactive_days: double, calls_outgoing_to_onnet_count: double, calls_outgoing_to_onnet_spendings: double, calls_outgoing_to_onnet_duration: double, calls_outgoing_to_onnet_inactive_days: double, calls_outgoing_to_offnet_count: double, calls_outgoing_to_offnet_spendings: double, calls_outgoing_to_offnet_duration: double, calls_outgoing_to_offnet_inactive_days: double, calls_outgoing_to_abroad_count: double, calls_outgoing_to_abroad_spendings: double, calls_outgoing_to_abroad_duration: double, calls_outgoing_to_abroad_inactive_days: double, sms_outgoing_count: double, sms_outgo

Sukuriame VectorAssembler objektą, kuriuo apjungsime stuplelių reikšmes. Klasterizavimo požymių reikšmes šiuo atveju saugome atskirame faile (`customer_churn` paketo `config_columns.py` faile), iš kurio jas ir importavome

In [32]:
columns_clustering_features

['user_lifetime',
 'user_no_outgoing_activity_in_days',
 'user_account_balance_last',
 'user_spendings',
 'reloads_inactive_days',
 'reloads_count',
 'reloads_sum',
 'calls_outgoing_count',
 'calls_outgoing_spendings',
 'calls_outgoing_duration',
 'calls_outgoing_spendings_max',
 'calls_outgoing_duration_max',
 'calls_outgoing_inactive_days',
 'calls_outgoing_to_onnet_count',
 'calls_outgoing_to_onnet_spendings',
 'calls_outgoing_to_onnet_duration',
 'calls_outgoing_to_onnet_inactive_days',
 'calls_outgoing_to_offnet_count',
 'calls_outgoing_to_offnet_spendings',
 'calls_outgoing_to_offnet_duration',
 'calls_outgoing_to_offnet_inactive_days',
 'calls_outgoing_to_abroad_count',
 'calls_outgoing_to_abroad_spendings',
 'calls_outgoing_to_abroad_duration',
 'calls_outgoing_to_abroad_inactive_days',
 'sms_outgoing_count',
 'sms_outgoing_spendings',
 'sms_outgoing_spendings_max',
 'sms_outgoing_inactive_days',
 'sms_outgoing_to_onnet_count',
 'sms_outgoing_to_onnet_spendings',
 'sms_outgoing

galime kintąjį aprašyti ir čia (notebook'e arba pyspark job'o parametrų JSON faile):

In [33]:
columns_clustering_features = [
    'user_lifetime',
    'user_no_outgoing_activity_in_days',
    'user_account_balance_last',
    'user_spendings',
    'reloads_inactive_days',
    'reloads_count',
    'reloads_sum',
    'calls_outgoing_count',
    'calls_outgoing_spendings',
    'calls_outgoing_duration',
    'calls_outgoing_spendings_max',
    'calls_outgoing_duration_max',
    'calls_outgoing_inactive_days',
    'calls_outgoing_to_onnet_count',
    'calls_outgoing_to_onnet_spendings',
    'calls_outgoing_to_onnet_duration',
    'calls_outgoing_to_onnet_inactive_days',
    'calls_outgoing_to_offnet_count',
    'calls_outgoing_to_offnet_spendings',
    'calls_outgoing_to_offnet_duration',
    'calls_outgoing_to_offnet_inactive_days',
    'calls_outgoing_to_abroad_count',
    'calls_outgoing_to_abroad_spendings',
    'calls_outgoing_to_abroad_duration',
    'calls_outgoing_to_abroad_inactive_days',
    'sms_outgoing_count',
    'sms_outgoing_spendings',
    'sms_outgoing_spendings_max',
    'sms_outgoing_inactive_days',
    'sms_outgoing_to_onnet_count',
    'sms_outgoing_to_onnet_spendings',
    'sms_outgoing_to_onnet_inactive_days',
    'sms_outgoing_to_offnet_count',
    'sms_outgoing_to_offnet_spendings',
    'sms_outgoing_to_offnet_inactive_days',
    'sms_outgoing_to_abroad_count',
    'sms_outgoing_to_abroad_spendings',
    'sms_outgoing_to_abroad_inactive_days',
    'sms_incoming_count',
    'sms_incoming_spendings',
    'sms_incoming_from_abroad_count',
    'sms_incoming_from_abroad_spendings',
    'gprs_session_count',
    'gprs_usage',
    'gprs_spendings',
    'gprs_inactive_days',
    'last_100_reloads_count',
    'last_100_reloads_sum',
    'last_100_calls_outgoing_duration',
    'last_100_calls_outgoing_to_onnet_duration',
    'last_100_calls_outgoing_to_offnet_duration',
    'last_100_calls_outgoing_to_abroad_duration',
    'last_100_sms_outgoing_count',
    'last_100_sms_outgoing_to_onnet_count',
    'last_100_sms_outgoing_to_offnet_count',
    'last_100_sms_outgoing_to_abroad_count',
    'last_100_gprs_usage',
    'n_months'
]

In [34]:
vector_assembler = VectorAssembler(
    inputCols=columns_clustering_features, 
    outputCol="initial_features")

In [35]:
standard_scaler = StandardScaler(
    inputCol="initial_features", 
    outputCol="features", 
    withStd=True, 
    withMean=True)

In [36]:
vectorized_df = vector_assembler.transform(clustering_df)
model_scaler = standard_scaler.fit(vectorized_df)
featurized_clustering_df = model_scaler.transform(vectorized_df)

In [37]:
featurization_pipeline = Pipeline(stages=[vector_assembler, standard_scaler])

In [38]:
featurization_pipeline_model = featurization_pipeline.fit(clustering_df)

In [39]:
model_scaler = featurization_pipeline_model.stages[-1]

In [40]:
featurized_clustering_df = featurization_pipeline_model.transform(clustering_df)

## Vieno KMeans modelio apmokymas

Sukuriame klasterių skaičiaus $k$ kintamąjį (rašydami pyspark job'ą, kuris apmoko tik vieną modelį, $k$ reikšmę perduosime per komandinės eilutės parametrus)

In [41]:
k = 5

In [42]:
kmeans = KMeans(featuresCol="features", k=k)

In [43]:
model_kmeans = kmeans.fit(featurized_clustering_df)

### SSE metrikos apskaičiavimas ir saugojimas

Sukuriame kintamąjį kelio su kelio reikšme į failą, kuriame saugosime $SSE$ metrikas (pyspark job'uose šią reikšmę perduosime per komandinės eilutės parametrus):

In [44]:
path_metrics_kmeans_sse = "../data/metrics_kmeans_see.jsonl"

#### kai $K=1$

Apache Spark KMeans modelių apmokymui galime naudoti tik $K > $ reikšmes, tačiau norėdami apskaičiuoti $f(K)$ metrikos reikšmę, turime žinoti $SSE$ metrikos reikšmę su $K = 1$. Klasteris yra tik vienas, todėl jo centras koordinatės yra kiekvieno požymio vidurkis. Kadangi klasterizavimui naudojame normalizuotis duomenis, kiekvieno kintamamojo vidurkis yra $0$, todėl klasterio centro koordinates žinome. Belieka rankiniu būdu paskaičiuoti $SSE$ metriką. Kadangi centro koordinatės nulinės, stebėjimų reikšmės jau yra atstumų nuo klasterio centro reikšmės kiekvienai koordinatei (požymiui). Norėdami apskaičiuoti $SSE$ metriką, sudėsime kiekvieno iš stebėjimų vektroriaus $\mathbf{x}$ Euklido normų (atstumų nuo centro) kvadratus $\|\mathbf{x}\|_2 = \sqrt{x_1^2 + x_2^2 + ... + x_n^2}$, kur $n$ yra klasterizavimui naudojamų požymių skaičius.

Jeigu `x` yra vienas stebėjimas su normalizuotomis koordinatėmis

In [50]:
x = featurized_clustering_df.select("features").first()[0]
x

DenseVector([-0.3639, -0.2462, 0.9187, 0.7733, -0.6784, 0.353, 1.2461, 0.4085, 0.8686, 1.2229, 0.0133, 0.4312, -0.4108, -0.2249, -0.195, -0.1952, -0.4108, 0.2785, 1.2261, 0.9776, -0.4108, -0.2979, -0.1989, -0.1962, -0.4108, 0.2991, 0.2758, 1.0247, -0.7056, 0.5964, 0.43, -0.7056, 0.4319, 0.4482, -0.7056, -0.2253, -0.2305, -0.7056, -0.2559, -0.1327, -0.3269, -0.1335, -0.0839, -0.0663, -0.0839, 0.3094, 0.5615, 1.6495, 1.7619, -0.1807, 1.4101, -0.2215, 0.3433, 0.8388, 0.4591, -0.228, -0.0719, 0.3941])

jo atstumo nuo centro kvadratas apskaičiuojamas

In [20]:
x.norm(2) ** 2

13.261252893202956

Tuomet $SSE$ reikmšmė normalizuotiems požymiams apskaičiuojame naudodami `pyspark.rdd` API:

In [21]:
sse_k_1 = (
    featurized_clustering_df.select("features").rdd
    .map(operator.itemgetter(0))
    .map(lambda x: x.norm(2) ** 2)
    .sum()
)

In [22]:
sse_k_1

294639.99999999988

išsaugome metrikos reikšmę į failą

In [23]:
with jsonlines.open(path_metrics_kmeans_sse, "w") as f:
    f.write({"k": 1, "sse": sse_k_1})

In [24]:
! cat ../data/metrics_kmeans_see.jsonl

{"k": 1, "sse": 294639.9999999999}


Jeigu $k = 1$, galime laikyti, kad duomenys jau suklasterizuoti priklauso vienam klasteriui ir apmokyti modeliu bei atlikti klasterių priskyrimo stebėjimams nereikia

**Pastaba** jeigu tik galime, visada naudojame `pyspark.dataframe`, o ne `pyspark.rdd` API

## kai $K > 1$

metriką skaičiuojame su apmokytu modeliu

In [25]:
sse = model_kmeans.computeCost(featurized_clustering_df)
sse

193611.6796785052

Išsaugojame metrikos $SSE$ reikšmę `jsonlines` formatu prijungdami ją prie failo. Atidarome failą `append` režimu ("a" parinktis), kad saugodami į failą prijungtume naujas eilutes neištrynę jo buvusio turinio:

In [26]:
metrics_row = {"k": k, "sse": sse}

with jsonlines.open(path_metrics_kmeans_sse, "a") as f:
    f.write(metrics_row)

Iš šio Jupyter Notebok galime pažiūrėti failo turinį:

In [27]:
! cat ../data/metrics_kmeans_see.jsonl

{"k": 1, "sse": 294639.9999999999}
{"k": 5, "sse": 193611.6796785052}


Tarkime, kad $K=5$ ir yra mūsų geriausias klasterių skaičius. Labai tikėtina, kad taip iš tiesų nėra, ir savo geriausią $K$ turėsite iš klasterizavimo metrikų $SSE$ ir $f(K)$ pagal $K$ grafikų.

Kai turime geriausią $K$ reišmę, mums reikia:
- išsaugoti klasterių atstatytus klasterių centrus interpretavimui
- išsaugoti duomenis su priskirtomis klasterių reikšmėmis, kadangi turėsime sudaryti po atskirą `churn` reikšmės prognozavimo modelį kiekvienam klasteriui

### Klasterio centrų atstatymas ir saugojimas

Atstatome klasterių centrus ir paskaičiuojame kiek procentaliai stebėjimų priklauso kiekvienam klasteriui

In [28]:
normalized_cluster_centers = model_kmeans.clusterCenters()

normalized_cluster_centers

[array([-0.42882882, -0.21206692,  0.11985678, -0.05975954, -0.17973427,
        -0.09582999, -0.05220308, -0.00440011, -0.06316899, -0.05318186,
         0.03404044,  0.08995054, -0.39293021, -0.0363793 , -0.05072465,
        -0.0535321 , -0.39293021, -0.00155153, -0.02882553, -0.03985544,
        -0.39293021, -0.0253303 , -0.05448482, -0.06072522, -0.39293021,
        -0.02102862, -0.02086909,  0.10215744, -0.64351189,  0.02532133,
         0.0409934 , -0.64351189, -0.00988745, -0.00893891, -0.64351189,
        -0.0198785 , -0.02502434, -0.64351189,  0.15117126, -0.00464374,
         0.08873809,  0.01437177, -0.02381759, -0.02713019, -0.01379998,
        -0.08471177, -0.05019147, -0.04247959, -0.04599262, -0.05537415,
        -0.03691763, -0.05680315, -0.01597915,  0.03828162, -0.00361144,
        -0.01752814, -0.02813   ,  0.31538293]),
 array([ 0.2556159 ,  0.02085874, -0.15395238, -0.46940366,  0.94410916,
        -0.50824321, -0.47449542, -0.43821724, -0.39006125, -0.43278304,
  

In [29]:
scaler_mean = model_scaler.mean
scaler_std = model_scaler.std

In [30]:
scaler_mean

DenseVector([3856.6874, 40.8785, 9.6406, 6.7465, 358.841, 0.8947, 7.1885, 34.4963, 4.6283, 31.0982, 0.88, 4.7224, 175.2331, 0.4517, 0.1916, 0.5683, 175.2331, 14.9531, 2.6703, 19.4404, 175.2331, 2.1128, 0.3899, 1.6548, 175.2331, 34.3214, 1.8845, 0.091, 403.8553, 2.3697, 0.1063, 403.8553, 21.0217, 1.2315, 403.8553, 1.617, 0.1901, 403.8553, 8.9411, 0.1187, 1.3225, 0.0148, 10.7202, 3.7864, 0.2337, 1209.5396, 3.5641, 22.2013, 98.0725, 1.4236, 63.7401, 4.1846, 102.4544, 7.0349, 62.7356, 3.9729, 7.6214, 2.7321])

In [31]:
scaler_std

DenseVector([5807.433, 160.4974, 15.5259, 12.3291, 511.4737, 1.2192, 12.2094, 55.636, 9.5226, 56.9104, 1.5903, 6.1234, 412.4304, 2.0705, 1.0505, 3.1572, 412.4304, 30.3168, 5.3543, 42.11, 412.4304, 8.1544, 2.0716, 8.0812, 412.4304, 82.1266, 4.836, 0.2378, 567.7779, 9.4627, 0.4459, 567.7779, 59.6546, 3.5112, 567.7779, 6.9409, 1.0966, 567.7779, 10.5023, 0.9331, 3.6193, 0.1316, 123.7969, 54.0219, 2.4874, 319.3671, 5.987, 41.0582, 177.6011, 7.6357, 131.5162, 21.4524, 245.7946, 26.2908, 172.7262, 16.1295, 98.0067, 0.6726])

In [32]:
cluster_sizes = model_kmeans.summary.clusterSizes
cluster_sizes

[3033, 940, 668, 438, 2]

In [33]:
n_obs = clustering_df.count()
n_obs

5081

In [34]:
denormalized_cluster_centers = [
    (cluster_id,) + (size, 100 * size / n_obs) + tuple(center * scaler_std + scaler_mean)
    for cluster_id, (size, center) in 
    enumerate(zip(cluster_sizes, normalized_cluster_centers))
]

denormalized_cluster_centers

[(0,
  3033,
  59.692973824050384,
  1366.292779426311,
  6.8422903615782005,
  11.501447961314431,
  6.0097027145840203,
  266.91169359270248,
  0.77783272887130483,
  6.5511451807890975,
  34.251456203978456,
  4.0267507418397628,
  28.071616661171554,
  0.93416144631278164,
  5.2731772722277173,
  13.176722716781825,
  0.3763600395647877,
  0.13833882844268566,
  0.39930651719969068,
  13.176722716781825,
  14.906088581162765,
  2.5160067040334102,
  17.762071656225956,
  13.176722716781825,
  1.9062534344433475,
  0.27706451258379983,
  1.1640213210242882,
  13.176722716781825,
  32.594351027585446,
  1.7835646774370808,
  0.11529453786130346,
  38.483459720848714,
  2.6093526761182568,
  0.12456643587207386,
  38.483459720848714,
  20.431915595120344,
  1.2000956149027364,
  38.483459720848714,
  1.4790636333663048,
  0.16262666227057945,
  38.483459720848714,
  10.528739421914496,
  0.11433124519177934,
  1.643642158478954,
  0.016738103088251487,
  7.7716232553027957,
  2.320801

sukuriame klasterio centrų Pandas Dataframe

In [35]:
cluster_centers_pddf = pd.DataFrame.from_records(denormalized_cluster_centers)
cluster_centers_pddf.columns = (
    ["cluster_id", "cluster_size", "cluster_size_pct"] + 
    columns_clustering_features
)

Norėdami atvaizduoti visus stulpelius šiame Jupyter Notebook, pakeičiame maksimalaus stulpelių skaičiaus parinktį

In [36]:
pd.set_option("max_columns", 999)

In [37]:
cluster_centers_pddf

Unnamed: 0,cluster_id,cluster_size,cluster_size_pct,user_lifetime,user_no_outgoing_activity_in_days,user_account_balance_last,user_spendings,reloads_inactive_days,reloads_count,reloads_sum,calls_outgoing_count,calls_outgoing_spendings,calls_outgoing_duration,calls_outgoing_spendings_max,calls_outgoing_duration_max,calls_outgoing_inactive_days,calls_outgoing_to_onnet_count,calls_outgoing_to_onnet_spendings,calls_outgoing_to_onnet_duration,calls_outgoing_to_onnet_inactive_days,calls_outgoing_to_offnet_count,calls_outgoing_to_offnet_spendings,calls_outgoing_to_offnet_duration,calls_outgoing_to_offnet_inactive_days,calls_outgoing_to_abroad_count,calls_outgoing_to_abroad_spendings,calls_outgoing_to_abroad_duration,calls_outgoing_to_abroad_inactive_days,sms_outgoing_count,sms_outgoing_spendings,sms_outgoing_spendings_max,sms_outgoing_inactive_days,sms_outgoing_to_onnet_count,sms_outgoing_to_onnet_spendings,sms_outgoing_to_onnet_inactive_days,sms_outgoing_to_offnet_count,sms_outgoing_to_offnet_spendings,sms_outgoing_to_offnet_inactive_days,sms_outgoing_to_abroad_count,sms_outgoing_to_abroad_spendings,sms_outgoing_to_abroad_inactive_days,sms_incoming_count,sms_incoming_spendings,sms_incoming_from_abroad_count,sms_incoming_from_abroad_spendings,gprs_session_count,gprs_usage,gprs_spendings,gprs_inactive_days,last_100_reloads_count,last_100_reloads_sum,last_100_calls_outgoing_duration,last_100_calls_outgoing_to_onnet_duration,last_100_calls_outgoing_to_offnet_duration,last_100_calls_outgoing_to_abroad_duration,last_100_sms_outgoing_count,last_100_sms_outgoing_to_onnet_count,last_100_sms_outgoing_to_offnet_count,last_100_sms_outgoing_to_abroad_count,last_100_gprs_usage,n_months
0,0,3033,59.692974,1366.292779,6.84229,11.501448,6.009703,266.911694,0.777833,6.551145,34.251456,4.026751,28.071617,0.934161,5.273177,13.176723,0.37636,0.138339,0.399307,13.176723,14.906089,2.516007,17.762072,13.176723,1.906253,0.277065,1.164021,13.176723,32.594351,1.783565,0.115295,38.48346,2.609353,0.124566,38.48346,20.431916,1.200096,38.48346,1.479064,0.162627,38.48346,10.528739,0.114331,1.643642,0.0167381,7.771623,2.320802,0.199389,1182.485438,3.2636,20.45716,89.90421,1.000824,58.884788,2.965995,98.526816,8.041323,62.111825,3.690186,4.864428,2.94428
1,1,940,18.500295,5341.159574,44.226241,7.250312,0.959149,841.728014,0.275,1.39522,10.115603,0.913874,6.468356,0.265518,1.613475,51.564362,0.05461,0.019316,0.056777,51.564362,3.4,0.546278,3.902761,51.564362,0.347518,0.031752,0.121319,51.564362,0.902305,0.043626,0.003438,1132.912589,0.059929,0.003202,1132.912589,0.56844,0.030787,1132.912589,0.052128,0.004468,1132.912589,3.024468,0.010493,0.43156,0.004198582,0.090071,0.011106,0.001649,1294.790426,0.682979,3.458539,20.11491,0.224798,12.673876,0.47633,2.834929,0.091135,2.066312,0.10922,0.149,2.734043
2,2,668,13.147018,14951.564621,215.641717,0.898802,0.203618,322.618014,0.741766,0.626931,0.884481,0.116951,0.736831,0.045155,0.228268,1196.275948,0.001996,0.000609,0.002096,1196.275948,0.289421,0.056023,0.373214,1196.275948,0.017964,0.001243,0.009316,1196.275948,0.658683,0.04268,0.004197,1278.345559,0.032435,0.001108,1278.345559,0.423154,0.023802,1278.345559,0.00499,0.000908,1278.345559,0.544411,0.002994,0.040419,-2.0816680000000002e-17,0.706088,0.178932,0.043987,1323.700848,0.831836,0.678825,0.711397,0.002096,0.364027,0.008548,0.682136,0.039421,0.435629,0.003992,0.171717,1.621257
3,3,438,8.62035,1003.409817,3.032344,13.323447,33.068269,15.863775,3.250381,33.264323,139.189498,22.748352,149.507074,2.896377,14.217766,6.413623,2.478311,1.187641,3.594087,6.413623,62.216134,12.246982,93.217527,6.413623,10.402207,2.428135,10.626256,6.413623,169.082953,9.271332,0.239197,37.429604,9.219559,0.36121,37.429604,100.436454,5.868562,37.429604,8.208143,1.008368,37.429604,23.165906,0.557405,2.898021,0.04694064,60.1914,23.781511,1.0486,1045.316971,15.884323,103.595085,464.854867,8.617572,302.666617,25.914338,497.729452,25.563546,292.226408,19.694825,41.301476,2.952055
4,4,2,0.039362,2017.0,0.666667,424.221667,264.973333,23.333333,4.5,177.568333,163.166667,201.121667,406.138333,44.873333,52.145,1.166667,7.666667,7.658333,23.768333,1.166667,63.333333,10.941667,78.523333,1.166667,29.333333,23.361667,51.378333,1.166667,90.833333,17.586667,0.933333,0.833333,5.166667,0.14,0.833333,16.666667,1.0,0.833333,41.333333,13.008333,0.833333,71.333333,0.08,16.166667,0.08,1988.833333,826.788333,46.268333,4.166667,27.833333,839.540333,1318.741667,105.418333,270.53,230.973333,306.5,23.0,73.0,131.166667,2812.743333,3.0


Sukuriame kintamąjį kelio su kelio reikšme į failą, kuriame saugosime atstatytus klasterių centrus (pyspark job'uose šią reikšmę perduosime per komandinės eilutės parametrus):

In [38]:
path_cluster_centers = "../data/cluster_centers_kmeans__k_{}.csv".format(k)
path_cluster_centers

'../data/cluster_centers_kmeans__k_5.csv'

Išsaugojame klasterių centrus `csv` formatu

In [39]:
cluster_centers_pddf.to_csv(path_cluster_centers, index=False)

Šiame Jupyter Notebok galime išvesti failo turinį

In [40]:
! cat ../data/cluster_centers_kmeans__k_5.csv

cluster_id,cluster_size,cluster_size_pct,user_lifetime,user_no_outgoing_activity_in_days,user_account_balance_last,user_spendings,reloads_inactive_days,reloads_count,reloads_sum,calls_outgoing_count,calls_outgoing_spendings,calls_outgoing_duration,calls_outgoing_spendings_max,calls_outgoing_duration_max,calls_outgoing_inactive_days,calls_outgoing_to_onnet_count,calls_outgoing_to_onnet_spendings,calls_outgoing_to_onnet_duration,calls_outgoing_to_onnet_inactive_days,calls_outgoing_to_offnet_count,calls_outgoing_to_offnet_spendings,calls_outgoing_to_offnet_duration,calls_outgoing_to_offnet_inactive_days,calls_outgoing_to_abroad_count,calls_outgoing_to_abroad_spendings,calls_outgoing_to_abroad_duration,calls_outgoing_to_abroad_inactive_days,sms_outgoing_count,sms_outgoing_spendings,sms_outgoing_spendings_max,sms_outgoing_inactive_days,sms_outgoing_to_onnet_count,sms_outgoing_to_onnet_spendings,sms_outgoing_to_onnet_inactive_days,sms_outgoing_to_offnet_count,sms_outgoing_to_offnet_spendings

### Duomenų su klasterizavimo rezultatais saugojimas

Naudojame apmokytą modelį atliekame klasterių priskyrimą stebėjimams

In [41]:
clustered_kmeans_df = model_kmeans.transform(featurized_clustering_df)

clustered_kmeans_df

DataFrame[user_account_id: int, user_lifetime: double, user_no_outgoing_activity_in_days: double, user_account_balance_last: double, user_spendings: double, reloads_inactive_days: double, reloads_count: double, reloads_sum: double, calls_outgoing_count: double, calls_outgoing_spendings: double, calls_outgoing_duration: double, calls_outgoing_spendings_max: double, calls_outgoing_duration_max: double, calls_outgoing_inactive_days: double, calls_outgoing_to_onnet_count: double, calls_outgoing_to_onnet_spendings: double, calls_outgoing_to_onnet_duration: double, calls_outgoing_to_onnet_inactive_days: double, calls_outgoing_to_offnet_count: double, calls_outgoing_to_offnet_spendings: double, calls_outgoing_to_offnet_duration: double, calls_outgoing_to_offnet_inactive_days: double, calls_outgoing_to_abroad_count: double, calls_outgoing_to_abroad_spendings: double, calls_outgoing_to_abroad_duration: double, calls_outgoing_to_abroad_inactive_days: double, sms_outgoing_count: double, sms_outgo

`prediction` stulpelis mums bus reikalingas norint apmokyti skirtingus modelius kiekvienam klasteriui. Išskaidyti šį DataFrame į `training`, `validation` ir `testing` imtis visam duomenų rinkiniui ir kiekvienam klasteriui atskirai galima panaudojant pyspark job'ą `churn_predicition/pyspark_jobs/split_for_classification.py`


Šiam job'ui reikės nurodyti kelią į DataFrame su priskirtomis klasterių reikšmėmis, todėl DataFrame išsaugojame.

Matome, kad po `kmeans_model.transform` iškvietimo prie DataFrame prisidėjo papildomas stupelis `prediction`. Stulpelyje `prediction` kiekvienam stebėjimui dabar saugomos jo klasterio, gauto apmokytu KMeans modeliu su $k=5$, reikšmės, indeksuojamos nuo `0` iki `4`:

In [42]:
clustered_kmeans_df.select(clustered_kmeans_df["prediction"]).limit(10).show()

+----------+
|prediction|
+----------+
|         0|
|         0|
|         2|
|         4|
|         0|
|         0|
|         0|
|         0|
|         1|
|         0|
+----------+



In [43]:
clustered_kmeans_df.select(pyspark.sql.functions.max("prediction")).first()

Row(max(prediction)=4)

In [44]:
clustered_kmeans_df.select(pyspark.sql.functions.max("prediction")).first()[0]

4

In [45]:
clustered_kmeans_df.select(pyspark.sql.functions.min("prediction")).first()[0]

0

Sukuriame kintamąjį kelio su kelio reikšme į failą, kuriame saugomise duomenis su klasterizavimo rezultatais (pyspark job'uose šią reikšmę perduosime per komandinės eilutės parametrus):

In [46]:
path_clustered_df = "../data/clustered_kmeans__k_{}_parquet".format(k)
path_clustered_df

'../data/clustered_kmeans__k_5_parquet'

Išsaugojame duomenis `parquet` formatu

In [47]:
clustered_kmeans_df.write.parquet(path_clustered_df)

In [48]:
! tree  ../data/clustered_kmeans__k_5_parquet

[01;34m../data/clustered_kmeans__k_5_parquet[00m
├── part-00000-9f47f601-c515-4724-8869-077dc314fd10-c000.snappy.parquet
├── part-00001-9f47f601-c515-4724-8869-077dc314fd10-c000.snappy.parquet
├── part-00002-9f47f601-c515-4724-8869-077dc314fd10-c000.snappy.parquet
├── part-00003-9f47f601-c515-4724-8869-077dc314fd10-c000.snappy.parquet
├── part-00004-9f47f601-c515-4724-8869-077dc314fd10-c000.snappy.parquet
├── part-00005-9f47f601-c515-4724-8869-077dc314fd10-c000.snappy.parquet
├── part-00006-9f47f601-c515-4724-8869-077dc314fd10-c000.snappy.parquet
└── _SUCCESS

0 directories, 8 files


---

**Užduotis**: iš čia naudoto kodo padaryti Apache Spark skriptą kuris nuskaito agreguotus `customer_usage` ir su `customer_churn` apjungtus duomenis ir priima vieną parametrą - kelią į JSON failą, kuriame pateikiame klasterių skaičiaus $K$ reikšmes, išvesties keliai ir/ar kiti reikalingi parameterai - ir apmoko K-vidurkių modelius su nurodytomis $K$ reikšmėmis, įvertina modelių $SSE$ metrikas ir jas išsaugo į failą (jeigu nurodytas metrikų failas jau egzistuoja, skriptas išveda klaidos pranešimą ir baigia darbą). Modeliavimo metu metrikų apskaičiuojamos ir į failą prijungiamos po kiekvieno modelio modelio apmokymo, ne tada kai apmokomi visi modeliai. *JSON* parametrų faile saugome parametrus, kad: 
- nurodytoje direktorijoje išsaugoti klasterių centrus atskiruose failuose (jeigu direktorija jau egzistuoja, skriptas išveda klaidos pranešimą ir baigia darbą).
- nurodytoje direktorijoje išsaugoti duomenų rinkinius su klasterizavimo rezultatais (jeigu direktorija jau egzistuoja, skriptas išveda klaidos pranešimą ir baigia darbą).
- nurodytoje direktorijoje išsaugoti apmokytus modelius (apmokyto modelio metodu `.save(...)`) (jeigu direktorija jau egzistuoja, skriptas išveda klaidos pranešimą ir baigia darbą).

Rekomenduojame naudoti parametrus JSON faile šiam pyspark job'ui: 
1. kelias į agreguotus ir apjungtus duomenis (išsaugotus `parquet` formatu) 
1. kelias į SSE metrikų failą.
1. kelias į direktoriją, kurioje bus saugomi klasterių centrai.
1. kelias į direktoriją, kurioje bus saugomi duomenų rinkiniai su klasterizavimo rezultatais (DataFrame su `prediction` stulpeliu).
1. kelias į direktoriją, kuriaje bus saugomi apmokyti KMeans modeliai.

**Pastaba** rekomenduojame prieš rašant patį pyspark job'ą, parašyti kodą Jupyter notebook ir išskaidyti jį į 3 lasteles:
1. ląstelė su visais komandinės eilutės parametrų kintamaisiais ir jų reikšmėmis
1. ląstelė su visa job'o logika ir funkcija, kuri priima pirmoje ląstelėje aprašytus kintamuosius (šią funkciją dekuoruosime [`click`](http://click.pocoo.org/6/) paketu pačiame pyspark job'e. Ląstelės pabaigoje ši funkcija įvykdoma su pirmoje ląstelėje aprašytais kintamaisiais.
1. ląstelė, kurioje patikriname ar funckija įvykdė tai ko norėjome. Ląstelėje naudojamos tik Linux shell komandos (pvz. `ls`, `tree -L `, `head -n `, `cat` ir kt.) rašant `!` simbolį prieš komandą, o išvestys tarp komandų atskiriamos įvykdant komandą `! echo "\n\n"`