# Data Loading

In [1]:
spark.conf.set("spark.sql.shuffle.partitions", 1000)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [2]:
from pyspark.sql.types import DateType, IntegerType
from pyspark.sql.functions import from_unixtime, unix_timestamp,to_date, max, when, collect_list
from pyspark.sql.functions import col, sum, greatest, expr, count, countDistinct, concat,substring, lit, length, lpad
from pyspark.sql.types import *
from pyspark.sql.functions import broadcast
from pyspark.ml.recommendation import ALS, ALSModel
import pandas as pd
import matplotlib.pylab as plt
from pyspark.mllib.fpm import FPGrowth

from pyspark.ml.clustering import KMeans, KMeansModel, BisectingKMeans, BisectingKMeansModel
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA, PCAModel, ElementwiseProduct
from pyspark.ml import Pipeline, PipelineModel
#import datetime

In [3]:
#FUNCIONES PARA EXTRAER NULOS POR COLUMNA
def count_null(col_name):
  return sum(col(col_name).isNull().cast('integer')).alias(col_name)

def mostrarNulos(dataframe):
    # Run the aggregation. The *exprs converts the list of expressions into
    # variable function arguments.
    exprs = []
    for col_name in dataframe.columns:
      exprs.append(count_null(col_name))
    return dataframe.agg(*exprs)

In [4]:
#PARA LOCAL DEL TRABAJO
#spark = SparkSession.builder.getOrCreate()

## Buffer Ticket

#### PRODUTOFAM

In [5]:
#LECTURA DE TABLA PARQUET
dfProdutofam=spark.read.parquet("/user/sparta-server/trusted/bufferticket/PRODUTOFAM")
dfProdutofam.persist()

DataFrame[PRF_CD_SUBFAMILIA: bigint, PRF_DS_SUBFAMILIA: string, PRF_CD_FAMILIA: bigint, PRF_DT_INCLUSAO: timestamp, PRF_DT_ALTERACAO: timestamp, TM_TRUSTED: timestamp, DT_TRUSTED: int]

In [6]:
dfProdutofam.printSchema()

root
 |-- PRF_CD_SUBFAMILIA: long (nullable = true)
 |-- PRF_DS_SUBFAMILIA: string (nullable = true)
 |-- PRF_CD_FAMILIA: long (nullable = true)
 |-- PRF_DT_INCLUSAO: timestamp (nullable = true)
 |-- PRF_DT_ALTERACAO: timestamp (nullable = true)
 |-- TM_TRUSTED: timestamp (nullable = true)
 |-- DT_TRUSTED: integer (nullable = true)



In [None]:
dfProdutofam.describe().show()

In [14]:
#DETECTAR DUPLICADOS
dfProdutofam.count()

3076

In [7]:
dfProdutofam.select("PRF_CD_SUBFAMILIA").distinct().count()

3076

In [13]:
dfProdutofam=dfProdutofam.dropDuplicates(["PRF_CD_SUBFAMILIA"])

In [None]:
#CONVERSION A FECHA, PARA CUANDO SEA NECESARIO

#dfProdutofam.select("PRF_DT_INCLUSAO").cast(DateType)
#dfProdutofam.withColumn("record_date",dfProdutofam['PRF_DT_INCLUSAO'].cast(DateType())).show()
#dfProdutofam.withColumn("parsed", to_date(to_timestamp("PRF_DT_INCLUSAO", "dd/MM/yy"))).show()
dfProdutofam.withColumn("parsed", to_date(from_unixtime(unix_timestamp("PRF_DT_INCLUSAO", "dd/MM/yy")))).show()

#ESCRITURA A PARQUET

#df.write.parquet("testparquet")
#df2= spark.read.parquet("testparquet")

In [None]:
mostrarNulos(dfProdutofam).show()

In [58]:
dfProdutofam.show(5)

