## I. Preprocesamiento en PySpark


1. Lectura de archivo
2. Limpieza de datos
3. Exploración y limpieza adicional
4. Creación de features para el modelo

### 1. Lectura de archivo

In [3]:
# 1.1 Creamos la sesion
from pyspark.sql import SparkSession
from pyspark import SparkContext

SpSession = SparkSession \
  .builder \
  .appName("DBA - Proyecto Final") \
  .getOrCreate()

SpContext = SpSession.sparkContext

### 2. Limpieza de datos

In [5]:
# 2.1 Leemos los datos y los insertamos a un dataframe
inputPath = '/FileStore/tables/data.csv'
salesdf = sqlContext.read.format('csv').options(header='True').load(inputPath)
salesdf.cache() #para tener la tabla en memoria y agilizar las transformaciones
salesdf.show(5) #mostramos las primeras 5 lineas
print((salesdf.count(), len(salesdf.columns)))
print(salesdf.columns)

In [6]:
# 2.2 Contamos nulls y nans de cada columna
from pyspark.sql.functions import isnan, when, count, col #importamos librerias necesarias

for column in salesdf.columns:
  salesdf.select([count(when(isnan(column) | col(column).isNull(),column)).alias(column)]).show()
  

  

In [7]:
# 2.3 Borramos los registros que no han sido asignados a clientes identificados. El proposito es clusterizar clientes, por lo tanto, no nos sirven datos que no esten asociados a un customerId.

salesdf = salesdf.dropna(subset=['CustomerID']) #esto deja un df libre de nulls

columntype = [column.dataType for column in salesdf.schema.fields]

print(columntype) #todas las columnas fueron importas como string. editaremos esto mas adelante




In [8]:
# 2.4 Buscamos si existen filas repetidas y las eliminamos. No queremos contabilizar dos veces la misma venta.

distinctrows = salesdf.distinct().count()
allrows = salesdf.count()
print('Existe una diferencia de {} entre el total de rows y las rows unicas. Por lo tanto hay duplicados y debemos elminarlos'.format(str(allrows-distinctrows)))

salesdf_ = salesdf.dropDuplicates() #usamos esta funcion que elimina los duplicados
assert salesdf_.count() == distinctrows #validamos que la nueva tabla tenga la cantidad de filas distintas
del salesdf
salesdf = salesdf_ #almacenamos en la variables salesdf
salesdf.cache()

### 3. Exploración y limpieza adicional

**Información de las variables**

En el dataframe encontramos 8 variables:

* **InvoiceNo**: Invoice number. Dígito de 6 enteros asignado a cada transacción. Si empieza con la letra 'C', indica cancelación
* **StockCode**: Código del producto. Nominal, dígito de 5 enteros asignado a cada producto diferente.
* **Description**: Nombre del producto
* **Quantity**: Cantidades vendidas de cada producto por transacción. Numérico
* **InvoiceDate**: Fecha y hora de facturación. Numérico.
* **UnitPrice**: Precio unitario es libras esterlinas. Numérico
* **CustomerID**: Identificador del cliente. Nominal. 5 dígitos enteros
* **Country**: Nombre del país de residencia de cada cliente. Nominal.

In [11]:
# 3.1. Aprovechamos el el contexto sql para seguir explorando los datos

salesdf.groupBy("country").count().orderBy('count', ascending=False).show() #los paises de los clientes

salesdf.groupBy("Description").count().orderBy('count',ascending=False).show() #los productos que mas aparecen en la lista

salesdf.groupBy("Description").agg({"Quantity":"sum", "UnitPrice":"avg"}).orderBy('sum(Quantity)', ascending=False).show(5)
#los productos mas vendidos, con su cantidad total y precio promedio

# ¿cuantos productos, clientes y transacciones hay en la base?
productos = salesdf.select("StockCode").distinct().count()
transacciones = salesdf.select("InvoiceNo").distinct().count()
clientes = salesdf.select("CustomerID").distinct().count()

print('Productos: ', productos )
print('Transacciones: ', transacciones )
print('Clientes: ', clientes )

# ¿cuantos productos hay en cada transaccion?
salesdf.groupBy("CustomerID","InvoiceNo").count().orderBy("CustomerID").show(10) 

In [12]:
# 3.2. En la exploración de datos encontramos que existen facturas canceladas. Para reducir distorsión en el dataset, eliminaremos las facturas canceladas y sus contrapartes.

#df sin cancels
salesdf_no_cancels = salesdf.where("InvoiceNo not like '%C%' ")
salesdf_no_cancels.show()

