<a href="https://colab.research.google.com/github/Samgomes2510/dashborad-ecommerce/blob/main/2_PreprocessamentoDados.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark



In [2]:
from google.colab import files
uploaded = files.upload()

Saving avaliacoes_pedido.csv to avaliacoes_pedido.csv
Saving clientes.csv to clientes.csv
Saving itens_pedido.csv to itens_pedido.csv
Saving pagamentos_pedido.csv to pagamentos_pedido.csv
Saving pedidos.csv to pedidos.csv
Saving produtos.csv to produtos.csv
Saving vendedores.csv to vendedores.csv


In [4]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, min, max
from pyspark.ml.feature import StringIndexer, MinMaxScaler, PCA, VectorAssembler

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [7]:
spark = SparkSession.builder.getOrCreate()

In [8]:
produtos = spark.read.csv('produtos.csv', header=True, inferSchema=True)
pagamentos_pedido = spark.read.csv('pagamentos_pedido.csv', header=True, inferSchema=True)

In [9]:
# Valores categóricos
indexador = StringIndexer(inputCol='categoria_produto', outputCol='categoria_produto_index')
modelo_indexer = indexador.fit(produtos)
produtos_indexado_df = modelo_indexer.transform(produtos)


produtos_indexado_df.show()


+--------------------+--------------------+--------------------+-------------------------+------------------------+--------------+----------------------+-----------------+------------------+-----------------------+
|          id_produto|   categoria_produto|tamanho_nome_produto|tamanho_descricao_produto|quantidade_fotos_produto|peso_produto_g|comprimento_produto_cm|altura_produto_cm|largura_produto_cm|categoria_produto_index|
+--------------------+--------------------+--------------------+-------------------------+------------------------+--------------+----------------------+-----------------+------------------+-----------------------+
|1e9e8ef04dbcff454...|          perfumaria|                  40|                      287|                       1|           225|                    16|               10|                14|                   11.0|
|3aa071139cb16b67c...|               artes|                  44|                      276|                       1|          1000|          

In [10]:
# Normalização manual
min_val, max_val = pagamentos_pedido.select(min('valor_pagamento'), max('valor_pagamento')).first()
pagamentos_normalizado_df = pagamentos_pedido.withColumn('valor_pagamento_normalizado', (col('valor_pagamento') - min_val) / (max_val - min_val))

min_parc, max_parc = pagamentos_normalizado_df.select(min('parcelas_pagamento'), max('parcelas_pagamento')).first()
pagamentos_normalizado_df = pagamentos_normalizado_df.withColumn('parcelas_pagamento_normalizado', (col('parcelas_pagamento') - min_parc) / (max_parc - min_parc))
pagamentos_normalizado_df.show()


+--------------------+-------------------+--------------+------------------+---------------+---------------------------+------------------------------+
|           id_pedido|sequencia_pagamento|tipo_pagamento|parcelas_pagamento|valor_pagamento|valor_pagamento_normalizado|parcelas_pagamento_normalizado|
+--------------------+-------------------+--------------+------------------+---------------+---------------------------+------------------------------+
|b81ef226f3fe1789b...|                  1|   credit_card|                 8|          99.33|       0.007269424652080491|            0.3333333333333333|
|a9810da82917af2d9...|                  1|   credit_card|                 1|          24.39|       0.001784971984941...|          0.041666666666666664|
|25e8ea4e93396b6fa...|                  1|   credit_card|                 1|          65.71|       0.004808958963940492|          0.041666666666666664|
|ba78997921bbcdc13...|                  1|   credit_card|                 8|         107

In [11]:
montar_vetor = VectorAssembler(
    inputCols=["peso_produto_g", "comprimento_produto_cm", "altura_produto_cm", "largura_produto_cm"],
    outputCol="caracteristicas")
produtos_caracteristicas_df = montar_vetor.transform(produtos)
produtos_caracteristicas_df.show()

