# TASK 1: Preparing Data

### Step 1: Start

- Download the required Libraries.
- Move the data under the datasets folder on HDFS.
- Create a Spark session..

**Note:** Read the data as `inferSchema` **True**, select `delimiter` according to the relevant csv file.

In [184]:
import pandas as pd
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *

In [186]:
!ls "/home/train/datasets"

Advertising.csv      churn-telecom  flo.csv   retail_db
Churn_Modelling.csv  flo100k.csv    iris.csv  train.csv


In [187]:
!hdfs dfs -ls '/user/train/datasets/'

Found 7 items
-rw-r--r--   1 train supergroup       4556 2020-09-23 20:56 /user/train/datasets/Advertising.csv
drwxr-xr-x   - train supergroup          0 2020-11-19 21:02 /user/train/datasets/churn-telecom
-rw-r--r--   1 train supergroup 1568689691 2022-06-07 11:38 /user/train/datasets/flo.csv
-rw-r--r--   1 train supergroup   19589922 2022-06-07 13:58 /user/train/datasets/flo100k.csv
-rw-r--r--   1 train supergroup     460676 2022-06-05 09:26 /user/train/datasets/house-prices.csv
drwxr-xr-x   - train supergroup          0 2020-11-21 11:16 /user/train/datasets/retail_db
-rw-r--r--   1 train supergroup     460676 2022-06-05 11:15 /user/train/datasets/train.csv


In [189]:
# Moving data to HDFS
!hdfs dfs -put "/home/train/datasets/flo100k.csv" "/user/train/datasets/"

In [183]:
import findspark
findspark.init('/opt/manual/spark')

In [185]:
# Creating or Get Spark Session
spark = (
    SparkSession.builder
    .appName("Flo_Clustering")
    .master("yarn")
    .config("spark.sql.shuffle.partitions", "2")
    .getOrCreate())

In [378]:

df = (spark.read.format('csv')
      .option('header', True)
      .option('delimiter', '|')
      .option('inferSchema', True)
      .load('/user/train/datasets/flo100k.csv')
     )

### Step 2: Glance
Data set;

- Your first 5 observations
- Total number of observations
- Total number of variables
- Examine the variable types..

In [379]:
df.persist()

DataFrame[master_id: string, order_channel: string, platform_type: string, last_order_channel: string, first_order_date: timestamp, last_order_date: timestamp, last_order_date_online: timestamp, last_order_date_offline: timestamp, order_num_total_ever_online: double, order_num_total_ever_offline: double, customer_value_total_ever_offline: double, customer_value_total_ever_online: double, interested_in_categories_12: string, online_product_group_amount_top_name_12: string, offline_product_group_name_12: string, last_order_date_new: string, store_type: string]

In [461]:
# Your first 5 observations
df.limit(5).toPandas()

Unnamed: 0,master_id,order_channel,platform_type,last_order_channel,first_order_date,last_order_date,last_order_date_online,last_order_date_offline,order_num_total_ever_online,order_num_total_ever_offline,customer_value_total_ever_offline,customer_value_total_ever_online,interested_in_categories_12,online_product_group_amount_top_name_12,offline_product_group_name_12,last_order_date_new,store_type,order_num_total,customer_value_total
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,Offline,Offline,Offline,2019-02-23 12:59:17,2019-02-23 12:59:17,NaT,2019-02-23 12:59:17,0.0,1.0,212.98,0.0,,,,2019-02-23,A,1.0,212.98
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,Offline,OmniChannel,Offline,2019-12-01 16:48:09,2019-12-01 16:48:09,NaT,2019-12-01 16:48:09,0.0,1.0,199.98,0.0,,,,2019-12-01,A,1.0,199.98
2,602897a6-cdac-11ea-b31f-000d3a38a36f,Offline,Offline,Offline,2020-07-24 15:49:47,2020-07-24 15:49:47,NaT,2020-07-24 15:49:47,0.0,1.0,140.49,0.0,[ERKEK],,ERKEK,2020-07-24,A,1.0,140.49
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,Mobile,Online,Mobile,2018-12-31 07:22:07,2018-12-31 07:22:07,2018-12-31 07:22:07,NaT,1.0,0.0,0.0,174.99,,,,2018-12-31,A,1.0,174.99
4,80664354-adf0-11eb-8f64-000d3a299ebf,Desktop,Online,Desktop,2021-05-05 21:07:02,2021-05-05 22:39:36,2021-05-05 22:39:36,NaT,2.0,0.0,0.0,283.95,[],,,2021-05-05,A,2.0,283.95