#eliminamos contrapartes de invoice cancelados
Canceled_invoices = salesdf.select("InvoiceNo").where("InvoiceNo like '%C%' ").collect()
canceled_list = [str(row.InvoiceNo) for row in Canceled_invoices]
canceled = []

for i in canceled_list:
  canceled.append(i.strip('C')) # al borrar el 'c', estamos buscando la contraparte de las facturas
  
canceled = list(set(canceled))
salesdf_nocancels_norcounterpart = salesdf_no_cancels.filter(col("InvoiceNo").isin(canceled) == False)

#la nueva tabla ya no tiene facturas de cancelaciones ni contrapartes
salesdf_nocancels_norcounterpart.select("InvoiceNo").where("InvoiceNo like '%C%' ").show()
salesdf_nocancels_norcounterpart.filter(col("InvoiceNo").isin(canceled)).show()
del salesdf
salesdf = salesdf_nocancels_norcounterpart
salesdf.cache()


In [13]:
salesdf.show()

In [14]:
# 3.3. Preparamos las columnas para poder realizar calculos

print(salesdf.dtypes) #actualmente todas estan como string 

from pyspark.sql.types import DoubleType, TimestampType #importamos los types que usaremos
from pyspark.sql.functions import to_timestamp

# la función withColumn nos permite crear nuevas columnas directamente en el df
salesdf = salesdf.withColumn("Quantity", salesdf["Quantity"].cast(DoubleType()))\
.withColumn("UnitPrice",salesdf["UnitPrice"].cast(DoubleType()))

salesdf = salesdf.withColumn("InvoiceDate", to_timestamp(salesdf["InvoiceDate"], 'MM/dd/yyyy HH:mm'))
salesdf = salesdf.dropna(subset=['InvoiceDate']) #Algunas fechas se convierten en null al formatearlas. Mientras no arreglemos el bug, usaremos un dropna.


print(salesdf.dtypes)

salesdf.show(3)

### 4. Creación de features para el modelo

In [16]:
# 4.1. Un feature será el precio total de cada fila. Multiplicación de cantidad y precio unitario
salesdf = salesdf.withColumn("TotalPrice", col("Quantity")*col("UnitPrice"))

In [17]:
# 4.2. La descripción de cada producto será llevada a una representación vectorial para posteriormente clusterizarlas y obtener un feature para el dataset.

from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer, StopWordsRemover
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans


#dividir en palabras y luego construir tfidf
tokenizer = Tokenizer(inputCol="Description", outputCol="tokens")
remover = StopWordsRemover(inputCol="tokens", outputCol="stopWordsRemovedTokens")
hashingTF = HashingTF(inputCol="stopWordsRemovedTokens", outputCol="rawFeatures", numFeatures=50)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=3)

#kmeans para crear 5 categorias de producto
kmeans = KMeans(k=5, seed=1, featuresCol='features', predictionCol='ProductCat')

#encapsulamos todo en un pipeline
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, kmeans])
model = pipeline.fit(salesdf)

results = model.transform(salesdf)
results.cache()

#validamos los clusters
results.groupBy("ProductCat").count().show()



In [18]:
# 4.3. Creamos 5 columnas para lo gastado en cada categoria. Esto servira a la hora que aplanemos filas
from pyspark.sql import functions as F

for i in range(5):
  column_name = 'categ_'+str(i)
  results = results.withColumn( column_name, F.when((F.col("ProductCat") ==str(i)), F.col("TotalPrice")).otherwise(0))
  




In [19]:
# 4.4. Agrupamos a una linea por orden  (customerid, invoiceno)

#groupby
orderdf_ = results.groupBy("CustomerID", "InvoiceNo")\
.agg({"InvoiceDate":"avg",\
      "TotalPrice":"sum",\
     "categ_0":"sum",\
     "categ_1":"sum",\
     "categ_2":"sum",\
     "categ_3":"sum",\
     "categ_4":"sum"})


#renombramos las columnas
orderdf = orderdf_.select(col("CustomerID"),
                col("InvoiceNo"),
                col("sum(categ_0)").alias("categ_0"),
                col("sum(categ_1)").alias("categ_1"),
                col("sum(categ_2)").alias("categ_2"),
                col("sum(categ_3)").alias("categ_3"),
                col("sum(categ_4)").alias("categ_4"),
                col("sum(TotalPrice)").alias("TotalPrice"),
                col("avg(InvoiceDate)").alias("InvoiceDate"))

