In [1]:
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession
import pyspark.sql as sql
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.14.0 pyspark-shell'

In [2]:
conf = SparkConf().setAppName("6132_Shustanov_2").setMaster('yarn')
context = SparkContext(conf=conf)

In [3]:
session = SparkSession(context)

In [4]:
posts_sample = session.read.format('xml')\
.option('rootTag', 'posts')\
.option('rowTag', 'row')\
.load("posts_sample.xml")
posts_sample.printSchema()

root
 |-- _AcceptedAnswerId: long (nullable = true)
 |-- _AnswerCount: long (nullable = true)
 |-- _Body: string (nullable = true)
 |-- _ClosedDate: timestamp (nullable = true)
 |-- _CommentCount: long (nullable = true)
 |-- _CommunityOwnedDate: timestamp (nullable = true)
 |-- _CreationDate: timestamp (nullable = true)
 |-- _FavoriteCount: long (nullable = true)
 |-- _Id: long (nullable = true)
 |-- _LastActivityDate: timestamp (nullable = true)
 |-- _LastEditDate: timestamp (nullable = true)
 |-- _LastEditorDisplayName: string (nullable = true)
 |-- _LastEditorUserId: long (nullable = true)
 |-- _OwnerDisplayName: string (nullable = true)
 |-- _OwnerUserId: long (nullable = true)
 |-- _ParentId: long (nullable = true)
 |-- _PostTypeId: long (nullable = true)
 |-- _Score: long (nullable = true)
 |-- _Tags: string (nullable = true)
 |-- _Title: string (nullable = true)
 |-- _ViewCount: long (nullable = true)



In [9]:
langs = session.read\
.option("header", True)\
.option("inferSchema", True)\
.csv('programming-languages.csv').select('name')
langs.printSchema()

root
 |-- name: string (nullable = true)



In [5]:
from pyspark.sql.functions import year
posts_sample_n = posts_sample.select(year('_CreationDate').alias('year'), '_ViewCount', '_Tags').dropna()
posts_sample_n = posts_sample_n.filter('year < 2021 and year > 2009')
posts_sample_n.show()

+----+----------+--------------------+
|year|_ViewCount|               _Tags|
+----+----------+--------------------+
|2010|      3650|<c++><character-e...|
|2010|       617|<sharepoint><info...|
|2010|      1315|<iphone><app-stor...|
|2010|       973|<symfony1><schema...|
|2010|       132|              <java>|
|2010|       419|<visual-studio-20...|
|2010|       869|<cakephp><file-up...|
|2010|      1303|<git><cygwin><putty>|
|2010|       748|  <drupal><drupal-6>|
|2010|      1258|<php><wordpress><...|
|2010|     14972|<c#><winforms><da...|
|2010|       274|<c#><asp.net><exc...|
|2010|       804|    <sql><xml><blob>|
|2010|      6019|<.htaccess><codei...|
|2010|      5456|<wcf><web-service...|
|2010|       316|<mod-rewrite><apa...|
|2010|     15477|<sql><database><d...|
|2010|      9649|         <ruby><rvm>|
|2010|     20199|  <android><eclipse>|
|2010|       735|<iphone><uiimagev...|
+----+----------+--------------------+
only showing top 20 rows



In [6]:
posts_sample_n = posts_sample_n.rdd.map(lambda entry: (entry[0], entry[1], str(entry[2])[1:-1].split('><'))).toDF(['year', 'view_count', 'tags'])

In [7]:
from pyspark.sql.functions import explode

posts_sample_n = posts_sample_n.select('year', 'view_count', explode('tags')).withColumnRenamed('col', 'lang')
posts_sample_n.show()

+----+----------+------------------+
|year|view_count|              lang|
+----+----------+------------------+
|2010|      3650|               c++|
|2010|      3650|character-encoding|
|2010|       617|        sharepoint|
|2010|       617|          infopath|
|2010|      1315|            iphone|
|2010|      1315|         app-store|
|2010|      1315|   in-app-purchase|
|2010|       973|          symfony1|
|2010|       973|            schema|
|2010|       973|          doctrine|
|2010|       973|          fixtures|
|2010|       132|              java|
|2010|       419|visual-studio-2010|
|2010|       419|          stylecop|
|2010|       869|           cakephp|
|2010|       869|       file-upload|
|2010|       869|         swfupload|
|2010|      1303|               git|
|2010|      1303|            cygwin|
|2010|      1303|             putty|
+----+----------+------------------+
only showing top 20 rows



In [10]:
from pyspark.sql.functions import lower, sum

posts_sample_n = posts_sample_n.crossJoin(langs.withColumn('name', lower('name'))).where('lang=name').drop('name')
posts_sample_n.show()

+----+----------+-----------+
|year|view_count|       lang|
+----+----------+-----------+
|2010|       132|       java|
|2010|      1258|        php|
|2010|      9649|       ruby|
|2010|      2384|          c|
|2010|      1987|        php|
|2010|      3321|     python|
|2010|       128| javascript|
|2010|       477|applescript|
|2010|      1748|        php|
|2010|       998|        php|
|2010|      2095| javascript|
|2010|       447|        sed|
|2010|      6558|     python|
|2010|       214|       java|
|2010|       214|       ruby|
|2010|       852|objective-c|
|2010|       179| javascript|
|2010|      6709|          r|
|2010|        78|        php|
|2010|      1280| javascript|
+----+----------+-----------+
only showing top 20 rows



In [11]:
posts_sample_n = posts_sample_n.groupBy('year', 'lang').agg(sum('view_count'))

In [12]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

window = Window.partitionBy(posts_sample_n.year).orderBy(col("sum(view_count)").desc())
window_df = posts_sample_n.withColumn("grade", row_number().over(window))
window_df = window_df.withColumnRenamed("sum(view_count)", "view_count").filter('grade <=10').drop('grade').orderBy(col('year').asc(), col('view_count').desc())
window_df.show()

+----+-----------+----------+
|year|       lang|view_count|
+----+-----------+----------+
|2010|        php|   1189629|
|2010|       java|    563211|
|2010| javascript|    316131|
|2010|objective-c|     97009|
|2010|       ruby|     76215|
|2010|          c|     66587|
|2010|     python|     60672|
|2010|     matlab|     51865|
|2010|applescript|     32305|
|2010|     delphi|     13065|
|2011| javascript|    809078|
|2011|       java|    389834|
|2011|        php|    246770|
|2011|          c|    238277|
|2011|objective-c|    218934|
|2011|     python|    203180|
|2011|       bash|     60805|
|2011|       ruby|     39223|
|2011|       perl|     28502|
|2011|     matlab|     18816|
+----+-----------+----------+
only showing top 20 rows



In [13]:
window_df.coalesce(1).write.mode('overwrite').parquet("langs_rating.parquet")

In [14]:
!hadoop fs -get 'langs_rating.parquet' .

get: `langs_rating.parquet/_SUCCESS': File exists


In [15]:
session.stop()