+-----------------+--------------------+--------------+--------------------+----------------+--------------------+----------+
|PRF_CD_SUBFAMILIA|   PRF_DS_SUBFAMILIA|PRF_CD_FAMILIA|     PRF_DT_INCLUSAO|PRF_DT_ALTERACAO|          TM_TRUSTED|DT_TRUSTED|
+-----------------+--------------------+--------------+--------------------+----------------+--------------------+----------+
|       1015552000|PAO FORMA ESP INT...|      10155520|2018-02-26 23:02:...|            null|2018-02-27 22:14:...|  20180227|
|       2020044800|         PROVOLONETE|      20200448|2018-02-26 23:02:...|            null|2018-02-27 22:14:...|  20180227|
|       1014685700|ORGANICOS SABORIZ...|      10146857|2018-02-26 23:02:...|            null|2018-02-27 22:14:...|  20180227|
|       2023201800|      BOLO CHOCOLATE|      20232018|2018-02-26 23:02:...|            null|2018-02-27 22:14:...|  20180227|
|       1011541000|ACESSO LIMPA FERR...|      10115410|2018-02-26 23:02:...|            null|2018-02-27 22:14:...|  20

In [55]:
dfProdutofam.agg(max('PRF_CD_SUBFAMILIA').alias('FECHAMAX')).show()

+----------+
|  FECHAMAX|
+----------+
|9099010100|
+----------+



#### PRODUTOFAMDEP

In [6]:
#LECTURA DE TABLA PARQUET
dfProdutofamdep=spark.read.parquet("/user/sparta-server/trusted/bufferticket/PRODUTOFAMDEP")
dfProdutofamdep.persist()

DataFrame[PRF_CD_DEPARTAMENTO: int, PRF_DS_DEPARTAMENTO: string, PRF_DT_INCLUSAO: timestamp, PRF_DT_ALTERACAO: timestamp, TM_TRUSTED: timestamp, DT_TRUSTED: int]

In [8]:
dfProdutofamdep.printSchema()

root
 |-- PRF_CD_DEPARTAMENTO: integer (nullable = true)
 |-- PRF_DS_DEPARTAMENTO: string (nullable = true)
 |-- PRF_DT_INCLUSAO: timestamp (nullable = true)
 |-- PRF_DT_ALTERACAO: timestamp (nullable = true)
 |-- TM_TRUSTED: timestamp (nullable = true)
 |-- DT_TRUSTED: integer (nullable = true)



In [20]:
#DETECTAR DUPLICADOS
dfProdutofamdep.count()

9

In [18]:
dfProdutofamdep.select("PRF_CD_DEPARTAMENTO").distinct().count()

9

In [19]:
dfProdutofamdep=dfProdutofamdep.dropDuplicates(["PRF_CD_DEPARTAMENTO"])

In [None]:
dfProdutofamdep.describe().show()

In [None]:
mostrarNulos(dfProdutofamdep).show()

#### PRODUTOFAMFAM

In [7]:
#LECTURA DE TABLA PARQUET
dfProdutofamfam=spark.read.parquet("/user/sparta-server/trusted/bufferticket/PRODUTOFAMFAM")
dfProdutofamfam.persist()

DataFrame[PRF_CD_FAMILIA: bigint, PRF_DS_FAMILIA: string, PRF_CD_GRUPO: bigint, PRF_DT_INCLUSAO: timestamp, PRF_DT_ALTERACAO: timestamp, TM_TRUSTED: timestamp, DT_TRUSTED: int]

In [10]:
dfProdutofamfam.printSchema()

root
 |-- PRF_CD_FAMILIA: long (nullable = true)
 |-- PRF_DS_FAMILIA: string (nullable = true)
 |-- PRF_CD_GRUPO: long (nullable = true)
 |-- PRF_DT_INCLUSAO: timestamp (nullable = true)
 |-- PRF_DT_ALTERACAO: timestamp (nullable = true)
 |-- TM_TRUSTED: timestamp (nullable = true)
 |-- DT_TRUSTED: integer (nullable = true)



In [None]:
#DETECTAR DUPLICADOS
dfProdutofamfam.count()==dfProdutofamfam.distinct().count()

In [None]:
dfProdutofamfam.describe().show()

In [12]:
dfProdutofamfam.count()

3079

In [10]:
dfProdutofamfam.select("PRF_CD_FAMILIA").distinct().count()

3079

