In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 57.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=dc6f8eb06c5378f54d9a68a91cce1a858de457b2ddb99fe826c65e0eecba45b4
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [2]:
import pyspark
import os

from datetime import datetime
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import StructType, StructField, StringType, DateType

In [3]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.13.0 pyspark-shell'
sc = SparkSession.builder.appName("L2").master("local[*]").getOrCreate()

In [4]:
sc

In [5]:
programming_languages = sc.read.csv("/content/drive/MyDrive/Big_Data/programming-languages.csv")

In [6]:
# Создадим список из имен языков программирования
prog_languages_list = [str(x[0]) for x in programming_languages.collect()]

In [15]:
prog_languages_list[:10]

['name',
 'A# (Axiom)',
 'A-0 System',
 'A+',
 'A++',
 'ABAP',
 'ABC',
 'ABC ALGOL',
 'ABSET',
 'ABSYS']

In [17]:
posts_sample = sc.read.format("xml").options(rowTag="row").load('/content/drive/MyDrive/Big_Data/posts_sample.xml')

In [19]:
posts_sample.take(1)

[Row(_AcceptedAnswerId=7, _AnswerCount=13, _Body="<p>I want to use a track-bar to change a form's opacity.</p>\n\n<p>This is my code:</p>\n\n<pre><code>decimal trans = trackBar1.Value / 5000;\nthis.Opacity = trans;\n</code></pre>\n\n<p>When I build the application, it gives the following error:</p>\n\n<blockquote>\n  <p>Cannot implicitly convert type <code>'decimal'</code> to <code>'double'</code></p>\n</blockquote>\n\n<p>I tried using <code>trans</code> and <code>double</code> but then the control doesn't work. This code worked fine in a past VB.NET project.</p>\n", _ClosedDate=None, _CommentCount=2, _CommunityOwnedDate=datetime.datetime(2012, 10, 31, 16, 42, 47, 213000), _CreationDate=datetime.datetime(2008, 7, 31, 21, 42, 52, 667000), _FavoriteCount=48, _Id=4, _LastActivityDate=datetime.datetime(2019, 7, 19, 1, 39, 54, 173000), _LastEditDate=datetime.datetime(2019, 7, 19, 1, 39, 54, 173000), _LastEditorDisplayName='Rich B', _LastEditorUserId=3641067, _OwnerDisplayName=None, _OwnerUs

In [20]:
def language_detection(x):
  """
  Данная функция переводит весь текст в нижний регистр
  и ищет название языка программирования в каждой строке,
  если язык был найден, то создается кортеж, иначе None
  """
  tag = None
  for language in prog_languages_list:
    if "<" + language.lower() + ">" in x._Tags.lower():
      tag = language
      break
  if tag is None:
    return None
  return (x._Id, tag)


def check_date(x, year):
  """
  Данная функция была написана для фильтрации по датам,
  так как нас интересует период с 2010 год по 2019 год
  """
  start = datetime(year=year, month=1, day=1)
  end = datetime(year=year, month=12, day=31)
  CreationDate = x._CreationDate
  return CreationDate >= start and CreationDate <= end

In [21]:
"""
Данный кусок кода сначала убирает пустые значения и оставляет диапазон с 2010 по 2019 год,
далее мы находим язык программирования в каждой строке и убираем пустые значения, если не был 
найден, потом смотрим, сколько раз упоминался каждый язык программирования в каждом годе и сортируем по 
количеству повторений и в конце идет сортировка от большего к меньшему по количеству упоминаний.
"""

final_result = {}
for year in range(2010, 2020):
  final_result[year] = posts_sample.rdd\
      .filter(lambda x: x._Tags is not None and check_date(x, year))\
      .map(language_detection)\
      .filter(lambda x: x is not None)\
      .keyBy(lambda x: x[1])\
      .aggregateByKey(
          0,
          lambda x, y: x + 1,
          lambda x1, x2: x1 + x2,
      )\
      .sortBy(lambda x: x[1], ascending=False)\
      .toDF()
  final_result[year] = final_result[year].select(col("_1").alias("Programming_language"), 
                                                 col("_2").alias(f"Number_of_mentions_in_{year}")).limit(10)
  final_result[year].show()

+--------------------+--------------------------+
|Programming_language|Number_of_mentions_in_2010|
+--------------------+--------------------------+
|                Java|                        52|
|          JavaScript|                        44|
|                 PHP|                        42|
|              Python|                        25|
|         Objective-C|                        22|
|                   C|                        20|
|                Ruby|                        11|
|              Delphi|                         7|
|                   R|                         3|
|                Bash|                         3|
+--------------------+--------------------------+

+--------------------+--------------------------+
|Programming_language|Number_of_mentions_in_2011|
+--------------------+--------------------------+
|                 PHP|                        97|
|                Java|                        92|
|          JavaScript|                        82|

In [22]:
for year in final_result.keys():
    final_result[year].write.format("parquet").save(f"/content/drive/MyDrive/Big_Data/top_{year}")