In [75]:
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession
import pyspark.sql as sql
from pyspark.sql.functions import col
import os

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

from pyspark.sql import Window
from pyspark.sql.functions import row_number

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.14.0 pyspark-shell'

In [2]:
conf = SparkConf().setAppName("language_analysis").setMaster('yarn')

In [3]:
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

In [4]:
posts_sample = spark.read.format('xml')\
.option('rootTag', 'posts')\
.option('rowTag', 'row')\
.load("posts_sample.xml")

In [6]:
wiki_languages = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.csv("programming-languages.csv")

In [7]:
posts_sample.printSchema()
wiki_languages.printSchema()

root
 |-- _AcceptedAnswerId: long (nullable = true)
 |-- _AnswerCount: long (nullable = true)
 |-- _Body: string (nullable = true)
 |-- _ClosedDate: timestamp (nullable = true)
 |-- _CommentCount: long (nullable = true)
 |-- _CommunityOwnedDate: timestamp (nullable = true)
 |-- _CreationDate: timestamp (nullable = true)
 |-- _FavoriteCount: long (nullable = true)
 |-- _Id: long (nullable = true)
 |-- _LastActivityDate: timestamp (nullable = true)
 |-- _LastEditDate: timestamp (nullable = true)
 |-- _LastEditorDisplayName: string (nullable = true)
 |-- _LastEditorUserId: long (nullable = true)
 |-- _OwnerDisplayName: string (nullable = true)
 |-- _OwnerUserId: long (nullable = true)
 |-- _ParentId: long (nullable = true)
 |-- _PostTypeId: long (nullable = true)
 |-- _Score: long (nullable = true)
 |-- _Tags: string (nullable = true)
 |-- _Title: string (nullable = true)
 |-- _ViewCount: long (nullable = true)

root
 |-- name: string (nullable = true)
 |-- wikipedia_url: string (nullable

In [17]:
lang_list = wiki_languages.select("name")\
.rdd\
.map(lambda x: x[0].lower())\
.collect()

In [72]:
def get_year(date):
    return str(date.year) if date.year >= 2010 and date.year < 2020 else "invalid"

def get_lang(tags):
    temp = tags.split(">")[0]
    temp = temp[1:]
    if temp not in lang_list:
        return "invalid"
    return temp

get_year_udf = udf(get_year, StringType())
get_lang_udf = udf(get_lang, StringType())

In [84]:
posts_crop = posts_sample.select("_CreationDate", "_Tags", "_ViewCount")\
.where("_CreationDate is not null and _Tags is not null")

posts_crop = posts_crop.withColumn("_CreationDate", 
                                   get_year_udf(posts_crop._CreationDate))
posts_crop = posts_crop.withColumn("_Tags", 
                                   get_lang_udf(posts_crop._Tags))

posts_crop = posts_crop.select("*")\
.where("_Tags != 'invalid' and _CreationDate != 'invalid'")\
.groupBy("_CreationDate", "_Tags")\
.sum("_ViewCount")\

window = Window.partitionBy(posts_crop["_CreationDate"])\
.orderBy(posts_crop["sum(_ViewCount)"].desc())

posts_crop = posts_crop.select('*', row_number().over(window).alias('row_number'))\
.filter(col('row_number') <= 10)\
.drop("row_number")

posts_crop = posts_crop.orderBy("_CreationDate",
                                col("sum(_ViewCount)").desc())

posts_crop = posts_crop.withColumnRenamed("_CreationDate", "year")
posts_crop = posts_crop.withColumnRenamed("_Tags", "language")
posts_crop = posts_crop.withColumnRenamed("sum(_ViewCount)", "views")

posts_crop.show(100)

+----+-----------+-------+
|year|   language|  views|
+----+-----------+-------+
|2010|        php|1189629|
|2010|       java| 562997|
|2010| javascript| 304994|
|2010|objective-c|  63442|
|2010|          c|  63041|
|2010|     python|  57979|
|2010|       ruby|  17145|
|2010|     delphi|  12769|
|2010|          r|   6709|
|2010|       bash|   4474|
|2011| javascript| 801545|
|2011|       java| 386984|
|2011|        php| 242932|
|2011|          c| 236802|
|2011|     python| 203180|
|2011|       bash|  57235|
|2011|objective-c|  51003|
|2011|       ruby|  29148|
|2011|          r|  14394|
|2011|     delphi|   4950|
|2012|       java| 659282|
|2012| javascript| 537693|
|2012|        php| 428025|
|2012|     python| 274297|
|2012|       ruby|  98356|
|2012|objective-c|  73303|
|2012|          c|  65995|
|2012|      scala|  24412|
|2012|    haskell|  23046|
|2012|          r|  15042|
|2013|       java|1036010|
|2013| javascript| 584908|
|2013|        php| 470181|
|2013|objective-c| 339795|
|

In [85]:
posts_crop.write.mode('overwrite').parquet("top_10_languages_per_year_between_2010_and_2020.parquet")

In [86]:
sc.stop()