In [11]:
dfProdutofamfam=dfProdutofamfam.dropDuplicates(["PRF_CD_FAMILIA"])

In [None]:
dfProdutofamfam.groupBy("PRF_CD_FAMILIA").count().where(col('count')>1).show(50)

In [None]:
mostrarNulos(dfProdutofamfam).show()

#### PRODUTOFAMGRP

In [8]:
#LECTURA DE TABLA PARQUET
dfProdutofamgrp=spark.read.parquet("/user/sparta-server/trusted/bufferticket/PRODUTOFAMGRP")
dfProdutofamgrp.persist()

DataFrame[PRF_CD_GRUPO: bigint, PRF_DS_GRUPO: string, PRF_CD_SETOR: int, PRF_DT_INCLUSAO: timestamp, PRF_DT_ALTERACAO: timestamp, TM_TRUSTED: timestamp, DT_TRUSTED: int]

In [12]:
dfProdutofamgrp.printSchema()

root
 |-- PRF_CD_GRUPO: long (nullable = true)
 |-- PRF_DS_GRUPO: string (nullable = true)
 |-- PRF_CD_SETOR: integer (nullable = true)
 |-- PRF_DT_INCLUSAO: timestamp (nullable = true)
 |-- PRF_DT_ALTERACAO: timestamp (nullable = true)
 |-- TM_TRUSTED: timestamp (nullable = true)
 |-- DT_TRUSTED: integer (nullable = true)



In [None]:
#DETECTAR DUPLICADOS
dfProdutofamgrp.count()==dfProdutofamgrp.distinct().count()

In [None]:
dfProdutofamgrp.count()

In [23]:
dfProdutofamgrp.select("PRF_CD_GRUPO").distinct().count()

335

In [25]:
dfProdutofamgrp.count()

335

In [24]:
dfProdutofamgrp=dfProdutofamgrp.dropDuplicates(["PRF_CD_GRUPO"])

In [None]:
groupList=[i.PRF_CD_GRUPO for i in dfProdutofamgrp.select('PRF_CD_GRUPO').distinct().orderBy('PRF_CD_GRUPO').collect()]

In [None]:
len(groupList)

In [None]:
dfProdutofamgrp.describe().show()

In [None]:
mostrarNulos(dfProdutofamgrp).show()

#### PRODUTOFAMSET

In [9]:
#LECTURA DE TABLA PARQUET
dfProdutofamset=spark.read.parquet("/user/sparta-server/trusted/bufferticket/PRODUTOFAMSET")
dfProdutofamset.persist()

DataFrame[PRF_CD_SETOR: int, PRF_DS_SETOR: string, PRF_CD_DEPARTAMENTO: int, PRF_DT_INCLUSAO: timestamp, PRF_DT_ALTERACAO: timestamp, TM_TRUSTED: timestamp, DT_TRUSTED: int]

In [14]:
dfProdutofamset.printSchema()

root
 |-- PRF_CD_SETOR: integer (nullable = true)
 |-- PRF_DS_SETOR: string (nullable = true)
 |-- PRF_CD_DEPARTAMENTO: integer (nullable = true)
 |-- PRF_DT_INCLUSAO: timestamp (nullable = true)
 |-- PRF_DT_ALTERACAO: timestamp (nullable = true)
 |-- TM_TRUSTED: timestamp (nullable = true)
 |-- DT_TRUSTED: integer (nullable = true)



In [None]:
#DETECTAR DUPLICADOS
dfProdutofamset.count()==dfProdutofamset.distinct().count()

In [30]:
dfProdutofamset.count()

40

In [28]:
dfProdutofamset.select("PRF_CD_SETOR").distinct().count()

40

In [29]:
dfProdutofamset=dfProdutofamset.dropDuplicates(["PRF_CD_SETOR"])

In [None]:
dfProdutofamset.describe().show()

In [None]:
mostrarNulos(dfProdutofamset).show()

#### ITEM_VAREJO

In [10]:
#LECTURA DE TABLA PARQUET
dfItemvarejo=spark.read.parquet("/user/sparta-server/trusted/bufferticket/ITEM_VAREJO")
dfItemvarejo.persist()

