# Лабораторная работа №2 - Reports with Apache Spark
*  Установка **Spark**

In [1]:
!pip3 install pyspark



*  Подключение библиотек

In [2]:
import os
import sys

import pyspark.sql.functions as F

from pyspark.sql import Row
from pyspark.sql import SparkSession

*  Объявление переменных окружения

In [3]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.17.0 pyspark-shell'

*  Создание **SparkSession**

In [4]:
spark = SparkSession.builder.getOrCreate()
spark

*  Чтение данных

In [8]:
postsData = spark.read.format('xml').option('rowTag', 'row').option("timestampFormat", 'y/M/d H:m:s').load('posts_sample.xml')
languagesData = spark.read.format('csv').option('header', 'true').option("inferSchema", True).load('programming-languages.csv').dropna()

In [9]:
print("Posts Data Schema:")
postsData.printSchema()

Posts Data Schema:
root
 |-- _AcceptedAnswerId: long (nullable = true)
 |-- _AnswerCount: long (nullable = true)
 |-- _Body: string (nullable = true)
 |-- _ClosedDate: timestamp (nullable = true)
 |-- _CommentCount: long (nullable = true)
 |-- _CommunityOwnedDate: timestamp (nullable = true)
 |-- _CreationDate: timestamp (nullable = true)
 |-- _FavoriteCount: long (nullable = true)
 |-- _Id: long (nullable = true)
 |-- _LastActivityDate: timestamp (nullable = true)
 |-- _LastEditDate: timestamp (nullable = true)
 |-- _LastEditorDisplayName: string (nullable = true)
 |-- _LastEditorUserId: long (nullable = true)
 |-- _OwnerDisplayName: string (nullable = true)
 |-- _OwnerUserId: long (nullable = true)
 |-- _ParentId: long (nullable = true)
 |-- _PostTypeId: long (nullable = true)
 |-- _Score: long (nullable = true)
 |-- _Tags: string (nullable = true)
 |-- _Title: string (nullable = true)
 |-- _ViewCount: long (nullable = true)



In [10]:
print("Languages Data Schema:")
languagesData.printSchema()

Languages Data Schema:
root
 |-- name: string (nullable = true)
 |-- wikipedia_url: string (nullable = true)



## **Задание**
Сформировать отчёт с информацией о 10 наиболее популярных языках программирования по итогам года за период с 2010 по 2020 годы. Отчёт будет отражать динамику изменения популярности языков программирования и представлять собой набор таблиц "топ-10" для каждого года.

Получившийся отчёт сохранить в формате Apache Parquet.


1. Фильтрация постов по дате

In [14]:
posts = postsData.filter(F.col("_CreationDate").between("2010-01-01", "2020-12-31"))
posts.show(1, vertical=True)

-RECORD 0--------------------------------------
 _AcceptedAnswerId      | NULL                 
 _AnswerCount           | NULL                 
 _Body                  | <p>No. (And more ... 
 _ClosedDate            | NULL                 
 _CommentCount          | 6                    
 _CommunityOwnedDate    | NULL                 
 _CreationDate          | 2010-09-20 16:18:... 
 _FavoriteCount         | NULL                 
 _Id                    | 3753373              
 _LastActivityDate      | 2010-09-20 16:18:... 
 _LastEditDate          | NULL                 
 _LastEditorDisplayName | NULL                 
 _LastEditorUserId      | NULL                 
 _OwnerDisplayName      | NULL                 
 _OwnerUserId           | 10293                
 _ParentId              | 3753364              
 _PostTypeId            | 2                    
 _Score                 | 13                   
 _Tags                  | NULL                 
 _Title                 | NULL          

2. Извлечение названий языков

In [18]:
languages = [str(record[0]) for record in languagesData.collect()]
languages[:5]

['A# .NET', 'A# (Axiom)', 'A-0 System', 'A+', 'A++']

3. Реализация функции для извлечения названия языка из тега поста

In [20]:
def get_language_name(post):
  tags = str(post._Tags).lower()
  tag = next((l for l in languages if f"<{l.lower()}>" in tags), None)
  return (post[6], tag)

4. Обработка данных

In [21]:
posts_rdd = posts.rdd \
    .map(get_language_name) \
    .filter(lambda x: x[1] is not None)

posts_rdd_group = posts_rdd \
    .keyBy(lambda post: (post[0].year, post[1])) \
    .aggregateByKey(0, lambda x, y: x + 1, lambda x1, x2: x1 + x2) \
    .sortBy(lambda x: x[1], ascending=False) \
    .collect()

years = list(range(2010, 2021))[::-1]

top_by_years_df = []
for year in years:
    top_by_years_df.extend([post for post in posts_rdd_group if post[0][0] == year][:10])

template = Row('Year', 'Language', 'Count')

result_df = spark.createDataFrame(
    [template(*x, y) for x, y in top_by_years_df]
)

result_df.show()

+----+----------+-----+
|Year|  Language|Count|
+----+----------+-----+
|2019|    Python|  162|
|2019|JavaScript|  131|
|2019|      Java|   95|
|2019|       PHP|   59|
|2019|         R|   36|
|2019|         C|   14|
|2019|      Dart|    9|
|2019|    MATLAB|    9|
|2019|        Go|    9|
|2019|      Bash|    8|
|2018|    Python|  214|
|2018|JavaScript|  196|
|2018|      Java|  145|
|2018|       PHP|   99|
|2018|         R|   63|
|2018|         C|   24|
|2018|     Scala|   22|
|2018|TypeScript|   21|
|2018|PowerShell|   13|
|2018|      Bash|   12|
+----+----------+-----+
only showing top 20 rows



5. Сохранение результатов в формате **Parquet**

In [22]:
result_df.write.parquet("top_10_languages_by_years_2010-2020.parquet")