Célula para importação

In [159]:
from pyspark.sql.functions import sum, avg, count, when, col, std, format_number, year, month, day, desc, asc, cast,mean, expr
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import IntegerType, StringType, DoubleType, TimestampType, FloatType, LongType
import pandas as pd
import numpy as np
import statistics as st
import os

Célula para pegar o caminho raiz do arquivo

In [160]:
#Pega a pasta atual em que o arquivo se encontra
RAIZ: str = os.getcwd()
#Junta o caminho raiz com o arquivo desejado
BASE_DIR: str = os.path.join(RAIZ, 'Retail_Transaction.xlsx')
#Importa o arquivo, como é CSV é necessário separar por vírgula

Inicialização do Pyspark junto com a importação do arquivo com pandas para mais na frente converter para spark

In [161]:
# Criar uma SparkSession
spark: SparkSession = SparkSession.builder \
    .appName("Consumo") \
    .getOrCreate()
# Ler dados de um arquivo excel
df: pd.DataFrame  = pd.read_excel(BASE_DIR, sheet_name='Sheet1')

Formatação prévia das datas devido a problemas que podem acontecer no futuro 

In [162]:
# transforma o dataframe em uma lista devido a problemas com formatação
transacao_datas: list[str] | list[list[str]] = list(df['TransactionDate'])
# separa os valores das datas
for index, datas in enumerate(transacao_datas):
    transacao_datas[index]: list[str] | str = datas.split("/")
# caso a data ou mês só tenha 1 digito, adiciona um 0 na frfente
for index, datas in enumerate(transacao_datas):
    for internal_index, valor in enumerate(datas):
        if len(valor) < 2:
            transacao_datas[index][internal_index] : str = f"0{valor}"
# junta as datas
for index, datas in enumerate(transacao_datas):
    transacao_datas[index]: list[str] | str = "/".join(datas)
# transforma as datas em uma coluna dataframe
df_datas = pd.DataFrame({"TransactionDate": transacao_datas})
# atribui as datas a coluna do datrame
df['TransactionDate'] = df_datas
# converte as datas pra datetime
df['TransactionDate'] = pd.to_datetime(df.TransactionDate)

Inspeção Inicial de dados

In [163]:
# Converte um dataframe pandas para um dataframe spark
Transaction: DataFrame = spark.createDataFrame(df)
# Mostrar os primeiros registros do DataFrame
Transaction.show(5)

# Mostrar o esquema do DataFrame
Transaction.printSchema()

+----------+----------+---------+--------+-----------+-------------------+-------------+--------------------+---------------+------------------+-----------+
|Unnamed: 0|CustomerID|ProductID|Quantity|      Price|    TransactionDate|PaymentMethod|       StoreLocation|ProductCategory|DiscountApplied(%)|TotalAmount|
+----------+----------+---------+--------+-----------+-------------------+-------------+--------------------+---------------+------------------+-----------+
|         0|    109318|        C|       7|80.07984415|2023-12-26 12:32:00|         Cash|176 Andrew Cliffs...|          Books|        18.6770995|455.8627638|
|         1|    993229|        C|       4|75.19522942|2023-08-05 00:00:00|         Cash|11635 William Wel...|     Home Decor|       14.12136502|258.3065464|
|         2|    579675|        A|       8|31.52881648|2024-03-11 18:51:00|         Cash|910 Mendez Ville ...|          Books|       15.94370066|212.0156509|
|         3|    799826|        D|       5|98.88021828|2023

Limpeza e Tratamento de dados

