In [1]:
import pyspark
import os

from datetime import datetime
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import StructType, StructField, StringType, DateType

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.13.0 pyspark-shell'
sc = SparkSession.builder.appName("L2").master("yarn").getOrCreate()

In [3]:
sc

In [None]:
!hadoop fs -put * /user/vonavic/

In [109]:
!hadoop fs -ls

Found 20 items
drwxr-xr-x   - vonavic vonavic          3 2023-12-24 11:59 .sparkStaging
-rwxr-xr-x   3 vonavic vonavic      16035 2023-12-23 18:34 Introduction_pyspark.ipynb
-rwxr-xr-x   3 vonavic vonavic      18356 2023-12-23 18:17 lr1data
drwxr-xr-x   - vonavic vonavic          1 2023-12-24 12:44 path
-rwxr-xr-x   3 vonavic vonavic   74162295 2023-12-24 10:54 posts_sample.xml
-rwxr-xr-x   3 vonavic vonavic      40269 2023-12-24 11:13 programming-languages.csv
drwxr-xr-x   - vonavic vonavic         95 2023-12-24 13:10 report.parquet
drwxr-xr-x   - vonavic vonavic         10 2023-12-24 13:12 report_dict_2010
drwxr-xr-x   - vonavic vonavic         11 2023-12-24 13:12 report_dict_2011
drwxr-xr-x   - vonavic vonavic         10 2023-12-24 13:11 report_dict_2012
drwxr-xr-x   - vonavic vonavic         10 2023-12-24 13:11 report_dict_2013
drwxr-xr-x   - vonavic vonavic          9 2023-12-24 13:11 report_dict_2014
drwxr-xr-x   - vonavic vonavic         11 2023-12-24 13:11 report_dict_2015
drwx

In [72]:
languages_df = sc.read.csv("programming-languages.csv", header=True, inferSchema=True)

In [73]:
from pyspark.sql.functions import lower
from pyspark.sql.functions import concat, lit
languages_df = languages_df.withColumn("name", concat(lit("<"), lower(col("name")), lit(">")))
languages_df.take(10)

[Row(name='<a# .net>', wikipedia_url='https://en.wikipedia.org/wiki/A_Sharp_(.NET)'),
 Row(name='<a# (axiom)>', wikipedia_url='https://en.wikipedia.org/wiki/A_Sharp_(Axiom)'),
 Row(name='<a-0 system>', wikipedia_url='https://en.wikipedia.org/wiki/A-0_System'),
 Row(name='<a+>', wikipedia_url='https://en.wikipedia.org/wiki/A%2B_(programming_language)'),
 Row(name='<a++>', wikipedia_url='https://en.wikipedia.org/wiki/A%2B%2B'),
 Row(name='<abap>', wikipedia_url='https://en.wikipedia.org/wiki/ABAP'),
 Row(name='<abc>', wikipedia_url='https://en.wikipedia.org/wiki/ABC_(programming_language)'),
 Row(name='<abc algol>', wikipedia_url='https://en.wikipedia.org/wiki/ABC_ALGOL'),
 Row(name='<abset>', wikipedia_url='https://en.wikipedia.org/wiki/ABSET'),
 Row(name='<absys>', wikipedia_url='https://en.wikipedia.org/wiki/ABSYS')]

In [74]:
posts_schema = StructType([
    StructField("_Id", StringType(), True),
    StructField("_CreationDate", StringType(), True),
    StructField("_Tags", StringType(), True)
])

posts_df = sc.read.format("xml") \
    .option("rowTag", "row") \
    .schema(posts_schema) \
    .load("posts_sample.xml")

In [75]:
# Разбивка Tags на языки программирования
posts_with_languages_df = posts_df.join(languages_df, col("_Tags").contains(col("name")))

In [76]:
posts_with_languages_df.count()

8204

In [77]:
posts_with_languages_df.take(10)