In [20]:
# 4.5. Agrupamos a una linea por cliente  (customerID)


orderdf.createOrReplaceTempView("order")

#group by
customerdf = sqlContext.sql("select CustomerID as id, count(InvoiceNo) as ordenes, \
                min(InvoiceDate) as f_primera_compra, \
                max(InvoiceDate) as f_ultima_compra, \
                sum(TotalPrice) as precio_total, \
                min(TotalPrice) as min_compra, \
                max(TotalPrice) as max_compra, \
                avg(TotalPrice) as prom_compra, \
                sum(categ_0) as categ_0, \
                sum(categ_1) as categ_1, \
                sum(categ_2) as categ_2, \
                sum(categ_3) as categ_3, \
                sum(categ_4) as categ_4 \
                from order group by CustomerID ")

#redondeo y renombre de columnas
from pyspark.sql.functions import round #, col
customerdf_ = customerdf.select("id", "ordenes",  \
                  "f_primera_compra", \
                  "f_ultima_compra", \
                    round(col("precio_total"),2).alias("precio_total"),
                  round(col("min_compra"),2).alias("min_compra"), 
                  round(col("max_compra"),2).alias("max_compra"),
                  round(col("prom_compra"),2).alias("prom_compra"),\
                  round(col("categ_0"),2).alias("categ_0"),\
                  round(col("categ_1"),2).alias("categ_1"),\
                  round(col("categ_2"),2).alias("categ_2"),\
                  round(col("categ_3"),2).alias("categ_3"),\
                  round(col("categ_4"),2).alias("categ_4") )

#se guarda en archivo "dataset_preprocessed.csv"

In [21]:
# 4.6. Estandarizamos las variables

from pyspark.ml.feature import StandardScaler, VectorAssembler, Normalizer


features = ['ordenes','f_primera_compra', 'f_ultima_compra', 'precio_total','min_compra','max_compra','prom_compra','categ_0',\
            'categ_1', 'categ_2', 'categ_3', 'categ_4']

#pasamos todas las feature columns a una sola columna de tipo vectores
assembler = VectorAssembler(
    inputCols=features,\
    outputCol="features")

customerdf_vector = assembler.transform(customerdf_)

#estandarizamos
scaler = StandardScaler(inputCol='features', outputCol='features_norm')
scalerModel = scaler.fit(customerdf_vector)
customerdf_norm = scalerModel.transform(customerdf_vector)

In [22]:
# 4.7. PCA para reducir a 4 dimensiones

from pyspark.ml.feature import PCA

salesPCA = PCA(k=4, inputCol = 'features_norm', outputCol = 'pcafeatures') # k para definir cantidad de dimensiones
pcaModel = salesPCA.fit(customerdf_norm)
pcaResult = pcaModel.transform(customerdf_norm).select('id', 'pcaFeatures')

pcaResult.cache
pcaResult.show()


In [23]:
# 4.8. Separamos el vector denso en 4 columnas

def extract(row):
    return (row.id, ) + tuple(row.pcaFeatures.toArray().tolist())

customerdf_split = pcaResult.rdd.map(extract).toDF(["id"]) 

customerdf_split.show()

In [24]:
# 4.9 Renombramos columnas

customerdf_final = customerdf_split.select("id",
                                     col("_2").alias("var1"),
                                     col("_3").alias("var2"),
                                     col("_4").alias("var3"),
                                     col("_5").alias("var4") 
                                          )

In [25]:
# 4.10. se lleva a objeto pandas para realizar la descarga
print((customerdf_final.count(), len(customerdf_final.columns)))
customersPandasDF = customerdf_final.toPandas()

In [26]:
display(customersPandasDF) # se usa la funcionalidad de display para exportar los datos como csv


id,var1,var2,var3,var4
15555,-1.4824188422916365,2.1783083089471464,-197.36808604529745,-21.08241074217314
15574,0.1210999253678188,1.5759363330995215,-196.22814297530215,-19.6458770029883
15634,0.4755198474888189,1.2420510090267154,-199.4993368571819,-18.892966551310195
13610,-0.072380654190681,1.6979903364012188,-197.8936422116078,-20.35739043037193
13192,0.1122821232005275,1.2220918081362964,-197.8155447088002,-19.21803288919643
14157,0.1517478061928603,1.5406409009133135,-197.86596605129856,-19.94352388583019
17686,-1.2451803495681515,1.523063301188282,-197.8532812407072,-20.304516304890623
13865,0.16327352704206,1.6065168178805038,-197.55898272290523,-19.917383571098583
16250,0.2872225671585227,1.4029339386111683,-195.63932020223024,-19.062537963730165
14204,0.5052826760827269,1.3242354297623145,-199.6956214265708,-18.8948130438989


