In [None]:
#sc.stop()

In [1]:
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import pyspark.sql as sql
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.14.0 pyspark-shell'

In [2]:
conf = SparkConf().setAppName("Lab2").setMaster('yarn')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

KeyboardInterrupt: 

In [None]:
posts_sample = spark.read.format('xml')\
.option('rootTag', 'posts')\
.option('rowTag', 'row')\
.load("posts_sample.xml")

posts_sample.printSchema()

wiki_languages = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.csv("programming-languages.csv")

wiki_languages.printSchema()

In [None]:
# Определение функций UDF
def get_year(creation_date):
    return str(creation_date.year) if creation_date.year >= 2010 and creation_date.year < 2020 else "invalid"

def get_lang(tags, lang_list):
    temp = tags.split(">")[0][1:]
    return temp if temp in lang_list else "invalid"

In [None]:
# Создание списка языков
lang_list = wiki_languages.select("name").rdd.map(lambda x: x[0].lower()).collect()

In [None]:
# Выбор нужных столбцов и фильтрация по нужным условиям
posts_crop.createOrReplaceTempView("posts_crop")

posts_crop = spark.sql("SELECT get_year_udf(_CreationDate) _CreationDate, get_lang_udf(_Tags) _Tags, _ViewCount\
                        FROM posts_crop")

In [None]:
# Применение функций UDF
get_year_udf = F.udf(get_year, StringType())
get_lang_udf = F.udf(lambda tags: get_lang(tags, lang_list), StringType())

In [None]:
posts_crop = posts_crop.withColumn("_CreationDate", get_year_udf(posts_crop._CreationDate))
posts_crop = posts_crop.withColumn("_Tags", get_lang_udf(posts_crop._Tags))

In [None]:
# Группировка, фильтрация и сортировка
posts_crop = spark.sql("\
    SELECT _CreationDate, _Tags, SUM(_ViewCount) _ViewCount\
    FROM posts_crop\
    WHERE _Tags != 'invalid' AND _CreationDate != 'invalid'\
    GROUP BY _CreationDate, _Tags
")

In [None]:
# Определение окна для выборки топ-10

posts_crop = spark.sql("""
    SELECT *, ROW_NUMBER() OVER (PARTITION BY _CreationDate ORDER BY _ViewCount DESC) row_number
    FROM posts_crop
""")

# Выборка топ-10
posts_crop = spark.sql("""
    SELECT *
    FROM posts_crop
    WHERE row_number <= 10
""")

In [None]:
# Выборка топ-10 и сортировка
posts_crop = posts_crop.select('*', F.row_number().over(window).alias('row_number'))\
    .filter(F.col('row_number') <= 10)\
    .drop("row_number")
posts_crop = posts_crop.orderBy("_CreationDate", F.col("sum(_ViewCount)").desc())

# Переименование столбцов
posts_crop = posts_crop.withColumnRenamed("_CreationDate", "year")\
    .withColumnRenamed("_Tags", "language")\
    .withColumnRenamed("sum(_ViewCount)", "views")

# Отображение результатов
posts_crop.show(10*10)