In [462]:
# Total number of observations
df.count()

100000

In [463]:
# Total number of variables
len(df.columns)

19

In [464]:
# Examine the variable types.
df.printSchema()

root
 |-- master_id: string (nullable = true)
 |-- order_channel: string (nullable = true)
 |-- platform_type: string (nullable = true)
 |-- last_order_channel: string (nullable = true)
 |-- first_order_date: timestamp (nullable = true)
 |-- last_order_date: timestamp (nullable = true)
 |-- last_order_date_online: timestamp (nullable = true)
 |-- last_order_date_offline: timestamp (nullable = true)
 |-- order_num_total_ever_online: double (nullable = false)
 |-- order_num_total_ever_offline: double (nullable = false)
 |-- customer_value_total_ever_offline: double (nullable = false)
 |-- customer_value_total_ever_online: double (nullable = false)
 |-- interested_in_categories_12: string (nullable = true)
 |-- online_product_group_amount_top_name_12: string (nullable = true)
 |-- offline_product_group_name_12: string (nullable = true)
 |-- last_order_date_new: string (nullable = true)
 |-- store_type: string (nullable = true)
 |-- order_num_total: double (nullable = false)
 |-- customer_

### Step 3: Missing observations

Examine the missing observations of the variables in the data set.

In [384]:
def null_count(df, col_name):
    nc = df.select(col_name).filter(
        (F.col(col_name) == 'NA') |
        (F.col(col_name) == '') |
        (F.col(col_name).isNull())).count()
    
    return nc

In [385]:
for col_name in df.dtypes:
    nc = null_count(df, col_name[0])
    
    if nc > 0:
        print("{} has {} % {}".format(col_name[0], nc, (nc/df.count())*100))

last_order_date_online has 70784 % 70.784
last_order_date_offline has 21703 % 21.703
order_num_total_ever_online has 70784 % 70.784
order_num_total_ever_offline has 21703 % 21.703
interested_in_categories_12 has 56590 % 56.589999999999996
online_product_group_amount_top_name_12 has 88295 % 88.295
offline_product_group_name_12 has 77209 % 77.209


### GÖREV 2: Veri Analizi
### Adım 1: Unique Değişken Kontrolü

`master_id` değişkeninin unique olup olmadığını kontrol ediniz.

In [386]:
# master_id değişkeninin unique sayısı verisetinin toplam satır sayısına eşit olduğu durumda unique olduğundan bahsedebiliriz.
from pyspark.sql.functions import countDistinct
df.select(countDistinct("master_id")).show()

+-------------------------+
|count(DISTINCT master_id)|
+-------------------------+
|                   100000|
+-------------------------+



In [387]:
df.count()

100000

### Adım2: Veriyi Anlama - Groupby
 `platform_type` & `order_channel` kırılımında gözlem sayılarına bakınız.

In [388]:
df.groupby(["platform_type","order_channel"]).count().show()

+-------------+-------------+-----+
|platform_type|order_channel|count|
+-------------+-------------+-----+
|  OmniChannel|      Offline| 4793|
|       Online|      Ios App| 3008|
|      Offline|      Offline|65991|
|       Online|       Mobile| 6451|
|       Online|      Desktop| 3253|
|       Online|  Android App| 8728|
|  OmniChannel|       Mobile| 2061|
|  OmniChannel|  Android App| 3261|
|  OmniChannel|      Ios App|  956|
|  OmniChannel|      Desktop| 1498|
+-------------+-------------+-----+



### Adım 3: Omnichannel
- Omnichannel müşterilerin hem online'dan hem de offline platformlardan alışveriş yaptığını ifade etmektedir. 
- Herbir müşterinin toplam alışveriş sayısı ve harcaması için yeni değişkenler oluşturunuz.
    - toplam alışveriş sayısı için değişken ismini `order_num_total` yapınız.
    - toplam harcama için değişken ismini `customer_value_total` yapınız

**Not:** Yeni değişkenleri oluşturmadan önce bu değişkenlerin null değerlerini sıfıra çeviriniz ve tüm değerlerinin sıfıra eşit veya büyük olduğuna emin olunuz.

