In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz

In [2]:
import os
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [3]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [4]:
import findspark
findspark.init()

In [5]:
!pip3 install pyspark==3.0.0

Collecting pyspark==3.0.0
  Downloading pyspark-3.0.0.tar.gz (204.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m204.7/204.7 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9 (from pyspark==3.0.0)
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m22.4 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.0-py2.py3-none-any.whl size=205044159 sha256=acded7e5a3830ded24804c22089236fe4eb2959254d9ef9f723f266523b7ab76
  Stored in directory: /root/.cache/pip/wheels/b1/bb/8b/ca24d3f756f2ed967225b0871898869db676eb5846df5adc56
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0

<h1><center>Инициализация</center></h1>

In [6]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType, IntegerType, ArrayType, StringType
from pyspark.sql.functions import udf, explode, rank, col, max, sum, desc, countDistinct
import re
from typing import List
import pyspark.sql as sql

In [7]:
spark = SparkSession \
    .builder \
    .appName("L2_reports_with_apache_spark") \
    .config("spark.jars.packages", "com.databricks:spark-xml_2.12:0.13.0") \
    .getOrCreate()

<h1><center>Загрузка данных</center></h1>

In [8]:
import os
prog_path = '/content/programming-languages.csv'
posts_path = '/content/posts_sample.xml'

In [9]:
posts = spark.read.format('xml').options(rowTag='row').load(posts_path)

In [10]:
program = spark.read \
      .option("header", True) \
      .option("inferSchema", True) \
      .option("DateTimeFormat", 'M/d/y H:m') \
      .csv(prog_path)

<h1><center>Задание</center></h1>

In [17]:
def get_tags(tags_string):
    if tags_string is None:
        return []
    pattern = r'<(.+?)>'
    tags = re.findall(pattern, tags_string)
    return tags

def get_year(date_and_time):
    return date_and_time.year

get_tags_udf = udf(get_tags, ArrayType(StringType()))
get_year_udf = udf(get_year, IntegerType())
posts_data_simplified = posts \
                    .withColumn("tags", get_tags_udf(posts["_Tags"])) \
                    .withColumn("year", get_year_udf(posts["_LastActivityDate"]))
posts_data_simplified = posts_data_simplified.select(col("tags"), col("year"), col("_ViewCount").alias("views"))
first_rows = posts_data_simplified.head(10)
for i, row in enumerate(first_rows):
    print(i+1, row)

1 Row(tags=['c#', 'floating-point', 'type-conversion', 'double', 'decimal'], year=2019, views=42817)
2 Row(tags=['html', 'css', 'internet-explorer-7'], year=2019, views=18214)
3 Row(tags=[], year=2017, views=None)
4 Row(tags=['c#', '.net', 'datetime'], year=2019, views=555183)
5 Row(tags=['c#', 'datetime', 'time', 'datediff', 'relative-time-span'], year=2019, views=149445)
6 Row(tags=[], year=2018, views=None)
7 Row(tags=['html', 'browser', 'timezone', 'user-agent', 'timezone-offset'], year=2019, views=176405)
8 Row(tags=['.net', 'math'], year=2018, views=123231)
9 Row(tags=[], year=2010, views=None)
10 Row(tags=[], year=2010, views=None)


In [18]:
posts_data_sorted = posts_data_simplified.select("year", explode("tags").alias("tag"), "views")

# Группируем по году последней активности и тегам, суммирование всех просмотров для каждого языка программирования в пределах одного года
posts_data_sorted = posts_data_sorted.groupBy("year", "tag").agg(sum("views").alias("total_views"))

posts_data_sorted = posts_data_sorted.orderBy("year", desc("total_views"))
posts_data_sorted.show()

+----+--------------------+-----------+
|year|                 tag|total_views|
+----+--------------------+-----------+
|2008|                  c#|      25401|
|2008|                .net|      24321|
|2008|            database|      19682|
|2008|               local|      19682|
|2008|                java|      11532|
|2008|         inheritance|      10971|
|2008|           variables|       7700|
|2008|       accessibility|       7700|
|2008|               excel|       6540|
|2008|          automation|       6540|
|2008|           interface|       3271|
|2008|       configuration|       2927|
|2008|dependency-injection|       2927|
|2008|      castle-windsor|       2927|
|2008|               linux|       2393|
|2008|       ruby-on-rails|       1843|
|2008|                ruby|       1843|
|2008|  visual-studio-2008|       1432|
|2008|  visual-studio-2005|       1432|
|2008|            .net-3.0|       1432|
+----+--------------------+-----------+
only showing top 20 rows



In [19]:
#  Определяем спецификацию Window, разбитую по годам и упорядоченную в порядке убывания по показателю total_views
window = Window.partitionBy("year").orderBy(posts_data_sorted["total_views"].desc())
# Добавляем колонку rank в DataFrame
ranked_df = posts_data_sorted.withColumn("rank", rank().over(window))
result_df = ranked_df.filter(ranked_df["rank"] <= 5)
result_df = result_df.select("year", "tag", "total_views")
posts_data_sorted_result = result_df.orderBy("year", desc("total_views"))
posts_data_sorted_result.show()
posts_data_sorted_result.write.parquet("posts_data_sorted_result.parquet")

+----+--------------------+-----------+
|year|                 tag|total_views|
+----+--------------------+-----------+
|2008|                  c#|      25401|
|2008|                .net|      24321|
|2008|            database|      19682|
|2008|               local|      19682|
|2008|                java|      11532|
|2009|                  c#|      73661|
|2009|                .net|      39167|
|2009|              python|      32219|
|2009|                 c++|      29381|
|2009|            winforms|      25670|
|2010|                  c#|     128597|
|2010|              arrays|      80868|
|2010|                java|      53333|
|2010|multidimensional-...|      51865|
|2010|              matlab|      51865|
|2011|                  c#|     238076|
|2011|                java|     121315|
|2011|                .net|     120734|
|2011|                 css|     119302|
|2011|             android|     107283|
+----+--------------------+-----------+
only showing top 20 rows



In [20]:
import shutil
directory_path = "posts_data_sorted_result.parquet"
# Удаляем директорию
shutil.rmtree(directory_path)