In [164]:
# Contar valores nulos por coluna
Transaction.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()
# Remover linhas com qualquer valor nulo
Transaction_Clean: DataFrame = Transaction.dropna()
# Remove duplicados
Transaction_Clean: DataFrame = Transaction_Clean.dropDuplicates()
# Filtrar linhas com quantidade ou preço negativos
Transaction_Clean: DataFrame = Transaction_Clean.filter((Transaction_Clean["Quantity"] > 0) & (Transaction_Clean["Price"] > 0) & (Transaction_Clean["DiscountApplied(%)"] >= 0))
# formata as colunas que possuem números reais para possuir no máximo 2 casas decimais e também converte o tipo das determinadas columas para número real
Transaction_Clean: DataFrame = (Transaction_Clean.withColumn("Price", col("Price").cast("float").withColumn("Price", format_number(Transaction_Clean["Price"], 2)))
                                .withColumn("DiscountApplied(%)", col("DiscountApplied(%)").cast("float")).withColumn("DiscountApplied(%)", format_number(Transaction_Clean["DiscountApplied(%)"], 2))
                                .withColumn("TotalAmount"), col("TotalAmount").cast("float")).withColumn("TotalAmount", format_number(Transaction_Clean["TotalAmount"], 2))

+----------+----------+---------+--------+-----+---------------+-------------+-------------+---------------+------------------+-----------+
|Unnamed: 0|CustomerID|ProductID|Quantity|Price|TransactionDate|PaymentMethod|StoreLocation|ProductCategory|DiscountApplied(%)|TotalAmount|
+----------+----------+---------+--------+-----+---------------+-------------+-------------+---------------+------------------+-----------+
|         0|         0|        0|       0|    0|              0|            0|            0|              0|                 0|          0|
+----------+----------+---------+--------+-----+---------------+-------------+-------------+---------------+------------------+-----------+



TypeError: 'Column' object is not callable

In [None]:
Transaction_Clean.show()
Transaction_Clean.printSchema()

Calcular o total de vendas por categoria de produto

In [None]:
Transaction_Agrupado: DataFrame = Transaction_Clean.groupBy("ProductCategory").agg(sum("TotalAmount").alias("TotalSales"))
Transaction_Agrupado.show()

Calcular a média de descontos aplicados por método de pagamento

In [None]:
Transaction_Descontos = Transaction_Clean.groupBy("PaymentMethod").agg(avg("DiscountApplied(%)").alias("AverageDiscount"))
Transaction_Descontos.show()

In [None]:
# Adicionar colunas de ano e mês
Transaction_Clean = Transaction_Clean.withColumn("Year", year("TransactionDate"))
Transaction_Clean = Transaction_Clean.withColumn("Month", month("TransactionDate"))
Transaction_Clean = Transaction_Clean.withColumn("Day", day("TransactionDate"))
# Calcular vendas mensais
Transaction_venda_diaria = Transaction_Clean.groupBy("Month", "Day").agg(sum("TotalAmount").alias("DailySales")).orderBy(desc("DailySales"))
Transaction_venda_diaria.show()

In [92]:
Transaction_venda_mensal = Transaction_Clean.groupBy("Year", "Month").agg(sum("TotalAmount").alias("MonthlySales")).orderBy(desc("MonthlySales"))
Transaction_venda_mensal.show()

+----+-----+------------------+
|Year|Month|      MonthlySales|
+----+-----+------------------+
|2023|    7|2132550.4899999998|
|2024|    1| 2128345.700000001|
|2023|   12|2125650.6799999992|
|2023|    8|2109352.3199999994|
|2024|    3|        2108247.75|
|2023|    5|2099576.0999999996|
|2023|    6|2066364.4400000002|
|2023|   11|        2051277.17|
|2023|    9|2050334.5499999998|
|2023|   10|2049451.0400000003|
|2024|    2|1973154.1399999997|
|2024|    4|1878349.4199999988|
|2023|    4|          60840.85|
+----+-----+------------------+



In [None]:
numeric_cols: list = []
categorical_cols: list = []

Transaction_schema = Transaction_Clean.schema
# Separar colunas com base no tipo de dado
for field in Transaction_schema:
    if isinstance(field.dataType, (IntegerType, DoubleType, FloatType, LongType)):
        numeric_cols.append(field.name)
    elif isinstance(field.dataType, (StringType, TimestampType)):
        categorical_cols.append(field.name)