DataFrame[IVA_CD_ITEM: bigint, PRF_CD_SUBFAMILIA: bigint, FOR_CD_FORNECEDOR: bigint, IVA_DS_ITEM: string, IVA_DT_INSERT: timestamp, IVA_DT_UPDATE: timestamp, TM_TRUSTED: timestamp, DT_TRUSTED: int]

In [16]:
dfItemvarejo.printSchema()

root
 |-- IVA_CD_ITEM: long (nullable = true)
 |-- PRF_CD_SUBFAMILIA: long (nullable = true)
 |-- FOR_CD_FORNECEDOR: long (nullable = true)
 |-- IVA_DS_ITEM: string (nullable = true)
 |-- IVA_DT_INSERT: timestamp (nullable = true)
 |-- IVA_DT_UPDATE: timestamp (nullable = true)
 |-- TM_TRUSTED: timestamp (nullable = true)
 |-- DT_TRUSTED: integer (nullable = true)



In [None]:
#DETECTAR DUPLICADOS
dfItemvarejo.count()==dfItemvarejo.distinct().count()

In [None]:
dfItemvarejo=dfItemvarejo.dropDuplicates()

In [35]:
dfItemvarejo.count()

456658

In [33]:
dfItemvarejo.select("IVA_CD_ITEM").distinct().count()

456658

In [34]:
dfItemvarejo=dfItemvarejo.dropDuplicates(["IVA_CD_ITEM"])

In [None]:
dfItemvarejo.describe().show()

In [None]:
mostrarNulos(dfItemvarejo).show()

#### ITEM_CUPOM

In [4]:
#LECTURA DE TABLA PARQUET
dfItemcupom=spark.read.parquet("/user/sparta-server/trusted/bufferticket/ITEM_CUPOM")
# dfItemcupom=spark.read.parquet("/bkp/ITEM_CUPOM")

In [18]:
dfItemcupom.printSchema()

root
 |-- CUP_ID_CUPOM: long (nullable = true)
 |-- IVA_CD_ITEM: long (nullable = true)
 |-- CUP_NU_CUPOM: integer (nullable = true)
 |-- CUP_DT_CUPOM: timestamp (nullable = true)
 |-- ICU_QT_ITECOMPRADOS: double (nullable = true)
 |-- ICU_VL_ITENS: double (nullable = true)
 |-- ICU_FG_PROMOCAO: string (nullable = true)
 |-- ICU_FG_ITEPESADO: string (nullable = true)
 |-- ICU_DT_INSERT: timestamp (nullable = true)
 |-- ICU_DT_UPDATE: timestamp (nullable = true)
 |-- TM_TRUSTED: timestamp (nullable = true)
 |-- DT_TRUSTED: integer (nullable = true)



In [46]:
dfItemcupom.count()
#3171783077
#3176595289

3176595289

In [None]:
dfItemcupom.describe().show()

In [None]:
mostrarNulos(dfItemcupom).show()

#### CUPOM

In [5]:
#LECTURA DE TABLA PARQUET
dfCupom=spark.read.parquet("/user/sparta-server/trusted/bufferticket/CUPOM")

In [20]:
dfCupom.printSchema()

root
 |-- CUP_ID_CUPOM: long (nullable = true)
 |-- TRA_CD_TRANSACAO: long (nullable = true)
 |-- CUP_NU_CUPOM: integer (nullable = true)
 |-- CUP_CD_ESTABELECIMENTO: long (nullable = true)
 |-- CUP_CD_PDV: string (nullable = true)
 |-- CUP_DT_CUPOM: timestamp (nullable = true)
 |-- CUP_HR_CUPOM: integer (nullable = true)
 |-- CUP_VL_CUPOM: double (nullable = true)
 |-- CUP_DT_INSERT: timestamp (nullable = true)
 |-- CUP_DT_UPDATE: timestamp (nullable = true)
 |-- PES_NU_CPFCNPJ: long (nullable = true)
 |-- SITE_COD: integer (nullable = true)
 |-- FISCAL_COUPON_COD: integer (nullable = true)
 |-- TRSACT_COD: integer (nullable = true)
 |-- SALE_TERMNL_COD: integer (nullable = true)
 |-- CUP_VL_TROCO: double (nullable = true)
 |-- TM_TRUSTED: timestamp (nullable = true)
 |-- DT_TRUSTED: integer (nullable = true)