In [390]:
# NA değerleri 0 ile doldurma
df = df.na.fill(value=0,
                subset=["order_num_total_ever_online", "order_num_total_ever_offline",
                        "customer_value_total_ever_offline","customer_value_total_ever_online"])

In [391]:
# Sıfıra eşit veya büyük olduğuna emin olmak için filtreleyelim.
df = df.filter((df.order_num_total_ever_online >= 0 ) & 
                           (df.order_num_total_ever_offline >= 0) & 
                           (df.customer_value_total_ever_offline >= 0) & 
                           (df.customer_value_total_ever_online >= 0))

In [392]:
# order_num_total değişkenini oluşturma
df = df.withColumn('order_num_total',
                   df.order_num_total_ever_online + df.order_num_total_ever_offline)


In [393]:
# customer_value_total değişkenini oluşturma
df = df.withColumn('customer_value_total', 
                   df.customer_value_total_ever_offline + df.customer_value_total_ever_online)

### Adım 4: Satış Değerleri

Kanallardaki toplam müşteri , ortalama alınan ürün ve kazanç dağılıma bakınız.

In [394]:
df.groupby("order_channel").agg({"master_id" : "count", 
                                 "order_num_total":"mean",
                                 "customer_value_total":"mean"}).show()

+-------------+----------------+-------------------------+--------------------+
|order_channel|count(master_id)|avg(customer_value_total)|avg(order_num_total)|
+-------------+----------------+-------------------------+--------------------+
|      Offline|           70784|       218.16394849129728|  1.6003475361663653|
|      Ios App|            3964|        568.4640312815283|   3.377648839556004|
|       Mobile|            8512|        391.7418761748012|   2.798637218045113|
|  Android App|           11989|        532.8462840937639|    3.50971724080407|
|      Desktop|            4751|       376.91355504103785|   2.538623447695222|
+-------------+----------------+-------------------------+--------------------+



### Adım 5: Platforma göre satış değerleri

Platformlardaki toplam müşteri , ortalama alınan ürün ve kazanç dağılıma bakınız.

In [395]:
df.groupby("platform_type").agg({"master_id" : "count", 
                                 "order_num_total":"mean",
                                 "customer_value_total":"mean"}).show()

+-------------+----------------+-------------------------+--------------------+
|platform_type|count(master_id)|avg(customer_value_total)|avg(order_num_total)|
+-------------+----------------+-------------------------+--------------------+
|      Offline|           65991|        215.7303068601296|  1.5837917291751906|
|  OmniChannel|           12569|        500.7937512928769|  3.4947092051873656|
|       Online|           21440|        404.7896576492903|  2.6207089552238805|
+-------------+----------------+-------------------------+--------------------+



## GÖREV 3: K-Means Metriklerinin Hesaplanması

### 1. RFM 

`master_id`, `last_order_date_new`, `order_num_total`, `customer_value_total` değişkenlerinden `rfm` adında yeni bir değişken oluşturunuz.

In [396]:
rfm = df[["master_id","last_order_date_new", "order_num_total","customer_value_total"]]

In [397]:
rfm.show(5)

+--------------------+-------------------+---------------+--------------------+
|           master_id|last_order_date_new|order_num_total|customer_value_total|
+--------------------+-------------------+---------------+--------------------+
|b3ace094-a17f-11e...|         2019-02-23|            1.0|              212.98|
|c57d7c4c-a950-11e...|         2019-12-01|            1.0|              199.98|
|602897a6-cdac-11e...|         2020-07-24|            1.0|              140.49|
|388e4c4e-af86-11e...|         2018-12-31|            1.0|              174.99|
|80664354-adf0-11e...|         2021-05-05|            2.0|              283.95|
+--------------------+-------------------+---------------+--------------------+
only showing top 5 rows



### Adım 2: Son satın alım tarihi 

`last_order_date` değişkeni üzerinden son satın alım tarihini bulunuz

In [398]:
# Son satın alım tarihi
last_order_date = df.agg({"last_order_date": "max"}).collect()[0][0] # 2021-5-30
last_order_date.date()

datetime.date(2021, 5, 30)

### Adım 3: Recency 

Son satın alım tarihinden iki gün sonrası üzerinden, her müşterinin son satın alma sürelelerini gün cinsinden hesaplayıp,`Recency` adında bir değişkende kaydediniz

In [399]:
from pyspark.sql.functions import expr

rfm = rfm.withColumn("Recency", expr("datediff('2021-6-1', last_order_date_new)"))
rfm.show(5)

