In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import DecimalType
from functools import reduce
import pyspark
import pyspark.sql.functions as f
import datetime

packages = ','.join([
    'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1',
])

spark = SparkSession \
        .builder \
        .appName("projeto-final-pmd-pedro-jean") \
        .config("spark.mongodb.input.uri","mongodb://mongo:27017/PMD2023.Mensagens") \
        .config("spark.mongodb.output.uri","mongodb://mongo:27017/PMD2023.Mensagens") \
        .config('spark.jars.packages', packages) \
        .getOrCreate()

In [2]:
dateUDF = f.udf(lambda date, days: (date + datetime.timedelta(days=days-1)).strftime("%Y-%m-%d"))

In [3]:
daysList = [1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 16, 17, 18, 19, 20, 21, 22, 23]
dfList = []

initDate = datetime.datetime(2022, 4, 12)

for i in daysList:
    df = spark.read.options(header='True', inferSchema='True', quote="\"", escape="\"").csv(f'./work/Mongo/day{i}.csv')
    
    treated = df.withColumn('currency', f.when((df.money.isNotNull()) & (df.money != '0'), f.regexp_replace('money', r'(\D*)(\d+(\.|,)?\d*)', '$1')).otherwise(None)) \
        .withColumn('money',  f.regexp_replace('money', r'(\D*)(\d+(\.|,)?\d*)', '$2').cast(DecimalType(20,2))) \
        .withColumn('donated', (f.col('money') != '0')) \
        .withColumn('date', f.to_date(dateUDF(f.lit(initDate), f.lit(i))))
    
    dfList.append(treated)
    

final_sdf = reduce(pyspark.sql.dataframe.DataFrame.unionByName, dfList)

final_sdf.show()

+--------------------+------------------+----------+-----+--------+-------+----------+
|            messages|       all_authors|timestamps|money|currency|donated|      date|
+--------------------+------------------+----------+-----+--------+-------+----------+
|being pretty mean...|     Ashlin Hiscox|     -8:15| 0.00|    null|  false|2022-04-12|
|yeah pretty evil ...|PeppermintPisces99|     -8:14| 0.00|    null|  false|2022-04-12|
|He has no backgro...|      IAM#FREE2BME|     -8:06| 0.00|    null|  false|2022-04-12|
|Only person anyon...|  Nolan A Hannigan|     -8:01| 0.00|    null|  false|2022-04-12|
|When's it going t...|      mina coltart|     -7:48| 0.00|    null|  false|2022-04-12|
|are they doing th...|          Kayleigh|     -7:32| 0.00|    null|  false|2022-04-12|
|        starts at 10|PeppermintPisces99|     -7:28| 0.00|    null|  false|2022-04-12|
|Yes, amber heard ...|          SJ Daily|     -7:26| 0.00|    null|  false|2022-04-12|
|            24 mins |          Kayleigh|  

In [4]:
final_sdf.write.format('com.mongodb.spark.sql.DefaultSource').mode("append").save()