In [13]:
from pyspark import SparkContext, SparkConf
from datetime import datetime
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Row

from typing import NamedTuple
import pandas as pd
import os
import sys
import numpy as np
from os import environ
environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.17.0 pyspark-shell'

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
spark = SparkSession.builder.getOrCreate()
spark

### 1. Сформировать отчёт с информацией о 10 наиболее популярных языках программирования по итогам года за период с 2010 по 2020 годы. Отчёт будет отражать динамику изменения популярности языков программирования и представлять собой набор таблиц "топ-10" для каждого года.

In [6]:
posts_sample = spark.read.format('xml').option('rowTag', 'row').load("posts_sample.xml")

In [7]:
posts_sample.show(1, vertical=True)

-RECORD 0--------------------------------------
 _AcceptedAnswerId      | 7                    
 _AnswerCount           | 13                   
 _Body                  | <p>I want to use ... 
 _ClosedDate            | NULL                 
 _CommentCount          | 2                    
 _CommunityOwnedDate    | 2012-10-31 16:42:... 
 _CreationDate          | 2008-07-31 21:42:... 
 _FavoriteCount         | 48                   
 _Id                    | 4                    
 _LastActivityDate      | 2019-07-19 01:39:... 
 _LastEditDate          | 2019-07-19 01:39:... 
 _LastEditorDisplayName | Rich B               
 _LastEditorUserId      | 3641067              
 _OwnerDisplayName      | NULL                 
 _OwnerUserId           | 8                    
 _ParentId              | NULL                 
 _PostTypeId            | 1                    
 _Score                 | 630                  
 _Tags                  | <c#><floating-poi... 
 _Title                 | Convert Decima

In [9]:
posts_sample.count()

14428

In [16]:
dates = ("2010-01-01",  "2020-12-31")
posts_by_period = posts_sample.filter(F.col('_CreationDate').between(*dates))
posts_by_period.show(15)