+--------------------+-------------------+---------------+--------------------+-------+
|           master_id|last_order_date_new|order_num_total|customer_value_total|Recency|
+--------------------+-------------------+---------------+--------------------+-------+
|b3ace094-a17f-11e...|         2019-02-23|            1.0|              212.98|    829|
|c57d7c4c-a950-11e...|         2019-12-01|            1.0|              199.98|    548|
|602897a6-cdac-11e...|         2020-07-24|            1.0|              140.49|    312|
|388e4c4e-af86-11e...|         2018-12-31|            1.0|              174.99|    883|
|80664354-adf0-11e...|         2021-05-05|            2.0|              283.95|     27|
+--------------------+-------------------+---------------+--------------------+-------+
only showing top 5 rows



### Adım 4: Yeniden isimlendirme 

- `order_num_total` değişkenini `Frequency` 
- `customer_value_total` değişkenini `Monetary` adında değiştiriniz.


In [400]:
rfm = (rfm.withColumnRenamed("order_num_total","Frequency")
       .withColumnRenamed("customer_value_total", "Monetary"))
       
rfm.show(5)

+--------------------+-------------------+---------+--------+-------+
|           master_id|last_order_date_new|Frequency|Monetary|Recency|
+--------------------+-------------------+---------+--------+-------+
|b3ace094-a17f-11e...|         2019-02-23|      1.0|  212.98|    829|
|c57d7c4c-a950-11e...|         2019-12-01|      1.0|  199.98|    548|
|602897a6-cdac-11e...|         2020-07-24|      1.0|  140.49|    312|
|388e4c4e-af86-11e...|         2018-12-31|      1.0|  174.99|    883|
|80664354-adf0-11e...|         2021-05-05|      2.0|  283.95|     27|
+--------------------+-------------------+---------+--------+-------+
only showing top 5 rows



### Adım 5: Filtre

`last_order_date_new` değişkenini `rfm` dataframeden çıkarınız

In [401]:
rfm = rfm.drop("last_order_date_new")

In [402]:
rfm.show(5)

+--------------------+---------+--------+-------+
|           master_id|Frequency|Monetary|Recency|
+--------------------+---------+--------+-------+
|b3ace094-a17f-11e...|      1.0|  212.98|    829|
|c57d7c4c-a950-11e...|      1.0|  199.98|    548|
|602897a6-cdac-11e...|      1.0|  140.49|    312|
|388e4c4e-af86-11e...|      1.0|  174.99|    883|
|80664354-adf0-11e...|      2.0|  283.95|     27|
+--------------------+---------+--------+-------+
only showing top 5 rows



# GÖREV 4: Modele Hazırlık

### Adım 1: Vector Assembler
- Recency
- Frequency
- Monetary 

değişkenlerine **Vector Assembler** uygulayınız.

In [403]:
rfm_col = ["Recency", "Frequency", "Monetary"]

In [404]:
from pyspark.ml.feature import VectorAssembler

assembler =  (VectorAssembler() \
              .setHandleInvalid("skip") \
              .setInputCols(rfm_col) \
              .setOutputCol("unscaled_features"))

### Adım 2: Standart Scaler

**Vector Assembler** uyguladığınız değişkenlere **Standart Scaler** işlemini uygulayınız.

In [405]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler() \
.setInputCol("unscaled_features") \
.setOutputCol("features")

### Adım 3: Pipeline

rfm dataframeini kullanarak bir Pipeline oluşturunuz.

In [406]:
from pyspark.ml import Pipeline
pipeline_obj = Pipeline().setStages([assembler, scaler])

In [407]:
pipeline_model = pipeline_obj.fit(rfm)

transformed_df = pipeline_model.transform(rfm)

transformed_df.show(5)


