In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.config('spark.driver.memory','8g').appName("cross_sell").getOrCreate()
from pyspark.sql.functions import col,struct,concat,collect_list,row_number
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

from os import listdir
from functools import reduce

# Transformando os dados para parquet

In [None]:
class RawToProcessed:
    def __init__(self,spark,path_in,path_out):
        self.path_in = path_in
        self.path_out = path_out
        self.spark = spark
    def csv_to_parquet(self,filename_in,filename_out,**kwargs):
        df = self.spark.read.csv(f'{self.path_in}/{filename_in}',**kwargs)
        df.write.parquet(f'{self.path_out}/{filename_out}.parquet')

In [None]:
# Caminhos e nomes de arquivos
path_in = 'data/raw'
path_out = 'data/processed'
files = listdir(path_in)
names = [file.split('.')[0] for file in files]

In [None]:
# Instanciando classe
processor = RawToProcessed(spark,path_in,path_out)
# Transformando cada arquivo para parquet
[processor.csv_to_parquet(file,name,sep = ';',header = True) for file,name in zip(files,names)]

# Gerando Pares de produtos

In [None]:
# Lendo arquivos
files = [filename for filename in listdir("data/processed/") if filename.startswith('venda')]

In [None]:
class MakePairs:
    def __init__(self,spark,path_in = 'data/processed/',path_out = 'data/gold/'):
        self.path_in = path_in
        self.path_out = path_out
        self.spark = spark
    def read_parquet(self,file):
        # Lendo os dados
        vendas = self.spark.read.parquet(f'data/processed/{file}')
        vendas.cache()
        return vendas
    def transform(self,data):
         # Agrupamento dos produtos por compra
        grouped = data.groupBy("COD_ID_VENDA_UNICO").agg(collect_list("COD_ID_PRODUTO").alias("values"))
        # Gerando pares
        pairs = (grouped.selectExpr("COD_ID_VENDA_UNICO", "explode(values) as value_col1")
                .join(grouped.selectExpr("COD_ID_VENDA_UNICO", "explode(values) as value_col2"), "COD_ID_VENDA_UNICO"))
        # Filtrando pares únicos e fazendo count
        df_pairs_count = pairs.filter(col("value_col1") < col("value_col2")).groupby("value_col1", "value_col2").count()
        data.unpersist()
        df_pairs_count.cache()
        return df_pairs_count
    def write_parquet(self,df_pairs_count,file):
        df_pairs_count.write.parquet(f"{self.path_out}{file}")
        df_pairs_count.unpersist()
        print(f"Arquivo {file} salvo")
    def do(self,file):
        data = self.read_parquet(file)
        df_pairs_count = self.transform(data)
        self.write_parquet(df_pairs_count,file)
        print(f"Arquivo {file} salvo")
    

In [None]:
maker = MakePairs(spark)
[maker.do(x) for x in files[:2]]

# Unindo dataframes, fazendo count geral e rankeando Top 5

In [None]:
def count_from_months_rank_top_5(path = 'data/gold/vendas*.parquet/'):
    # Gerando counts totais
    vendas = spark.read.parquet(f'{path}')
    vendas_count = vendas.groupBy("value_col1","value_col2").agg({"count":"sum"}).withColumnRenamed("sum(count)", "total")
    # Gerando Ranking dos pares pela contagem
    w = Window.partitionBy("value_col1").orderBy(col("total").desc())
    ranked_df = vendas_count.withColumn("rank", row_number().over(w))
    # Pegando apenas os 5 primeiros
    top_5 = ranked_df.filter(col("rank") <= 5)
    top_5.show()

In [None]:
count_from_months_rank_top_5()