## II. Fuzzy C-Means (FCM)
Desarrollado en MRJob en un script aparte

## III. Resultados
1. Concatenado de Datasets
1. Gráficas
2. Silhouette Score y ubicación de centroides

### 1. Concatenado de Datasets 
Se concatena el dataset resultante de MRJob con las variables originales del dataset para permitir el análisis de los clusters

In [30]:
# 1.1. Se hace lectura del dataset de membresíaa resultante del MRJob. Se transforma a un Pandas Dataframe para los gráficos que se quieren generar

results_Path = '/FileStore/tables/archivo_membresia-2.txt'
clustersdf = sqlContext.read.format('csv').options(header='False').load(results_Path)
clustersdf.count()
clustersdf = clustersdf.toPandas()
clustersdf.columns=['1','2','3']

In [31]:
# 1.2. Se crea una columna "prediction" con el cluster de mayor membresía

clusterdf = clustersdf.astype({'1': 'float','2': 'float','3': 'float'})
clusterdf['prediction'] = clusterdf.idxmax(axis=1).astype('int32')

In [32]:
# 1.3. Se concatenan ambos datasets

customerdf_ = customerdf_.toPandas() # si no se tiene el objeto customerdf_, se puede cargar el archivo "dataset_preprocessed.csv"
results = customerdf_[:clusterdf.shape[0]]
results['prediction'] = clusterdf.prediction

### 2. Gráficas
Se generan gráficas de barras, radar plots e histogramas para caracterizar los grupos encontrados

In [34]:
!pip install plotly
import numpy as np
import pandas as pd
import seaborn as sns
import plotly.express as px
import matplotlib.pyplot as plt

In [35]:
# 2.1. Tamaño de los grupos

size = pd.DataFrame(results.prediction.value_counts())
size.plot.pie('prediction')

In [36]:
# 2.2. Se calcula una tabla con valores centrales que serviran como descriptivos para caracterizar los grupos.

#Dataframe "desriptives"
descriptives = results.groupby('prediction').agg({'prom_compra':'mean','ordenes':'mean','categ_0':'mean','categ_1':'mean',
                                               'categ_2':'mean','categ_3':'mean','categ_4':'mean' })
descriptives['prediction'] = descriptives.index

display(descriptives)

prom_compra,ordenes,categ_0,categ_1,categ_2,categ_3,categ_4,prediction
497.56916585839,10.761396702230844,1033.4778273520865,1351.0106595538325,99.71350145489816,2291.703608147432,318.03774975751645,1
338.21360000000016,2.1154545454545453,142.14769696969702,183.37671818181823,16.824745454545475,300.9175090909086,44.49312121212112,2
22819.28875,57.5,29399.8225,44864.098750000005,5399.711249999999,85521.69625000001,5532.92,3


In [37]:
# 2.3. Se compara el promedio de compra de cada grupo

sns.barplot(x="prediction", y="prom_compra", data=descriptives)

In [38]:
sns.barplot(x="prediction", y="prom_compra", data=descriptives[0:2])

In [39]:
# 2.4. Se visualiza un radar plot de cada cluster según sus gastos en las 5 categorías de productos

#Cluster 1

fig = plt.figure(i)
val = descriptives[descriptives.prediction==1].values.flatten().tolist()[2:-1]
df = pd.DataFrame(dict(
  r=val,
  theta=['categ_0','categ_1','categ_2',
         'categ_3', 'categ_4']))
fig = px.line_polar(df, r='r', theta='theta', line_close=True)
fig.update_traces(fill='toself')
fig.show()

In [40]:
#Cluster 2

fig = plt.figure(i)
val = descriptives[descriptives.prediction==2].values.flatten().tolist()[2:-1]
df = pd.DataFrame(dict(
  r=val,
  theta=['categ_0','categ_1','categ_2',
         'categ_3', 'categ_4']))
fig = px.line_polar(df, r='r', theta='theta', line_close=True)
fig.update_traces(fill='toself')
fig.show()

In [41]:
#Cluster 3

fig = plt.figure(i)
val = descriptives[descriptives.prediction==3].values.flatten().tolist()[2:-1]
df = pd.DataFrame(dict(
  r=val,
  theta=['categ_0','categ_1','categ_2',
         'categ_3', 'categ_4']))
fig = px.line_polar(df, r='r', theta='theta', line_close=True)
fig.update_traces(fill='toself')
fig.show()

