In [1]:
from pyspark.sql import SparkSession

In [2]:
from pyspark.sql.types import *

In [68]:
from pyspark.sql.functions import col, lit, coalesce, count, desc, asc, explode, concat_ws

In [4]:
spark = SparkSession.builder\
        .master("local[1]")\
        .appName('PySpark_Articles')\
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/18 22:13:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
json_file = 'articles.json'
data = spark.read.json(json_file)

                                                                                

In [6]:
data.printSchema()

root
 |-- canonical_url: string (nullable = true)
 |-- collection_id: long (nullable = true)
 |-- comments_count: long (nullable = true)
 |-- cover_image: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- crossposted_at: string (nullable = true)
 |-- description: string (nullable = true)
 |-- edited_at: string (nullable = true)
 |-- flare_tag: struct (nullable = true)
 |    |-- bg_color_hex: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- text_color_hex: string (nullable = true)
 |-- id: long (nullable = true)
 |-- language: string (nullable = true)
 |-- last_comment_at: string (nullable = true)
 |-- organization: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- profile_image: string (nullable = true)
 |    |-- profile_image_90: string (nullable = true)
 |    |-- slug: string (nullable = true)
 |    |-- username: string (nullable = true)
 |-- path: string (nullable = true)
 |-- positive_reactions_count: long (

In [6]:
articles_raw = data.select('type_of', 'id', 'title', 'description', 'slug', 'url', 'comments_count', 
            'public_reactions_count', 'published_timestamp', 'language', 'subforem_id', 
            'positive_reactions_count', 'created_at', 'published_at', 'reading_time_minutes', 
            'tag_list', 'user.user_id', 'user.name', 'user.username'). \
        withColumn("published_at", col("published_at").cast('timestamp')). \
        withColumn("created_at", col("created_at").cast('timestamp'))

In [7]:
articles_raw.show(1, False)

+-------+-------+-----------+--------------------------------------------------------------------------------------------------------+----------------+-----------------------------------+--------------+----------------------+--------------------+--------+-----------+------------------------+-------------------+-------------------+--------------------+-----------------------------+-------+-----------+--------+
|type_of|id     |title      |description                                                                                             |slug            |url                                |comments_count|public_reactions_count|published_timestamp |language|subforem_id|positive_reactions_count|created_at         |published_at       |reading_time_minutes|tag_list                     |user_id|name       |username|
+-------+-------+-----------+--------------------------------------------------------------------------------------------------------+----------------+-----------------------

In [8]:
articles = data.select('type_of', 'id', 'title', 'slug', 'url', 'comments_count', 
            'public_reactions_count', 'published_timestamp', 
            'positive_reactions_count', 'created_at', 'published_at', 'reading_time_minutes', 
            'tag_list', 'user.user_id'). \
        withColumn("published_at", col("published_at").cast('timestamp')). \
        withColumn("created_at", col("created_at").cast('timestamp'))

In [9]:
articles.show(3)

+-------+-------+--------------------+--------------------+--------------------+--------------+----------------------+--------------------+------------------------+-------------------+-------------------+--------------------+--------------------+-------+
|type_of|     id|               title|                slug|                 url|comments_count|public_reactions_count| published_timestamp|positive_reactions_count|         created_at|       published_at|reading_time_minutes|            tag_list|user_id|
+-------+-------+--------------------+--------------------+--------------------+--------------+----------------------+--------------------+------------------------+-------------------+-------------------+--------------------+--------------------+-------+
|article|2597174|         Meme Monday|    meme-monday-114b|https://dev.to/be...|            43|                    35|2025-06-16T12:16:59Z|                      35|2025-06-16 16:17:03|2025-06-16 16:16:59|                   1|[discuss, 

In [10]:
#проверка на дубликаты articles
articles.distinct().count()

14998

In [11]:
#удаление дубликатов
articles = articles.dropDuplicates()

In [12]:
users = data.select('user.user_id', 'user.name', 'user.username', 'user.website_url')

In [13]:
users.show()

+-------+--------------------+---------------+--------------------+
|user_id|                name|       username|         website_url|
+-------+--------------------+---------------+--------------------+
|      1|         Ben Halpern|            ben|http://benhalpern...|
| 201004|               Eevis|eevajonnapanula| https://eevis.codes|
|   9688|       Aurélie Vache|   aurelievache|   http://scraly.com|
|   9597|         Nick Taylor|   nickytonline|https://OneTipAWe...|
| 192035|    Tilde A. Thurium|     annthurium|https://tildethur...|
| 156043|Rita {FlyNerd} Ly...|         ritaly|http://www.flyner...|
| 527439|      Christoph Görn|          goern|   https://b4mad.net|
|2711665|              Shayan|          shayy| https://userjot.com|
|  35233|      Burdette Lamar|  burdettelamar|                null|
|2924029|      davinceleecode| davinceleecode|                null|
|   8745|        Chris Jarvis|   jarvisscript|http://christophe...|
| 135503|       Erik Dietrich|       daedtech|ht

In [14]:
#проверка на дубликаты
users.distinct().count() == users.count()

False

In [15]:
users = users.dropDuplicates()

In [16]:
users.count()

6044

In [17]:
json_file_videos = 'videos.json'
data_videos = spark.read.json(json_file_videos)

In [18]:
data_videos.printSchema()

root
 |-- cloudinary_video_url: string (nullable = true)
 |-- id: long (nullable = true)
 |-- path: string (nullable = true)
 |-- title: string (nullable = true)
 |-- type_of: string (nullable = true)
 |-- user: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |-- user_id: long (nullable = true)
 |-- video_duration_in_minutes: string (nullable = true)
 |-- video_source_url: string (nullable = true)



In [19]:
data_videos.show()

+--------------------+-------+--------------------+--------------------+-------------+-------------------+-------+-------------------------+--------------------+
|cloudinary_video_url|     id|                path|               title|      type_of|               user|user_id|video_duration_in_minutes|    video_source_url|
+--------------------+-------+--------------------+--------------------+-------------+-------------------+-------+-------------------------+--------------------+
|https://media2.de...|2594229|/kingsmen732/ambi...|Ambient lighting ...|video_article|      {B Mithilesh}|1631667|                    01:19|https://dw71fyauz...|
|https://media2.de...|2597387|/kingsmen732/bypa...|Bypass Netflix’s ...|video_article|      {B Mithilesh}|1631667|                    00:38|https://dw71fyauz...|
|https://media2.de...|2562728|/insightworks/bus...|Business Central ...|video_article|    {Insight Works}|3081293|                    10:39|https://dw71fyauz...|
|https://media2.de...|258892

In [20]:
videos = data_videos.select('id', 'title', 'type_of', 'user.name', 'user_id', 'video_duration_in_minutes', 'video_source_url')

In [21]:
videos.show()

+-------+--------------------+-------------+-----------------+-------+-------------------------+--------------------+
|     id|               title|      type_of|             name|user_id|video_duration_in_minutes|    video_source_url|
+-------+--------------------+-------------+-----------------+-------+-------------------------+--------------------+
|2594229|Ambient lighting ...|video_article|      B Mithilesh|1631667|                    01:19|https://dw71fyauz...|
|2597387|Bypass Netflix’s ...|video_article|      B Mithilesh|1631667|                    00:38|https://dw71fyauz...|
|2562728|Business Central ...|video_article|    Insight Works|3081293|                    10:39|https://dw71fyauz...|
|2588924|"Hello, world" 🖥...|video_article|           AI AGI|3185505|                    02:13|https://dw71fyauz...|
|2588138|Project of the We...|video_article|     Riyana Patel|2915485|                    02:00|https://dw71fyauz...|
|2579437|Are you preparing...|video_article|    Mohit Dec

In [22]:
#проверка на дубликаты
videos.count()

1711

In [23]:
videos.distinct().count()

1711

In [None]:
#Записываем в БД


In [42]:
jdbc_url = "jdbc:postgresql://localhost:5432/nataly"
connection_properties = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

In [14]:
articles_raw.write.jdbc(
    url=jdbc_url,
    table="pp_articles.articles_raw",
    mode="overwrite", # или "append", "ignore", "error"
    properties=connection_properties
)

25/06/17 00:02:02 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [128]:
articles.write.jdbc(
    url=jdbc_url,
    table="pp_articles.articles",
    mode="overwrite", # или "append", "ignore", "error"
    properties=connection_properties
)

In [141]:
users.write.jdbc(
    url=jdbc_url,
    table="pp_articles.users",
    mode="overwrite", # или "append", "ignore", "error"
    properties=connection_properties
)

In [25]:
videos.write.jdbc(
    url=jdbc_url,
    table="pp_articles.videos",
    mode="overwrite", # или "append", "ignore", "error"
    properties=connection_properties
)

In [78]:
#замена нулевых значений на ноль в колонке title
#users_with_videos = users_with_videos.withColumn('title', coalesce('title', lit(0)))

In [24]:
#новый датафрейм с кол-вом видео по пользователям
users_with_videos = videos.groupBy('user_id').agg(count("title").alias("video_cnt"))

In [25]:
users_with_videos.show(5)

+-------+---------+
|user_id|video_cnt|
+-------+---------+
| 776627|        1|
|1182761|        1|
| 178146|        1|
|1410108|        1|
| 118274|        1|
+-------+---------+
only showing top 5 rows



In [26]:
#новый датафрейм с кол-вом статей по пользователям 
articles_by_users = articles.groupBy('user_id').agg(count("id").alias("articles_cnt"))

In [27]:
articles_by_users.show(5)

+-------+------------+
|user_id|articles_cnt|
+-------+------------+
| 436375|           6|
|  51056|           5|
| 659184|           2|
| 194630|          14|
| 307749|           1|
+-------+------------+
only showing top 5 rows



In [28]:
#джойним два этих датафрейма
top_users = articles_by_users.join(users_with_videos, on='user_id', how='left')

In [29]:
top_users.show(5)

+-------+------------+---------+
|user_id|articles_cnt|video_cnt|
+-------+------------+---------+
| 436375|           6|     null|
|  51056|           5|     null|
| 659184|           2|     null|
| 194630|          14|     null|
| 307749|           1|     null|
+-------+------------+---------+
only showing top 5 rows



In [30]:
top_users.count()

6044

In [31]:
#пользователи с видео
top_users.filter(col('video_cnt').isNotNull()).count()

124

In [32]:
top_users.filter(col('video_cnt').isNotNull()).show()

+-------+------------+---------+
|user_id|articles_cnt|video_cnt|
+-------+------------+---------+
| 358418|           3|        1|
|  83627|           1|        1|
|1109311|           1|        1|
| 364684|           2|        1|
|   2882|           2|       22|
|1106033|           1|        2|
| 284014|          10|        1|
| 192035|           1|        1|
| 114176|           1|        1|
| 378160|           1|        2|
| 215856|           5|        1|
|  11207|           6|        1|
|    264|         136|        2|
|1262818|           2|        1|
| 307576|           2|       90|
|1438636|           1|        1|
| 147310|           1|        1|
| 108782|           6|       21|
| 555587|           7|       18|
| 271838|          17|       32|
+-------+------------+---------+
only showing top 20 rows



In [33]:
#замена нулевых значений на ноль в колонке video_cnt
top_users = top_users.withColumn('video_cnt', coalesce('video_cnt', lit(0)))

In [36]:
top_users = top_users.join(users, on='user_id', how='left').select('user_id', 'username', 'articles_cnt', 'video_cnt')

In [40]:
#лидеры по кол-ву статей
top_users_by_articles = top_users.sort(desc("articles_cnt"))
top_users_by_articles.show()

+-------+------------------+------------+---------+
|user_id|          username|articles_cnt|video_cnt|
+-------+------------------+------------+---------+
|      3|   thepracticaldev|         593|        7|
|      1|               ben|         503|        1|
|  31047|             sloan|         300|        0|
|  38578|michaeltharrington|         278|        0|
|    264|              jess|         136|        2|
| 345658|           bekahhw|         115|        5|
|   9597|      nickytonline|         101|        9|
| 370165|     dailydevtips1|          97|        0|
| 342975|     graciegregory|          89|        0|
| 159737|             madza|          86|        0|
| 494502|       erinposting|          73|        0|
| 161327|     alvaromontoro|          59|        0|
| 968077|       rachelfazio|          47|        0|
|   9688|      aurelievache|          42|        0|
| 302741|    blackgirlbytes|          42|        0|
| 180171|        devencourt|          41|        0|
| 110884|   

In [43]:
#записываем в БД
top_users_by_articles.write.jdbc(
    url=jdbc_url,
    table="pp_articles.top_users_by_articles",
    mode="overwrite", # или "append", "ignore", "error"
    properties=connection_properties
)


In [41]:
#лидеры по кол-ву видео
top_users_by_videos = top_users.sort(desc("video_cnt"))
top_users_by_videos.show()

+-------+-----------------+------------+---------+
|user_id|         username|articles_cnt|video_cnt|
+-------+-----------------+------------+---------+
| 231136|     cheukting_ho|           2|      118|
| 307576| eleftheriabatsou|           2|       90|
|1255335|jguerrero-voxel51|           3|       57|
| 271838|     waylonwalker|          17|       32|
| 397557|      mishmanners|          31|       32|
|  19970|        bdougieyo|          30|       31|
| 407879|           whykay|           9|       29|
|   2882|     vaidehijoshi|           2|       22|
| 108782|      andrewbrown|           6|       21|
|  33233|   realtoughcandy|           4|       20|
| 555587|          kgilpin|           7|       18|
| 175743|     coderarchive|           2|       17|
|   2416|          joelnet|           3|       16|
| 386069|        hammertoe|           3|       16|
|1222232|        gpiechnik|           1|       14|
|  45906|       satansdeer|           1|       12|
|  38442|         codebubb|    

In [44]:
#записываем в БД
top_users_by_videos.write.jdbc(
    url=jdbc_url,
    table="pp_articles.top_users_by_videos",
    mode="overwrite", # или "append", "ignore", "error"
    properties=connection_properties
)

In [46]:
users.count()

6044

In [47]:
#отсортируем articles по кол-ву позитивных реакций (>= 1000)
articles.sort(desc("positive_reactions_count")).filter(col('positive_reactions_count') >= 1000).count()

61

In [48]:
#отсортируем articles по кол-ву позитивных реакций (топ-1000)
top_1000_articles = articles.sort(desc("positive_reactions_count")).limit(1000)

In [49]:
top_1000_articles.show(3)

+-------+------+--------------------+--------------------+--------------------+--------------+----------------------+--------------------+------------------------+-------------------+-------------------+--------------------+--------------------+-------+
|type_of|    id|               title|                slug|                 url|comments_count|public_reactions_count| published_timestamp|positive_reactions_count|         created_at|       published_at|reading_time_minutes|            tag_list|user_id|
+-------+------+--------------------+--------------------+--------------------+--------------+----------------------+--------------------+------------------------+-------------------+-------------------+--------------------+--------------------+-------+
|article|347446|35+ Free React te...|35-free-react-tem...|https://dev.to/da...|            71|                  4159|2020-06-02T10:44:43Z|                    4159|2020-06-01 17:51:59|2020-06-02 14:44:43|                  15|[react, webdev

In [50]:
#разворачиваем список тегов
explode_tags = top_1000_articles.select('id', 'title', 'positive_reactions_count', 'user_id', explode('tag_list').alias('tag'))
explode_tags.show(3)

+------+--------------------+------------------------+-------+----------+
|    id|               title|positive_reactions_count|user_id|       tag|
+------+--------------------+------------------------+-------+----------+
|347446|35+ Free React te...|                    4159| 327338|     react|
|347446|35+ Free React te...|                    4159| 327338|    webdev|
|347446|35+ Free React te...|                    4159| 327338|javascript|
+------+--------------------+------------------------+-------+----------+
only showing top 3 rows



In [51]:
#самые популярные теги
top_tags = explode_tags.groupBy('tag').agg((count("id")).alias('tag_cnt'))
top_tags.show(3)


+----------+-------+
|       tag|tag_cnt|
+----------+-------+
|     react|    169|
|    webdev|    524|
|javascript|    411|
+----------+-------+
only showing top 3 rows



In [52]:
top_tags_20 = top_tags.sort(desc('tag_cnt')).limit(20)
top_tags_20.show()

+------------+-------+
|         tag|tag_cnt|
+------------+-------+
|      webdev|    524|
|  javascript|    411|
|   beginners|    314|
|       react|    169|
|productivity|    169|
|         css|    143|
|      career|    128|
|    tutorial|    127|
| programming|    103|
|  codenewbie|     86|
|        html|     73|
|        node|     62|
|      github|     60|
|  opensource|     57|
|     showdev|     40|
|         git|     38|
|      devops|     36|
|     discuss|     31|
|  typescript|     30|
|      python|     25|
+------------+-------+



In [53]:
#записываем в БД
top_tags_20.write.jdbc(
    url=jdbc_url,
    table="pp_articles.top_tags_20",
    mode="overwrite", # или "append", "ignore", "error"
    properties=connection_properties
)


In [73]:
top_tags.sort(desc('tag_cnt')).count()

392

In [82]:
#самые популярные пары тегов
#self-join таблицы explode_tags
explode_tags.createOrReplaceTempView("explode_tags")
popular_tags = spark.sql("""SELECT a.tag as tag_1, b.tag as tag_2
                       FROM explode_tags a, explode_tags b
                       WHERE a.id = b.id and a.tag != b.tag""")

In [83]:
popular_tags.show()

+----------+------------+
|     tag_1|       tag_2|
+----------+------------+
|     react|        html|
|     react|  javascript|
|     react|      webdev|
|    webdev|        html|
|    webdev|  javascript|
|    webdev|       react|
|javascript|        html|
|javascript|      webdev|
|javascript|       react|
|      html|  javascript|
|      html|      webdev|
|      html|       react|
|    github|productivity|
|    github|  javascript|
|    github|      webdev|
|    webdev|productivity|
|    webdev|  javascript|
|    webdev|      github|
|javascript|productivity|
|javascript|      webdev|
+----------+------------+
only showing top 20 rows



In [84]:
popular_tags = popular_tags.groupBy('tag_1', 'tag_2').count().sort(desc('count'))

In [88]:
popular_tags.show()

+------------+------------+-----+
|       tag_1|       tag_2|count|
+------------+------------+-----+
|      webdev|  javascript|  275|
|      webdev|   beginners|  182|
|       react|  javascript|  139|
|  javascript|   beginners|  132|
|       react|      webdev|  108|
|         css|      webdev|  100|
|      webdev|    tutorial|   80|
|      webdev|productivity|   76|
|      webdev| programming|   59|
|   beginners|      career|   57|
|   beginners|    tutorial|   56|
|      webdev|        html|   54|
|productivity|   beginners|   52|
|  javascript|        node|   50|
|         css|  javascript|   49|
|      career|      webdev|   48|
|   beginners|       react|   42|
|      career|productivity|   40|
|  codenewbie|   beginners|   35|
|  javascript|        html|   34|
+------------+------------+-----+
only showing top 20 rows



In [89]:
popular_tags = popular_tags.drop_duplicates(['count']).sort(desc('count'))

In [90]:
popular_tags = popular_tags.withColumn('tags_pair', concat_ws(' - ', col('tag_1'), col('tag_2')))

In [97]:
popular_tags_pair = popular_tags.select('tags_pair', 'count').limit(40)
popular_tags_pair.show()

+--------------------+-----+
|           tags_pair|count|
+--------------------+-----+
| webdev - javascript|  275|
|  webdev - beginners|  182|
|  react - javascript|  139|
|javascript - begi...|  132|
|      react - webdev|  108|
|        css - webdev|  100|
|   webdev - tutorial|   80|
|webdev - producti...|   76|
|webdev - programming|   59|
|  beginners - career|   57|
|beginners - tutorial|   56|
|       webdev - html|   54|
|productivity - be...|   52|
|   javascript - node|   50|
|    css - javascript|   49|
|     career - webdev|   48|
|   beginners - react|   42|
|career - producti...|   40|
|codenewbie - begi...|   35|
|   javascript - html|   34|
+--------------------+-----+
only showing top 20 rows



In [98]:
#записываем в БД
popular_tags_pair.write.jdbc(
    url=jdbc_url,
    table="pp_articles.popular_tags_pair",
    mode="overwrite", # или "append", "ignore", "error"
    properties=connection_properties
)


In [259]:
#самые популярные статьи по реакциям
articles.select('user_id', 'title', 'comments_count', 'public_reactions_count', 'reading_time_minutes', 'tag_list').sort(desc('public_reactions_count')).limit(50).show()

+-------+--------------------+--------------+----------------------+--------------------+--------------------+
|user_id|               title|comments_count|public_reactions_count|reading_time_minutes|            tag_list|
+-------+--------------------+--------------+----------------------+--------------------+--------------------+
| 327338|35+ Free React te...|            71|                  4159|                  15|[react, webdev, j...|
| 393931|Best GitHub Repos...|            56|                  2722|                   5|[github, webdev, ...|
| 497983|Complete Flexbox ...|            85|                  2646|                   6|[css, webdev, beg...|
| 327338|50+ free tools an...|            89|                  2519|                   8|[design, webdev, ...|
| 487739|10 Fun APIs to Us...|            71|                  2106|                   4|[javascript, webd...|
| 126437|The Complete Guid...|            78|                  2074|                  28|[web3, blockchain...|
|

In [263]:
#самые комментируемые статьи 
articles.select('user_id', 'title', 'comments_count', 'public_reactions_count', 'reading_time_minutes', 'tag_list').sort(desc('comments_count')).limit(50).show()

+-------+--------------------+--------------+----------------------+--------------------+--------------------+
|user_id|               title|comments_count|public_reactions_count|reading_time_minutes|            tag_list|
+-------+--------------------+--------------+----------------------+--------------------+--------------------+
|  31047|Welcome Thread - ...|           354|                    47|                   1|           [welcome]|
|  31047|Welcome Thread - ...|           343|                    56|                   1|           [welcome]|
|  31047|Welcome Thread - ...|           334|                    36|                   1|           [welcome]|
| 262904|Do You Still Use ...|           320|                   216|                   1|[discuss, git, te...|
|  31047|Welcome Thread - ...|           312|                    29|                   1|           [welcome]|
|  31047|Welcome Thread - ...|           307|                    33|                   1|           [welcome]|
|