In [2]:
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession
import pyspark.sql as sql

In [3]:
spark = SparkSession \
    .builder \
    .appName("Kusakina_LR2") \
    .config("spark.jars.packages", "com.databricks:spark-xml_2.12:0.13.0") \
    .getOrCreate()

In [4]:
spark

In [5]:
posts_sample = spark.read.format('xml')\
.option('rootTag', 'posts')\
.option('rowTag', 'row')\
.load("posts_sample.xml")

In [6]:
posts_sample.printSchema()

root
 |-- _AcceptedAnswerId: long (nullable = true)
 |-- _AnswerCount: long (nullable = true)
 |-- _Body: string (nullable = true)
 |-- _ClosedDate: timestamp (nullable = true)
 |-- _CommentCount: long (nullable = true)
 |-- _CommunityOwnedDate: timestamp (nullable = true)
 |-- _CreationDate: timestamp (nullable = true)
 |-- _FavoriteCount: long (nullable = true)
 |-- _Id: long (nullable = true)
 |-- _LastActivityDate: timestamp (nullable = true)
 |-- _LastEditDate: timestamp (nullable = true)
 |-- _LastEditorDisplayName: string (nullable = true)
 |-- _LastEditorUserId: long (nullable = true)
 |-- _OwnerDisplayName: string (nullable = true)
 |-- _OwnerUserId: long (nullable = true)
 |-- _ParentId: long (nullable = true)
 |-- _PostTypeId: long (nullable = true)
 |-- _Score: long (nullable = true)
 |-- _Tags: string (nullable = true)
 |-- _Title: string (nullable = true)
 |-- _ViewCount: long (nullable = true)



In [49]:
from pyspark.sql.functions import col
table = posts_sample.select("_CreationDate", "_LastActivityDate", "_ViewCount", "_Tags")
#table.show()
not_null = table.filter(table._Tags.isNotNull())
not_null.show()
#проверка на null по количеству просмотров
#not_null.filter(not_null._ViewCount.isNull()).show()

+--------------------+--------------------+----------+--------------------+
|       _CreationDate|   _LastActivityDate|_ViewCount|               _Tags|
+--------------------+--------------------+----------+--------------------+
|2008-08-01 02:42:...|2019-07-19 05:39:...|     42817|<c#><floating-poi...|
|2008-08-01 03:08:...|2019-07-19 05:43:...|     18214|<html><css><inter...|
|2008-08-01 04:40:...|2019-06-26 19:25:...|    555183|<c#><.net><datetime>|
|2008-08-01 04:55:...|2019-05-26 06:31:...|    149445|<c#><datetime><ti...|
|2008-08-01 05:42:...|2019-05-14 20:02:...|    176405|<html><browser><t...|
|2008-08-01 05:59:...|2018-09-19 07:49:...|    123231|        <.net><math>|
|2010-09-22 14:33:...|2017-02-26 22:11:...|      3650|<c++><character-e...|
|2010-09-23 10:47:...|2010-11-23 16:14:...|       617|<sharepoint><info...|
|2010-09-23 12:53:...|2012-09-11 18:09:...|      1315|<iphone><app-stor...|
|2010-09-23 15:47:...|2012-04-17 02:54:...|       973|<symfony1><schema...|
|2010-09-23 

In [83]:

def mass_tag(stringa):
    stringa = stringa[1:-1]
    a = stringa.split("><")
    print(a)
    return a

In [130]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType, StringType,ShortType, LongType
mass_tag_udf = udf(mass_tag, ArrayType(StringType()))
table2 = not_null.withColumn("Tags", mass_tag_udf(col("_Tags"))).drop("_Tags")
table2.show()                             

