In [1]:
from datetime import datetime
from pyspark.sql.types import StructType, StructField, StringType, DateType
from pyspark.sql.functions import asc, desc, rank, col
from pyspark.sql.window import Window

In [2]:
from pyspark.sql import SparkSession
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.13.0 pyspark-shell'

spark_session = SparkSession.builder.appName("Lab2").getOrCreate()
sc = spark_session._sc
spark_session

In [3]:
progLangSchema = StructType([
    StructField("Language", StringType(), False),
    StructField("Url_language", StringType(), False),
])
dfProgLangs = spark_session.read.csv("programming-languages.csv", schema=progLangSchema)
dfProgLangs.head()

Row(Language='name', Url_language='wikipedia_url')

In [4]:
firstRow = dfProgLangs.rdd.first()
progLangsList = dfProgLangs.rdd\
    .filter(lambda x: x != firstRow)\
    .map(lambda x: x[0])\
    .collect()
progLangsList[:10]

['A# .NET',
 'A# (Axiom)',
 'A-0 System',
 'A+',
 'A++',
 'ABAP',
 'ABC',
 'ABC ALGOL',
 'ABSET',
 'ABSYS']

In [6]:
dfPostsSample = spark_session.read.format("xml").options(rowTag="row").load('posts_sample.xml')
print(dfPostsSample)
print("\n\n")
print(dfPostsSample.first())

DataFrame[_AcceptedAnswerId: bigint, _AnswerCount: bigint, _Body: string, _ClosedDate: timestamp, _CommentCount: bigint, _CommunityOwnedDate: timestamp, _CreationDate: timestamp, _FavoriteCount: bigint, _Id: bigint, _LastActivityDate: timestamp, _LastEditDate: timestamp, _LastEditorDisplayName: string, _LastEditorUserId: bigint, _OwnerDisplayName: string, _OwnerUserId: bigint, _ParentId: bigint, _PostTypeId: bigint, _Score: bigint, _Tags: string, _Title: string, _ViewCount: bigint]



Row(_AcceptedAnswerId=7, _AnswerCount=13, _Body="<p>I want to use a track-bar to change a form's opacity.</p>\n\n<p>This is my code:</p>\n\n<pre><code>decimal trans = trackBar1.Value / 5000;\nthis.Opacity = trans;\n</code></pre>\n\n<p>When I build the application, it gives the following error:</p>\n\n<blockquote>\n  <p>Cannot implicitly convert type <code>'decimal'</code> to <code>'double'</code></p>\n</blockquote>\n\n<p>I tried using <code>trans</code> and <code>double</code> but then the control doesn't

In [7]:
def DefineLanguage(row):
    languageTag = None
    for language in progLangsList:
        if '<' + language.upper() + '>' in row._Tags.upper():
            languageTag = language
            break
    if languageTag is not None:
        return (row._Id, languageTag, row._CreationDate.year, row._ViewCount)

def IsDateInRange(row):
    leftBorder = datetime(year=2010, month=1, day=1)
    rightBorder = datetime(year=2020, month=12, day=31)
    return row._CreationDate >= leftBorder and row._CreationDate <= rightBorder

In [8]:
topLanguagesPerYear = dfPostsSample.rdd.filter(lambda row: row._Tags is not None and IsDateInRange(row))\
    .map(DefineLanguage)\
    .filter(lambda row: row is not None)\
    .keyBy(lambda row: (row[2], row[1]))\
    .map(lambda row: ((row[0][0], row[0][1]), row[1][3]))\
    .reduceByKey(lambda a, b: a + b)\
    .map(lambda row: (row[0][0], row[0][1], row[1]))\
    .toDF(('Year', 'Language', 'Count'))

topLanguagesPerYear.show()

+----+------------+-------+
|Year|    Language|  Count|
+----+------------+-------+
|2010|        Java| 548916|
|2010|         PHP|1178114|
|2010|        Ruby|  76001|
|2010|      Python|  49372|
|2010| Objective-C|  79490|
|2010|           R|  11087|
|2011|           C|  85002|
|2011|  JavaScript| 594941|
|2011|        Perl|   9236|
|2011|        Bash|  18351|
|2013|  JavaScript| 412170|
|2013|ActionScript|     30|
|2013|        Bash|  22416|
|2013|      Groovy|   1640|
|2013|           C|  13887|
|2014|         PHP| 167346|
|2014|        Java| 369513|
|2014| Objective-C|  44321|
|2014|        Ruby|  17655|
|2014|      Python|  92749|
+----+------------+-------+
only showing top 20 rows



In [9]:
window_spec = Window.partitionBy("Year").orderBy(topLanguagesPerYear["Count"].desc())
topLanguagesRanks = topLanguagesPerYear.withColumn("rank", rank().over(window_spec))
result = topLanguagesRanks.filter(topLanguagesRanks["rank"] <= 10).drop(col("rank"))
result.show()

+----+-----------+-------+
|Year|   Language|  Count|
+----+-----------+-------+
|2010|        PHP|1178114|
|2010|       Java| 548916|
|2010| JavaScript| 293421|
|2010|Objective-C|  79490|
|2010|       Ruby|  76001|
|2010|          C|  66587|
|2010|     MATLAB|  51865|
|2010|     Python|  49372|
|2010|AppleScript|  31320|
|2010|     Delphi|  11817|
|2011| JavaScript| 594941|
|2011|       Java| 196931|
|2011|     Python| 116678|
|2011|          C|  85002|
|2011|        PHP|  65104|
|2011|Objective-C|  45901|
|2011|       Bash|  18351|
|2011|          R|  12764|
|2011|       Ruby|  12647|
|2011|       Perl|   9236|
+----+-----------+-------+
only showing top 20 rows