In [42]:
# 2.5. Distribución (histograma) de cantidad de ordenes para cada grupo

for i in sorted(results.prediction.unique().tolist()):
  x = results[results.prediction == i].ordenes
  fig = plt.figure(i)
  fig.suptitle('Distribución de cantidad de órdenes para cluster '+str(i))
  sns.distplot(x)

In [43]:
# 2.6. Gráfico de dispersión de compra mínima y máxima con los clusters representados con color

# se eliminan outliers
graph_results = results[ (results.max_compra <100000) & (results.min_compra<20000)]

fig, ax = plt.subplots()
cmap = plt.cm.get_cmap('jet')
for i, cluster in graph_results.groupby('prediction'):
    _ = ax.scatter(cluster['min_compra'], cluster['max_compra'], c=cmap(i/3), label=i)
ax.legend()

### 3. Silhouette Score y ubicación de centroides

In [45]:
# 3.1. Silhouette Score

from sklearn.metrics import silhouette_score
import warnings
warnings.simplefilter("ignore")

X = results[['id', 'ordenes', 'f_primera_compra', 'f_ultima_compra', 'precio_total',
       'min_compra', 'max_compra', 'prom_compra', 'categ_0', 'categ_1',
       'categ_2', 'categ_3', 'categ_4']]
labels = results[['prediction']]
 
ss = silhouette_score(X, labels, metric='euclidean')

print('El Silhouette Score de los custers es: {}'.format(str(np.round(ss,2)))) #-1 a +1


In [46]:
# 3.2. Leemos archivo de centroides

from sklearn.manifold import TSNE

centroides_path = '/FileStore/tables/centroides_fuzzy.txt'
centroides = sqlContext.read.format('csv').options(header='False').load(centroides_path)
centroidesdf = centroides.toPandas()
centroidesdf.columns=['var1', 'var2', 'var3', 'var4']

centroidesdf



Unnamed: 0,var1,var2,var3,var4
0,-1.1854869544215965,1.7770693234008172,-197.5048583444957,-20.52033220089467
1,0.2529802792892114,1.2670840286984526,-197.801115118013,-19.10082874668942
2,-51.8693475,0.0453825000000005,-198.32119666666668,-11.64179


In [47]:
# 3.3. Se proyectan las coordenadas de los clusters a representaciones bidimensionales con el algoritmo t-SNE

tsne = TSNE(n_components=2, verbose=1, perplexity=20, n_iter=300)
tsne_results = tsne.fit_transform(centroidesdf)

In [48]:
# 3.4. Se grafica los centroides en un scatterplot bidimensional

df_subset = pd.DataFrame(tsne_results)
df_subset.columns=['dim1', 'dim2']

plt.figure(figsize=(8,8))
sns.scatterplot(
    x='dim1', y='dim2',
    data=df_subset,
    legend="full",
    alpha=1
)

## IV. Anexos

1. Referencias
2. Trabajo Futuro

### 1. Referencias

* https://www.kaggle.com/fabiendaniel/customer-segmentation

* https://stackoverflow.com/questions/44627386/how-to-find-count-of-null-and-nan-values-for-each-column-in-a-pyspark-dataframe
  
* https://stackoverflow.com/questions/40287237/pyspark-dataframe-operator-is-not-in

* https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2281281647728580/33322862051354/347222873603598/latest.html

* http://spark.apache.org/docs/2.2.0/mllib-feature-extraction.html#word2vec

* http://spark.apache.org/docs/latest/ml-clustering.html#k-means

* Archivo "5_clustering - Databricks.pdf" en contenido del curso de Big Data Analytics

* https://stackoverflow.com/questions/43863373/tf-idf-document-clustering-with-k-means-in-apache-spark-putting-points-into-one

* http://www.datasciencemadesimple.com/round-up-round-down-and-round-off-in-pyspark-ceil-floor/

* https://stackoverflow.com/questions/38384347/how-to-split-vector-into-columns-using-pyspark

* https://plotly.com/python/radar-chart/

* https://stackoverflow.com/questions/39091515/matplotlib-does-not-show-legend-in-scatter-plot

* https://towardsdatascience.com/visualising-high-dimensional-datasets-using-pca-and-t-sne-in-python-8ef87e7915b

### 2. Future work

* Experimentar con algoritmos de clustering que produzcan grupos de cantidades iguales
* Optimizar el PCA de las variables previo al FCM
* Utilizar las categorías de clientes para producir predicciones sobre su comportamiento de compra