In [1]:
import pyspark
import os
import pandas as pd
import numpy as np

from datetime import datetime
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import StructType, StructField, StringType, DateType

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.13.0 pyspark-shell'
spark = SparkSession.builder.appName("L2").master("local[*]").getOrCreate()

In [3]:
pl = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv("programming-languages.csv")

In [4]:
pl.createOrReplaceTempView("pl")

In [5]:
posts_sample = spark.read.format("xml").options(rowTag="row").load('posts_sample.xml')

In [6]:
posts_sample.createOrReplaceTempView("posts")

##### Для оценки "популярности" языков программирования возьмем показатель числа просмотров статей созданных за период с 2010 года. 
##### Периодов для оценки будет служить дата создания поста.

In [7]:
posts = spark.sql("""
    SELECT  posts.Tags, posts.Year, posts.ViewCount FROM (
        SELECT 
            REPLACE( substring_index(Tags, '>', 1),  '<', '') AS Tags,
            SUM(ViewCount) AS ViewCount,
            Year
        FROM(
            SELECT  
                CAST (_Tags as VARCHAR(1000) ) AS Tags, 
                CAST (_ViewCount as INT) AS ViewCount, 
                year( CAST (_CreationDate as Date) ) AS Year
            FROM posts
                WHERE _Tags IS NOT NULL  AND  year( CAST (_CreationDate as Date) ) >= 2010) AS A
            GROUP BY Tags, Year
            ORDER BY Year ) AS posts
        INNER JOIN ( SELECT LOWER(name) as name FROM pl ) as lang
            ON posts.Tags = lang.name
        """)

In [8]:
posts_all_tags = posts.rdd.map(lambda p: p.Tags).collect()
posts_ViewCount = posts.rdd.map(lambda p: p.ViewCount).collect()
posts_Year = posts.rdd.map(lambda p: p.Year).collect()

d = {'Tags': posts_all_tags, 'Year': posts_Year , 'ViewCount': posts_ViewCount}
posts_df = pd.DataFrame(data=d)
posts_df['Year'] = posts_df['Year'].astype(str)+' '

In [9]:
posts_df

Unnamed: 0,Tags,Year,ViewCount
0,java,2010,7782
1,coffeescript,2013,62
2,java,2015,473
3,java,2013,1862
4,javascript,2010,123290
...,...,...,...
5967,haskell,2015,327
5968,javascript,2016,35
5969,javascript,2019,104
5970,java,2018,76


In [10]:
posts_df = pd.pivot_table(posts_df, values='ViewCount', index=['Tags'],
                    columns=['Year'], aggfunc=np.sum, fill_value=0)

In [11]:
for i in range(0, 10):
    print("Результаты за " + str(posts_df.columns[i]) + "год")
    print( posts_df.iloc[:, i].nlargest(10) )
    print(".\n")

Результаты за 2010 год
Tags
php            1189629
java            562997
javascript      304994
objective-c      63442
c                63041
python           57979
ruby             17145
delphi           12769
r                 6709
bash              4474
Name: 2010 , dtype: int64
.

Результаты за 2011 год
Tags
javascript     801545
java           386984
php            242932
c              236802
python         203180
bash            57235
objective-c     51003
ruby            29148
r               14394
delphi           4950
Name: 2011 , dtype: int64
.

Результаты за 2012 год
Tags
java           659282
javascript     537693
php            428025
python         274297
ruby            98356
objective-c     73303
c               65995
scala           24412
haskell         23046
r               15042
Name: 2012 , dtype: int64
.

Результаты за 2013 год
Tags
java           1036010
javascript      584908
php             470181
objective-c     339795
python          160204
chef            

In [12]:
posts_df.to_parquet('posts_df.parquet.gzip', compression='gzip') 