In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('read_csv').getOrCreate()
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

In [2]:
t = spark.read.csv('./train.csv', header=True, inferSchema=True)
t.show()

+----------+-----+----+-----+
|      date|store|item|sales|
+----------+-----+----+-----+
|2013-01-01|    1|   1|   13|
|2013-01-02|    1|   1|   11|
|2013-01-03|    1|   1|   14|
|2013-01-04|    1|   1|   13|
|2013-01-05|    1|   1|   10|
|2013-01-06|    1|   1|   12|
|2013-01-07|    1|   1|   10|
|2013-01-08|    1|   1|    9|
|2013-01-09|    1|   1|   12|
|2013-01-10|    1|   1|    9|
|2013-01-11|    1|   1|    9|
|2013-01-12|    1|   1|    7|
|2013-01-13|    1|   1|   10|
|2013-01-14|    1|   1|   12|
|2013-01-15|    1|   1|    5|
|2013-01-16|    1|   1|    7|
|2013-01-17|    1|   1|   16|
|2013-01-18|    1|   1|    7|
|2013-01-19|    1|   1|   18|
|2013-01-20|    1|   1|   15|
+----------+-----+----+-----+
only showing top 20 rows



In [3]:
# date column에서 주차, 요일 추출
from pyspark.sql.functions import *

t = t.withColumn('year', date_format(col('date'), 'y').cast("int"))\
        .withColumn('month', date_format(col('date'), 'M').cast("int"))\
            .withColumn('week', date_format(col('date'), 'W').cast("int"))


In [4]:
t.createOrReplaceTempView('rawdf')
# 주 단위 데이터 생성(DataFrame 활용)
week_it_df = t.groupBy(['year', 'month', 'week', 'item']).agg(sum('sales').alias('week_sales')).orderBy(['year','month','week','item'])
week_si_df = t.groupBy(['year', 'month', 'week', 'store', 'item']).agg(sum('sales').alias('week_sales')).orderBy(['year','month','week','store','item'])

In [18]:
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.ml import Pipeline

assembler = VectorAssembler(inputCols = ['week_sales'], outputCol = 'week_sales_vector') 
scaler = MinMaxScaler(inputCol = 'week_sales_vector', outputCol = 'scaled_sales') 
pipeline = Pipeline(stages=[assembler, scaler]) 
scalerModel = pipeline.fit(week_si_df) 
scalerData = scalerModel.transform(week_si_df) 

scalerData.show(5)