numeric_cols.remove('Unnamed: 0')
numeric_cols.remove("CustomerID")
Transaction_numerico = Transaction_Clean.select(numeric_cols)
Transaction_categorico = Transaction_Clean.select(categorical_cols)

In [165]:
def calcula_outlier(dataframe: DataFrame, col_name: str) -> tuple:
    q1 = dataframe.approxQuantile(col_name, [0.25], 0.01)[0]
    q3 = dataframe.approxQuantile(col_name, [0.75], 0.01)[0]
    iqr = q3 - q1
    outliers_inferior = q1 - 1.5 * iqr
    outliers_superior = q3 + 1.5 * iqr
    return outliers_inferior, outliers_superior

# Calcular os limites para cada coluna numérica
outliers_limite = {col_name: calcula_outlier(Transaction_numerico, col_name) for col_name in numeric_cols}
print(outliers_limite)

{'Quantity': (-3.0, 13.0), 'Price': (-34.145917880000006, 143.66259860000002), 'DiscountApplied(%)': (-9.8155356225, 29.6791015975), 'TotalAmount': (-299.96853877499996, 750.2632981449999)}


In [166]:
# Função para identificar outliers
def identifica_outlier(dataframe: DataFrame, col_name: str, outliers_inferiores: float, outliers_superiores: float) -> DataFrame:
    return dataframe.filter((col(col_name) < outliers_inferiores) | (col(col_name) > outliers_superiores))

# Identificar e mostrar os outliers
outliers = {}
for col_name, (outlier_inferior, outlier_superior) in outliers_limite.items():
    transaction_outlier = identifica_outlier(Transaction_numerico, col_name, outlier_inferior, outlier_superior)
    outliers[col_name] = transaction_outlier

# Mostrar os outliers para cada coluna
for col_name, transaction_outlier in outliers.items():
    print(f"Outliers na coluna {col_name}:")
    transaction_outlier.show()

Outliers na coluna Quantity:
+--------+-----+------------------+-----------+
|Quantity|Price|DiscountApplied(%)|TotalAmount|
+--------+-----+------------------+-----------+
+--------+-----+------------------+-----------+

Outliers na coluna Price:
+--------+-----+------------------+-----------+
|Quantity|Price|DiscountApplied(%)|TotalAmount|
+--------+-----+------------------+-----------+
+--------+-----+------------------+-----------+

Outliers na coluna DiscountApplied(%):
+--------+-----+------------------+-----------+
|Quantity|Price|DiscountApplied(%)|TotalAmount|
+--------+-----+------------------+-----------+
+--------+-----+------------------+-----------+

Outliers na coluna TotalAmount:
+--------+-----------+------------------+-----------+
|Quantity|      Price|DiscountApplied(%)|TotalAmount|
+--------+-----------+------------------+-----------+
|       9| 90.3351513|       5.341062648|769.5926485|
|       9|96.09278818|       3.435480217|835.1238551|
|       9| 98.7028445|   

In [157]:
Transaction_media: list = [int(Transaction_Clean.select(mean(column).alias("Média")).collect()[0]['Média'])for column in numeric_cols]
Transaction_desvio_padrao: list = [int(Transaction_Clean.select(std(column).alias("Desvio Padrão")).collect()[0]["Desvio Padrão"]) for column in numeric_cols]
print(Transaction_media)
print(Transaction_desvio_padrao)

Py4JJavaError: An error occurred while calling o2548.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 420.0 failed 1 times, most recent failure: Lost task 2.0 in stage 420.0 (TID 1617) (DESKTOP-1593J9O executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:135)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199)
	at java.net.ServerSocket.implAccept(ServerSocket.java:560)
	at java.net.ServerSocket.accept(ServerSocket.java:528)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 34 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:135)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199)
	at java.net.ServerSocket.implAccept(ServerSocket.java:560)
	at java.net.ServerSocket.accept(ServerSocket.java:528)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 34 more
