In [1]:
import os
import sys
from pyspark.sql import Row
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

In [2]:
os.environ.update({
    "PYSPARK_PYTHON": sys.executable,
    "PYSPARK_DRIVER_PYTHON": sys.executable,
    "PYSPARK_SUBMIT_ARGS": "--packages com.databricks:spark-xml_2.12:0.17.0 pyspark-shell"
})

spark = SparkSession.builder.getOrCreate()

In [3]:
# Скачиваем датасеты из репа и загружаем датасеты (Spark)
!wget https://git.ai.ssau.ru/tk/big_data/raw/branch/master/data/posts_sample.xml
!wget https://git.ai.ssau.ru/tk/big_data/raw/branch/master/data/programming-languages.csv

postsData = spark.read.format('xml').option('rowTag', 'row').option("timestampFormat", 'y/M/d H:m:s').load('posts_sample.xml')
langData = spark.read.format('csv').option('header', 'true').option("inferSchema", True).load('programming-languages.csv').dropna()

--2025-04-29 18:05:59--  https://git.ai.ssau.ru/tk/big_data/raw/branch/master/data/posts_sample.xml
Resolving git.ai.ssau.ru (git.ai.ssau.ru)... 91.222.131.161
Connecting to git.ai.ssau.ru (git.ai.ssau.ru)|91.222.131.161|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 74162295 (71M) [text/plain]
Saving to: ‘posts_sample.xml.4’


2025-04-29 18:08:16 (532 KB/s) - ‘posts_sample.xml.4’ saved [74162295/74162295]

--2025-04-29 18:08:16--  https://git.ai.ssau.ru/tk/big_data/raw/branch/master/data/programming-languages.csv
Resolving git.ai.ssau.ru (git.ai.ssau.ru)... 91.222.131.161
Connecting to git.ai.ssau.ru (git.ai.ssau.ru)|91.222.131.161|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 40269 (39K) [text/plain]
Saving to: ‘programming-languages.csv.3’


2025-04-29 18:08:18 (128 KB/s) - ‘programming-languages.csv.3’ saved [40269/40269]



In [9]:
print("For POST \n\n\nSchema: ")
postsData.printSchema()
postsData.show(n = 2)
postsData.describe().show()

print("\n\n\nFor LANG \n\n\nSchema: ")
langData.printSchema()
langData.show(n = 2)
langData.describe().show()

For POST 


Schema: 
root
 |-- _AcceptedAnswerId: long (nullable = true)
 |-- _AnswerCount: long (nullable = true)
 |-- _Body: string (nullable = true)
 |-- _ClosedDate: timestamp (nullable = true)
 |-- _CommentCount: long (nullable = true)
 |-- _CommunityOwnedDate: timestamp (nullable = true)
 |-- _CreationDate: timestamp (nullable = true)
 |-- _FavoriteCount: long (nullable = true)
 |-- _Id: long (nullable = true)
 |-- _LastActivityDate: timestamp (nullable = true)
 |-- _LastEditDate: timestamp (nullable = true)
 |-- _LastEditorDisplayName: string (nullable = true)
 |-- _LastEditorUserId: long (nullable = true)
 |-- _OwnerDisplayName: string (nullable = true)
 |-- _OwnerUserId: long (nullable = true)
 |-- _ParentId: long (nullable = true)
 |-- _PostTypeId: long (nullable = true)
 |-- _Score: long (nullable = true)
 |-- _Tags: string (nullable = true)
 |-- _Title: string (nullable = true)
 |-- _ViewCount: long (nullable = true)

+-----------------+------------+--------------------+---

In [5]:
#Посты в периоде с 2010-01-01 по 2020-12-31
posts_in_period = postsData.filter(F.col("_CreationDate").between(*("2010-01-01",  "2020-12-31")))

# Названия языков программирования
lang_names = [str(x[0]) for x in langData.collect()]
lang_names[:10]

['A# .NET',
 'A# (Axiom)',
 'A-0 System',
 'A+',
 'A++',
 'ABAP',
 'ABC',
 'ABC ALGOL',
 'ABSET',
 'ABSYS']

In [6]:
# Получение тега ЯП из поста
def find_language(post):
    post_tags = str(post._Tags).lower()
    for lang in lang_names:
        if f"<{lang.lower()}>" in post_tags:
            return (post[6], lang)
    return (post[6], 'NoLang')

# Фильтрация постов
filtered_posts = posts_in_period.rdd.map(find_language).filter(lambda x: x[1] != 'NoLang')

# Группировка и получение кол-ва записей
def pair_with_one(record):
    return ((record[0].year, record[1]), 1)

def sum_counts(a, b):
    return a + b

# Кол-во для пары год, язык
counts_rdd = filtered_posts.map(pair_with_one).reduceByKey(sum_counts)

# Сорт
sorted_counts = counts_rdd.sortBy(lambda x: x[1], ascending=False).collect()

years_list = list(range(2020, 2009, -1))

# 10 языков для каждого года
top_per_year = []
for year in years_list:
    top_for_year = [item for item in sorted_counts if item[0][0] == year][:10]
    top_per_year.extend(top_for_year)


In [8]:
#Сохраняем итоговый файл
row_template = Row('Year', 'Language', 'Count')
result_df = spark.createDataFrame([row_template(x[0][0], x[0][1], x[1]) for x in top_per_year])

result_df.write.parquet("top_langs.paquet")
result_df.show()

+----+----------+-----+
|Year|  Language|Count|
+----+----------+-----+
|2019|    Python|  162|
|2019|JavaScript|  131|
|2019|      Java|   95|
|2019|       PHP|   59|
|2019|         R|   36|
|2019|         C|   14|
|2019|      Dart|    9|
|2019|    MATLAB|    9|
|2019|        Go|    9|
|2019|      Bash|    8|
|2018|    Python|  214|
|2018|JavaScript|  196|
|2018|      Java|  145|
|2018|       PHP|   99|
|2018|         R|   63|
|2018|         C|   24|
|2018|     Scala|   22|
|2018|TypeScript|   21|
|2018|PowerShell|   13|
|2018|      Bash|   12|
+----+----------+-----+
only showing top 20 rows