In [20]:
dfCupom.count()

545664710

In [49]:
dfCupom.select('CUP_ID_CUPOM').distinct().count()

514428341

In [18]:
dfCupom.describe('PES_NU_CPFCNPJ').show()

+-------+--------------------+
|summary|      PES_NU_CPFCNPJ|
+-------+--------------------+
|  count|           104865042|
|   mean|2.843997501977631...|
| stddev|2.499903923385492...|
|    min|                   4|
|    max|         99999986234|
+-------+--------------------+



In [None]:
mostrarNulos(dfCupom).toPandas()

In [19]:
mostrarNulos(dfCupom.select('PES_NU_CPFCNPJ')).toPandas()

Unnamed: 0,PES_NU_CPFCNPJ
0,440799668


In [None]:
#dfCupom.select("PES_NU_CPFCNPJ").show(10)

#### TRANSACAO

In [6]:
#LECTURA DE TABLA PARQUET
dfTransacao=spark.read.parquet("/user/sparta-server/trusted/bufferticket/TRANSACAO")

In [22]:
dfTransacao.printSchema()

root
 |-- TRA_CD_TRANSACAO: long (nullable = true)
 |-- TTR_CD_TIPTRANSACAO: integer (nullable = true)
 |-- CTA_CD_CONTA: long (nullable = true)
 |-- MTR_CD_MEITRANSACAO: string (nullable = true)
 |-- EST_CD_ESTABELECIMENTO: long (nullable = true)
 |-- PLA_CD_PLANO: long (nullable = true)
 |-- CUP_NU_CUPOM: long (nullable = true)
 |-- PRD_CD_PROORIGINAL: string (nullable = true)
 |-- TRA_CD_NATTRANSACAO: string (nullable = true)
 |-- TRA_DT_TRANSACAO: timestamp (nullable = true)
 |-- TRA_HR_TRANSACAO: string (nullable = true)
 |-- TRA_DT_EFETRANSACAO: timestamp (nullable = true)
 |-- TRA_VL_TRANSACAO: double (nullable = true)
 |-- TRA_VL_COMISSAO: double (nullable = true)
 |-- TRA_VL_LIQTRANSACAO: double (nullable = true)
 |-- TRA_VL_PARC_1: double (nullable = true)
 |-- TRA_VL_PARC_DEMAIS: double (nullable = true)
 |-- TRA_TP_MOETRANSACAO: string (nullable = true)
 |-- TRA_QT_PARCELAS: integer (nullable = true)
 |-- TRA_NU_AUTORIZACAO: long (nullable = true)
 |-- TRA_NU_NSU: string (n

In [51]:
dfTransacao.count()

470287150

In [52]:
dfTransacao.select("TRA_CD_TRANSACAO").distinct().count()

470287141

In [None]:
dfTransacao.describe().show()

In [None]:
mostrarNulos(dfTransacao).show()

# Predictive Tables

### Estrutura Mercadologica

In [6]:
#CARGAR LA EM (no hace faltar ejecutar lo siguiente)
dfEM=spark.read.parquet("/analytics/bc1_clustering/tables/dfEM")

dfEM.persist()

dfEM.show(5)

+-----------+-----------------+--------------------+-----------------+-----------------+--------------+--------------+------------+------------+------------+-------------------+-------------------+-------------------+
|IVA_CD_ITEM|FOR_CD_FORNECEDOR|         IVA_DS_ITEM|PRF_CD_SUBFAMILIA|PRF_DS_SUBFAMILIA|PRF_CD_FAMILIA|PRF_DS_FAMILIA|PRF_CD_GRUPO|PRF_DS_GRUPO|PRF_CD_SETOR|       PRF_DS_SETOR|PRF_CD_DEPARTAMENTO|PRF_DS_DEPARTAMENTO|
+-----------+-----------------+--------------------+-----------------+-----------------+--------------+--------------+------------+------------+------------+-------------------+-------------------+-------------------+
|    2999641|           290009|PRODUTO GENERICO ...|       5066990100|         DIVERSOS|      50669901|      DIVERSOS|      506699|       GENCO|        5066|TEXTIL - PERMANENTE|                  5|             TEXTIL|
|    2999633|           290009|PRODUTO GENERICO ...|       5066990100|         DIVERSOS|      50669901|      DIVERSOS|      5066

In [11]:
dfEMtmp = dfItemvarejo.join(dfProdutofam, dfItemvarejo.PRF_CD_SUBFAMILIA==dfProdutofam.PRF_CD_SUBFAMILIA,'left_outer') \
    .join(dfProdutofamfam, dfProdutofamfam.PRF_CD_FAMILIA==dfProdutofam.PRF_CD_FAMILIA,'left_outer') \
    .join(dfProdutofamgrp, dfProdutofamgrp.PRF_CD_GRUPO==dfProdutofamfam.PRF_CD_GRUPO,'left_outer') \
    .join(dfProdutofamset, dfProdutofamset.PRF_CD_SETOR==dfProdutofamgrp.PRF_CD_SETOR,'left_outer') \
    .join(dfProdutofamdep, dfProdutofamdep.PRF_CD_DEPARTAMENTO==dfProdutofamset.PRF_CD_DEPARTAMENTO,'left_outer') 
dfEMtmp.printSchema()

NameError: name 'dfItemvarejo' is not defined

In [37]:
dfEM=dfEMtmp.select("IVA_CD_ITEM","FOR_CD_FORNECEDOR","IVA_DS_ITEM",dfProdutofam["PRF_CD_SUBFAMILIA"],"PRF_DS_SUBFAMILIA", \
              dfProdutofamfam["PRF_CD_FAMILIA"],"PRF_DS_FAMILIA",dfProdutofamgrp["PRF_CD_GRUPO"],"PRF_DS_GRUPO", \
              dfProdutofamset["PRF_CD_SETOR"],"PRF_DS_SETOR",dfProdutofamdep["PRF_CD_DEPARTAMENTO"],"PRF_DS_DEPARTAMENTO").persist()

In [38]:
dfEM.printSchema()

root
 |-- IVA_CD_ITEM: long (nullable = true)
 |-- FOR_CD_FORNECEDOR: long (nullable = true)
 |-- IVA_DS_ITEM: string (nullable = true)
 |-- PRF_CD_SUBFAMILIA: long (nullable = true)
 |-- PRF_DS_SUBFAMILIA: string (nullable = true)
 |-- PRF_CD_FAMILIA: long (nullable = true)
 |-- PRF_DS_FAMILIA: string (nullable = true)
 |-- PRF_CD_GRUPO: long (nullable = true)
 |-- PRF_DS_GRUPO: string (nullable = true)
 |-- PRF_CD_SETOR: integer (nullable = true)
 |-- PRF_DS_SETOR: string (nullable = true)
 |-- PRF_CD_DEPARTAMENTO: integer (nullable = true)
 |-- PRF_DS_DEPARTAMENTO: string (nullable = true)



In [39]:
mostrarNulos(dfEM).toPandas()

Unnamed: 0,IVA_CD_ITEM,FOR_CD_FORNECEDOR,IVA_DS_ITEM,PRF_CD_SUBFAMILIA,PRF_DS_SUBFAMILIA,PRF_CD_FAMILIA,PRF_DS_FAMILIA,PRF_CD_GRUPO,PRF_DS_GRUPO,PRF_CD_SETOR,PRF_DS_SETOR,PRF_CD_DEPARTAMENTO,PRF_DS_DEPARTAMENTO
0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [40]:
dfEM.is_cached

True

In [41]:
dfEM.count()

456658

In [None]:
dfEM.write.parquet("/analytics/bc1_clustering/tables/dfEM")

In [12]:
dfEM.select('PRF_CD_SUBFAMILIA').distinct().count()

3017

### Retail Transactions 

In [11]:
dfCupom.where((to_date(col('CUP_DT_CUPOM'))>='2017-05-01') & (to_date(col('CUP_DT_CUPOM'))<'2017-11-01')) \
    .select('CUP_DT_CUPOM',to_date(col('CUP_DT_CUPOM')).alias('FECHA')).agg({"FECHA": "min", "CUP_DT_CUPOM": "max"}).show()

+----------+--------------------+
|min(FECHA)|   max(CUP_DT_CUPOM)|
+----------+--------------------+
|2017-05-01|2017-10-31 00:00:...|
+----------+--------------------+



In [9]:
dfCupomFilter=dfCupom.where((to_date(col('CUP_DT_CUPOM'))>='2017-05-01') & (to_date(col('CUP_DT_CUPOM'))<'2017-11-01')) \
    .select('CUP_ID_CUPOM').distinct()

In [10]:
dfCupomFilter.printSchema()

root
 |-- CUP_ID_CUPOM: long (nullable = true)



In [17]:
dfProductsTransactions = dfItemcupom.join(dfCupomFilter,dfItemcupom.CUP_ID_CUPOM==dfCupomFilter.CUP_ID_CUPOM) \
    .select(dfItemcupom.CUP_ID_CUPOM,dfItemcupom.IVA_CD_ITEM).distinct() \
    .join(dfEM,dfItemcupom.IVA_CD_ITEM==dfEM.IVA_CD_ITEM) \
    .select(dfItemcupom.CUP_ID_CUPOM,dfEM.PRF_CD_FAMILIA,dfEM.PRF_DS_FAMILIA).distinct()

In [18]:
dfProductsTransactions.printSchema()

root
 |-- CUP_ID_CUPOM: long (nullable = true)
 |-- PRF_CD_FAMILIA: long (nullable = true)
 |-- PRF_DS_FAMILIA: string (nullable = true)



In [24]:
dfFinal=dfProductsTransactions.groupBy('CUP_ID_CUPOM').agg(collect_list('PRF_DS_FAMILIA').alias('FAMILIES'))

In [25]:
dfFinal.persist().count()

109556023

In [27]:
dfFinal.show(5,False)

+------------+--------------------------------------------------------------------------------------------------------------+
|CUP_ID_CUPOM|FAMILIES                                                                                                      |
+------------+--------------------------------------------------------------------------------------------------------------+
|1469169457  |[BALAS MASTIGAVEIS, ATE 250ML GUARANA]                                                                        |
|1469169872  |[BISCOITO WAFER CHOCOLATE]                                                                                    |
|1498701371  |[MAQUINA TABLETE, LEITE UHT DESNATADO, INTIMA BASICO KIT SOUTIEN, REGULAR DESNATADO, DESENGORDURANTE APARELHO]|
|1498705855  |[LINHA NOITE FEM PIJAMA LONGO, SACOLAS PLASTICA, MEIAS ESPORTE]                                               |
|1498706949  |[ESPORTE BERMUDA FITNESS, SUCRALOSE LIQUIDO, ACIMA DE 601ML SEM GAS]                                    

In [2]:
dfFinal.write.parquet("/analytics/bc3_mba/tables/dfFinalMBA", mode='overwrite')

In [1]:
dfFinal=spark.read.parquet("/analytics/bc3_mba/tables/dfFinalMBA")

In [8]:
dfSample=dfFinal.sample(fraction=0.1,withReplacement=False)

In [9]:
dfSample.persist().count()

10956311

In [10]:
dfSample.write.parquet("/analytics/bc3_mba/tables/dfSample10M", mode='overwrite')

In [83]:
#dfSample.select('FAMILIES').coalesce(1).write.csv('/analytics/bc3_mba/tables/dfSamplecsv')

Py4JJavaError: An error occurred while calling o1971.csv.
: java.lang.UnsupportedOperationException: CSV data source does not support array<string> data type.
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.org$apache$spark$sql$execution$datasources$csv$CSVFileFormat$$verifyType$1(CSVFileFormat.scala:233)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$verifySchema$1.apply(CSVFileFormat.scala:237)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$verifySchema$1.apply(CSVFileFormat.scala:237)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at org.apache.spark.sql.types.StructType.foreach(StructType.scala:96)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.verifySchema(CSVFileFormat.scala:237)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.prepareWrite(CSVFileFormat.scala:121)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:108)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
	at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:579)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)


