In [1]:
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession
import pyspark.sql as sql
from pyspark.sql.functions import col
import os

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

from pyspark.sql import Window
from pyspark.sql.functions import row_number

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.14.0 pyspark-shell'

In [2]:
conf = SparkConf().setAppName("language_analysis").setMaster('yarn')

In [3]:
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

In [4]:
posts_sample = spark.read.format('xml')\
.option('rootTag', 'posts')\
.option('rowTag', 'row')\
.load("posts_sample.xml")

In [5]:
wiki_languages = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.csv("programming-languages.csv")

In [6]:
posts_sample.printSchema()
wiki_languages.printSchema()

root
 |-- _AcceptedAnswerId: long (nullable = true)
 |-- _AnswerCount: long (nullable = true)
 |-- _Body: string (nullable = true)
 |-- _ClosedDate: timestamp (nullable = true)
 |-- _CommentCount: long (nullable = true)
 |-- _CommunityOwnedDate: timestamp (nullable = true)
 |-- _CreationDate: timestamp (nullable = true)
 |-- _FavoriteCount: long (nullable = true)
 |-- _Id: long (nullable = true)
 |-- _LastActivityDate: timestamp (nullable = true)
 |-- _LastEditDate: timestamp (nullable = true)
 |-- _LastEditorDisplayName: string (nullable = true)
 |-- _LastEditorUserId: long (nullable = true)
 |-- _OwnerDisplayName: string (nullable = true)
 |-- _OwnerUserId: long (nullable = true)
 |-- _ParentId: long (nullable = true)
 |-- _PostTypeId: long (nullable = true)
 |-- _Score: long (nullable = true)
 |-- _Tags: string (nullable = true)
 |-- _Title: string (nullable = true)
 |-- _ViewCount: long (nullable = true)
 |-- _corrupt_record: string (nullable = true)

root
 |-- name: string (nullab

In [7]:
lang_list = wiki_languages.select("name")\
.rdd\
.map(lambda x: x[0].lower())\
.collect()

In [8]:
def get_year(date):
    return str(date.year) if date.year >= 2010 and date.year < 2020 else "invalid"

def get_languages(tags):
    temp = tags.split(">")[0]
    temp = temp[1:]
    if temp not in lang_list:
        return "invalid"
    return temp

get_year_udf = udf(get_year, StringType())
get_languages_udf = udf(get_languages, StringType())

In [9]:
posts_crop = posts_sample.select("_CreationDate", "_Tags", "_ViewCount")\
.where("_CreationDate is not null and _Tags is not null")

posts_crop = posts_crop.withColumn("_CreationDate", 
                                   get_year_udf(posts_crop._CreationDate))
posts_crop = posts_crop.withColumn("_Tags", 
                                   get_languages_udf(posts_crop._Tags))

posts_crop = posts_crop.select("*")\
.where("_Tags != 'invalid' and _CreationDate != 'invalid'")\
.groupBy("_CreationDate", "_Tags")\
.sum("_ViewCount")\

window = Window.partitionBy(posts_crop["_CreationDate"])\
.orderBy(posts_crop["sum(_ViewCount)"].desc())

posts_crop = posts_crop.select('*', row_number().over(window).alias('row_number'))\
.filter(col('row_number') <= 10)\
.drop("row_number")

posts_crop = posts_crop.orderBy("_CreationDate",
                                col("sum(_ViewCount)").desc())

posts_crop = posts_crop.withColumnRenamed("_CreationDate", "year")
posts_crop = posts_crop.withColumnRenamed("_Tags", "language")
posts_crop = posts_crop.withColumnRenamed("sum(_ViewCount)", "views")

posts_crop.show(100)

+----+------------+-----+
|year|    language|views|
+----+------------+-----+
|2010|      python| 9879|
|2010|        ruby| 9649|
|2010|           r| 6709|
|2010|         php| 6274|
|2010|           c| 2645|
|2010|  javascript| 1587|
|2010| applescript|  477|
|2010|        java|  132|
|2011|  javascript|62543|
|2011|           c|59971|
|2011|        java|40204|
|2011|      python|33332|
|2011| objective-c|16941|
|2011|        bash|14716|
|2011|         php| 7285|
|2011|        ruby| 3581|
|2011|      delphi| 1722|
|2011|      scheme| 1422|
|2012|        ruby|42566|
|2012|        java|11728|
|2012|  javascript| 8461|
|2012|         php| 8148|
|2012|      python| 1595|
|2013|        java|30035|
|2013|  javascript|22551|
|2013|        perl|10147|
|2013|         php| 9714|
|2013|      python| 8058|
|2013|  powershell|  746|
|2013|           c|  528|
|2013|        ruby|  366|
|2013|       scala|  210|
|2013|coffeescript|   62|
|2014|         php|88804|
|2014|        java|62097|
|2014|  java

In [None]:
posts_crop.write.mode('overwrite').parquet("top_10_languages_per_year_between_2010_and_2020.parquet")

In [None]:
sc.stop()