In [1]:
from pyspark import SparkConf
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType, StructType, StructField, TimestampType

from core.csv_data_manager import CSVDataManager

from metadata.columns import StackExchangeColumns as columns
from metadata.schemas import posts_schema, comments_schema, tag_synonyms_schema, tags_schema

In [2]:
DATA_DIRECTORY_PATH = "D:/PyCharmStorage/BD_2024/data"

In [3]:
spark_session = (SparkSession.builder
                 .master("local")
                 .appName("BD_2024")
                 .config(conf=SparkConf())
                 .getOrCreate())

csv_data_manager = CSVDataManager(spark_session, DATA_DIRECTORY_PATH)

In [4]:
posts_dataframe = csv_data_manager.read(["/Posts_1.csv"], schema=posts_schema)
comments_dataframe = csv_data_manager.read(["/Comments_1.csv", "/Comments_2.csv"], schema=comments_schema)
tag_synonyms_dataframe = csv_data_manager.read(["/TagSynonyms.csv"], schema=tag_synonyms_schema)
tags_dataframe = csv_data_manager.read(["/Tags_1.csv", "/Tags_2.csv"], schema=tags_schema)

In [5]:
posts_dataframe = posts_dataframe.drop_duplicates()

In [6]:
posts_dataframe = posts_dataframe.dropna('all')

In [7]:
questions_dataframe = posts_dataframe.filter(
    (f.col(columns.post_type_id) == 1) & (~f.col(columns.tags).isNull())
)
questions_dataframe.show()

+--------+----------+----------------+--------+-------------------+------------+-----+---------+--------------------+-----------+----------------+----------------+---------------------+-------------------+-------------------+--------------------+--------------------+-----------+------------+-------------+-------------------+------------------+--------------+
|      Id|PostTypeId|AcceptedAnswerId|ParentId|       CreationDate|DeletionDate|Score|ViewCount|                Body|OwnerUserId|OwnerDisplayName|LastEditorUserId|LastEditorDisplayName|       LastEditDate|   LastActivityDate|               Title|                Tags|AnswerCount|CommentCount|FavoriteCount|         ClosedDate|CommunityOwnedDate|ContentLicense|
+--------+----------+----------------+--------+-------------------+------------+-----+---------+--------------------+-----------+----------------+----------------+---------------------+-------------------+-------------------+--------------------+--------------------+-----------

In [8]:
answers_dataframe = posts_dataframe.filter(
    (f.col(columns.post_type_id) == 2) & (f.col(columns.tags).isNull())
)
answers_dataframe.show()

+--------+----------+----------------+--------+-------------------+------------+-----+---------+--------------------+-----------+----------------+----------------+---------------------+-------------------+-------------------+-----+----+-----------+------------+-------------+----------+-------------------+--------------+
|      Id|PostTypeId|AcceptedAnswerId|ParentId|       CreationDate|DeletionDate|Score|ViewCount|                Body|OwnerUserId|OwnerDisplayName|LastEditorUserId|LastEditorDisplayName|       LastEditDate|   LastActivityDate|Title|Tags|AnswerCount|CommentCount|FavoriteCount|ClosedDate| CommunityOwnedDate|ContentLicense|
+--------+----------+----------------+--------+-------------------+------------+-----+---------+--------------------+-----------+----------------+----------------+---------------------+-------------------+-------------------+-----+----+-----------+------------+-------------+----------+-------------------+--------------+
|77740824|         2|            n

In [9]:
question_id_column = "question_id"
tags_and_question_id = questions_dataframe.select(f.col(columns.id).alias(question_id_column), columns.tags)
parent_and_date = answers_dataframe.select(columns.id, columns.parent_id, columns.creation_date)
answers_creation_date_and_tags = tags_and_question_id.join(
    parent_and_date, [tags_and_question_id[question_id_column] == parent_and_date[columns.parent_id]], "inner"
).select(columns.id, columns.creation_date, columns.tags)

In [10]:
questions_creation_date_and_tags = questions_dataframe.select(columns.id, columns.creation_date, columns.tags)

In [11]:
posts_with_tags = questions_creation_date_and_tags.union(answers_creation_date_and_tags).distinct()

In [12]:
posts_with_tags.show()

