In [99]:
import os
import sys
import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql import SparkSession

In [100]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.17.0 pyspark-shell'
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

In [101]:
post_data = (spark.read.format('xml')
             .option('rowTag', 'row')
             .option("timestampFormat", 'y/M/d H:m:s')
             .load('posts_sample.xml'))

posts_by_date = post_data.filter(F.col("_CreationDate").between("2010-01-01","2020-12-31"))
posts_by_date.show()

+-----------------+------------+--------------------+-----------+-------------+--------------------+--------------------+--------------+-------+--------------------+--------------------+----------------------+-----------------+-----------------+------------+---------+-----------+------+--------------------+--------------------+----------+
|_AcceptedAnswerId|_AnswerCount|               _Body|_ClosedDate|_CommentCount| _CommunityOwnedDate|       _CreationDate|_FavoriteCount|    _Id|   _LastActivityDate|       _LastEditDate|_LastEditorDisplayName|_LastEditorUserId|_OwnerDisplayName|_OwnerUserId|_ParentId|_PostTypeId|_Score|               _Tags|              _Title|_ViewCount|
+-----------------+------------+--------------------+-----------+-------------+--------------------+--------------------+--------------+-------+--------------------+--------------------+----------------------+-----------------+-----------------+------------+---------+-----------+------+--------------------+----------

In [102]:
language_data = (spark.read.format("csv")
                 .option("header", True)
                 .option("inferSchema", True)
                 .load("programming-languages.csv")
                 .dropna(how="all"))

language_data.show()

+------------+--------------------+
|        name|       wikipedia_url|
+------------+--------------------+
|     A# .NET|https://en.wikipe...|
|  A# (Axiom)|https://en.wikipe...|
|  A-0 System|https://en.wikipe...|
|          A+|https://en.wikipe...|
|         A++|https://en.wikipe...|
|        ABAP|https://en.wikipe...|
|         ABC|https://en.wikipe...|
|   ABC ALGOL|https://en.wikipe...|
|       ABSET|https://en.wikipe...|
|       ABSYS|https://en.wikipe...|
|         ACC|https://en.wikipe...|
|      Accent|https://en.wikipe...|
|    Ace DASL|https://en.wikipe...|
|        ACL2|https://en.wikipe...|
|     ACT-III|https://en.wikipe...|
|     Action!|https://en.wikipe...|
|ActionScript|https://en.wikipe...|
|         Ada|https://en.wikipe...|
|     Adenine|https://en.wikipe...|
|        Agda|https://en.wikipe...|
+------------+--------------------+
only showing top 20 rows



In [103]:
from operator import add

def find_names(row, language_names):
    tag = next((name for name in language_names if name.lower() in str(row._Tags).lower()), 'None')
    return (row._CreationDate, tag)

language_names = [str(row['name']) for row in language_data.collect()] # Список имен языков

year_name = posts_by_date.rdd.map(lambda row: find_names(row, language_names)) # Данные в формате дата -- имя языка
year_name_filtered = year_name.filter(lambda row: row[1] != 'None') # Отфильтрованные данные, убраны даты без языков (22 записи)
year_name_keyed = year_name_filtered.keyBy(lambda row: (row[0].year, row[1])) # Добавляем каждой строке ключ -- в формате (год, язык)
year_name_counted = year_name_keyed.aggregateByKey(0, lambda acc, row: acc + 1, lambda acc1, acc2: acc1 + acc2) # Группируем значения по ключам, считая их
year_name_sorted = year_name_counted.sortBy(lambda row: row[1], ascending=False) # Сортируем записи по посчитанному выше количеству

table = year_name_sorted.collect() # Материализуем табличку

# Собираем список языков по годам, включая для каждого из них не более 10 записей
lang_by_years_list = []
for year in range(2010,2021):
    lang_by_years_list.extend([(row[0][0], row[0][1], row[1]) for row in table if row[0][0] == year][:10])


row_template = Row('Year', 'Language', 'Count')
df = spark.createDataFrame([row_template(*x) for x in lang_by_years_list])
df.write.mode("overwrite").parquet("top_10_languages_by_years.parquet")
df.show()

+----+--------+-----+
|Year|Language|Count|
+----+--------+-----+
|2010|       E| 1510|
|2010|       C|  296|
|2010|       B|  210|
|2010|       D|   65|
|2010|       L|   14|
|2010|     Arc|   10|
|2010|       G|    9|
|2010|     ACC|    9|
|2010|       J|    8|
|2010|     PHP|    7|
|2011|       E| 2497|
|2011|       C|  466|
|2011|       B|  336|
|2011|       D|  127|
|2011|       L|   29|
|2011|       G|   15|
|2011|     PHP|   14|
|2011|       J|   14|
|2011|     Arc|   10|
|2011|       F|    9|
+----+--------+-----+
only showing top 20 rows