+--------------------+--------------------+--------------------+-------------------------+------------------------+--------------+----------------------+-----------------+------------------+--------------------+
|          id_produto|   categoria_produto|tamanho_nome_produto|tamanho_descricao_produto|quantidade_fotos_produto|peso_produto_g|comprimento_produto_cm|altura_produto_cm|largura_produto_cm|     caracteristicas|
+--------------------+--------------------+--------------------+-------------------------+------------------------+--------------+----------------------+-----------------+------------------+--------------------+
|1e9e8ef04dbcff454...|          perfumaria|                  40|                      287|                       1|           225|                    16|               10|                14|[225.0,16.0,10.0,...|
|3aa071139cb16b67c...|               artes|                  44|                      276|                       1|          1000|                    30

In [16]:
from pyspark.ml.feature import Imputer, VectorAssembler, MinMaxScaler

# Imputar valores nulos com a média
imputer = Imputer(
    inputCols=["peso_produto_g", "comprimento_produto_cm", "altura_produto_cm", "largura_produto_cm"],
    outputCols=["peso_produto_g", "comprimento_produto_cm", "altura_produto_cm", "largura_produto_cm"]
)
produtos_imputados = imputer.fit(produtos).transform(produtos)

# Montar vetor de características
montar_vetor = VectorAssembler(
    inputCols=["peso_produto_g", "comprimento_produto_cm", "altura_produto_cm", "largura_produto_cm"],
    outputCol="caracteristicas"
)
produtos_caracteristicas_df = montar_vetor.transform(produtos_imputados)

# Normalizar os dados
scaler = MinMaxScaler(
    inputCol="caracteristicas",
    outputCol="caracteristicas_normalizado"
)
modelo_scaler = scaler.fit(produtos_caracteristicas_df)
produtos_caracteristicas_normalizado_df = modelo_scaler.transform(produtos_caracteristicas_df)

# Mostrar o resultado
produtos_caracteristicas_normalizado_df.show(truncate=False)

+--------------------------------+--------------------------------+--------------------+-------------------------+------------------------+--------------+----------------------+-----------------+------------------+------------------------+-----------------------------------------------------------------------------------+
|id_produto                      |categoria_produto               |tamanho_nome_produto|tamanho_descricao_produto|quantidade_fotos_produto|peso_produto_g|comprimento_produto_cm|altura_produto_cm|largura_produto_cm|caracteristicas         |caracteristicas_normalizado                                                        |
+--------------------------------+--------------------------------+--------------------+-------------------------+------------------------+--------------+----------------------+-----------------+------------------+------------------------+-----------------------------------------------------------------------------------+
|1e9e8ef04dbcff4541ed26657ea

In [17]:
from pyspark.ml.feature import VectorAssembler, MinMaxScaler

# Normalizar
scaler = MinMaxScaler(inputCol='caracteristicas', outputCol='caracteristicas_normalizado')
modelo_scaler = scaler.fit(produtos_caracteristicas_df)

produtos_caracteristicas_normalizado_df = modelo_scaler.transform(produtos_caracteristicas_df)
produtos_caracteristicas_normalizado_df.show(truncate=False)

+--------------------------------+--------------------------------+--------------------+-------------------------+------------------------+--------------+----------------------+-----------------+------------------+------------------------+-----------------------------------------------------------------------------------+
|id_produto                      |categoria_produto               |tamanho_nome_produto|tamanho_descricao_produto|quantidade_fotos_produto|peso_produto_g|comprimento_produto_cm|altura_produto_cm|largura_produto_cm|caracteristicas         |caracteristicas_normalizado                                                        |
+--------------------------------+--------------------------------+--------------------+-------------------------+------------------------+--------------+----------------------+-----------------+------------------+------------------------+-----------------------------------------------------------------------------------+
|1e9e8ef04dbcff4541ed26657ea

In [18]:
# Reduzir dimensionalidade
pca = PCA(k=2, inputCol='caracteristicas_normalizado', outputCol='caracteristicas_pca')
modelo_pca = pca.fit(produtos_caracteristicas_normalizado_df)
produtos_caracteristicas_reduzido_df = modelo_pca.transform(produtos_caracteristicas_normalizado_df)


produtos_caracteristicas_reduzido_df.show(truncate=False)

+--------------------------------+--------------------------------+--------------------+-------------------------+------------------------+--------------+----------------------+-----------------+------------------+------------------------+-----------------------------------------------------------------------------------+--------------------------------------------+
|id_produto                      |categoria_produto               |tamanho_nome_produto|tamanho_descricao_produto|quantidade_fotos_produto|peso_produto_g|comprimento_produto_cm|altura_produto_cm|largura_produto_cm|caracteristicas         |caracteristicas_normalizado                                                        |caracteristicas_pca                         |
+--------------------------------+--------------------------------+--------------------+-------------------------+------------------------+--------------+----------------------+-----------------+------------------+------------------------+-----------------------

In [19]:
produtos_caracteristicas_reduzido_df.write.mode('overwrite').parquet('produtos_caracteristicas_reduzido.parquet')

df = spark.read.option('header', 'true').parquet('produtos_caracteristicas_reduzido.parquet')
df.show(5)
df.printSchema()

+--------------------+--------------------+--------------------+-------------------------+------------------------+--------------+----------------------+-----------------+------------------+--------------------+---------------------------+--------------------+
|          id_produto|   categoria_produto|tamanho_nome_produto|tamanho_descricao_produto|quantidade_fotos_produto|peso_produto_g|comprimento_produto_cm|altura_produto_cm|largura_produto_cm|     caracteristicas|caracteristicas_normalizado| caracteristicas_pca|
+--------------------+--------------------+--------------------+-------------------------+------------------------+--------------+----------------------+-----------------+------------------+--------------------+---------------------------+--------------------+
|1e9e8ef04dbcff454...|          perfumaria|                  40|                      287|                       1|           225|                    16|               10|                14|[225.0,16.0,10.0,...|      

In [25]:
produtos_caracteristicas_reduzido_df.write.mode('overwrite').parquet('drive/MyDrive/Colab Notebooks/spark/produtos_caracteristicas_reduzido.parquet')

df = spark.read.option('header', 'true').parquet('produtos_caracteristicas_reduzido.parquet')
df.show(5)
df.printSchema()

Py4JJavaError: An error occurred while calling o641.parquet.
: java.util.NoSuchElementException: None.get
	at scala.None$.get(Option.scala:529)
	at scala.None$.get(Option.scala:527)
	at org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker$.metrics(BasicWriteStatsTracker.scala:239)
	at org.apache.spark.sql.execution.command.DataWritingCommand.metrics(DataWritingCommand.scala:55)
	at org.apache.spark.sql.execution.command.DataWritingCommand.metrics$(DataWritingCommand.scala:55)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.metrics$lzycompute(InsertIntoHadoopFsRelationCommand.scala:47)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.metrics(InsertIntoHadoopFsRelationCommand.scala:47)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.metrics$lzycompute(commands.scala:109)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.metrics(commands.scala:109)
	at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:63)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:120)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:802)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [21]:
