### Лабораторная 2. Формирование отчётов в Apache Spark <br>

#### Задание: <br>

> Сформировать отчёт с информацией о 10 наиболее популярных языках программирования по итогам года за период с 2010 по 2020 годы. Отчёт будет отражать динамику изменения популярности языков программирования и представлять собой набор таблиц "топ-10" для каждого года. <br>
Получившийся отчёт сохранить в формате Apache Parquet.
Для выполнения задания вы можете использовать любую комбинацию Spark API: RDD API, Dataset API, SQL API.






In [2]:
pip install pandas

Defaulting to user installation because normal site-packages is not writeable
Collecting pandas
  Downloading pandas-1.3.5-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (11.5 MB)
[K     |████████████████████████████████| 11.5 MB 1.2 MB/s eta 0:00:01
Installing collected packages: pandas
Successfully installed pandas-1.3.5
Note: you may need to restart the kernel to use updated packages.


In [3]:
from pyspark import SparkContext, SparkConf
from datetime import datetime
from pyspark.sql import SparkSession

from typing import NamedTuple
import pandas as pd
import os
import numpy as np

In [4]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.14.0 pyspark-shell'

In [5]:
sc = SparkSession.builder.appName("Project2").master("local[*]").getOrCreate()
sc

In [6]:
posts_sample = sc.read.format('xml').option('rowTag', 'row').load("posts_sample.xml")

In [9]:
posts_sample.show(1)

+-----------------+------------+--------------------+-----------+-------------+--------------------+--------------------+--------------+---+--------------------+--------------------+----------------------+-----------------+-----------------+------------+---------+-----------+------+--------------------+--------------------+----------+
|_AcceptedAnswerId|_AnswerCount|               _Body|_ClosedDate|_CommentCount| _CommunityOwnedDate|       _CreationDate|_FavoriteCount|_Id|   _LastActivityDate|       _LastEditDate|_LastEditorDisplayName|_LastEditorUserId|_OwnerDisplayName|_OwnerUserId|_ParentId|_PostTypeId|_Score|               _Tags|              _Title|_ViewCount|
+-----------------+------------+--------------------+-----------+-------------+--------------------+--------------------+--------------+---+--------------------+--------------------+----------------------+-----------------+-----------------+------------+---------+-----------+------+--------------------+--------------------+-

In [12]:
rdd_posts_sample = posts_sample.rdd # Смена типизации 

In [13]:
# Ограничиваем выборку 2010 < X < 2020

posts_sample_period = rdd_posts_sample.filter(lambda row: (2009 < row._CreationDate.year) and (row._CreationDate.year < 2021))

In [16]:
posts_sample_period.take(1) # Пример выборки 

[Row(_AcceptedAnswerId=None, _AnswerCount=None, _Body='<p>No. (And more stuff to round this up to 15 characters...)</p>\n', _ClosedDate=None, _CommentCount=6, _CommunityOwnedDate=None, _CreationDate=datetime.datetime(2010, 9, 20, 20, 18, 14, 580000), _FavoriteCount=None, _Id=3753373, _LastActivityDate=datetime.datetime(2010, 9, 20, 20, 18, 14, 580000), _LastEditDate=None, _LastEditorDisplayName=None, _LastEditorUserId=None, _OwnerDisplayName=None, _OwnerUserId=10293, _ParentId=3753364, _PostTypeId=2, _Score=13, _Tags=None, _Title=None, _ViewCount=None)]

In [17]:
# Подтягиваем имена

languages_name = sc.read.csv("programming-languages.csv")

In [18]:
languages_name.take(10)

[Row(_c0='name', _c1='wikipedia_url'),
 Row(_c0='A# .NET', _c1='https://en.wikipedia.org/wiki/A_Sharp_(.NET)'),
 Row(_c0='A# (Axiom)', _c1='https://en.wikipedia.org/wiki/A_Sharp_(Axiom)'),
 Row(_c0='A-0 System', _c1='https://en.wikipedia.org/wiki/A-0_System'),
 Row(_c0='A+', _c1='https://en.wikipedia.org/wiki/A%2B_(programming_language)'),
 Row(_c0='A++', _c1='https://en.wikipedia.org/wiki/A%2B%2B'),
 Row(_c0='ABAP', _c1='https://en.wikipedia.org/wiki/ABAP'),
 Row(_c0='ABC', _c1='https://en.wikipedia.org/wiki/ABC_(programming_language)'),
 Row(_c0='ABC ALGOL', _c1='https://en.wikipedia.org/wiki/ABC_ALGOL'),
 Row(_c0='ABSET', _c1='https://en.wikipedia.org/wiki/ABSET')]

In [19]:
languages_name_header = languages_name.first()
languages_name_header

Row(_c0='name', _c1='wikipedia_url')

In [20]:
languages_name

DataFrame[_c0: string, _c1: string]

In [24]:
# Вытягиваем названия языков
names = []
for i in languages_name.collect():
    names.append(str(i[0]))
names[0:10]

['name',
 'A# .NET',
 'A# (Axiom)',
 'A-0 System',
 'A+',
 'A++',
 'ABAP',
 'ABC',
 'ABC ALGOL',
 'ABSET']

In [25]:
# Кол-во наименований 
count_langueges = len(names)-1 
count_langueges

700

In [26]:
# В нижний регистр
def includes_name(x):
    tag = None
    for name in names:
        n = '<' + name.lower() + '>'
        if n in str(x._Tags).lower():
            tag = name
            break
    if tag is None:
        tag = 'No'
            
    return (x[6], tag)

In [30]:
# Убираем там где "NO"
period_language = posts_sample_period.map(includes_name).filter(lambda x: x[1] != 'No')
period_language.take(5)

[(datetime.datetime(2010, 9, 23, 16, 13, 59, 443000), 'Java'),
 (datetime.datetime(2010, 9, 26, 21, 7, 4, 840000), 'PHP'),
 (datetime.datetime(2010, 9, 30, 22, 27, 56, 320000), 'Ruby'),
 (datetime.datetime(2010, 10, 1, 15, 52, 42, 210000), 'C'),
 (datetime.datetime(2010, 10, 5, 1, 5, 50, 150000), 'PHP')]

In [31]:
# Кол-во вхождений языка

period_language_group = period_language.keyBy(lambda row: (row[0].year,row[1])).aggregateByKey(0, lambda x, y: x + 1, lambda x1, x2: x1 + x2).sortBy(lambda x: x[1], ascending=False)

In [33]:
# Результаты группировки


period_language_group.take(10)

[((2016, 'JavaScript'), 272),
 ((2015, 'JavaScript'), 270),
 ((2017, 'JavaScript'), 244),
 ((2014, 'JavaScript'), 235),
 ((2014, 'Java'), 228),
 ((2018, 'Python'), 214),
 ((2015, 'Java'), 208),
 ((2017, 'Java'), 204),
 ((2013, 'JavaScript'), 196),
 ((2018, 'JavaScript'), 196)]

In [34]:
# список
list_period_language_group = []

for row in period_language_group.collect():
    list_row = []
    for i in row[0]:
        list_row.append(i)
    list_row.append(row[1])
    
    list_period_language_group.append([list_row[0], list_row[1], list_row[2]])

In [35]:
list_period_language_group[0:20]

[[2016, 'JavaScript', 272],
 [2015, 'JavaScript', 270],
 [2017, 'JavaScript', 244],
 [2014, 'JavaScript', 235],
 [2014, 'Java', 228],
 [2018, 'Python', 214],
 [2015, 'Java', 208],
 [2017, 'Java', 204],
 [2013, 'JavaScript', 196],
 [2018, 'JavaScript', 196],
 [2013, 'Java', 191],
 [2017, 'Python', 185],
 [2016, 'Java', 179],
 [2013, 'PHP', 173],
 [2019, 'Python', 162],
 [2014, 'PHP', 154],
 [2015, 'PHP', 147],
 [2018, 'Java', 145],
 [2016, 'Python', 141],
 [2012, 'PHP', 136]]

In [36]:
# топ 10 в в список
from pyspark.sql import Row
rows = Row('Year','Language','Count')
df=sc.createDataFrame([rows(x,y,z) for x,y,z in (list_period_language_group[0:100])])

In [37]:
df.show()

+----+----------+-----+
|Year|  Language|Count|
+----+----------+-----+
|2016|JavaScript|  272|
|2015|JavaScript|  270|
|2017|JavaScript|  244|
|2014|JavaScript|  235|
|2014|      Java|  228|
|2018|    Python|  214|
|2015|      Java|  208|
|2017|      Java|  204|
|2013|JavaScript|  196|
|2018|JavaScript|  196|
|2013|      Java|  191|
|2017|    Python|  185|
|2016|      Java|  179|
|2013|       PHP|  173|
|2019|    Python|  162|
|2014|       PHP|  154|
|2015|       PHP|  147|
|2018|      Java|  145|
|2016|    Python|  141|
|2012|       PHP|  136|
+----+----------+-----+
only showing top 20 rows



In [38]:
# Согласно задания, формируем список из топ- 10 лучших языков, за каждый год
perem_year = 2010
i = 0
count_ = 0
list_ = []
while perem_year <= 2020:
    for perem_ in list_period_language_group:
        if perem_year == perem_[0]:
            print(perem_[0], perem_[1], perem_[2])
            list_.append(perem_)
            count_ = count_ + 1
            if count_ == 10:
                count_ = 0 
                break
        i = i + 1
    perem_year += 1

2010 Java 52
2010 JavaScript 44
2010 PHP 42
2010 Python 25
2010 Objective-C 23
2010 C 20
2010 Ruby 11
2010 Delphi 7
2010 AppleScript 3
2010 R 3
2011 PHP 97
2011 Java 92
2011 JavaScript 82
2011 Python 35
2011 Objective-C 33
2011 C 24
2011 Ruby 17
2011 Perl 8
2011 Delphi 8
2011 Bash 7
2012 PHP 136
2012 JavaScript 129
2012 Java 124
2012 Python 65
2012 Objective-C 45
2012 C 27
2012 Ruby 25
2012 Bash 9
2012 R 9
2012 MATLAB 6
2013 JavaScript 196
2013 Java 191
2013 PHP 173
2013 Python 87
2013 Objective-C 40
2013 C 36
2013 Ruby 30
2013 R 25
2013 Bash 11
2013 Scala 10
2014 JavaScript 235
2014 Java 228
2014 PHP 154
2014 Python 103
2014 C 52
2014 Objective-C 49
2014 R 28
2014 Ruby 20
2014 MATLAB 16
2014 Bash 13
2015 JavaScript 270
2015 Java 208
2015 PHP 147
2015 Python 119
2015 R 43
2015 C 38
2015 Objective-C 30
2015 Ruby 21
2015 MATLAB 16
2015 Scala 13
2016 JavaScript 272
2016 Java 179
2016 Python 141
2016 PHP 126
2016 R 50
2016 C 32
2016 Ruby 21
2016 Bash 16
2016 Scala 16
2016 MATLAB 15
2017 Ja

In [41]:
rows = Row('Year','Language','Count')
top_ten_language = sc.createDataFrame([rows(x,y,z) for x,y,z in (list_[0:(len(list_))])])

In [45]:
# Результат:
top_ten_language.show(len(list_))

+----+-----------+-----+
|Year|   Language|Count|
+----+-----------+-----+
|2010|       Java|   52|
|2010| JavaScript|   44|
|2010|        PHP|   42|
|2010|     Python|   25|
|2010|Objective-C|   23|
|2010|          C|   20|
|2010|       Ruby|   11|
|2010|     Delphi|    7|
|2010|AppleScript|    3|
|2010|          R|    3|
|2011|        PHP|   97|
|2011|       Java|   92|
|2011| JavaScript|   82|
|2011|     Python|   35|
|2011|Objective-C|   33|
|2011|          C|   24|
|2011|       Ruby|   17|
|2011|       Perl|    8|
|2011|     Delphi|    8|
|2011|       Bash|    7|
|2012|        PHP|  136|
|2012| JavaScript|  129|
|2012|       Java|  124|
|2012|     Python|   65|
|2012|Objective-C|   45|
|2012|          C|   27|
|2012|       Ruby|   25|
|2012|       Bash|    9|
|2012|          R|    9|
|2012|     MATLAB|    6|
|2013| JavaScript|  196|
|2013|       Java|  191|
|2013|        PHP|  173|
|2013|     Python|   87|
|2013|Objective-C|   40|
|2013|          C|   36|
|2013|       Ruby|   30|


In [46]:
# Получившийся отчёт сохранить в формате Apache Parquet
top_ten_language.write.parquet("top_ten_language.parquet")