In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import DecimalType, StructType
from functools import reduce
import pyspark
import pyspark.sql.functions as f
import datetime

packages = ','.join([
    'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1',
    'com.redislabs:spark-redis_2.12:3.1.0'
])

spark = SparkSession \
        .builder \
        .appName("projeto-final-pmd-pedro-jean") \
        .config("spark.mongodb.input.uri","mongodb://mongo:27017/PMD2023.Mensagens") \
        .config("spark.mongodb.output.uri","mongodb://mongo:27017/PMD2023.Mensagens") \
        .config("spark.redis.host", "redis") \
        .config("spark.redis.port", "6379") \
        .config("spark.redis.auth", "123") \
        .config('spark.jars.packages', packages) \
        .getOrCreate()

In [None]:
redisDf = spark.read.format("org.apache.spark.sql.redis").option("table", "mensagens").option("key.column", "id").load()
mongoDf = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

mongoDf = mongoDf \
            .withColumn("_id", mongoDf._id.oid) \
            .withColumnRenamed("_id", "id")

unitedDf = redisDf.unionByName(mongoDf)

unitedDf.createOrReplaceTempView("messages")

unitedDf.show()

## Média de dinheiro doado em Reais (R$) por dia para o canal

In [None]:
media = spark.sql("""
                SELECT
                    date as Data, 
                    currency as Moeda, 
                    AVG(money) as Media_de_dinheiro_doado
                FROM
                    messages as m
                WHERE
                    donated = true
                    and currency = 'R$'
                GROUP BY
                    date, currency
                ORDER BY date ASC
                  """)

media.show()

## Total de dinheiro doado em dólar por dia para a stream

In [None]:
total = spark.sql("""
                SELECT
                    date as Data, 
                    currency as Moeda, 
                    SUM(money) as Total_de_dinheiro_doado
                FROM
                    messages as m
                WHERE
                    donated = true
                    and currency = '$'
                GROUP BY
                    date, currency
                ORDER BY date ASC
                  """)

total.show()

## Total de dinheiro arrecadado pelo canal, separado por moeda

In [None]:
total2 = spark.sql("""
                SELECT
                    currency as Moeda, 
                    SUM(money) as Total_de_dinheiro_doado
                FROM
                    messages as m
                WHERE
                    donated = true
                GROUP BY
                    currency
                HAVING 
                    Total_de_dinheiro_doado is not null
                ORDER BY Total_de_dinheiro_doado DESC
                  """)

total2.show(100)

## Buscar todas as mensagens contendo a string "Brasil" no chat ao vivo

In [None]:
brasil = redisDf.filter(redisDf.messages.like("%Brasil%"))

brasil.show(100)

## Transferir dados do redis para o mongo

In [None]:
redisToWrite = redisDf.drop(redisDf.id)

redisToWrite.write.format('com.mongodb.spark.sql.DefaultSource').mode("append").save()

to_overwrite = spark.createDataFrame([], StructType([]))
to_overwrite.write.format("org.apache.spark.sql.redis") \
    .option("table", "mensagens") \
    .option("key.column", "id") \
    .mode("overwrite") \
    .save()