+--------+-------------------+--------------------+
|      Id|       CreationDate|                Tags|
+--------+-------------------+--------------------+
|77757424|2024-01-04 10:10:17|<reactjs><iis><re...|
|77747897|2024-01-02 18:25:13|       <google-maps>|
|77750360|2024-01-03 07:45:34|<javascript><html...|
|77779159|2024-01-08 11:00:56|<node.js><next.js...|
|77795684|2024-01-10 18:36:19|<algorithm><stm32...|
|77759874|2024-01-04 16:44:16|         <c++><aslr>|
|77785204|2024-01-09 08:13:59|          <autocode>|
|77772424|2024-01-07 08:45:06|     <xml><xslt-2.0>|
|77794509|2024-01-10 15:29:46|<java><date><pars...|
|77753935|2024-01-03 18:23:33|<sqlanywhere><mob...|
|77793381|2024-01-10 12:30:29|   <java><glassfish>|
|77764923|2024-01-05 13:31:50|<django><websocke...|
|77775747|2024-01-08 01:34:32|<sql><google-bigq...|
|77749588|2024-01-03 03:30:59|<php><microsoft-g...|
|77795274|2024-01-10 17:23:18|<mongodb><mongo-s...|
|77779970|2024-01-08 12:19:43|       <linux><bash>|
|77790798|20

In [13]:
posts_with_tags_for_comments = posts_with_tags.select(columns.id, columns.tags)
comments_post_id_and_date = comments_dataframe.select(columns.post_id, columns.creation_date)
comments_dates_with_tags = posts_with_tags_for_comments.join(
    comments_post_id_and_date, [posts_with_tags_for_comments[columns.id] == comments_post_id_and_date[columns.post_id]], "inner"
).select(columns.creation_date, columns.tags)

In [14]:
posts_dates_with_tags = posts_with_tags.select(columns.creation_date, columns.tags)

dates_with_tags = posts_dates_with_tags.union(comments_dates_with_tags).distinct()

In [15]:
dates_with_tags.show()

+-------------------+--------------------+
|       CreationDate|                Tags|
+-------------------+--------------------+
|2024-01-06 05:39:47|       <html><forms>|
|2024-01-05 07:10:39|           <angular>|
|2024-01-10 18:28:11|<typescript><elec...|
|2024-01-02 13:35:15|<python><numpy><n...|
|2024-01-02 02:49:59|              <rust>|
|2024-01-05 10:08:42|     <python><typer>|
|2024-01-09 10:24:08|<c#><build><refer...|
|2024-01-10 02:14:51| <python><nevergrad>|
|2024-01-10 21:02:15|<actionscript-3><...|
|2024-01-03 10:31:31|<r><dataframe><jo...|
|2024-01-04 12:31:36|<python><image-pr...|
|2024-01-08 13:29:26|<python><python-3...|
|2024-01-02 22:21:53| <pandas><dataframe>|
|2024-01-10 12:03:24|<javascript><html...|
|2024-01-04 10:34:09|<angular><spring>...|
|2024-01-10 10:56:04|<python><active-d...|
|2024-01-04 07:58:25|<javascript><c#><...|
|2024-01-08 21:57:41|<javascript><html...|
|2024-01-02 20:00:25|<can-bus><autosar...|
|2024-01-09 20:03:10|   <xamarin.android>|
+----------

In [16]:
date_and_single_tag = dates_with_tags.withColumn(
    columns.tag,
    f.explode(f.array_remove(f.split(f.regexp_replace(columns.tags, "<", ""), ">"), ""))
).select(columns.creation_date, columns.tag)

In [17]:
date_and_single_tag.show()

+-------------------+-----------------+
|       CreationDate|              Tag|
+-------------------+-----------------+
|2024-01-06 05:39:47|             html|
|2024-01-06 05:39:47|            forms|
|2024-01-05 07:10:39|          angular|
|2024-01-10 18:28:11|       typescript|
|2024-01-10 18:28:11|         electron|
|2024-01-10 18:28:11|             vite|
|2024-01-02 13:35:15|           python|
|2024-01-02 13:35:15|            numpy|
|2024-01-02 13:35:15|numerical-methods|
|2024-01-02 02:49:59|             rust|
|2024-01-05 10:08:42|           python|
|2024-01-05 10:08:42|            typer|
|2024-01-09 10:24:08|               c#|
|2024-01-09 10:24:08|            build|
|2024-01-09 10:24:08|        reference|
|2024-01-09 10:24:08|         json.net|
|2024-01-10 02:14:51|           python|
|2024-01-10 02:14:51|        nevergrad|
|2024-01-10 21:02:15|   actionscript-3|
|2024-01-10 21:02:15|              air|
+-------------------+-----------------+
only showing top 20 rows



In [18]:
date_and_single_tag.count()

221886

