In [19]:
from pyspark.sql import SparkSession
from vars import *
from datetime import date
from functions import flatten_json, loadConfigs
from pyspark.sql.functions import lit
from pyspark.sql.functions import col,explode

spark = SparkSession.builder \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config("spark.jars", "/jars/postgresql-42.2.5.jar") \
    .getOrCreate()
loadConfigs(spark.sparkContext)

from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [20]:
today = date.today().strftime('%Y%m%d')
today = 20230326
output_file = "author_flair_richtext"

In [40]:
df_raw = spark.read.option("header", "true") \
    .json(f"s3a://{minio_bucket}/raw/popular_{today}.json")

In [41]:
df_raw = df_raw.select(explode(df_raw.data.children.data).alias("data"))
df_raw = df_raw.select("data.*")

In [42]:
df_author_flair_richtext = df_raw.select("id","author","author_flair_richtext",
                                         "author_flair_template_id","author_flair_template_id",
                                         "author_flair_text","author_flair_type") \
                                  .withColumnRenamed("id", "post_id")

In [43]:
df_author_flair_exploded = df_author_flair_richtext.select("post_id","author",
                                                           "author_flair_template_id",
                                                           "author_flair_text","author_flair_type",
                                                           explode("author_flair_richtext").alias("author_flair_richtext"))

In [44]:
df_author_flair_cleaned = df_author_flair_exploded.select("*", "author_flair_richtext.*")
df_author_flair_cleaned = df_author_flair_cleaned.drop("author_flair_richtext", "u")

In [45]:
df_author_flair_renamed = df_author_flair_cleaned.withColumnRenamed("a", "additional_attributes") \
                                                                   .withColumnRenamed("e", "type") \
                                                                   .withColumnRenamed("t", "text")

In [46]:
df_final = df_author_flair_renamed.dropDuplicates()

In [47]:
df_final = df_final.withColumn("dateid", lit(today))

In [48]:
df_final.show()

+-------+------------------+------------------------+--------------------+-----------------+---------------------+-----+--------------------+--------+
|post_id|            author|author_flair_template_id|   author_flair_text|author_flair_type|additional_attributes| type|                text|  dateid|
+-------+------------------+------------------------+--------------------+-----------------+---------------------+-----+--------------------+--------+
|1223h92|MarvelsGrantMan136|                    null|r/Movies contributor|         richtext|                 null| text|r/Movies contributor|20230326|
|1225o1s|      gandalf45435|    32fb913a-fd7d-11e...|:natsm: Dyrus Mic...|         richtext|              :natsm:|emoji|                null|20230326|
|11isfra|            elch3w|    c1aefe16-027b-11e...|:MayMayMaker: MAY...|         richtext|        :MayMayMaker:|emoji|                null|20230326|
|12251en|            cbbBot|                    null|:rcbb: :ncaa: /r/...|         richtext|  

In [49]:
df_final.write.mode("overwrite").parquet(f"s3a://{minio_bucket}/processed/{output_file}/{output_file}_{today}")