pagamentos_normalizado_df.write.mode('overwrite').parquet('pagamentos_normalizado.parquet')

df = spark.read.option('header', 'true').parquet('pagamentos_normalizado.parquet')
df.show(5)
df.printSchema()

+--------------------+-------------------+--------------+------------------+---------------+---------------------------+------------------------------+
|           id_pedido|sequencia_pagamento|tipo_pagamento|parcelas_pagamento|valor_pagamento|valor_pagamento_normalizado|parcelas_pagamento_normalizado|
+--------------------+-------------------+--------------+------------------+---------------+---------------------------+------------------------------+
|b81ef226f3fe1789b...|                  1|   credit_card|                 8|          99.33|       0.007269424652080491|            0.3333333333333333|
|a9810da82917af2d9...|                  1|   credit_card|                 1|          24.39|       0.001784971984941...|          0.041666666666666664|
|25e8ea4e93396b6fa...|                  1|   credit_card|                 1|          65.71|       0.004808958963940492|          0.041666666666666664|
|ba78997921bbcdc13...|                  1|   credit_card|                 8|         107

In [27]:
pagamentos_normalizado_df.write.mode('overwrite').parquet('drive/MyDrive/Colab Notebooks/spark/pagamentos_normalizado.parquet')

df = spark.read.option('header', 'true').parquet('pagamentos_normalizado.parquet')
df.show(5)
df.printSchema()

Py4JJavaError: An error occurred while calling o649.parquet.
: java.util.NoSuchElementException: None.get
	at scala.None$.get(Option.scala:529)
	at scala.None$.get(Option.scala:527)
	at org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker$.metrics(BasicWriteStatsTracker.scala:239)
	at org.apache.spark.sql.execution.command.DataWritingCommand.metrics(DataWritingCommand.scala:55)
	at org.apache.spark.sql.execution.command.DataWritingCommand.metrics$(DataWritingCommand.scala:55)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.metrics$lzycompute(InsertIntoHadoopFsRelationCommand.scala:47)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.metrics(InsertIntoHadoopFsRelationCommand.scala:47)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.metrics$lzycompute(commands.scala:109)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.metrics(commands.scala:109)
	at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:63)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:120)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:802)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [24]:
spark.stop()