# Импорты

In [5]:
import os
import pyspark
from pyspark.sql.functions import row_number, rank, col
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

# Создаем сессию

In [6]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.13.0 pyspark-shell'
ss = SparkSession.builder.appName("Big_Data_Lab_2").master("local[*]").getOrCreate()

# Считываем данные

In [9]:
lang_df = ss.read.option("header", True)\
                  .option("inferSchema", True)\
                  .csv("data/programming-languages.csv")
posts_df = ss.read.format("xml")\
                      .options(rowTag="row")\
                      .load("data/posts_sample.xml")

# Производим обработку данных

In [10]:
lang_list = [row[0] for row in lang_df.select('name').collect()]

In [11]:
def get_id_lang_year_by_row(row):
    global lang_list
    language_tag = None
    for lang in lang_list:
        if "<" + lang.lower() + ">" in row._Tags.lower():
            language_tag = lang
            return (row._Id, language_tag, row._CreationDate.year)

In [12]:
# Исключаем посты без тегов
# Преобразуем записи
# Убираем пустые записи
# Фильтруем записи по году
id_lang_year_rdd = posts_df.rdd\
                           .filter(lambda row: row._Tags)\
                           .map(get_id_lang_year_by_row)\
                           .filter(lambda row: row)\
                           .filter(lambda row: 2010 <= row[2] and row[2] <= 2020)

In [13]:
# Группируем по (язык, год)
# Считаем количество упоминаний языков в год
year_lang_count_rdd = id_lang_year_rdd\
    .keyBy(lambda row: (row[2], row[1]))\
    .aggregateByKey( 
        0,
        lambda acc, value: acc + 1,
        lambda acc1, acc2: acc1 + acc2,
    )\
    .map(lambda row: (*row[0], row[1])) # ((year, lang), count) -> (year, lang, count)

In [14]:
# Преобразуем в DataFrame
year_lang_count_df = year_lang_count_rdd.toDF(schema=["year", "lang", "count"])

In [16]:
top_n = 10

# Группируем по году и сортируем по количеству вхождений
# Оставляем только top_n наиболее упоминаемых языков для каждого года

window_group_by_columns = Window.partitionBy(year_lang_count_df["year"])\
                                    .orderBy(year_lang_count_df["count"].desc())
ordered_df = year_lang_count_df.select(year_lang_count_df.columns + [
    row_number().over(window_group_by_columns).alias('row_rank')])

topN_df = ordered_df.filter(f"row_rank <= {top_n}").drop("row_rank")

In [17]:
topN_df.where(col("year") == 2010).show()

+----+-----------+-----+
|year|       lang|count|
+----+-----------+-----+
|2010|       Java|   52|
|2010| JavaScript|   44|
|2010|        PHP|   42|
|2010|     Python|   25|
|2010|Objective-C|   23|
|2010|          C|   20|
|2010|       Ruby|   11|
|2010|     Delphi|    7|
|2010|          R|    3|
|2010|       Perl|    3|
+----+-----------+-----+



In [18]:
# Т.к., например за 2020 нет статей
max_non_empty_year = topN_df.agg({"year": "max"}).collect()[0]["max(year)"]
min_non_empty_year = topN_df.agg({"year": "min"}).collect()[0]["min(year)"]

In [19]:
for year in range(min_non_empty_year, max_non_empty_year+1):
  topN_df.select(col("lang"), col("count").alias(f"Entries in {year} year")).where(col("year") == year).show()

+-----------+--------------------+
|       lang|Entries in 2010 year|
+-----------+--------------------+
|       Java|                  52|
| JavaScript|                  44|
|        PHP|                  42|
|     Python|                  25|
|Objective-C|                  23|
|          C|                  20|
|       Ruby|                  11|
|     Delphi|                   7|
|          R|                   3|
|       Perl|                   3|
+-----------+--------------------+

+-----------+--------------------+
|       lang|Entries in 2011 year|
+-----------+--------------------+
|        PHP|                  97|
|       Java|                  92|
| JavaScript|                  82|
|     Python|                  35|
|Objective-C|                  33|
|          C|                  24|
|       Ruby|                  17|
|     Delphi|                   8|
|       Perl|                   8|
|       Bash|                   7|
+-----------+--------------------+

+-----------+-----

In [20]:
# Сохранить отчет по частям (по годам)
#for year in range(min_non_empty_year, max_non_empty_year+1):
#  year_df = topN_df.select(col("lang"), col("count").alias(f"Entries in {year} year")).where(col("year") == year)
#  year_df.write.format("parquet").save(f"/content/report/top_langs_year_{year}")
# Сохранить отчет одним целым
topN_df.write.format("parquet").save(f"/content/report")