+--------------------+---------+--------+-------+------------------+--------------------+
|           master_id|Frequency|Monetary|Recency| unscaled_features|            features|
+--------------------+---------+--------+-------+------------------+--------------------+
|b3ace094-a17f-11e...|      1.0|  212.98|    829|[829.0,1.0,212.98]|[3.24793811935259...|
|c57d7c4c-a950-11e...|      1.0|  199.98|    548|[548.0,1.0,199.98]|[2.14700855175539...|
|602897a6-cdac-11e...|      1.0|  140.49|    312|[312.0,1.0,140.49]|[1.22238443092642...|
|388e4c4e-af86-11e...|      1.0|  174.99|    883|[883.0,1.0,174.99]|[3.45950465547447...|
|80664354-adf0-11e...|      2.0|  283.95|     27| [27.0,2.0,283.95]|[0.10578326806094...|
+--------------------+---------+--------+-------+------------------+--------------------+
only showing top 5 rows



# GÖREV 5: K-Means 

### Adım 1: Optimum Küme Sayısı 

Kmeans küme sayısı ve hatalarını hesaplayıp, bir küme sayısı seçiniz

In [408]:
from pyspark.ml.clustering import KMeans

def compute_kmeans_model(df,k):
    kmeansObject = KMeans() \
    .setSeed(142) \
    .setK(k) 
    return kmeansObject.fit(df)

### Evaluate Model

In [419]:
from pyspark.ml.evaluation import ClusteringEvaluator

evaluator = ClusteringEvaluator()
score_list = []

for k in range(2,11):
    kmeans_model = compute_kmeans_model(transformed_df, k)
    
    transformed = kmeans_model.transform(transformed_df)
    
    
    score = evaluator.evaluate(transformed)
    score_list.append(score)
    
    print("k: {}, score {}".format(k, score))

k: 2, score 0.4403139929816374
k: 3, score 0.655033488104594
k: 4, score 0.6658979549823151
k: 5, score 0.6777415637605754
k: 6, score 0.5104147694093096
k: 7, score 0.6780801245958924
k: 8, score 0.6494848988252858
k: 9, score 0.5039729078361186
k: 10, score 0.5132605636800474


### Adım 2: Final Model

Seçtiğiniz küme sayısı ile yeni modeli kurunuz.

In [450]:
# Pyspark sınıflandırma için Silhoutte metodunu kullanıyor, 1'e yakın olan küme sayısı tercih edilebilir.

kmeans = (KMeans() \
          .setSeed(142) \
          .setK(5))

In [451]:
kmeans_model = kmeans.fit(transformed_df)

In [452]:
transformed = kmeans_model.transform(transformed_df)

In [453]:
transformed.show()

+--------------------+---------+------------------+-------+--------------------+--------------------+----------+
|           master_id|Frequency|          Monetary|Recency|   unscaled_features|            features|prediction|
+--------------------+---------+------------------+-------+--------------------+--------------------+----------+
|b3ace094-a17f-11e...|      1.0|            212.98|    829|  [829.0,1.0,212.98]|[3.24793811935259...|         2|
|c57d7c4c-a950-11e...|      1.0|            199.98|    548|  [548.0,1.0,199.98]|[2.14700855175539...|         2|
|602897a6-cdac-11e...|      1.0|            140.49|    312|  [312.0,1.0,140.49]|[1.22238443092642...|         0|
|388e4c4e-af86-11e...|      1.0|            174.99|    883|  [883.0,1.0,174.99]|[3.45950465547447...|         2|
|80664354-adf0-11e...|      2.0|            283.95|     27|   [27.0,2.0,283.95]|[0.10578326806094...|         0|
|47511f36-aeb4-11e...|      1.0|            139.98|    933|  [933.0,1.0,139.98]|[3.6553995963280

### Adım 3: Tanımlayıcı İstatistikler 
Oluşturduğunuz kümelerin tanımlayıcı istatistiklerini inceleyiniz.

In [484]:
from pyspark.sql.functions import sum, col, desc, mean, count
transformed.groupBy("prediction") \
  .agg(count("Monetary").alias('count'), \
       mean("Monetary").alias("avg_monetary"), \
       mean("Recency").alias("avg_recency"), \
       mean("Frequency").alias("avg_frequency")) \
  .sort(desc("avg_monetary")) \
  .show()

+----------+-----+------------------+------------------+------------------+
|prediction|count|      avg_monetary|       avg_recency|     avg_frequency|
+----------+-----+------------------+------------------+------------------+
|         1|    3|          37469.96| 536.6666666666666| 306.3333333333333|
|         3|   17|13068.847058823529|169.05882352941177| 91.23529411764706|
|         4| 4072|1544.8937328094296|189.84037328094303| 9.520383104125736|
|         0|41794| 307.9166727281978|173.99028568694072|2.0373020050724984|
|         2|54114|179.52782311424278| 615.5692427098348| 1.445928964778061|
+----------+-----+------------------+------------------+------------------+

