## Teste das bibliotecas de Clustering de Time-Series

In [1]:
from dtaidistance import dtw
from dtaidistance import dtw_visualisation as dtwvis
from dtaidistance import clustering
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
from pyspark.sql.functions import col
import numpy as np
import pandas as pd

In [2]:
# criar contexto e configuração para o Spark
conf = SparkConf().setAppName("Covid-19 BRAZIL")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

In [3]:
# ler arquivo de registro em cartorio de obitos de pessoas naturais
obito_cartorio = sqlContext.read.csv('../datasets/obito_cartorio.csv', 
                                     sep=',', 
                                     header=True)
obito_cartorio = obito_cartorio.select("date","state","deaths_covid19", "new_deaths_covid19")

In [14]:
from datetime import datetime
today = str(datetime.today().strftime('%Y-%m-%d'))
date_first_case = '2020-03-01'
obito_cartorio = obito_cartorio.filter(obito_cartorio.date >= date_first_case)
obito_cartorio = obito_cartorio.filter(obito_cartorio.date <= today)

In [15]:
obito_cartorio = obito_cartorio.fillna({'deaths_covid19':0})
obito_cartorio = obito_cartorio.fillna({'new_deaths_covid19':0})
obito_cartorio.summary().show()

+-------+----------+-----+-----------------+------------------+
|summary|      date|state|   deaths_covid19|new_deaths_covid19|
+-------+----------+-----+-----------------+------------------+
|  count|      2376| 2376|             2376|              2376|
|   mean|      null| null| 235.618265993266| 9.734848484848484|
| stddev|      null| null|842.2509026768068|29.184804658700543|
|    min|2020-03-01|   AC|                0|                 0|
|    25%|      null| null|              0.0|               0.0|
|    50%|      null| null|              2.0|               1.0|
|    75%|      null| null|             82.0|               5.0|
|    max|2020-05-27|   TO|               99|                98|
+-------+----------+-----+-----------------+------------------+



In [16]:
# converte colunas de datas em linhas com melt
obito_cartorio_pd = obito_cartorio.toPandas()
obito_cartorio_pd

Unnamed: 0,date,state,deaths_covid19,new_deaths_covid19
0,2020-05-27,AC,0,0
1,2020-05-27,AL,0,0
2,2020-05-27,AM,0,0
3,2020-05-27,AP,0,0
4,2020-05-27,BA,0,0
...,...,...,...,...
2371,2020-03-01,RS,0,0
2372,2020-03-01,SC,0,0
2373,2020-03-01,SE,0,0
2374,2020-03-01,SP,0,0


In [17]:
# converte tipos das colunas para os formatos corretos
obito_cartorio_pd.date = pd.to_datetime(obito_cartorio_pd.date)
obito_cartorio_pd.new_deaths_covid19 = obito_cartorio_pd.new_deaths_covid19.astype(int)
obito_cartorio_pd.deaths_covid19 = obito_cartorio_pd.deaths_covid19.astype(int)

In [18]:
# Recria Spark dataframes, com estrtutura nova
schema_jhu = StructType([StructField('date', TimestampType(), True),
                         StructField('state', StringType(), True),
                         StructField('deaths_covid19', IntegerType(), True),
                         StructField('new_deaths_covid19', IntegerType(), True)])

OC = sqlContext.createDataFrame(obito_cartorio_pd, schema=schema_jhu)

In [19]:
# Criar coluna com qtde_dias desde o ínicio, pra cada país
column_list = ['state', 'new_deaths_covid19', 'deaths_covid19']
OC = OC.select("state","new_deaths_covid19", "deaths_covid19",
               F.row_number().over(Window.partitionBy([col(x) for x in column_list])\
                                   .orderBy(OC['date'])).alias("num_days"))

In [20]:
# Criar Janela de Dados por País, Província e Numero de Dias (poderia ser a data também)
my_window = Window.partitionBy("state").orderBy("num_days")

In [21]:
# criar um dataframe, convertendo num_days em colunas (pivot)
clustering_pivot_df = OC.groupby("state").pivot("num_days").sum("new_deaths_covid19")

In [22]:
#clustering_array_np = np.array(clustering_pivot_df.select(clustering_pivot_df.columns[1:]).limit(20).collect(), dtype=np.double)
clustering_array_np = np.array(clustering_pivot_df.select(clustering_pivot_df.columns[1:]).collect(), dtype=np.double)
clustering_array_np.shape

(27, 77)

In [23]:
# imprimir array sem notação científica
np.set_printoptions(suppress=True)

In [None]:
# Clustering
model1 = clustering.Hierarchical(dtw.distance_matrix_fast, {})
cluster_idx1 = model1.fit(clustering_array_np)

1629986it [00:44, 36182.96it/s]       

In [None]:
# Augment Hierarchical object to keep track of the full tree
model2 = clustering.HierarchicalTree(model1)
cluster_idx2 = model2.fit(clustering_array_np)

In [None]:
# SciPy linkage clustering
model3 = clustering.LinkageTree(dtw.distance_matrix_fast, {})
cluster_idx3 = model3.fit(clustering_array_np)

In [None]:
#model3.plot("plot.png")
model1.plot()
model2.plot()
model3.plot()