In [26]:
import os
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import pyspark.sql as sql
from pyspark.sql.functions import explode
from pyspark.sql import Window
from pyspark.sql.functions import row_number

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.14.0 pyspark-shell'

In [4]:
conf = SparkConf().setAppName("my_LR_2").setMaster('local[1]')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

In [13]:
!hadoop fs -ls

Found 9 items
drwxr-xr-x   - kuricyn kuricyn          1 2023-12-20 14:55 .sparkStaging
-rwxr-xr-x   3 kuricyn kuricyn      10824 2023-12-20 15:14 LR_1.ipynb
-rwxr-xr-x   3 kuricyn kuricyn      16004 2023-12-20 15:14 LR_2.ipynb
-rwxr-xr-x   3 kuricyn kuricyn      12237 2023-12-20 13:46 Untitled.ipynb
-rwxr-xr-x   3 kuricyn kuricyn   34603008 2023-12-20 15:14 posts_sample.xml
-rwxr-xr-x   3 kuricyn kuricyn      40269 2023-12-20 15:14 programming-languages.csv
drwxr-xr-x   - kuricyn kuricyn          0 2023-12-20 14:23 spark-warehouse
-rwxr-xr-x   3 kuricyn kuricyn       5647 2023-12-20 13:46 station.csv
-rwxr-xr-x   3 kuricyn kuricyn   80208848 2023-12-20 13:46 trip.csv


In [23]:
# Чтение данных из файла
posts_sample = spark.read.format('xml')\
.option('rootTag', 'posts')\
.option('rowTag', 'row')\
.load("posts_sample.xml")
# Чтение данных из файла
wiki_languages = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.csv("programming-languages.csv")

In [16]:
# Выбор определенных столбцов из DataFrame posts_sample
posts_sample_DF = posts_sample.select("_CreationDate", "_ViewCount", "_Tags")

In [33]:
# Фильтрация строк DataFrame posts_sample_DF
parsed_posts_sample = posts_sample_DF.filter("_Tags is not null").rdd.map(lambda x: (str(x[0]).split('-')[0], x[1], x[2][1:-1].split('><'))).toDF(["year", "views", "languages"])
# Извлечение года, количества просмотров и списка языков из каждой строки в диапазоне от 2010 до 2020 включительно
parsed_posts_sample = parsed_posts_sample.filter(col("year") >= 2010).filter(col("year") <= 2020)

In [21]:
parsed_posts_sample = parsed_posts_sample.withColumn("language", explode(parsed_posts_sample["languages"])).drop('languages')
parsed_posts_sample.show()

+----+-----+------------------+
|year|views|          language|
+----+-----+------------------+
|2010| 3650|               c++|
|2010| 3650|character-encoding|
|2010|  617|        sharepoint|
|2010|  617|          infopath|
|2010| 1315|            iphone|
|2010| 1315|         app-store|
|2010| 1315|   in-app-purchase|
|2010|  973|          symfony1|
|2010|  973|            schema|
|2010|  973|          doctrine|
|2010|  973|          fixtures|
|2010|  132|              java|
|2010|  419|visual-studio-2010|
|2010|  419|          stylecop|
|2010|  869|           cakephp|
|2010|  869|       file-upload|
|2010|  869|         swfupload|
|2010| 1303|               git|
|2010| 1303|            cygwin|
|2010| 1303|             putty|
+----+-----+------------------+
only showing top 20 rows



In [24]:
# Выбор всех имен языков из столбца "name" в DataFrame wiki_languages и приведение их к нижнему регистру
names = wiki_languages.select("name").rdd.map(lambda x: x[0].lower()).collect()
# Фильтрация DataFrame parsed_posts_sample: оставляем только записи, где язык содержится в списке names
parsed_posts_sample = parsed_posts_sample.filter(parsed_posts_sample.language.isin(names))
parsed_posts_sample.show()

