In [64]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col
from datetime import datetime

# Инициализация SparkSession
spark = SparkSession.builder.appName("ProgrammingLanguageReport").getOrCreate()

In [65]:
# Загрузка данных
posts_path = "posts_sample.xml"
programming_languages_path = "programming-languages.csv"
years = list(map(str, range(2010, 2021)))
top_count = 10

In [16]:
!hadoop fs -put /home/Lab2/posts_sample.xml ./
!hadoop fs -put /home/Lab2/programming-languages.csv ./

In [66]:
# Загрузка RDD для постов
posts_rdd = spark.sparkContext.textFile(posts_path)
posts_count = posts_rdd.count()
posts_raw = posts_rdd.zipWithIndex().filter(lambda row: row[1] > 2 and row[1] < posts_count - 1).map(lambda row: row[0])

In [67]:
spark.sparkContext.textFile(posts_path).count()

46009

In [68]:
# Загрузка RDD для языков программирования
programming_languages_raw = spark.sparkContext.textFile(programming_languages_path).zipWithIndex().filter(lambda row: row[1] > 0).map(lambda row: row[0])
programming_languages = programming_languages_raw.map(lambda row: row.split(",")).filter(lambda row_values: len(row_values) == 2).map(lambda row_values: row_values[0].lower()).collect()

In [81]:
import xml.etree.ElementTree as ET
# Парсинг XML данных
posts_xml = posts_raw.map(lambda row: ET.fromstring(row))

In [86]:
posts_xml.take(3)[2].get("Tags")[1:-1].split("><")

['c#', '.net', 'datetime']

In [71]:
? ET.fromstring

[0;31mSignature:[0m  [0mET[0m[0;34m.[0m[0mfromstring[0m[0;34m([0m[0mtext[0m[0;34m,[0m [0mparser[0m[0;34m=[0m[0;32mNone[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Parse XML document from string constant.

This function can be used to embed "XML Literals" in Python code.

*text* is a string containing XML data, *parser* is an
optional parser instance, defaulting to the standard XMLParser.

Returns an Element instance.
[0;31mFile:[0m      /opt/miniconda3/lib/python3.9/xml/etree/ElementTree.py
[0;31mType:[0m      function


In [87]:
# Извлечение даты создания и тегов
def extract_creation_date_and_tags(e):
    creation_date = e.get("CreationDate")
    tags = e.get("Tags")
    return (creation_date, tags)

post_creation_date_and_tags = posts_xml.map(extract_creation_date_and_tags).filter(lambda x: x[0] is not None and x[1] is not None)

In [88]:
# Парсинг даты создания и тегов
def parse_creation_date_and_tags(e):
    creation_date, tags = e
    year = creation_date[:4]
    tags_array = tags[1:-1].split("><")
    return year, tags_array

post_year_tags = post_creation_date_and_tags.map(parse_creation_date_and_tags)

In [89]:
# Создание DataFrame
schema = StructType([
    StructField("Year", StringType(), True),
    StructField("Language", StringType(), True),
    StructField("Count", IntegerType(), True)
])

In [90]:
# Обработка данных
def process_data(report_year):
    # Отфильтровать теги и подсчитать их количество
    year_tags_counts = post_year_tags.filter(lambda x: x[0] == report_year).flatMap(lambda x: [(tag, 1) for tag in x[1]])
    
    # Отфильтровать только языки программирования и кэшировать результат
    year_language_tags = year_tags_counts.filter(lambda x: x[0] in programming_languages).cache()

    # Подсчитать количество языков программирования и сформировать топ-10
    top_tags_counts = year_language_tags.reduceByKey(lambda a, b: a + b).map(lambda x: (report_year, x[0], x[1]))
    top_tags_counts = top_tags_counts.sortBy(lambda x: -x[2]).take(top_count)
    return top_tags_counts

In [91]:
# Создание отчета для каждого года
years_tag_counts = [process_data(year) for year in years]

# Создание окончательного отчета
final_report = spark.createDataFrame(spark.sparkContext.parallelize([item for sublist in years_tag_counts for item in sublist]), schema=schema)

# Сохранение в формате Parquet
parquet_output_path = "programming_language_report.parquet"
final_report.write.mode("overwrite").parquet(parquet_output_path)

# Показать результат
final_report.show(len(years) * top_count, truncate=False)

# Время выполнения
start_time = datetime.now()
print(f"Duration: {datetime.now() - start_time}")




+----+-----------+-----+
|Year|Language   |Count|
+----+-----------+-----+
|2010|java       |52   |
|2010|php        |46   |
|2010|javascript |44   |
|2010|python     |26   |
|2010|objective-c|23   |
|2010|c          |20   |
|2010|ruby       |12   |
|2010|delphi     |8    |
|2010|applescript|3    |
|2010|bash       |3    |
|2011|php        |102  |
|2011|java       |93   |
|2011|javascript |83   |
|2011|python     |37   |
|2011|objective-c|34   |
|2011|c          |24   |
|2011|ruby       |20   |
|2011|perl       |9    |
|2011|delphi     |8    |
|2011|bash       |7    |
|2012|php        |154  |
|2012|javascript |132  |
|2012|java       |124  |
|2012|python     |69   |
|2012|objective-c|45   |
|2012|ruby       |27   |
|2012|c          |27   |
|2012|bash       |10   |
|2012|r          |9    |
|2012|xpath      |6    |
|2013|javascript |198  |
|2013|php        |198  |
|2013|java       |194  |
|2013|python     |90   |
|2013|objective-c|40   |
|2013|c          |36   |
|2013|ruby       |32   |


In [95]:
!hadoop fs -get programming_language_report.parquet ./

In [96]:
# Завершение сессии Spark
spark.stop()