In [1]:
import os, sys
ROOT = "/workspaces/projeto_pyspark"  # ajuste se diferente
if ROOT not in sys.path:
    sys.path.insert(0, ROOT)

from config.config import Config
from session.spark_session import SparkSessionManager
from dataio.data_io import DataIO
from business.logic import BusinessLogic


In [2]:
config = Config()
spark = SparkSessionManager(config).get_session()

data_io = DataIO(spark, config)
logic   = BusinessLogic(spark, config)


JAVA_HOME: /usr/local/sdkman/candidates/java/current
HADOOP_HOME: None
TEMP: None
TMP: None


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/07 14:30:52 WARN Utils: Your hostname, codespaces-68955f, resolves to a loopback address: 127.0.0.1; using 10.0.11.190 instead (on interface eth0)
25/09/07 14:30:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/07 14:30:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/09/07 14:30:53 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in standalone/kubernetes and LOCAL_DIRS in YARN).
25/09/07 14:30:54 WARN Utils: The configured local directories are not expected to be URIs; however, got suspicious values [C:/spark_

In [3]:
pedidos_df    = data_io.read_pedidos()
pagamentos_df = data_io.read_pagamentos()

print("### Pedidos schema"); pedidos_df.printSchema()
print("### Pagamentos schema"); pagamentos_df.printSchema()

print("### Amostra pedidos"); pedidos_df.show(10, truncate=False)
print("### Amostra pagamentos"); pagamentos_df.show(10, truncate=False)


                                                                                

### Pedidos schema
root
 |-- ID_PEDIDO: string (nullable = true)
 |-- PRODUTO: string (nullable = true)
 |-- VALOR_UNITARIO: double (nullable = true)
 |-- QUANTIDADE: integer (nullable = true)
 |-- DATA_CRIACAO: timestamp (nullable = true)
 |-- UF: string (nullable = true)
 |-- ID_CLIENTE: integer (nullable = true)

### Pagamentos schema
root
 |-- avaliacao_fraude: struct (nullable = true)
 |    |-- fraude: boolean (nullable = true)
 |    |-- score: double (nullable = true)
 |-- data_processamento: string (nullable = true)
 |-- forma_pagamento: string (nullable = true)
 |-- id_pedido: string (nullable = true)
 |-- status: boolean (nullable = true)
 |-- valor_pagamento: double (nullable = true)

### Amostra pedidos
+------------------------------------+-----------+--------------+----------+-------------------+---+----------+
|ID_PEDIDO                           |PRODUTO    |VALOR_UNITARIO|QUANTIDADE|DATA_CRIACAO       |UF |ID_CLIENTE|
+------------------------------------+-----------+--

In [5]:
resultado_df = logic.process(pedidos_df, pagamentos_df)

print("### Resultado schema")
resultado_df.printSchema()

print("### Resultado (amostra)")
resultado_df.show(20, truncate=False)

print("Qtde linhas do resultado:", resultado_df.count())


### Resultado schema
root
 |-- id_pedido: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- forma_pagamento: string (nullable = true)
 |-- valor_total_pedido: double (nullable = true)
 |-- data_pedido: timestamp (nullable = true)

### Resultado (amostra)


                                                                                

+------------------------------------+------+---------------+------------------+-------------------+
|id_pedido                           |estado|forma_pagamento|valor_total_pedido|data_pedido        |
+------------------------------------+------+---------------+------------------+-------------------+
|d8a94cc0-2df6-43ad-af0b-4ff5436a9581|AL    |CARTAO_CREDITO |1000.0            |2025-02-20 12:21:45|
|6a917788-20c6-4cfc-b51d-ececa0db13e1|AL    |CARTAO_CREDITO |2500.0            |2025-04-04 10:01:28|
|6076af29-68bc-4975-8ff3-89b0f5afa87a|AL    |CARTAO_CREDITO |1100.0            |2025-04-15 18:06:55|
|10a85986-e4da-446c-aed5-de1f5163a8ac|AL    |CARTAO_CREDITO |900.0             |2025-04-28 13:40:53|
|d0493b71-6626-493a-8f3c-c1dae97e095e|AL    |CARTAO_CREDITO |3000.0            |2025-05-13 20:12:54|
|94a47e1e-e26f-4f80-9105-db822612d925|AL    |CARTAO_CREDITO |1100.0            |2025-05-15 17:31:56|
|c093bbc9-bd56-43d0-afe2-5d8a52609bc9|AL    |CARTAO_CREDITO |5000.0            |2025-05-15 

In [6]:
data_io.write_parquet(resultado_df)

print("### Arquivos gerados em:", config.output_path)
import glob
for p in glob.glob(config.output_path + "/**", recursive=True)[:50]:
    print(p)


### Arquivos gerados em: /workspaces/projeto_pyspark/output/relatorio_parquet
/workspaces/projeto_pyspark/output/relatorio_parquet/
/workspaces/projeto_pyspark/output/relatorio_parquet/part-00000-265a7be1-ac60-4ef1-b54e-964ee7f2d239-c000.snappy.parquet
/workspaces/projeto_pyspark/output/relatorio_parquet/_SUCCESS


                                                                                

In [7]:
import subprocess, sys, os
env = os.environ.copy()
env["PYTHONPATH"] = ROOT
print(subprocess.run(["pytest", "-v"], cwd=ROOT, env=env, text=True, capture_output=False))


platform linux -- Python 3.12.1, pytest-8.4.1, pluggy-1.6.0 -- /usr/local/py-utils/venvs/pytest/bin/python
cachedir: .pytest_cache
rootdir: /workspaces/projeto_pyspark
configfile: pyproject.toml
plugins: anyio-4.9.0
[1mcollecting ... [0mcollected 1 item

tests/test_logic.py::test_process [32mPASSED[0m[32m                                 [100%][0m

tests/test_logic.py::test_process
  see the appropriate new directories, set the environment variable
  `JUPYTER_PLATFORM_DIRS=1` and then run `jupyter --paths`.
  The use of platformdirs will be the default in `jupyter_core` v6
    from jupyter_core.paths import jupyter_data_dir, jupyter_runtime_dir, secure_write

CompletedProcess(args=['pytest', '-v'], returncode=0)


In [8]:
resultado_df.orderBy("estado","forma_pagamento","data_pedido").show(100, truncate=False)


+------------------------------------+------+---------------+------------------+-------------------+
|id_pedido                           |estado|forma_pagamento|valor_total_pedido|data_pedido        |
+------------------------------------+------+---------------+------------------+-------------------+
|d8a94cc0-2df6-43ad-af0b-4ff5436a9581|AL    |CARTAO_CREDITO |1000.0            |2025-02-20 12:21:45|
|6a917788-20c6-4cfc-b51d-ececa0db13e1|AL    |CARTAO_CREDITO |2500.0            |2025-04-04 10:01:28|
|6076af29-68bc-4975-8ff3-89b0f5afa87a|AL    |CARTAO_CREDITO |1100.0            |2025-04-15 18:06:55|
|10a85986-e4da-446c-aed5-de1f5163a8ac|AL    |CARTAO_CREDITO |900.0             |2025-04-28 13:40:53|
|d0493b71-6626-493a-8f3c-c1dae97e095e|AL    |CARTAO_CREDITO |3000.0            |2025-05-13 20:12:54|
|94a47e1e-e26f-4f80-9105-db822612d925|AL    |CARTAO_CREDITO |1100.0            |2025-05-15 17:31:56|
|c093bbc9-bd56-43d0-afe2-5d8a52609bc9|AL    |CARTAO_CREDITO |5000.0            |2025-05-15 