In [44]:
import pyspark
from pyspark import SparkContext, SparkConf
from datetime import datetime

In [45]:
#это нужно для работы с файлами типа xml
from pyspark.sql import SparkSession
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.14.0 pyspark-shell'

In [46]:
sc = SparkSession.builder.appName("Project2").master("local[*]").getOrCreate()

In [47]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType

#Считываем названия языков программирования
prog_languages = sc.read.csv("/mnt/data/programming-languages.csv")

#Создаём из названий список, так удобнее
prog_lang_name=[]
for x in prog_languages.collect():
    prog_lang_name.append(str(x[0]))
prog_lang_name[0:10]

['name',
 'A# .NET',
 'A# (Axiom)',
 'A-0 System',
 'A+',
 'A++',
 'ABAP',
 'ABC',
 'ABC ALGOL',
 'ABSET']

In [48]:
#Считываем данные, которые нужно будет анализировать, выбираем row, который будет интепретироваться как строка в Spark
PSample = sc.read.format('xml').option('rowTag', 'row').load("/mnt/data/posts_sample.xml")

In [49]:
PSample.first()

Row(_AcceptedAnswerId=7, _AnswerCount=13, _Body="<p>I want to use a track-bar to change a form's opacity.</p>\n\n<p>This is my code:</p>\n\n<pre><code>decimal trans = trackBar1.Value / 5000;\nthis.Opacity = trans;\n</code></pre>\n\n<p>When I build the application, it gives the following error:</p>\n\n<blockquote>\n  <p>Cannot implicitly convert type <code>'decimal'</code> to <code>'double'</code></p>\n</blockquote>\n\n<p>I tried using <code>trans</code> and <code>double</code> but then the control doesn't work. This code worked fine in a past VB.NET project.</p>\n", _ClosedDate=None, _CommentCount=2, _CommunityOwnedDate=datetime.datetime(2012, 10, 31, 16, 42, 47, 213000), _CreationDate=datetime.datetime(2008, 7, 31, 21, 42, 52, 667000), _FavoriteCount=48, _Id=4, _LastActivityDate=datetime.datetime(2019, 7, 19, 1, 39, 54, 173000), _LastEditDate=datetime.datetime(2019, 7, 19, 1, 39, 54, 173000), _LastEditorDisplayName='Rich B', _LastEditorUserId=3641067, _OwnerDisplayName=None, _OwnerUse

In [50]:
#переходим к rdd потому,что так мне удобнее и привычнее работать

PS_RDD=PSample.rdd
PS_RDD.take(1)

[Row(_AcceptedAnswerId=7, _AnswerCount=13, _Body="<p>I want to use a track-bar to change a form's opacity.</p>\n\n<p>This is my code:</p>\n\n<pre><code>decimal trans = trackBar1.Value / 5000;\nthis.Opacity = trans;\n</code></pre>\n\n<p>When I build the application, it gives the following error:</p>\n\n<blockquote>\n  <p>Cannot implicitly convert type <code>'decimal'</code> to <code>'double'</code></p>\n</blockquote>\n\n<p>I tried using <code>trans</code> and <code>double</code> but then the control doesn't work. This code worked fine in a past VB.NET project.</p>\n", _ClosedDate=None, _CommentCount=2, _CommunityOwnedDate=datetime.datetime(2012, 10, 31, 16, 42, 47, 213000), _CreationDate=datetime.datetime(2008, 7, 31, 21, 42, 52, 667000), _FavoriteCount=48, _Id=4, _LastActivityDate=datetime.datetime(2019, 7, 19, 1, 39, 54, 173000), _LastEditDate=datetime.datetime(2019, 7, 19, 1, 39, 54, 173000), _LastEditorDisplayName='Rich B', _LastEditorUserId=3641067, _OwnerDisplayName=None, _OwnerUs

In [51]:
#Что бы найти самые популярные языки по годам, будем считать сколько раз они упоминались в тегах
#Для начала исключим года выходящие за установленные рамки ([2010-2020])
#Для этого пишем метод checking_the_date
def checking_the_date(row):
    created_date = row._CreationDate.year
    return 2010 >= created_date and created_date <= 2020

In [52]:
#Будем последовательно совершать все необходимые шаги
#Исключаем года выходящие за рамки
d2=PS_RDD.filter(lambda row: checking_the_date(row))

In [54]:
d2.take(3)

[Row(_AcceptedAnswerId=7, _AnswerCount=13, _Body="<p>I want to use a track-bar to change a form's opacity.</p>\n\n<p>This is my code:</p>\n\n<pre><code>decimal trans = trackBar1.Value / 5000;\nthis.Opacity = trans;\n</code></pre>\n\n<p>When I build the application, it gives the following error:</p>\n\n<blockquote>\n  <p>Cannot implicitly convert type <code>'decimal'</code> to <code>'double'</code></p>\n</blockquote>\n\n<p>I tried using <code>trans</code> and <code>double</code> but then the control doesn't work. This code worked fine in a past VB.NET project.</p>\n", _ClosedDate=None, _CommentCount=2, _CommunityOwnedDate=datetime.datetime(2012, 10, 31, 16, 42, 47, 213000), _CreationDate=datetime.datetime(2008, 7, 31, 21, 42, 52, 667000), _FavoriteCount=48, _Id=4, _LastActivityDate=datetime.datetime(2019, 7, 19, 1, 39, 54, 173000), _LastEditDate=datetime.datetime(2019, 7, 19, 1, 39, 54, 173000), _LastEditorDisplayName='Rich B', _LastEditorUserId=3641067, _OwnerDisplayName=None, _OwnerUs

In [55]:
#метод для проверки тегов на наличие в них названия языка
def search_program_language(row):
    tags_lan=None
    for n in prog_lang_name:
        if ('<' + n.lower() + '>') in str(row._Tags).lower():
            tags_lan = n
            break
    if tags_lan is None:
        return (row[6], 'nothing')
    return (row[6], tags_lan)
    

In [56]:
#Отмечаем где есть и где нету упоминаний о названии языка в тегах 
d3=d2.map(search_program_language)

In [57]:
d3.take(5)

[(datetime.datetime(2008, 7, 31, 21, 42, 52, 667000), 'nothing'),
 (datetime.datetime(2008, 7, 31, 22, 8, 8, 620000), 'nothing'),
 (datetime.datetime(2008, 7, 31, 22, 17, 57, 883000), 'nothing'),
 (datetime.datetime(2008, 7, 31, 23, 40, 59, 743000), 'nothing'),
 (datetime.datetime(2008, 7, 31, 23, 55, 37, 967000), 'nothing')]

In [58]:
#Отсеиваем те, где в тэгах не упоминается название языка
d4=d3.filter(lambda row: row[1]!='nothing')

In [59]:
d4.take(5)

[(datetime.datetime(2010, 9, 23, 12, 13, 59, 443000), 'Java'),
 (datetime.datetime(2010, 9, 26, 17, 7, 4, 840000), 'PHP'),
 (datetime.datetime(2010, 9, 30, 18, 27, 56, 320000), 'Ruby'),
 (datetime.datetime(2010, 10, 1, 11, 52, 42, 210000), 'C'),
 (datetime.datetime(2010, 10, 4, 21, 5, 50, 150000), 'PHP')]

In [60]:
#группируем по году и названию. А затем считаем количество упоминаний
d5=d4.keyBy(lambda row: (row[0].year,row[1])).aggregateByKey(
        0,
        lambda acc, value: acc + 1,
        lambda acc1, acc2: acc1 + acc2,
    )

In [61]:
d5.take(5)

[((2010, 'Python'), 25),
 ((2010, 'JavaScript'), 44),
 ((2010, 'R'), 3),
 ((2010, 'Delphi'), 7),
 ((2010, 'Perl'), 3)]

In [62]:
#Сортируем по убыванию количества упоминаний
d6=d5.sortBy(lambda row: row[1], ascending=False)

In [63]:
d6.take(10)

[((2010, 'Java'), 52),
 ((2010, 'JavaScript'), 44),
 ((2010, 'PHP'), 42),
 ((2009, 'Java'), 28),
 ((2010, 'Python'), 25),
 ((2010, 'Objective-C'), 23),
 ((2009, 'PHP'), 22),
 ((2009, 'Python'), 22),
 ((2010, 'C'), 20),
 ((2009, 'JavaScript'), 12)]

In [64]:
#Сортируем по годам
d7=d6.sortBy(lambda row: row[0][0], ascending=False)
d7.take(10)

[((2010, 'Java'), 52),
 ((2010, 'JavaScript'), 44),
 ((2010, 'PHP'), 42),
 ((2010, 'Python'), 25),
 ((2010, 'Objective-C'), 23),
 ((2010, 'C'), 20),
 ((2010, 'Ruby'), 11),
 ((2010, 'Delphi'), 7),
 ((2010, 'R'), 3),
 ((2010, 'Perl'), 3)]

In [65]:
#Теперь я хочу избавиться от сдвоенного ключа, и добавить столбец с удобной нумерацией,
#которая для каждого года начинается с начала
#Поэтому подготовлю данные и потом создам новый DataFrame
prname=[]
prname2=[]
prname3=[]
prname4=[]

tsr=100000
k=1

for x in d7.collect():
    prname.append(x[0])
    prname3.append(int(x[1]))

for x in prname:
    prname2.append([x[0],x[1]])
    
for x1 in range(0,len(prname)-1):
    t=prname3[x1]
    if(tsr<t):
        k=1
    prname4.append([prname2[x1][0],prname2[x1][1],t,int(k)])
    tsr=t
    k=k+1
prname4[0:10]

[[2010, 'Java', 52, 1],
 [2010, 'JavaScript', 44, 2],
 [2010, 'PHP', 42, 3],
 [2010, 'Python', 25, 4],
 [2010, 'Objective-C', 23, 5],
 [2010, 'C', 20, 6],
 [2010, 'Ruby', 11, 7],
 [2010, 'Delphi', 7, 8],
 [2010, 'R', 3, 9],
 [2010, 'Perl', 3, 10]]

In [66]:
#создаем новый DataFrame
from pyspark.sql import Row
R = Row('Year','Name','Count','Number')
new_DF=sc.createDataFrame([R(i,x,y,z) for i,x,y,z in (prname4)])


In [67]:
#Выводим чтобы полюбоваться на то, что уже есть
new_DF.show(40)

+----+------------+-----+------+
|Year|        Name|Count|Number|
+----+------------+-----+------+
|2010|        Java|   52|     1|
|2010|  JavaScript|   44|     2|
|2010|         PHP|   42|     3|
|2010|      Python|   25|     4|
|2010| Objective-C|   23|     5|
|2010|           C|   20|     6|
|2010|        Ruby|   11|     7|
|2010|      Delphi|    7|     8|
|2010|           R|    3|     9|
|2010|        Perl|    3|    10|
|2010| AppleScript|    3|    11|
|2010|        Bash|    3|    12|
|2010|     Haskell|    2|    13|
|2010|          F#|    2|    14|
|2010|  PowerShell|    1|    15|
|2010|       Mouse|    1|    16|
|2010|       XPath|    1|    17|
|2010|       OCaml|    1|    18|
|2010|ActionScript|    1|    19|
|2010|         ksh|    1|    20|
|2010|         Sed|    1|    21|
|2010|      MATLAB|    1|    22|
|2010|       BASIC|    1|    23|
|2010|          Go|    1|    24|
|2010|      Racket|    1|    25|
|2010|       dBase|    1|    26|
|2009|        Java|   28|     1|
|2009|    

In [68]:
#Записываем в .parquet, чтобы потом открыть но уже sql
new_DF.write.parquet("Lang.parquet")

In [69]:
#Считываем обратно
Table = sc.read.parquet("Lang.parquet")

In [70]:
#Выбираем топ 10 языков для каждого года
Table.createOrReplaceTempView("Table")
top_10 = sc.sql("SELECT Year,Name,Count FROM Table WHERE Number<=10 ")
top_10.show(top_10.count())

+----+-----------+-----+
|Year|       Name|Count|
+----+-----------+-----+
|2008|       Java|    4|
|2008|       Ruby|    3|
|2008| JavaScript|    2|
|2008|          C|    2|
|2008|     Groovy|    1|
|2008|        X++|    1|
|2008|         Io|    1|
|2008|        PHP|    1|
|2009|       Java|   28|
|2009|        PHP|   22|
|2009|     Python|   22|
|2009| JavaScript|   12|
|2009|     Delphi|    7|
|2009|       Ruby|    7|
|2009|          C|    6|
|2009|Objective-C|    5|
|2009|    Haskell|    4|
|2009|       Bash|    3|
|2010|       Ruby|   11|
|2010|     Delphi|    7|
|2010|          R|    3|
|2010|       Perl|    3|
|2010|       Java|   52|
|2010| JavaScript|   44|
|2010|        PHP|   42|
|2010|     Python|   25|
|2010|Objective-C|   23|
|2010|          C|   20|
+----+-----------+-----+



In [71]:
#И сохраняем отчёт в формате Apache Parquet.
top_10.write.parquet("top_10_lang.parquet")