+----+-----+----+-----+----+----------+-----------------+--------------------+
|year|month|week|store|item|week_sales|week_sales_vector|        scaled_sales|
+----+-----+----+-----+----+----------+-----------------+--------------------+
|2013|    1|   1|    1|   1|        61|           [61.0]|[0.04695222405271...|
|2013|    1|   1|    1|   2|       151|          [151.0]|[0.12108731466227...|
|2013|    1|   1|    1|   3|        92|           [92.0]|[0.07248764415156...|
|2013|    1|   1|    1|   4|        60|           [60.0]|[0.04612850082372...|
|2013|    1|   1|    1|   5|        42|           [42.0]|[0.03130148270181...|
+----+-----+----+-----+----+----------+-----------------+--------------------+
only showing top 5 rows



In [21]:
import numpy as np
from itertools import product
ts_lst = []
st_it = [list(range(1,11)), list(range(1,51))]
st_it = list(product(*st_it))

for i,j in st_it:
    ts_lst.append(np.array(scalerData.filter((scalerData['store']==i) & (scalerData['item'] == j)).select('scaled_sales').collect()))
ts_lst = np.array(ts_lst)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "C:\spark\spark-3.2.1-bin-hadoop2.7\python\lib\py4j-0.10.9.3-src.zip\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\spark\spark-3.2.1-bin-hadoop2.7\python\lib\py4j-0.10.9.3-src.zip\py4j\clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "c:\Users\hyosk\miniconda3\envs\mobis\lib\socket.py", line 589, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [27]:
np.array(ts_lst).flatten()

array([0.04695222, 0.05271829, 0.05848435, ..., 0.092257  , 0.09637562,
       0.01400329])

In [11]:
from tslearn.clustering import TimeSeriesKMeans
from tslearn.preprocessing import TimeSeriesScalerMinMax,TimeSeriesScalerMeanVariance
from tslearn.utils import to_time_series_dataset
from tslearn.clustering import silhouette_score

for i in range(2,6):
    ts_dtw_model = TimeSeriesKMeans(n_clusters=i,metric='dtw', max_iter=10)
#ts_dtw_model.fit(to_time_series_dataset(ts_lst))
    print(f'cluster_{i} silhoutte_score: ',silhouette_score(ts_lst, ts_dtw_model.fit_predict(to_time_series_dataset(ts_lst)), metric='dtw', n_jobs=-1,verbose=1))

#ts_dtw_model = TimeSeriesKMeans(n_clusters=5,metric='dtw', max_iter=10)
#ts_stdtw_model = TimeSeriesKMeans(n_clusters=5,metric='softdtw', max_iter=10)
#ts_stdtw_model.fit(to_time_series_dataset(ts_lst))
#print(silhouette_score(ts_lst, ts_stdtw_model.fit_predict(to_time_series_dataset(ts_lst)), metric='softdtw', verbose=1))
#ts_stdtw_model.cluster_centers_.shape

[Parallel(n_jobs=-1)]: Using backend ThreadingBackend with 8 concurrent workers.
[Parallel(n_jobs=-1)]: Done  34 tasks      | elapsed:    0.0s
[Parallel(n_jobs=-1)]: Done 184 tasks      | elapsed:    0.0s
[Parallel(n_jobs=-1)]: Done 434 tasks      | elapsed:    0.0s
[Parallel(n_jobs=-1)]: Done 784 tasks      | elapsed:    0.1s
[Parallel(n_jobs=-1)]: Done 1234 tasks      | elapsed:    0.1s
[Parallel(n_jobs=-1)]: Done 1784 tasks      | elapsed:    0.3s
[Parallel(n_jobs=-1)]: Done 2434 tasks      | elapsed:    0.4s
[Parallel(n_jobs=-1)]: Done 3184 tasks      | elapsed:    0.5s
[Parallel(n_jobs=-1)]: Done 4034 tasks      | elapsed:    0.7s
[Parallel(n_jobs=-1)]: Done 4984 tasks      | elapsed:    0.9s
[Parallel(n_jobs=-1)]: Done 6034 tasks      | elapsed:    1.0s
[Parallel(n_jobs=-1)]: Done 7184 tasks      | elapsed:    1.3s
[Parallel(n_jobs=-1)]: Done 8434 tasks      | elapsed:    1.5s
[Parallel(n_jobs=-1)]: Done 9784 tasks      | elapsed:    1.8s
[Parallel(n_jobs=-1)]: Done 11234 tasks  

cluster_2 silhoutte_score:  0.5582086230146244


[Parallel(n_jobs=-1)]: Using backend ThreadingBackend with 8 concurrent workers.
[Parallel(n_jobs=-1)]: Done  34 tasks      | elapsed:    0.0s
[Parallel(n_jobs=-1)]: Done 184 tasks      | elapsed:    0.0s
[Parallel(n_jobs=-1)]: Done 434 tasks      | elapsed:    0.1s
[Parallel(n_jobs=-1)]: Done 784 tasks      | elapsed:    0.1s
[Parallel(n_jobs=-1)]: Done 1234 tasks      | elapsed:    0.2s
[Parallel(n_jobs=-1)]: Done 1784 tasks      | elapsed:    0.3s
[Parallel(n_jobs=-1)]: Done 2434 tasks      | elapsed:    0.4s
[Parallel(n_jobs=-1)]: Done 3184 tasks      | elapsed:    0.6s
[Parallel(n_jobs=-1)]: Done 4034 tasks      | elapsed:    0.7s
[Parallel(n_jobs=-1)]: Done 4984 tasks      | elapsed:    0.9s
[Parallel(n_jobs=-1)]: Done 6034 tasks      | elapsed:    1.1s
[Parallel(n_jobs=-1)]: Done 7184 tasks      | elapsed:    1.3s
[Parallel(n_jobs=-1)]: Done 8434 tasks      | elapsed:    1.6s
[Parallel(n_jobs=-1)]: Done 9784 tasks      | elapsed:    1.8s
[Parallel(n_jobs=-1)]: Done 11234 tasks  

cluster_3 silhoutte_score:  0.5286270488309759


[Parallel(n_jobs=-1)]: Using backend ThreadingBackend with 8 concurrent workers.
[Parallel(n_jobs=-1)]: Done  34 tasks      | elapsed:    0.0s
[Parallel(n_jobs=-1)]: Done 184 tasks      | elapsed:    0.0s
[Parallel(n_jobs=-1)]: Done 434 tasks      | elapsed:    0.0s
[Parallel(n_jobs=-1)]: Done 784 tasks      | elapsed:    0.1s
[Parallel(n_jobs=-1)]: Done 1234 tasks      | elapsed:    0.2s
[Parallel(n_jobs=-1)]: Done 1784 tasks      | elapsed:    0.3s
[Parallel(n_jobs=-1)]: Done 2434 tasks      | elapsed:    0.4s
[Parallel(n_jobs=-1)]: Done 3184 tasks      | elapsed:    0.6s
[Parallel(n_jobs=-1)]: Done 4034 tasks      | elapsed:    0.7s
[Parallel(n_jobs=-1)]: Done 4984 tasks      | elapsed:    0.9s
[Parallel(n_jobs=-1)]: Done 6034 tasks      | elapsed:    1.1s
[Parallel(n_jobs=-1)]: Done 7184 tasks      | elapsed:    1.3s
[Parallel(n_jobs=-1)]: Done 8434 tasks      | elapsed:    1.5s
[Parallel(n_jobs=-1)]: Done 9784 tasks      | elapsed:    1.8s
[Parallel(n_jobs=-1)]: Done 11234 tasks  

cluster_4 silhoutte_score:  0.49367386488194553


[Parallel(n_jobs=-1)]: Using backend ThreadingBackend with 8 concurrent workers.
[Parallel(n_jobs=-1)]: Done  34 tasks      | elapsed:    0.0s
[Parallel(n_jobs=-1)]: Done 184 tasks      | elapsed:    0.0s
[Parallel(n_jobs=-1)]: Done 434 tasks      | elapsed:    0.0s
[Parallel(n_jobs=-1)]: Done 784 tasks      | elapsed:    0.1s
[Parallel(n_jobs=-1)]: Done 1234 tasks      | elapsed:    0.2s
[Parallel(n_jobs=-1)]: Done 1784 tasks      | elapsed:    0.3s
[Parallel(n_jobs=-1)]: Done 2434 tasks      | elapsed:    0.4s
[Parallel(n_jobs=-1)]: Done 3184 tasks      | elapsed:    0.5s
[Parallel(n_jobs=-1)]: Done 4034 tasks      | elapsed:    0.7s
[Parallel(n_jobs=-1)]: Done 4984 tasks      | elapsed:    0.9s
[Parallel(n_jobs=-1)]: Done 6034 tasks      | elapsed:    1.1s
[Parallel(n_jobs=-1)]: Done 7184 tasks      | elapsed:    1.3s
[Parallel(n_jobs=-1)]: Done 8434 tasks      | elapsed:    1.5s
[Parallel(n_jobs=-1)]: Done 9784 tasks      | elapsed:    1.8s
[Parallel(n_jobs=-1)]: Done 11234 tasks  

cluster_5 silhoutte_score:  0.45684827245062604


[Parallel(n_jobs=-1)]: Done 124750 out of 124750 | elapsed:   28.3s finished


In [16]:
for i in range(2,6):
    ts_dtw_model = TimeSeriesKMeans(n_clusters=i,metric='dtw', max_iter=10)
#ts_dtw_model.fit(to_time_series_dataset(ts_lst))
    print(f'cluster_{i} silhoutte_score: ',silhouette_score(ts_lst, ts_dtw_model.fit_predict(TimeSeriesScalerMinMax().fit_transform(ts_lst)), metric='dtw', n_jobs=-1,verbose=1))

[Parallel(n_jobs=-1)]: Using backend ThreadingBackend with 8 concurrent workers.
[Parallel(n_jobs=-1)]: Done  34 tasks      | elapsed:    0.0s
[Parallel(n_jobs=-1)]: Done 184 tasks      | elapsed:    0.0s
[Parallel(n_jobs=-1)]: Done 434 tasks      | elapsed:    0.0s
[Parallel(n_jobs=-1)]: Done 784 tasks      | elapsed:    0.1s
[Parallel(n_jobs=-1)]: Done 1234 tasks      | elapsed:    0.3s
[Parallel(n_jobs=-1)]: Done 1784 tasks      | elapsed:    0.4s
[Parallel(n_jobs=-1)]: Done 2434 tasks      | elapsed:    0.6s
[Parallel(n_jobs=-1)]: Done 3184 tasks      | elapsed:    0.8s
[Parallel(n_jobs=-1)]: Done 4034 tasks      | elapsed:    1.0s
[Parallel(n_jobs=-1)]: Done 4984 tasks      | elapsed:    1.4s
[Parallel(n_jobs=-1)]: Done 6034 tasks      | elapsed:    1.7s
[Parallel(n_jobs=-1)]: Done 7184 tasks      | elapsed:    2.0s
[Parallel(n_jobs=-1)]: Done 8434 tasks      | elapsed:    2.3s
[Parallel(n_jobs=-1)]: Done 9784 tasks      | elapsed:    2.5s
[Parallel(n_jobs=-1)]: Done 11234 tasks  

cluster_2 silhoutte_score:  0.1109716008541304


[Parallel(n_jobs=-1)]: Using backend ThreadingBackend with 8 concurrent workers.
[Parallel(n_jobs=-1)]: Done  34 tasks      | elapsed:    0.0s
[Parallel(n_jobs=-1)]: Done 184 tasks      | elapsed:    0.0s
[Parallel(n_jobs=-1)]: Done 434 tasks      | elapsed:    0.0s
[Parallel(n_jobs=-1)]: Done 784 tasks      | elapsed:    0.1s
[Parallel(n_jobs=-1)]: Done 1234 tasks      | elapsed:    0.1s
[Parallel(n_jobs=-1)]: Done 1784 tasks      | elapsed:    0.3s
[Parallel(n_jobs=-1)]: Done 2434 tasks      | elapsed:    0.4s
[Parallel(n_jobs=-1)]: Done 3184 tasks      | elapsed:    0.6s
[Parallel(n_jobs=-1)]: Done 4034 tasks      | elapsed:    0.8s
[Parallel(n_jobs=-1)]: Done 4984 tasks      | elapsed:    1.0s
[Parallel(n_jobs=-1)]: Done 6034 tasks      | elapsed:    1.2s
[Parallel(n_jobs=-1)]: Done 7184 tasks      | elapsed:    1.5s
[Parallel(n_jobs=-1)]: Done 8434 tasks      | elapsed:    1.8s
[Parallel(n_jobs=-1)]: Done 9784 tasks      | elapsed:    2.1s
[Parallel(n_jobs=-1)]: Done 11234 tasks  

cluster_3 silhoutte_score:  -0.060073212842995515


[Parallel(n_jobs=-1)]: Using backend ThreadingBackend with 8 concurrent workers.
[Parallel(n_jobs=-1)]: Done  34 tasks      | elapsed:    0.0s
[Parallel(n_jobs=-1)]: Done 184 tasks      | elapsed:    0.0s
[Parallel(n_jobs=-1)]: Done 434 tasks      | elapsed:    0.0s
[Parallel(n_jobs=-1)]: Done 784 tasks      | elapsed:    0.1s
[Parallel(n_jobs=-1)]: Done 1234 tasks      | elapsed:    0.1s
[Parallel(n_jobs=-1)]: Done 1784 tasks      | elapsed:    0.3s
[Parallel(n_jobs=-1)]: Done 2434 tasks      | elapsed:    0.4s
[Parallel(n_jobs=-1)]: Done 3184 tasks      | elapsed:    0.6s
[Parallel(n_jobs=-1)]: Done 4034 tasks      | elapsed:    0.8s
[Parallel(n_jobs=-1)]: Done 4984 tasks      | elapsed:    1.0s
[Parallel(n_jobs=-1)]: Done 6034 tasks      | elapsed:    1.2s
[Parallel(n_jobs=-1)]: Done 7184 tasks      | elapsed:    1.4s
[Parallel(n_jobs=-1)]: Done 8434 tasks      | elapsed:    1.7s
[Parallel(n_jobs=-1)]: Done 9784 tasks      | elapsed:    2.0s
[Parallel(n_jobs=-1)]: Done 11234 tasks  

KeyboardInterrupt: 

In [14]:
TimeSeriesScalerMinMax().fit_transform(ts_lst)

array([[[1.24615385],
        [1.28205128],
        [1.31794872],
        ...,
        [1.44615385],
        [1.56410256],
        [1.05128205]],

       [[1.19852941],
        [1.32720588],
        [1.29963235],
        ...,
        [1.47242647],
        [1.54227941],
        [1.04411765]],

       [[1.20186335],
        [1.28881988],
        [1.28881988],
        ...,
        [1.48447205],
        [1.50310559],
        [1.00621118]],

       ...,

       [[1.17637271],
        [1.31946755],
        [1.32445923],
        ...,
        [1.4608985 ],
        [1.48419301],
        [1.0249584 ]],

       [[1.22580645],
        [1.34017595],
        [1.29032258],
        ...,
        [1.46920821],
        [1.49266862],
        [1.07331378]],

       [[1.21097046],
        [1.30801688],
        [1.29817159],
        ...,
        [1.55414909],
        [1.51758087],
        [1.03375527]]])

In [15]:
ts_lst

array([[[ 61],
        [ 68],
        [ 75],
        ...,
        [100],
        [123],
        [ 23]],

       [[151],
        [221],
        [206],
        ...,
        [300],
        [338],
        [ 67]],

       [[ 92],
        [120],
        [120],
        ...,
        [183],
        [189],
        [ 29]],

       ...,

       [[151],
        [237],
        [240],
        ...,
        [322],
        [336],
        [ 60]],

       [[ 94],
        [133],
        [116],
        ...,
        [177],
        [185],
        [ 42]],

       [[208],
        [277],
        [270],
        ...,
        [452],
        [426],
        [ 82]]])

In [None]:
print(silhouette_score(ts_lst, ts_stdtw_model.fit_predict(to_time_series_dataset(ts_lst)), metric='softdtw', verbose=1))