In [19]:
source_target = tag_synonyms_dataframe.select(columns.source_tag_name, columns.target_tag_name)
cleaned_date_and_single_tag = date_and_single_tag.join(
    source_target, [date_and_single_tag[columns.tag] == source_target[columns.source_tag_name]], 'left'
).withColumn(
    columns.tag, f.when(~f.col(columns.target_tag_name).isNull(), f.col(columns.target_tag_name)).otherwise(f.col(columns.tag))
).select(columns.creation_date, columns.tag)

In [20]:
cleaned_date_and_single_tag.show()

+-------------------+-----------------+
|       CreationDate|              Tag|
+-------------------+-----------------+
|2024-01-06 05:39:47|             html|
|2024-01-06 05:39:47|            forms|
|2024-01-05 07:10:39|          angular|
|2024-01-10 18:28:11|       typescript|
|2024-01-10 18:28:11|         electron|
|2024-01-10 18:28:11|             vite|
|2024-01-02 13:35:15|           python|
|2024-01-02 13:35:15|            numpy|
|2024-01-02 13:35:15|numerical-methods|
|2024-01-02 02:49:59|             rust|
|2024-01-05 10:08:42|           python|
|2024-01-05 10:08:42|            typer|
|2024-01-09 10:24:08|               c#|
|2024-01-09 10:24:08|            build|
|2024-01-09 10:24:08|        reference|
|2024-01-09 10:24:08|         json.net|
|2024-01-10 02:14:51|           python|
|2024-01-10 02:14:51|        nevergrad|
|2024-01-10 21:02:15|   actionscript-3|
|2024-01-10 21:02:15|              air|
+-------------------+-----------------+
only showing top 20 rows



In [21]:
day_tag_records = cleaned_date_and_single_tag.withColumn(
    columns.date, f.date_format(columns.creation_date, "yyyy-MM-dd")
).select(columns.date, columns.tag)

In [22]:
day_tag_count = (day_tag_records.withColumn(columns.count, f.lit(1))
                    .groupBy(columns.date, columns.tag)
                    .agg(f.sum(columns.count)
                    .alias(columns.count)))

In [23]:
day_tag_count.show()

+----------+--------------------+-----+
|      Date|                 Tag|Count|
+----------+--------------------+-----+
|2024-01-10|           nevergrad|    1|
|2024-01-04|   template-matching|    1|
|2024-01-03|                json|   85|
|2024-01-05|           gstreamer|    2|
|2024-01-05|              vb.net|   25|
|2024-01-08|          typescript|  272|
|2024-01-01|salesforce-lightning|    1|
|2024-01-10|             backend|   15|
|2024-01-08|               godot|    3|
|2024-01-10|             rscript|    7|
|2024-01-09| google-calendar-api|    2|
|2024-01-07|           wxwidgets|    3|
|2024-01-05|                grob|    4|
|2024-01-10|              median|   13|
|2024-01-09|              prolog|   13|
|2024-01-03|           app-store|    5|
|2024-01-05|                rest|   17|
|2024-01-03|           telemetry|    3|
|2024-01-01|       deep-learning|   11|
|2024-01-08|              ubuntu|   11|
+----------+--------------------+-----+
only showing top 20 rows



In [24]:
tags_dataframe_for_join = tags_dataframe.withColumnRenamed(columns.count, "GlobalCount")
day_id_tag_count_df = day_tag_count.join(
    tags_dataframe_for_join, [day_tag_count[columns.tag] == tags_dataframe_for_join[columns.tag_name]], "inner"
).select(columns.date, f.col(columns.id).alias(columns.tag_id), columns.tag, columns.count)

In [25]:
day_id_tag_count_df.show()

+----------+------+--------------------+-----+
|      Date| TagId|                 Tag|Count|
+----------+------+--------------------+-----+
|2024-01-10|144568|           nevergrad|    1|
|2024-01-04| 54257|   template-matching|    1|
|2024-01-03|  1508|                json|   85|
|2024-01-05| 24851|           gstreamer|    2|
|2024-01-05|    41|              vb.net|   25|
|2024-01-08| 84518|          typescript|  272|
|2024-01-01|124469|salesforce-lightning|    1|
|2024-01-10|  5664|             backend|   15|
|2024-01-08|124165|               godot|    3|
|2024-01-10| 44486|             rscript|    7|
|2024-01-09|  5780| google-calendar-api|    2|
|2024-01-07|  1706|           wxwidgets|    3|
|2024-01-05| 93682|                grob|    4|
|2024-01-10| 22955|              median|   13|
|2024-01-09|  8746|              prolog|   13|
|2024-01-03| 28129|           app-store|    5|
|2024-01-05|   364|                rest|   17|
|2024-01-03|106469|           telemetry|    3|
|2024-01-01|1

In [26]:
csv_data_manager.write(day_id_tag_count_df, "/Test")