+----+-----+-----------+
|year|views|   language|
+----+-----+-----------+
|2010|  132|       java|
|2010| 1258|        php|
|2010| 9649|       ruby|
|2010| 2384|          c|
|2010| 1987|        php|
|2010| 3321|     python|
|2010|  128| javascript|
|2010|  477|applescript|
|2010| 1748|        php|
|2010|  998|        php|
|2010| 2095| javascript|
|2010|  447|        sed|
|2010| 6558|     python|
|2010|  214|       java|
|2010|  214|       ruby|
|2010|  852|objective-c|
|2010|  179| javascript|
|2010| 6709|          r|
|2010|   78|        php|
|2010| 1280| javascript|
+----+-----+-----------+
only showing top 20 rows



In [25]:
# Группировка DataFrame parsed_posts_sample по столбцам "year" и "language" с агрегацией суммы просмотров
parsed_posts_sample = parsed_posts_sample.groupBy("year", "language").agg({"views": "sum"})
parsed_posts_sample.show()

+----+----------+----------+
|year|  language|sum(views)|
+----+----------+----------+
|2013|    erlang|       864|
|2017|typescript|     19078|
|2017|       sed|        51|
|2013|javascript|    245957|
|2013|        f#|       228|
|2012|powershell|     13711|
|2019|       php|      2032|
|2017|   haskell|      2132|
|2013|autohotkey|       790|
|2019|     xpath|        27|
|2015|    racket|       762|
|2017|        go|       970|
|2018|     perl6|       109|
|2012|        f#|      1222|
|2018|    python|     38826|
|2017|    prolog|        76|
|2014|       php|    142858|
|2018|       x++|        78|
|2014|       lua|       604|
|2019|   haskell|        85|
+----+----------+----------+
only showing top 20 rows



In [29]:
window = Window.partitionBy(parsed_posts_sample['year']).orderBy(parsed_posts_sample['sum(views)'].desc())
# Применение аналитической функции row_number() для упорядочивания и нумерации строк в каждой группе по убыванию суммы просмотров
parsed_posts_sample_top10 = parsed_posts_sample.select('*', row_number().over(window).alias('row_number')).filter(col('row_number') <= 10)
# Дополнительная сортировка и удаление лишнего столбца
parsed_posts_sample_top10 = parsed_posts_sample_top10.orderBy(col("year").asc(), col("sum(views)").desc()).drop("rank")
# Переименование столбца "sum(views)" в "views"
parsed_posts_sample_top10 = parsed_posts_sample_top10.withColumnRenamed("sum(views)", "views")
# Запись результата в формате Parquet
parsed_posts_sample_top10.write.mode('overwrite').parquet("top_10_languages_per_year_between_2010_and_2020.parquet")

In [30]:
!hadoop fs -ls

Found 10 items
drwxr-xr-x   - kuricyn kuricyn          1 2023-12-20 14:55 .sparkStaging
-rwxr-xr-x   3 kuricyn kuricyn      10824 2023-12-20 15:14 LR_1.ipynb
-rwxr-xr-x   3 kuricyn kuricyn      16004 2023-12-20 15:14 LR_2.ipynb
-rwxr-xr-x   3 kuricyn kuricyn      12237 2023-12-20 13:46 Untitled.ipynb
-rwxr-xr-x   3 kuricyn kuricyn   34603008 2023-12-20 15:14 posts_sample.xml
-rwxr-xr-x   3 kuricyn kuricyn      40269 2023-12-20 15:14 programming-languages.csv
drwxr-xr-x   - kuricyn kuricyn          0 2023-12-20 14:23 spark-warehouse
-rwxr-xr-x   3 kuricyn kuricyn       5647 2023-12-20 13:46 station.csv
drwxr-xr-x   - kuricyn kuricyn        101 2023-12-20 15:30 top_10_languages_per_year_between_2010_and_2020.parquet
-rwxr-xr-x   3 kuricyn kuricyn   80208848 2023-12-20 13:46 trip.csv


In [32]:
# загрузка с сервера
!hadoop fs -get top_10_languages_per_year_between_2010_and_2020.parquet .

In [None]:
sc.stop()# отключаемся