In [1]:
import pyspark
import os

from datetime import datetime
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DateType
from pyspark import SparkContext, SparkConf

In [2]:
#для чтения xml
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.13.0 pyspark-shell'

In [3]:
sc = SparkSession.builder.appName("lab2").master("yarn").getOrCreate()

In [4]:
!hadoop fs -put * .

put: `posts_sample.xml': File exists
put: `programming-languages.csv': File exists
put: `Untitled.ipynb': File exists


In [6]:
prog_lang = sc.read.csv("programming-languages.csv")
posts_sample = sc.read.format("xml").options(rowTag="row").load('posts_sample.xml')

In [7]:
prog_lang_list = [str(x[0]) for x in prog_lang.collect()]
prog_lang_list[:7]

['name', 'A# .NET', 'A# (Axiom)', 'A-0 System', 'A+', 'A++', 'ABAP']

In [8]:
def find_language(x):
    tag = None
    for language in prog_lang_list:
        if "<" + language.lower() + ">" in x._Tags.lower():
            tag = language
            break
    if tag is None:
        return None
    return (x._Id, tag)

def is_year_date(x, year):
    start = datetime(year=year, month=1, day=1)
    end = datetime(year=year, month=12, day=31)
    return x._CreationDate >= start and x._CreationDate <= end

In [9]:
pl_by_year = {}

for year in range(2010, 2020):
    pl_by_year[year] = posts_sample.rdd \
        .filter(lambda x: x._Tags is not None and is_year_date(x, year)) \
        .map(find_language) \
        .filter(lambda x: x is not None) \
        .keyBy(lambda x: x[1]) \
        .aggregateByKey(0, lambda x, y: x + 1, lambda x, y: x + y) \
        .sortBy(lambda x: x[1], ascending=False) \
        .toDF()

    pl_by_year[year] = pl_by_year[year] \
                        .select(col("_1").alias("PL"), col("_2").alias(f"Mentioned_in_{year}")) \
                        .limit(10)

    pl_by_year[year].show()

+-----------+-----------------+
|         PL|Mentioned_in_2010|
+-----------+-----------------+
|       Java|               52|
| JavaScript|               44|
|        PHP|               42|
|     Python|               25|
|Objective-C|               22|
|          C|               20|
|       Ruby|               11|
|     Delphi|                7|
|AppleScript|                3|
|          R|                3|
+-----------+-----------------+

+-----------+-----------------+
|         PL|Mentioned_in_2011|
+-----------+-----------------+
|        PHP|               97|
|       Java|               92|
| JavaScript|               82|
|     Python|               35|
|Objective-C|               33|
|          C|               24|
|       Ruby|               17|
|       Perl|                8|
|     Delphi|                8|
|       Bash|                7|
+-----------+-----------------+

+-----------+-----------------+
|         PL|Mentioned_in_2012|
+-----------+-----------------+
|     

In [None]:
for i in pl_by_year.keys():
    pl_by_year[i].write.format("parquet").save(f"lab2/pl_by_{i}")

In [None]:
!hadoop fs -get /user/diman63s/lab2 ~/lab2