# 📂 Parte 1 Análise Estática (Spark SQL + PostgreSQL + S3)

### 1 - Realize a leitura da tabela apostas do PostgreSQL e transforme a coluna timestamp corretamente.

In [1]:
from pyspark.sql.functions import to_timestamp
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Leitura PostgreSQL") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.2.27,org.postgresql:postgresql:42.2.27,org.apache.hadoop:hadoop-aws:3.3.2,com.amazonaws:aws-java-sdk-bundle:1.11.1026,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .getOrCreate()

url = "jdbc:postgresql://localhost:5432/betalert"
properties = {
    "user": "admin",
    "password": "admin",
    "driver": "org.postgresql.Driver"
}

table = "apostas"

df = spark.read.jdbc(url=url, table=table, properties=properties)

# convertendo a data timestamp de string para timestamp
df = df.withColumn("timestamp", to_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss"))

df.printSchema()
df.show(10)
spark.stop()


:: loading settings :: url = jar:file:/home/mateus/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/mateus/.ivy2/cache
The jars for the packages stored in: /home/mateus/.ivy2/jars
org.postgresql#postgresql added as a dependency
org.postgresql#postgresql added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-81ac969b-70ab-4991-b8d8-87c512a61a2b;1.0
	confs: [default]
	found org.postgresql#postgresql;42.2.27 in central
	found org.checkerframework#checker-qual;3.5.0 in central
	found org.apache.hadoop#hadoop-aws;3.3.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.1026 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	fou

root
 |-- aposta_id: string (nullable = true)
 |-- apostador_id: string (nullable = true)
 |-- jogo_id: string (nullable = true)
 |-- valor: decimal(38,18) (nullable = true)
 |-- odd: decimal(38,18) (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- resultado: string (nullable = true)



                                                                                

+---------+------------+-------+--------------------+--------------------+-------------------+---------+
|aposta_id|apostador_id|jogo_id|               valor|                 odd|          timestamp|resultado|
+---------+------------+-------+--------------------+--------------------+-------------------+---------+
| b61f4f08|         u79| jogo31|1813.570000000000...|4.490000000000000000|2025-01-05 15:42:00|   perdeu|
| 88c44f52|         u23| jogo33|830.6000000000000...|2.870000000000000000|2025-01-05 09:54:00| pendente|
| 4c32051c|         u72| jogo72|1829.920000000000...|2.090000000000000000|2025-01-03 23:34:00| pendente|
| 1baa492c|         u70| jogo67|1614.200000000000...|4.210000000000000000|2025-01-04 16:50:00|   perdeu|
| 2820b7bb|         u40| jogo50|989.7300000000000...|2.980000000000000000|2025-01-02 20:37:00|   perdeu|
| 9f953950|         u10| jogo66|1786.550000000000...|3.300000000000000000|2025-01-04 13:58:00|   perdeu|
| 1549ba26|         u95| jogo42|683.4800000000000...|3.

### 2 - Realize a leitura da tabela transacoes_financeiras e normalize o nome da coluna de valor.

In [2]:
from pyspark.sql.types import DecimalType
from pyspark.sql import SparkSession
from pyspark.sql.functions import round, col

spark = SparkSession.builder \
    .appName("Leitura PostgreSQL") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.2.27") \
    .getOrCreate()

url = "jdbc:postgresql://localhost:5432/betalert"
properties = {
    "user": "admin",
    "password": "admin",
    "driver": "org.postgresql.Driver"
}

table = "transacoes_financeiras"

df = spark.read.jdbc(url=url, table=table, properties=properties)

# convertendo a coluna para manter só dois valores decimais, mas sem arredondamento para manter o valor real
df = df.withColumn("valor", col("valor").cast(DecimalType(10, 2)))

df.printSchema()
df.show(10)
spark.stop()


root
 |-- id: integer (nullable = true)
 |-- apostador_id: string (nullable = true)
 |-- valor: decimal(10,2) (nullable = true)
 |-- tipo: string (nullable = true)
 |-- data: timestamp (nullable = true)



                                                                                

+---+------------+--------+--------+-------------------+
| id|apostador_id|   valor|    tipo|               data|
+---+------------+--------+--------+-------------------+
|  1|         u69|14890.81|deposito|2025-01-01 10:00:00|
|  2|         u94| 5616.48|deposito|2025-01-01 10:01:00|
|  3|         u95|16376.42|   saque|2025-01-01 10:02:00|
|  4|         u11|15335.80|deposito|2025-01-01 10:03:00|
|  5|         u88|19559.30|   saque|2025-01-01 10:04:00|
|  6|         u32| 4797.95|deposito|2025-01-01 10:05:00|
|  7|         u14| 1494.86|   saque|2025-01-01 10:06:00|
|  8|         u23|15609.39|deposito|2025-01-01 10:07:00|
|  9|          u8|14916.72|deposito|2025-01-01 10:08:00|
| 10|         u92|12104.17|   saque|2025-01-01 10:09:00|
+---+------------+--------+--------+-------------------+
only showing top 10 rows



### 3 - Faça o join entre apostas e o arquivo apostadores.csv do S3 para incluir o país e dados extras.

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Leitura PostgreSQL") \
    .config("spark.hadoop.fs.s3a.access.key", "admin") \
    .config("spark.hadoop.fs.s3a.secret.key", "admin123") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .getOrCreate()

url = "jdbc:postgresql://localhost:5432/betalert"
properties = {
    "user": "admin",
    "password": "admin",
    "driver": "org.postgresql.Driver"
}

# Em tese, o join faz uma combinação de cada saque por aposta, então seria necessário só comparar
# as diferenças entre saque e depósito.

transactions = spark.read.jdbc(url=url, table="transacoes_financeiras", properties=properties)
bets = spark.read.jdbc(url=url, table="apostas", properties=properties)

apostadores = spark.read.csv("s3a://betalogs/apostadores.csv", header=True, inferSchema=True)

bets = bets.withColumnRenamed("valor", "bet_valor")
transactions = transactions.withColumnRenamed("valor", "transaction_valor")

bets = bets.withColumnRenamed("pais", "pais_bet")
transactions = transactions.withColumnRenamed("pais", "pais_transacao")


bets_transactions = transactions.join(
    bets, transactions.apostador_id == bets.apostador_id, "inner"
).drop(bets["apostador_id"])

resultado_final = bets_transactions.join(
    apostadores, bets_transactions.apostador_id == apostadores.id, "inner"
).drop(apostadores["id"])

resultado_final.show(10)


25/06/02 22:26:02 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


+---+------------+--------------------+--------+-------------------+---------+-------+--------------------+--------------------+-------------------+---------+----------+--------------------+----+
| id|apostador_id|   transaction_valor|    tipo|               data|aposta_id|jogo_id|           bet_valor|                 odd|          timestamp|resultado|      nome|               email|pais|
+---+------------+--------------------+--------+-------------------+---------+-------+--------------------+--------------------+-------------------+---------+----------+--------------------+----+
|411|         u42|12961.78000000000...|deposito|2025-01-01 16:50:00| be7645ac|  jogo2|1776.830000000000...|4.700000000000000000|2025-01-03 11:29:00| pendente|Jogador 42|jogador42@exemplo...|  ES|
|403|         u42|3849.670000000000...|deposito|2025-01-01 16:42:00| be7645ac|  jogo2|1776.830000000000...|4.700000000000000000|2025-01-03 11:29:00| pendente|Jogador 42|jogador42@exemplo...|  ES|
|277|         u42|13

# 🔍 Parte 2 Detecção de Padrões

Observando se uma aposta é Flash por meio da subtração de duas time stamps em unix (segundos desde 1970)

In [4]:
from pyspark.sql.functions import unix_timestamp, abs

deposits = resultado_final.where(resultado_final.tipo == "deposito")

# depositos onde a diferença entre data e timestamp é menor que 10 segundos
flash_deposits = deposits.where(
    abs(unix_timestamp("data") - unix_timestamp("timestamp")) < 10
)

flash_deposits.show(20)

+---+------------+--------------------+--------+-------------------+---------+-------+--------------------+--------------------+-------------------+---------+----------+--------------------+----+
| id|apostador_id|   transaction_valor|    tipo|               data|aposta_id|jogo_id|           bet_valor|                 odd|          timestamp|resultado|      nome|               email|pais|
+---+------------+--------------------+--------+-------------------+---------+-------+--------------------+--------------------+-------------------+---------+----------+--------------------+----+
|505|         u73|15000.00000000000...|deposito|2025-01-11 20:04:00| db280beb|  jogo3|15000.00000000000...|24.54000000000000...|2025-01-11 20:04:07|   ganhou|Jogador 73|jogador73@exemplo...|  ES|
|510|         u66|15000.00000000000...|deposito|2025-01-11 20:09:00| 0c7defda|  jogo6|15000.00000000000...|12.23000000000000...|2025-01-11 20:09:09|   ganhou|Jogador 66|jogador66@exemplo...|  AR|
|173|         u19|85

Exiba apostas-relâmpago com valor acima de R$500.

In [5]:
gt_500_flash_deposits = flash_deposits.where(flash_deposits.bet_valor > 500)
gt_500_flash_deposits.show(20)

+---+------------+--------------------+--------+-------------------+---------+-------+--------------------+--------------------+-------------------+---------+----------+--------------------+----+
| id|apostador_id|   transaction_valor|    tipo|               data|aposta_id|jogo_id|           bet_valor|                 odd|          timestamp|resultado|      nome|               email|pais|
+---+------------+--------------------+--------+-------------------+---------+-------+--------------------+--------------------+-------------------+---------+----------+--------------------+----+
|505|         u73|15000.00000000000...|deposito|2025-01-11 20:04:00| db280beb|  jogo3|15000.00000000000...|24.54000000000000...|2025-01-11 20:04:07|   ganhou|Jogador 73|jogador73@exemplo...|  ES|
|510|         u66|15000.00000000000...|deposito|2025-01-11 20:09:00| 0c7defda|  jogo6|15000.00000000000...|12.23000000000000...|2025-01-11 20:09:09|   ganhou|Jogador 66|jogador66@exemplo...|  AR|
|173|         u19|85

Exiba apostas-relâmpago com valor acima de R$10.000.

In [6]:
gt_10000_flash_deposits = flash_deposits.where(flash_deposits.bet_valor > 10000)
gt_10000_flash_deposits.show()

+---+------------+--------------------+--------+-------------------+---------+-------+--------------------+--------------------+-------------------+---------+----------+--------------------+----+
| id|apostador_id|   transaction_valor|    tipo|               data|aposta_id|jogo_id|           bet_valor|                 odd|          timestamp|resultado|      nome|               email|pais|
+---+------------+--------------------+--------+-------------------+---------+-------+--------------------+--------------------+-------------------+---------+----------+--------------------+----+
|505|         u73|15000.00000000000...|deposito|2025-01-11 20:04:00| db280beb|  jogo3|15000.00000000000...|24.54000000000000...|2025-01-11 20:04:07|   ganhou|Jogador 73|jogador73@exemplo...|  ES|
|510|         u66|15000.00000000000...|deposito|2025-01-11 20:09:00| 0c7defda|  jogo6|15000.00000000000...|12.23000000000000...|2025-01-11 20:09:09|   ganhou|Jogador 66|jogador66@exemplo...|  AR|
|508|         u99|15

Detecte jogadores que realizaram 10 ou mais apostas em um mesmo jogo.

In [7]:
from pyspark.sql.functions import count

heavy_bettors = bets.groupBy("apostador_id", "jogo_id") \
    .agg(count("*").alias("num_apostas")) \
    .where(col("num_apostas") >= 10)

heavy_bettors.show()

+------------+-------+-----------+
|apostador_id|jogo_id|num_apostas|
+------------+-------+-----------+
|         u88| jogo77|         13|
+------------+-------+-----------+



Exiba o total e a média de valores apostados por país.

In [8]:
from pyspark.sql.functions import sum, avg

bets_by_country = resultado_final.groupBy("pais").agg(
    sum("bet_valor").alias("total_apostado"),
    avg("bet_valor").alias("media_apostada")
)

bets_by_country.show()

spark.stop()


                                                                                

+----+--------------------+--------------------+
|pais|      total_apostado|      media_apostada|
+----+--------------------+--------------------+
|  PT|3343853.040000000...|1029.511403940886...|
|  BR|5107855.500000000...|1078.744561774023...|
|  ES|7238634.930000000...|1082.169970100164...|
|  FR|6773318.370000000...|1025.017913135593...|
|  AR|4756412.960000000...|1100.512022211938...|
+----+--------------------+--------------------+



# 📡 Parte 3 Streaming em Tempo Real (Kafka + Spark Structured Streaming)


Modifiquei pesadamente o docker compose para conseguir usar o jupyter notebook no projeto. Extrai os scripts em python da imagem, e removi a network do docker compose, mapeando todas as portas na minha máquina, já que não foi possível consumir o kafka na rede virtual do docker

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, when, avg, count
from pyspark.sql.types import StructType, StringType, TimestampType, FloatType

spark = SparkSession.builder \
    .appName("KafkaStreamingApp") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.2.27,org.apache.hadoop:hadoop-aws:3.3.2,com.amazonaws:aws-java-sdk-bundle:1.11.1026,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .config("spark.hadoop.fs.s3a.access.key", "admin") \
    .config("spark.hadoop.fs.s3a.secret.key", "admin123") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .getOrCreate()

# Removendo o log level para limpar o streaming
spark.sparkContext.setLogLevel("WARN")

kafka_stream = spark.readStream \
    .format('kafka') \
    .option("kafka.bootstrap.servers", "localhost:9092,localhost:9093") \
    .option('subscribe', 'stream_apostas') \
    .load()

# Leitura do CSV com apostadores na S3
bettors_df = spark.read.csv("s3a://betalogs/apostadores.csv", header=True, inferSchema=True)

# Schema da mensagem Kafka
message_schema = StructType() \
    .add("bet_id", StringType()) \
    .add("bettor_id", StringType()) \
    .add("game_id", StringType()) \
    .add("amount", FloatType()) \
    .add("odd", FloatType()) \
    .add("timestamp", TimestampType())

# Parse do JSON + transformação
parsed_stream = kafka_stream.selectExpr("CAST(value AS STRING) as json_str") \
    .select(from_json(col("json_str"), message_schema).alias("data")) \
    .select("data.*")

# Join com apostadores
joined_df = parsed_stream.join(bettors_df, bettors_df.id == parsed_stream.bettor_id, "inner")

joined_df.printSchema()

# Coluna de suspeita
enriched_df = joined_df.withColumn(
    "suspicious",
    when((col("amount") > 12000) & (col("odd") > 15), True).otherwise(False)
)

# Média de valor por país.
avg_amount_by_country = enriched_df.groupBy("pais").agg(avg("amount").alias("media_valor"))

# Contagem de apostas suspeitas por país.
suspicious_count_by_country = enriched_df.groupBy("pais") \
    .agg(count(when(col("suspicious") == True, "suspicious")).alias("suspicious_bets"))

# Volume total de apostas.
total_bets = enriched_df.groupBy().agg(count("*").alias("total_bets"))

# Contagem de apostas suspeitas por apostador.

query = avg_amount_by_country.writeStream \
    .format("console") \
    .trigger(processingTime='1 minute') \
    .option("truncate", False) \
    .start()

query.awaitTermination()


root
 |-- bet_id: string (nullable = true)
 |-- bettor_id: string (nullable = true)
 |-- game_id: string (nullable = true)
 |-- amount: float (nullable = true)
 |-- odd: float (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- id: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- email: string (nullable = true)
 |-- pais: string (nullable = true)



25/06/02 22:51:09 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-f9fba928-588a-4525-883b-aa5e19b9a74a. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/06/02 22:51:09 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;
Aggregate [pais#3417], [pais#3417, avg(amount#3429) AS media_valor#3483]
+- Project [bet_id#3426, bettor_id#3427, game_id#3428, amount#3429, odd#3430, timestamp#3431, id#3414, nome#3415, email#3416, pais#3417, CASE WHEN ((amount#3429 > cast(12000 as float)) AND (odd#3430 > cast(15 as float))) THEN true ELSE false END AS suspicious#3459]
   +- Join Inner, (id#3414 = bettor_id#3427)
      :- Project [data#3424.bet_id AS bet_id#3426, data#3424.bettor_id AS bettor_id#3427, data#3424.game_id AS game_id#3428, data#3424.amount AS amount#3429, data#3424.odd AS odd#3430, data#3424.timestamp AS timestamp#3431]
      :  +- Project [from_json(StructField(bet_id,StringType,true), StructField(bettor_id,StringType,true), StructField(game_id,StringType,true), StructField(amount,FloatType,true), StructField(odd,FloatType,true), StructField(timestamp,TimestampType,true), json_str#3422, Some(America/Belem)) AS data#3424]
      :     +- Project [cast(value#3384 as string) AS json_str#3422]
      :        +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@17627fd3, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@63397291, [kafka.bootstrap.servers=localhost:9092,localhost:9093, subscribe=stream_apostas], [key#3383, value#3384, topic#3385, partition#3386, offset#3387L, timestamp#3388, timestampType#3389], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3bebac7f,kafka,List(),None,List(),None,Map(kafka.bootstrap.servers -> localhost:9092,localhost:9093, subscribe -> stream_apostas),None), kafka, [key#3376, value#3377, topic#3378, partition#3379, offset#3380L, timestamp#3381, timestampType#3382]
      +- Relation [id#3414,nome#3415,email#3416,pais#3417] csv


repositório no github: https://github.com/MateusGurgel/Infnet-Works/tree/main/creation_of_big_data_solutions_with_spark/tp-3