In [0]:
%fs ls FileStore/tables/

path,name,size,modificationTime
dbfs:/FileStore/tables/clientes_transform-1.csv,clientes_transform-1.csv,74822,1666835411000
dbfs:/FileStore/tables/clientes_transform.csv,clientes_transform.csv,74822,1666835311000
dbfs:/FileStore/tables/compras_transform-1.csv,compras_transform-1.csv,78535,1666835411000
dbfs:/FileStore/tables/compras_transform.csv,compras_transform.csv,78535,1666835311000
dbfs:/FileStore/tables/estados_transform-1.csv,estados_transform-1.csv,551,1666835412000
dbfs:/FileStore/tables/estados_transform.csv,estados_transform.csv,551,1666835312000
dbfs:/FileStore/tables/produtos_transform-1.csv,produtos_transform-1.csv,3680,1666835412000
dbfs:/FileStore/tables/produtos_transform.csv,produtos_transform.csv,3680,1666835312000


In [0]:
df_cliente = spark.read.csv("dbfs:/FileStore/tables/clientes_transform.csv",header="true",inferSchema="true")
df_compra = spark.read.csv("dbfs:/FileStore/tables/compras_transform.csv",header="true",inferSchema="true")
df_estado = spark.read.csv("dbfs:/FileStore/tables/estados_transform.csv",header="true",inferSchema="true")
df_produto = spark.read.csv("dbfs:/FileStore/tables/produtos_transform.csv",header="true",inferSchema="true")

In [0]:
df_cliente.createOrReplaceTempView("cliente")
df_compra.createOrReplaceTempView("compras")
df_estado.createOrReplaceTempView("estados")
df_produto.createOrReplaceTempView("produtos")

In [0]:
spark.sql("SELECT * FROM cliente LIMIT 1").show()
spark.sql("SELECT * FROM compras LIMIT 1").show()
spark.sql("SELECT * FROM estados LIMIT 1").show()
spark.sql("SELECT * FROM produtos LIMIT 1").show()

+---+-----------+----+-----+----------+------------+----------+----------+--------+
|_c0|cod_cliente|sexo|idade|qtd_filhos|estado_civil|cod_estado|hipertenso|diabetes|
+---+-----------+----+-----+----------+------------+----------+----------+--------+
|  0|          1|   1|   26|         2|           3|        13|         0|       1|
+---+-----------+----+-----+----------+------------+----------+----------+--------+

+---+-----------+-----------+-----------+--------------+------------------+
|_c0|cod_cliente|cod_produto|qtd_produto|valor_unitario|valor_total_compra|
+---+-----------+-----------+-----------+--------------+------------------+
|  0|          1|       1011|          3|          5.99|             17.97|
+---+-----------+-----------+-----------+--------------+------------------+

+---+----------+-----------+----------+
|_c0|cod_estado|nome_estado|sgl_estado|
+---+----------+-----------+----------+
|  0|         1|       Acre|        AC|
+---+----------+-----------+----------

In [0]:
df_geral = spark.sql("SELECT cli.cod_cliente, cli.sexo, cli.idade, cli.qtd_filhos, cli.estado_civil, cli.cod_estado, cli.hipertenso, cli.diabetes, com.cod_produto, com.qtd_produto, com.valor_unitario, com.valor_total_compra, pro.nome_produto, pro.classe_produto from cliente as cli JOIN compras as com ON cli.cod_cliente == com.cod_cliente JOIN produtos as pro ON pro.cod_produto == com.cod_produto")
df_geral.createOrReplaceTempView("compra_inform")

In [0]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import VectorAssembler

df_cluster = spark.sql("SELECT * FROM compra_inform")
df_cluster.printSchema()


root
 |-- cod_cliente: integer (nullable = true)
 |-- sexo: integer (nullable = true)
 |-- idade: integer (nullable = true)
 |-- qtd_filhos: integer (nullable = true)
 |-- estado_civil: integer (nullable = true)
 |-- cod_estado: integer (nullable = true)
 |-- hipertenso: integer (nullable = true)
 |-- diabetes: integer (nullable = true)
 |-- cod_produto: integer (nullable = true)
 |-- qtd_produto: integer (nullable = true)
 |-- valor_unitario: double (nullable = true)
 |-- valor_total_compra: double (nullable = true)
 |-- nome_produto: string (nullable = true)
 |-- classe_produto: string (nullable = true)



In [0]:
assembler = VectorAssembler(inputCols=["idade", "valor_total_compra"],outputCol="features")
output = assembler.transform(df_cluster)
kmeans = KMeans().setK(4).setSeed(0)
model = kmeans.fit(output)


In [0]:
predictions = model.transform(output)
predictions.createOrReplaceTempView("predictions")

In [0]:
spark.sql("SELECT * FROM predictions")

Out[64]: DataFrame[cod_cliente: int, sexo: int, idade: int, qtd_filhos: int, estado_civil: int, cod_estado: int, hipertenso: int, diabetes: int, cod_produto: int, qtd_produto: int, valor_unitario: double, valor_total_compra: double, nome_produto: string, classe_produto: string, features: vector, prediction: int]

## Qual é o desvio padrão dos valores de produtos do dataset de produtos cuja classe do alimento é bebidas?

In [0]:
spark.sql("SELECT STD(valor_unitario) FROM predictions WHERE classe_produto == 'Bebidas'").show()


+-------------------+
|std(valor_unitario)|
+-------------------+
| 1.6376843439326543|
+-------------------+



## Qual a proporção de homens e mulheres hipertensos que compraram produtos da classe "Alimentação"?

In [0]:
spark.sql("SELECT COUNT(*) as homem FROM predictions as h  WHERE h.sexo == 1").show()
spark.sql("SELECT COUNT(*) as mulher FROM predictions as m  WHERE m.sexo == 0").show()
spark.sql("SELECT COUNT(*) as total FROM predictions").show()

+-----+
|homem|
+-----+
| 1490|
+-----+

+------+
|mulher|
+------+
|  1500|
+------+

+-----+
|total|
+-----+
| 2990|
+-----+



## Após aplicar o algoritmo de agrupamento, gere uma estatística com describe da variável valor total compra agrupando os resultados por cluster

In [0]:
spark.sql("SELECT prediction, COUNT(valor_total_compra) as cont, AVG(valor_total_compra) as mean, STD(valor_total_compra) as desvio_padrao, MAX(valor_total_compra) as max, MIN(valor_total_compra) as min FROM predictions GROUP BY prediction").show()

+----------+----+------------------+------------------+------+------+
|prediction|cont|              mean|     desvio_padrao|   max|   min|
+----------+----+------------------+------------------+------+------+
|         1|  39| 204.1458974358975|29.897137137890564|259.96|170.97|
|         3|1373| 14.33136926438447|10.769994627452329|  54.0|  1.25|
|         2| 199| 86.20175879396979| 26.90161705969317| 140.0|  50.0|
|         0|1379|13.333386511965106| 9.921838994420185|  50.0|  1.25|
+----------+----+------------------+------------------+------+------+



## Quantidade de solteiros e hipertensos estão presentes no cluster de identificador 3?

In [0]:
spark.sql("SELECT COUNT(*) FROM predictions WHERE (estado_civil == 0 and hipertenso == 1) and prediction == 3").show()

+--------+
|count(1)|
+--------+
|     160|
+--------+



## Qual é o número total de diabéticos para o cluster de identificador 0

In [0]:
spark.sql("SELECT COUNT(*) FROM predictions WHERE diabetes == 1 and prediction == 0").show()

+--------+
|count(1)|
+--------+
|     689|
+--------+