In [28]:
dfSample=spark.read.parquet("/analytics/bc3_mba/tables/dfSample10M")

In [58]:
testLista=dfSample.rdd.map(lambda x: list(set(x.FAMILIES))).collect()

In [None]:
testLista

In [60]:
data=sc.parallelize(testLista)

In [11]:
testFPG=FPGrowth.train(dfSample.rdd.map(lambda x: list(set(x.FAMILIES))),0.005,400)

In [12]:
testFPG.save(sc,'/analytics/bc3_mba/models/testFPG10M')

In [92]:
result = testFPG.freqItemsets().collect()
for fi in result:
    print(fi)

FreqItemset(items=['APARELHO DESCARTAVEL'], freq=7998)
FreqItemset(items=['APARELHO DESCARTAVEL', 'CARIOCA'], freq=1262)
FreqItemset(items=['APARELHO DESCARTAVEL', 'REFINADO ATE 1KG'], freq=1234)
FreqItemset(items=['APARELHO DESCARTAVEL', 'CREME DENTAL MULTIBENEFICIO'], freq=1474)
FreqItemset(items=['APARELHO DESCARTAVEL', 'BRANCO ACIMA 1KG'], freq=1320)
FreqItemset(items=['APARELHO DESCARTAVEL', 'ACHOCOLATADO REGULAR'], freq=1114)
FreqItemset(items=['APARELHO DESCARTAVEL', 'COM SAL REGULAR'], freq=1355)
FreqItemset(items=['APARELHO DESCARTAVEL', 'TRADICIONAL TP'], freq=1316)
FreqItemset(items=['APARELHO DESCARTAVEL', 'LEITE UHT INTEGRAL'], freq=2064)
FreqItemset(items=['APARELHO DESCARTAVEL', 'CEBOLA'], freq=1189)
FreqItemset(items=['APARELHO DESCARTAVEL', 'REFOGADOS TRADICIONAL'], freq=1466)
FreqItemset(items=['APARELHO DESCARTAVEL', 'MASSAS INSTANTANEAS INDIVIDUAL'], freq=1176)
FreqItemset(items=['APARELHO DESCARTAVEL', 'ROUPA COMUM PO'], freq=1739)
FreqItemset(items=['APARELHO DESC

In [93]:
len(result)

21883

In [126]:
dir()

['ALS',
 'ALSModel',
 'ArrayType',
 'BinaryType',
 'BisectingKMeans',
 'BisectingKMeansModel',
 'BooleanType',
 'ByteType',
 'Comm',
 'DataType',
 'DateType',
 'DecimalType',
 'DoubleType',
 'ElementwiseProduct',
 'FloatType',
 'In',
 'IntegerType',
 'KMeans',
 'KMeansModel',
 'LongType',
 'Magics',
 'MapType',
 'ModelRepositoryMagics',
 'MyClass',
 'NullType',
 'Out',
 'PCA',
 'PCAModel',
 'Path',
 'Pipeline',
 'PipelineModel',
 'RankBasedEvaluator',
 'RankBasedEvaluator2',
 'SQLContext',
 'ShortType',
 'SimpleSparkSerializer',
 'SparkContext',
 'SparkSession',
 'StandardScaler',
 'StorageLevel',
 'StringType',
 'StructField',
 'StructType',
 'TimestampType',
 'VectorAssembler',
 '_',
 '_101',
 '_108',
 '_113',
 '_115',
 '_119',
 '_120',
 '_121',
 '_124',
 '_30',
 '_36',
 '_38',
 '_39',
 '_40',
 '_42',
 '_44',
 '_45',
 '_47',
 '_49',
 '_5',
 '_50',
 '_51',
 '_53',
 '_65',
 '_81',
 '_85',
 '_94',
 '_95',
 '_96',
 '_97',
 '__',
 '___',
 '__builtin__',
 '__builtins__',
 '__doc__',
 '__lo