1. Настройка окружения и загрузка библиотек

In [24]:
!pip install pyspark
!pip install --upgrade pyspark cloudpickle



In [25]:
import os
import sys
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import col

In [26]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.17.0 pyspark-shell'

2. Создание Spark-сессии

In [27]:
spark = SparkSession.builder.getOrCreate()

3. Загрузка данных

In [28]:
# Скачиваем файл, если его нет
if not os.path.exists('posts_sample.xml'):
    !wget https://git.ai.ssau.ru/tk/big_data/raw/branch/master/data/posts_sample.xml

# Чтение XML-файла
posts_df = spark.read.format('xml') \
    .option('rowTag', 'row') \
    .option("timestampFormat", 'yyyy/MM/dd HH:mm:ss') \
    .load('posts_sample.xml')

# Проверка данных
print("Количество строк:", posts_df.count())

start_date, end_date = "2010-01-01", "2020-12-31"
posts_filtered = posts_df.filter(col("_CreationDate").between(start_date, end_date))

Количество строк: 46006


In [29]:
# Скачиваем файл, если его нет
if not os.path.exists('programming-languages.csv'):
    !wget https://git.ai.ssau.ru/tk/big_data/raw/branch/master/data/programming-languages.csv

# Чтение CSV-файла
languages_df = spark.read.format('csv') \
    .option('header', 'true') \
    .option("inferSchema", True) \
    .load('programming-languages.csv') \
    .dropna()

# Проверка данных
print("Всего языков:", languages_df.count())

language_names = [row['name'] for row in languages_df.collect()]

Всего языков: 699


4. Функция для поиска языков в тегах

In [30]:
def includes_name(row):
    creation_date = row["_CreationDate"]
    tags = str(row["_Tags"]).lower() if row["_Tags"] else ""
    for name in language_names:
        if f"<{name.lower()}>" in tags:
            return (creation_date, name)
    return (creation_date, 'None')

5. Обработка постов и подсчет языков

In [31]:
posts_filtered_rdd = posts_filtered.rdd \
    .map(includes_name) \
    .filter(lambda x: x[1] != 'None')

posts_aggregate = posts_filtered_rdd \
    .keyBy(lambda row: (row[0].year, row[1])) \
    .aggregateByKey(0, lambda x, _: x + 1, lambda x1, x2: x1 + x2) \
    .sortBy(lambda x: x[1], ascending=False) \
    .collect()

years = list(range(2010, 2021))
years_df = []

for year in years:
    first_languages = [row for row in posts_aggregate if row[0][0] == year][:10]
    years_df.extend(first_languages)

row_name = Row('Year', 'Language', 'Count')
result_df = spark.createDataFrame([
    row_name(year, lang, count) for ((year, lang), count) in years_df
])

result_df.show(100 ,truncate=False)
result_df.write.mode("overwrite").parquet("result.parquet")

+----+-----------+-----+
|Year|Language   |Count|
+----+-----------+-----+
|2010|Java       |52   |
|2010|JavaScript |44   |
|2010|PHP        |42   |
|2010|Python     |25   |
|2010|Objective-C|23   |
|2010|C          |20   |
|2010|Ruby       |11   |
|2010|Delphi     |7    |
|2010|R          |3    |
|2010|AppleScript|3    |
|2011|PHP        |97   |
|2011|Java       |92   |
|2011|JavaScript |82   |
|2011|Python     |35   |
|2011|Objective-C|33   |
|2011|C          |24   |
|2011|Ruby       |17   |
|2011|Delphi     |8    |
|2011|Perl       |8    |
|2011|Bash       |7    |
|2012|PHP        |136  |
|2012|JavaScript |129  |
|2012|Java       |124  |
|2012|Python     |65   |
|2012|Objective-C|45   |
|2012|C          |27   |
|2012|Ruby       |25   |
|2012|R          |9    |
|2012|Bash       |9    |
|2012|MATLAB     |6    |
|2013|JavaScript |196  |
|2013|Java       |191  |
|2013|PHP        |173  |
|2013|Python     |87   |
|2013|Objective-C|40   |
|2013|C          |36   |
|2013|Ruby       |30   |


6. Сохранение результатов в Parquet

In [32]:
result_df.write.mode("overwrite").parquet("result.parquet")