+--------------------+--------------------+----------+--------------------+
|       _CreationDate|   _LastActivityDate|_ViewCount|                Tags|
+--------------------+--------------------+----------+--------------------+
|2008-08-01 02:42:...|2019-07-19 05:39:...|     42817|[c#, floating-poi...|
|2008-08-01 03:08:...|2019-07-19 05:43:...|     18214|[html, css, inter...|
|2008-08-01 04:40:...|2019-06-26 19:25:...|    555183|[c#, .net, datetime]|
|2008-08-01 04:55:...|2019-05-26 06:31:...|    149445|[c#, datetime, ti...|
|2008-08-01 05:42:...|2019-05-14 20:02:...|    176405|[html, browser, t...|
|2008-08-01 05:59:...|2018-09-19 07:49:...|    123231|        [.net, math]|
|2010-09-22 14:33:...|2017-02-26 22:11:...|      3650|[c++, character-e...|
|2010-09-23 10:47:...|2010-11-23 16:14:...|       617|[sharepoint, info...|
|2010-09-23 12:53:...|2012-09-11 18:09:...|      1315|[iphone, app-stor...|
|2010-09-23 15:47:...|2012-04-17 02:54:...|       973|[symfony1, schema...|
|2010-09-23 

In [104]:
from pyspark.sql.functions import explode
print(table2)
alone_tags = table2.select(table2._CreationDate, table2._LastActivityDate, table2._ViewCount, explode(table2.Tags)).withColumnRenamed("col", "Tag") 
alone_tags.show()

DataFrame[_CreationDate: timestamp, _LastActivityDate: timestamp, _ViewCount: bigint, Tags: array<string>]
+--------------------+--------------------+----------+-------------------+
|       _CreationDate|   _LastActivityDate|_ViewCount|                Tag|
+--------------------+--------------------+----------+-------------------+
|2008-08-01 02:42:...|2019-07-19 05:39:...|     42817|                 c#|
|2008-08-01 02:42:...|2019-07-19 05:39:...|     42817|     floating-point|
|2008-08-01 02:42:...|2019-07-19 05:39:...|     42817|    type-conversion|
|2008-08-01 02:42:...|2019-07-19 05:39:...|     42817|             double|
|2008-08-01 02:42:...|2019-07-19 05:39:...|     42817|            decimal|
|2008-08-01 03:08:...|2019-07-19 05:43:...|     18214|               html|
|2008-08-01 03:08:...|2019-07-19 05:43:...|     18214|                css|
|2008-08-01 03:08:...|2019-07-19 05:43:...|     18214|internet-explorer-7|
|2008-08-01 04:40:...|2019-06-26 19:25:...|    555183|              

In [116]:
from pyspark.sql.functions import lower

languages = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.csv("programming-languages.csv")

languages = languages.drop("wikipedia_url").withColumn("name",  lower("name"))
languages.show()
languages = languages.withColumnRenamed("name", "Tag") 
languages.show()

+------------+
|        name|
+------------+
|     a# .net|
|  a# (axiom)|
|  a-0 system|
|          a+|
|         a++|
|        abap|
|         abc|
|   abc algol|
|       abset|
|       absys|
|         acc|
|      accent|
|    ace dasl|
|        acl2|
|     act-iii|
|     action!|
|actionscript|
|         ada|
|     adenine|
|        agda|
+------------+
only showing top 20 rows

+------------+
|         Tag|
+------------+
|     a# .net|
|  a# (axiom)|
|  a-0 system|
|          a+|
|         a++|
|        abap|
|         abc|
|   abc algol|
|       abset|
|       absys|
|         acc|
|      accent|
|    ace dasl|
|        acl2|
|     act-iii|
|     action!|
|actionscript|
|         ada|
|     adenine|
|        agda|
+------------+
only showing top 20 rows



In [118]:
res = alone_tags.join(languages, on=["Tag"], how = 'inner')
res.show()

+-----------+--------------------+--------------------+----------+
|        Tag|       _CreationDate|   _LastActivityDate|_ViewCount|
+-----------+--------------------+--------------------+----------+
|       java|2010-09-23 16:13:...|2010-09-23 17:52:...|       132|
|        php|2010-09-26 21:07:...|2010-09-27 18:34:...|      1258|
|       ruby|2010-09-30 22:27:...|2015-05-18 02:06:...|      9649|
|          c|2010-10-01 15:52:...|2010-10-01 16:16:...|      2384|
|        php|2010-10-05 01:05:...|2015-08-02 03:36:...|      1987|
|     python|2010-10-06 17:31:...|2010-10-07 14:36:...|      3321|
| javascript|2010-10-08 00:53:...|2010-10-08 13:43:...|       128|
|applescript|2010-10-08 03:56:...|2010-10-08 22:34:...|       477|
|        php|2010-10-08 18:44:...|2010-10-12 21:48:...|      1748|
|        php|2010-10-11 11:54:...|2010-11-13 01:22:...|       998|
| javascript|2010-10-12 16:19:...|2013-11-14 15:31:...|      2095|
|        sed|2010-10-12 19:44:...|2010-10-12 20:02:...|       

In [126]:
def mass_years(stringa, stringa2):
    stringa= str(stringa)
    stringa2 = str(stringa2)
    a = int(stringa.split("-")[0])
    b = int(stringa2.split("-")[0])
    mass = [i for i in range(a, b+1, 1)] 
    return mass

In [127]:
print(mass_years("2008-08-01 05:42:", "2019-05-14 20:02:"))

[2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019]


In [129]:
mass_years_udf = udf(mass_years, ArrayType(ShortType()))
table3 = res.withColumn("Years", mass_years_udf(col("_CreationDate"), col("_LastActivityDate"))).drop("_LastActivityDate").drop("_CreationDate")
table3.show()

+-----------+----------+--------------------+
|        Tag|_ViewCount|               Years|
+-----------+----------+--------------------+
|       java|       132|              [2010]|
|        php|      1258|              [2010]|
|       ruby|      9649|[2010, 2011, 2012...|
|          c|      2384|              [2010]|
|        php|      1987|[2010, 2011, 2012...|
|     python|      3321|              [2010]|
| javascript|       128|              [2010]|
|applescript|       477|              [2010]|
|        php|      1748|              [2010]|
|        php|       998|              [2010]|
| javascript|      2095|[2010, 2011, 2012...|
|        sed|       447|              [2010]|
|     python|      6558|[2010, 2011, 2012...|
|       java|       214|[2010, 2011, 2012...|
|       ruby|       214|[2010, 2011, 2012...|
|objective-c|       852|[2010, 2011, 2012...|
| javascript|       179|              [2010]|
|          r|      6709|              [2010]|
|        php|        78|          

In [136]:
def count_views(views, years):
    return int(views/len(years))

In [145]:
count_views_udf = udf(count_views, LongType())
res2 = table3.withColumn("View", count_views_udf(col("_ViewCount"), col("Years")))
res2.show()

+-----------+----------+--------------------+----+
|        Tag|_ViewCount|               Years|View|
+-----------+----------+--------------------+----+
|       java|       132|              [2010]| 132|
|        php|      1258|              [2010]|1258|
|       ruby|      9649|[2010, 2011, 2012...|1608|
|          c|      2384|              [2010]|2384|
|        php|      1987|[2010, 2011, 2012...| 331|
|     python|      3321|              [2010]|3321|
| javascript|       128|              [2010]| 128|
|applescript|       477|              [2010]| 477|
|        php|      1748|              [2010]|1748|
|        php|       998|              [2010]| 998|
| javascript|      2095|[2010, 2011, 2012...| 523|
|        sed|       447|              [2010]| 447|
|     python|      6558|[2010, 2011, 2012...|1093|
|       java|       214|[2010, 2011, 2012...|  35|
|       ruby|       214|[2010, 2011, 2012...|  35|
|objective-c|       852|[2010, 2011, 2012...| 213|
| javascript|       179|       

In [146]:
res3 = res2.select(res2.Tag, res2.View, explode(res2.Years)).withColumnRenamed("col", "Year") 
res3.show()
print(res3)

+-----------+----+----+
|        Tag|View|Year|
+-----------+----+----+
|       java| 132|2010|
|        php|1258|2010|
|       ruby|1608|2010|
|       ruby|1608|2011|
|       ruby|1608|2012|
|       ruby|1608|2013|
|       ruby|1608|2014|
|       ruby|1608|2015|
|          c|2384|2010|
|        php| 331|2010|
|        php| 331|2011|
|        php| 331|2012|
|        php| 331|2013|
|        php| 331|2014|
|        php| 331|2015|
|     python|3321|2010|
| javascript| 128|2010|
|applescript| 477|2010|
|        php|1748|2010|
|        php| 998|2010|
+-----------+----+----+
only showing top 20 rows

DataFrame[Tag: string, View: bigint, Year: smallint]


In [157]:
final_res = res3.groupBy("Year", "Tag").agg({"View":"sum"})
print(final_res)
sorted_res= final_res.orderBy(col("Year").desc(), col("sum(View)").desc())
sorted_res.show()

DataFrame[Year: smallint, Tag: string, sum(View): bigint]
+----+-----------+---------+
|Year|        Tag|sum(View)|
+----+-----------+---------+
|2019|     python|   373745|
|2019|        php|   168545|
|2019| javascript|   142609|
|2019|       java|    95995|
|2019|objective-c|    58395|
|2019|          r|     9687|
|2019| typescript|     8664|
|2019|         io|     6156|
|2019|       dart|     5625|
|2019|     kotlin|     5124|
|2019|       ruby|     4701|
|2019|       bash|     4137|
|2019|          c|     2691|
|2019|         go|     2559|
|2019| powershell|     2544|
|2019|       rust|     2271|
|2019|     scheme|     1859|
|2019|      scala|     1751|
|2019|    haskell|     1406|
|2019|     delphi|     1333|
+----+-----------+---------+
only showing top 20 rows



In [160]:
sorted_res=sorted_res.withColumnRenamed("sum(View)", "Sum_View") 
mass = [i for i in range(2010, 2020+1, 1)] 
for year in mass:
    year_res = sorted_res.filter(col("Year") == year).limit(10)
    year_res.show()
    tr_year = str(year)
    year_res.write.mode('overwrite').parquet("popular_languages_per_year_"+tr_year)
    

+----+-----------+--------+
|Year|        Tag|Sum_View|
+----+-----------+--------+
|2010|     python|  341402|
|2010|        php|  179828|
|2010|       java|  155905|
|2010| javascript|  121002|
|2010|         id|   61763|
|2010|     matlab|   58030|
|2010|objective-c|   53371|
|2010|       ruby|   37114|
|2010|          c|   35026|
|2010|     delphi|   12908|
+----+-----------+--------+

+----+-----------+--------+
|Year|        Tag|Sum_View|
+----+-----------+--------+
|2011|     python|  426149|
|2011|       java|  281158|
|2011| javascript|  259676|
|2011|        php|  241105|
|2011|          c|  113003|
|2011|objective-c|   84264|
|2011|         id|   61763|
|2011|       ruby|   42712|
|2011|       bash|   20492|
|2011|     matlab|   15709|
+----+-----------+--------+

+----+-----------+--------+
|Year|        Tag|Sum_View|
+----+-----------+--------+
|2012|     python|  564428|
|2012|        php|  498672|
|2012| javascript|  475611|
|2012|       java|  407204|
|2012|objective-c|