[Row(_Id='3778222', _CreationDate='2010-09-23T12:13:59.443', _Tags='<java>', name='<java>', wikipedia_url='https://en.wikipedia.org/wiki/Java_(programming_language)'),
 Row(_Id='3798872', _CreationDate='2010-09-26T17:07:04.840', _Tags='<php><wordpress><memory>', name='<php>', wikipedia_url='https://en.wikipedia.org/wiki/PHP'),
 Row(_Id='3833655', _CreationDate='2010-09-30T18:27:56.320', _Tags='<ruby><rvm>', name='<ruby>', wikipedia_url='https://en.wikipedia.org/wiki/Ruby_(programming_language)'),
 Row(_Id='3838866', _CreationDate='2010-10-01T11:52:42.210', _Tags='<c><sizeof>', name='<c>', wikipedia_url='https://en.wikipedia.org/wiki/C_(programming_language)'),
 Row(_Id='3859099', _CreationDate='2010-10-04T21:05:50.150', _Tags='<php><sql>', name='<php>', wikipedia_url='https://en.wikipedia.org/wiki/PHP'),
 Row(_Id='3872977', _CreationDate='2010-10-06T13:31:25.900', _Tags='<python><sungridengine><qsub>', name='<python>', wikipedia_url='https://en.wikipedia.org/wiki/Python_(programming_la

In [83]:
from pyspark.sql.functions import to_date, year
posts_with_languages_df = posts_with_languages_df.withColumn("Year", year(to_date("_CreationDate", "yyyy-MM-dd'T'HH:mm:ss.SSS")))
posts_with_languages_df.take(10)

[Row(_Id='3778222', _CreationDate='2010-09-23T12:13:59.443', _Tags='<java>', name='<java>', wikipedia_url='https://en.wikipedia.org/wiki/Java_(programming_language)', Year=2010),
 Row(_Id='3798872', _CreationDate='2010-09-26T17:07:04.840', _Tags='<php><wordpress><memory>', name='<php>', wikipedia_url='https://en.wikipedia.org/wiki/PHP', Year=2010),
 Row(_Id='3833655', _CreationDate='2010-09-30T18:27:56.320', _Tags='<ruby><rvm>', name='<ruby>', wikipedia_url='https://en.wikipedia.org/wiki/Ruby_(programming_language)', Year=2010),
 Row(_Id='3838866', _CreationDate='2010-10-01T11:52:42.210', _Tags='<c><sizeof>', name='<c>', wikipedia_url='https://en.wikipedia.org/wiki/C_(programming_language)', Year=2010),
 Row(_Id='3859099', _CreationDate='2010-10-04T21:05:50.150', _Tags='<php><sql>', name='<php>', wikipedia_url='https://en.wikipedia.org/wiki/PHP', Year=2010),
 Row(_Id='3872977', _CreationDate='2010-10-06T13:31:25.900', _Tags='<python><sungridengine><qsub>', name='<python>', wikipedia_ur

In [88]:
# Группировка по годам и языкам
grouped_df = posts_with_languages_df.groupBy("Year", "name").count()
grouped_df

DataFrame[Year: int, name: string, count: bigint]

In [103]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

window_spec = Window.partitionBy("Year").orderBy(col("count").desc())

ranked_df = grouped_df.withColumn("rank", F.rank().over(window_spec))

In [104]:
top10_df = ranked_df.filter(col("rank") <= 10).orderBy(col("Year").desc(), col("rank")).filter(col("Year") >= 2010)
top10_df.take(20)

[Row(Year=2019, name='<python>', count=166, rank=1),
 Row(Year=2019, name='<javascript>', count=135, rank=2),
 Row(Year=2019, name='<java>', count=95, rank=3),
 Row(Year=2019, name='<php>', count=65, rank=4),
 Row(Year=2019, name='<r>', count=37, rank=5),
 Row(Year=2019, name='<typescript>', count=17, rank=6),
 Row(Year=2019, name='<c>', count=14, rank=7),
 Row(Year=2019, name='<bash>', count=11, rank=8),
 Row(Year=2019, name='<matlab>', count=9, rank=9),
 Row(Year=2019, name='<dart>', count=9, rank=9),
 Row(Year=2019, name='<go>', count=9, rank=9),
 Row(Year=2018, name='<python>', count=220, rank=1),
 Row(Year=2018, name='<javascript>', count=198, rank=2),
 Row(Year=2018, name='<java>', count=146, rank=3),
 Row(Year=2018, name='<php>', count=111, rank=4),
 Row(Year=2018, name='<r>', count=66, rank=5),
 Row(Year=2018, name='<typescript>', count=27, rank=6),
 Row(Year=2018, name='<c>', count=24, rank=7),
 Row(Year=2018, name='<scala>', count=23, rank=8),
 Row(Year=2018, name='<powershel

In [105]:
tables_dict = {}
for year in top10_df.select("Year").distinct().rdd.map(lambda x: x.Year).collect():
    tables_dict[year] = top10_df.filter(col("Year") == year)

# Вывод примера для первых 2 годов
for year, table in tables_dict.items():
    print(f"Year: {year}")
    table.show()

Year: 2018
+----+------------+-----+----+
|Year|        name|count|rank|
+----+------------+-----+----+
|2018|    <python>|  220|   1|
|2018|<javascript>|  198|   2|
|2018|      <java>|  146|   3|
|2018|       <php>|  111|   4|
|2018|         <r>|   66|   5|
|2018|<typescript>|   27|   6|
|2018|         <c>|   24|   7|
|2018|     <scala>|   23|   8|
|2018|<powershell>|   13|   9|
|2018|      <bash>|   12|  10|
+----+------------+-----+----+

Year: 2015
+----+-------------+-----+----+
|Year|         name|count|rank|
+----+-------------+-----+----+
|2015| <javascript>|  277|   1|
|2015|       <java>|  209|   2|
|2015|        <php>|  167|   3|
|2015|     <python>|  121|   4|
|2015|          <r>|   43|   5|
|2015|          <c>|   38|   6|
|2015|<objective-c>|   30|   7|
|2015|       <ruby>|   21|   8|
|2015|     <matlab>|   16|   9|
|2015|      <scala>|   15|  10|
+----+-------------+-----+----+

Year: 2013
+----+-------------+-----+----+
|Year|         name|count|rank|
+----+-------------

In [108]:
# Сохранение отчета в формате Parquet
top10_df.write.parquet("report.parquet", mode="overwrite")
for year, table in tables_dict.items():
    table.write.format("parquet").save(f"report_dict_{year}")