In [25]:
import pyspark
import os

from datetime import datetime
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DateType
from pyspark import SparkContext, SparkConf

In [26]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.13.0 pyspark-shell'

In [27]:
sc = SparkSession.builder.appName("2lab").master("yarn").getOrCreate()

In [46]:
!hadoop fs -put * .

put: `2lab.ipynb': File exists
put: `hs_err_pid16302.log': File exists
put: `posts_sample.xml': File exists
put: `programming-languages.csv': File exists


In [28]:
prog_lang = sc.read.csv("2lab/programming-languages.csv")
posts_sample = sc.read.format("xml").options(rowTag="row").load('2lab/posts_sample.xml')

In [29]:
prog_lang.first()

Row(_c0='name', _c1='wikipedia_url')

In [30]:
posts_sample.first()

Row(_AcceptedAnswerId=7, _AnswerCount=13, _Body="<p>I want to use a track-bar to change a form's opacity.</p>\n\n<p>This is my code:</p>\n\n<pre><code>decimal trans = trackBar1.Value / 5000;\nthis.Opacity = trans;\n</code></pre>\n\n<p>When I build the application, it gives the following error:</p>\n\n<blockquote>\n  <p>Cannot implicitly convert type <code>'decimal'</code> to <code>'double'</code></p>\n</blockquote>\n\n<p>I tried using <code>trans</code> and <code>double</code> but then the control doesn't work. This code worked fine in a past VB.NET project.</p>\n", _ClosedDate=None, _CommentCount=2, _CommunityOwnedDate=datetime.datetime(2012, 10, 31, 20, 42, 47, 213000), _CreationDate=datetime.datetime(2008, 8, 1, 2, 42, 52, 667000), _FavoriteCount=48, _Id=4, _LastActivityDate=datetime.datetime(2019, 7, 19, 5, 39, 54, 173000), _LastEditDate=datetime.datetime(2019, 7, 19, 5, 39, 54, 173000), _LastEditorDisplayName='Rich B', _LastEditorUserId=3641067, _OwnerDisplayName=None, _OwnerUserI

In [31]:
prog_lang_list = [str(x[0]) for x in prog_lang.collect()]
prog_lang_list[:7]

['name', 'A# .NET', 'A# (Axiom)', 'A-0 System', 'A+', 'A++', 'ABAP']

In [32]:
pl_by_year = {}

for year in range(2010, 2020):
    pl_by_year[year] = posts_sample.rdd \
        .filter(lambda x: x._Tags is not None and datetime(year=year, month=1, day=1) <= x._CreationDate <= datetime(year=year, month=12, day=31)) \
        .flatMap(lambda x: [(x._Id, language) for language in prog_lang_list if "<" + language.lower() + ">" in x._Tags.lower()]) \
        .keyBy(lambda x: x[1]) \
        .aggregateByKey(0, lambda x, y: x + 1, lambda x, y: x + y) \
        .sortBy(lambda x: x[1], ascending=False) \
        .toDF()

    pl_by_year[year] = pl_by_year[year] \
        .select(col("_1").alias("PL"), col("_2").alias(f"Mentioned_in_{year}")) \
        .limit(10)

    pl_by_year[year].show()

+-----------+-----------------+
|         PL|Mentioned_in_2010|
+-----------+-----------------+
|       Java|               52|
|        PHP|               46|
| JavaScript|               44|
|     Python|               26|
|Objective-C|               22|
|          C|               20|
|       Ruby|               12|
|     Delphi|                8|
|AppleScript|                3|
|          R|                3|
+-----------+-----------------+

+-----------+-----------------+
|         PL|Mentioned_in_2011|
+-----------+-----------------+
|        PHP|              102|
|       Java|               93|
| JavaScript|               83|
|     Python|               37|
|Objective-C|               34|
|          C|               24|
|       Ruby|               20|
|       Perl|                9|
|     Delphi|                8|
|       Bash|                7|
+-----------+-----------------+

+-----------+-----------------+
|         PL|Mentioned_in_2012|
+-----------+-----------------+
|     

In [33]:
for i in pl_by_year.keys():
    pl_by_year[i].write.format("parquet").save(f"2lab/pl_by_{i}")

In [34]:
!hadoop fs -get /user/cordanius/2lab ~/lab2

In [24]:
sc.stop()