+-----------------+------------+--------------------+-----------+-------------+--------------------+--------------------+--------------+-------+--------------------+--------------------+----------------------+-----------------+-----------------+------------+---------+-----------+------+-----+------+----------+
|_AcceptedAnswerId|_AnswerCount|               _Body|_ClosedDate|_CommentCount| _CommunityOwnedDate|       _CreationDate|_FavoriteCount|    _Id|   _LastActivityDate|       _LastEditDate|_LastEditorDisplayName|_LastEditorUserId|_OwnerDisplayName|_OwnerUserId|_ParentId|_PostTypeId|_Score|_Tags|_Title|_ViewCount|
+-----------------+------------+--------------------+-----------+-------------+--------------------+--------------------+--------------+-------+--------------------+--------------------+----------------------+-----------------+-----------------+------------+---------+-----------+------+-----+------+----------+
|             NULL|        NULL|<p>No. (And more ...|       NULL

In [18]:
programming_languages = spark.read.format('csv').option('header', 'true').option("inferSchema", True).load("programming-languages.csv").dropna()

In [19]:
programming_languages.show(1, vertical=True)

-RECORD 0-----------------------------
 name          | A# .NET              
 wikipedia_url | https://en.wikipe... 
only showing top 1 row



In [20]:
programming_languages.count()

699

In [22]:
languages = [language.name for language in programming_languages.collect()]
print(languages)

['A# .NET', 'A# (Axiom)', 'A-0 System', 'A+', 'A++', 'ABAP', 'ABC', 'ABC ALGOL', 'ABSET', 'ABSYS', 'ACC', 'Accent', 'Ace DASL', 'ACL2', 'ACT-III', 'Action!', 'ActionScript', 'Ada', 'Adenine', 'Agda', 'Agilent VEE', 'Agora', 'AIMMS', 'Alef', 'ALF', 'ALGOL 58', 'ALGOL 60', 'ALGOL 68', 'ALGOL W', 'Alice', 'Alma-0', 'AmbientTalk', 'Amiga E', 'AMOS', 'AMPL', 'Apex (Salesforce.com)', 'APL', "App Inventor for Android's visual block language", 'AppleScript', 'Arc', 'ARexx', 'Argus', 'AspectJ', 'Assembly language', 'ATS', 'Ateji PX', 'AutoHotkey', 'Autocoder', 'AutoIt', 'AutoLISP / Visual LISP', 'Averest', 'AWK', 'Axum', 'B', 'Babbage', 'Bash', 'BASIC', 'bc', 'BCPL', 'BeanShell', 'Batch (Windows/Dos)', 'Bertrand', 'BETA', 'Bigwig', 'Bistro', 'BitC', 'BLISS', 'Blockly', 'BlooP', 'Blue', 'Boo', 'Boomerang', 'Bourne shell (including', 'bash and', 'ksh )', 'BREW', 'BPEL', 'C', 'C--', 'C++ – ISO/IEC 14882', 'C# – ISO/IEC 23270', 'C/AL', 'Caché ObjectScript', 'C Shell', 'Caml', 'Cayenne', 'CDuce', 'C

In [39]:
def is_tag_in_post(post):
  tag = None
  for language in languages:
    language_tag = '<' + language.lower() + '>'
    if language_tag in str(post._Tags).lower():
      tag = language
      break
  return (post._CreationDate, tag)

In [40]:
post_tag_date = posts_by_period.rdd.map(is_tag_in_post).filter(lambda x: x[1] != None)

In [43]:
posts_language_year = (
    post_tag_date
    .keyBy(lambda row: (row[0].year, row[1]))
    .aggregateByKey(0, lambda x, y: x + 1, lambda x1, x2: x1 + x2)
    .sortBy(lambda x: x[1], ascending=False)
    .collect())

In [44]:
posts_language_year

[((2016, 'JavaScript'), 272),
 ((2015, 'JavaScript'), 270),
 ((2017, 'JavaScript'), 244),
 ((2014, 'JavaScript'), 235),
 ((2014, 'Java'), 228),
 ((2018, 'Python'), 214),
 ((2015, 'Java'), 208),
 ((2017, 'Java'), 204),
 ((2013, 'JavaScript'), 196),
 ((2018, 'JavaScript'), 196),
 ((2013, 'Java'), 191),
 ((2017, 'Python'), 185),
 ((2016, 'Java'), 179),
 ((2013, 'PHP'), 173),
 ((2019, 'Python'), 162),
 ((2014, 'PHP'), 154),
 ((2015, 'PHP'), 147),
 ((2018, 'Java'), 145),
 ((2016, 'Python'), 141),
 ((2012, 'PHP'), 136),
 ((2019, 'JavaScript'), 131),
 ((2012, 'JavaScript'), 129),
 ((2016, 'PHP'), 126),
 ((2012, 'Java'), 124),
 ((2017, 'PHP'), 122),
 ((2015, 'Python'), 119),
 ((2014, 'Python'), 103),
 ((2018, 'PHP'), 99),
 ((2011, 'PHP'), 97),
 ((2019, 'Java'), 95),
 ((2011, 'Java'), 92),
 ((2013, 'Python'), 87),
 ((2011, 'JavaScript'), 82),
 ((2012, 'Python'), 65),
 ((2018, 'R'), 63),
 ((2019, 'PHP'), 59),
 ((2017, 'R'), 53),
 ((2014, 'C'), 52),
 ((2010, 'Java'), 52),
 ((2016, 'R'), 50),
 ((2

In [53]:
language_by_period = []
years = [year for year in range(2010, 2021)]

for year in years[::-1]:
  language_by_period.extend([row for row in posts_language_year if row[0][0] == year][:10])



In [54]:
language_by_period

[((2019, 'Python'), 162),
 ((2019, 'JavaScript'), 131),
 ((2019, 'Java'), 95),
 ((2019, 'PHP'), 59),
 ((2019, 'R'), 36),
 ((2019, 'C'), 14),
 ((2019, 'Go'), 9),
 ((2019, 'Dart'), 9),
 ((2019, 'MATLAB'), 9),
 ((2019, 'Ruby'), 8),
 ((2018, 'Python'), 214),
 ((2018, 'JavaScript'), 196),
 ((2018, 'Java'), 145),
 ((2018, 'PHP'), 99),
 ((2018, 'R'), 63),
 ((2018, 'C'), 24),
 ((2018, 'Scala'), 22),
 ((2018, 'TypeScript'), 21),
 ((2018, 'PowerShell'), 13),
 ((2018, 'Bash'), 12),
 ((2017, 'JavaScript'), 244),
 ((2017, 'Java'), 204),
 ((2017, 'Python'), 185),
 ((2017, 'PHP'), 122),
 ((2017, 'R'), 53),
 ((2017, 'C'), 24),
 ((2017, 'Objective-C'), 19),
 ((2017, 'Ruby'), 16),
 ((2017, 'PowerShell'), 14),
 ((2017, 'TypeScript'), 14),
 ((2016, 'JavaScript'), 272),
 ((2016, 'Java'), 179),
 ((2016, 'Python'), 141),
 ((2016, 'PHP'), 126),
 ((2016, 'R'), 50),
 ((2016, 'C'), 32),
 ((2016, 'Ruby'), 21),
 ((2016, 'Scala'), 16),
 ((2016, 'Bash'), 16),
 ((2016, 'MATLAB'), 15),
 ((2015, 'JavaScript'), 270),
 (

In [57]:
from pyspark.sql import Row
row = Row('Year','Language','Count')
df=spark.createDataFrame([row(*x, y) for x, y in language_by_period])
df.show()

+----+----------+-----+
|Year|  Language|Count|
+----+----------+-----+
|2019|    Python|  162|
|2019|JavaScript|  131|
|2019|      Java|   95|
|2019|       PHP|   59|
|2019|         R|   36|
|2019|         C|   14|
|2019|        Go|    9|
|2019|      Dart|    9|
|2019|    MATLAB|    9|
|2019|      Ruby|    8|
|2018|    Python|  214|
|2018|JavaScript|  196|
|2018|      Java|  145|
|2018|       PHP|   99|
|2018|         R|   63|
|2018|         C|   24|
|2018|     Scala|   22|
|2018|TypeScript|   21|
|2018|PowerShell|   13|
|2018|      Bash|   12|
+----+----------+-----+
only showing top 20 rows



### 2.Получившийся отчёт сохранить в формате Apache Parquet.

In [60]:
from google.colab import drive
drive.mount('/content/drive')

# Получившийся отчёт сохранить в формате Apache Parquet
df.write.parquet("/Colab Notebooks/top_languages.parquet")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
