In [1]:
pip install --upgrade pyspark



In [2]:
import os
import sys
import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql import SparkSession
import subprocess

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.17.0 pyspark-shell'
spark = SparkSession.builder.getOrCreate()
spark

In [3]:
subprocess.run(["wget", "https://git.ai.ssau.ru/tk/big_data/raw/branch/master/data/posts_sample.xml"], check=True)

postsData = spark.read.format('xml').option('rowTag', 'row').option("timestampFormat", 'y/M/d H:m:s').load('posts_sample.xml')

print(f"Elems: {postsData.count()}")
postsData.printSchema()
postsData.show()

dates = ("2008-01-01",  "2022-12-31")
posts_by_date = postsData.filter(F.col("_CreationDate").between(*dates))
posts_by_date.show()

Elems: 46006
root
 |-- _AcceptedAnswerId: long (nullable = true)
 |-- _AnswerCount: long (nullable = true)
 |-- _Body: string (nullable = true)
 |-- _ClosedDate: timestamp (nullable = true)
 |-- _CommentCount: long (nullable = true)
 |-- _CommunityOwnedDate: timestamp (nullable = true)
 |-- _CreationDate: timestamp (nullable = true)
 |-- _FavoriteCount: long (nullable = true)
 |-- _Id: long (nullable = true)
 |-- _LastActivityDate: timestamp (nullable = true)
 |-- _LastEditDate: timestamp (nullable = true)
 |-- _LastEditorDisplayName: string (nullable = true)
 |-- _LastEditorUserId: long (nullable = true)
 |-- _OwnerDisplayName: string (nullable = true)
 |-- _OwnerUserId: long (nullable = true)
 |-- _ParentId: long (nullable = true)
 |-- _PostTypeId: long (nullable = true)
 |-- _Score: long (nullable = true)
 |-- _Tags: string (nullable = true)
 |-- _Title: string (nullable = true)
 |-- _ViewCount: long (nullable = true)

+-----------------+------------+--------------------+-----------

In [4]:
subprocess.run(["wget", "https://git.ai.ssau.ru/tk/big_data/raw/branch/master/data/programming-languages.csv"], check=True)
languagesData = spark.read.format("csv").option("header", True).option("inferSchema", True).load("programming-languages.csv").dropna(how="all")

In [5]:
print(f"\nLanguages_count: {languagesData.count()}")
print(f"\nFirst_lang:")
languagesData.show(vertical=True)
languagesData.printSchema()


Languages_count: 700

First_lang:
-RECORD 0-----------------------------
 name          | A# .NET              
 wikipedia_url | https://en.wikipe... 
-RECORD 1-----------------------------
 name          | A# (Axiom)           
 wikipedia_url | https://en.wikipe... 
-RECORD 2-----------------------------
 name          | A-0 System           
 wikipedia_url | https://en.wikipe... 
-RECORD 3-----------------------------
 name          | A+                   
 wikipedia_url | https://en.wikipe... 
-RECORD 4-----------------------------
 name          | A++                  
 wikipedia_url | https://en.wikipe... 
-RECORD 5-----------------------------
 name          | ABAP                 
 wikipedia_url | https://en.wikipe... 
-RECORD 6-----------------------------
 name          | ABC                  
 wikipedia_url | https://en.wikipe... 
-RECORD 7-----------------------------
 name          | ABC ALGOL            
 wikipedia_url | https://en.wikipe... 
-RECORD 8--------------------

In [6]:
def includes_name(x, language_names):
    tag = next((name for name in language_names if f'<{name.lower()}>' in str(x._Tags).lower()), 'No')
    return (x[6], tag)

def process_posts(posts_by_date, languagesData):
    language_names = [str(x[0]) for x in languagesData.collect()]
    posts_by_date_rdd = posts_by_date.rdd.map(lambda x: includes_name(x, language_names)).filter(lambda x: x[1] != 'No')
    posts_by_date_rdd_group = posts_by_date_rdd.keyBy(lambda row: (row[0].year, row[1])) .aggregateByKey(0, lambda x, y: x + 1, lambda x1, x2: x1 + x2) .sortBy(lambda x: x[1], ascending=False).collect()
    return posts_by_date_rdd_group

def get_top_languages_by_years(posts_by_date_rdd_group):
    years = list(range(2022, 2009, -1))
    df_by_years = []
    for year in years:
        df_by_years.extend([row for row in posts_by_date_rdd_group if row[0][0] == year][:10])

    return df_by_years

def create_dataframe(df_by_years):
    row_template = Row('Year', 'Language', 'Count')
    return spark.createDataFrame([row_template(*x) for x in df_by_years])

def main(posts_by_date, languagesData):
    result_df = create_dataframe(get_top_languages_by_years(process_posts(posts_by_date, languagesData)))
    result_df.show()
    result_df.write.mode("overwrite").parquet("top_10_languages.parquet")

main(posts_by_date, languagesData)

+------------------+--------+
|              Year|Language|
+------------------+--------+
|    {2019, Python}|     162|
|{2019, JavaScript}|     131|
|      {2019, Java}|      95|
|       {2019, PHP}|      59|
|         {2019, R}|      36|
|         {2019, C}|      14|
|      {2019, Dart}|       9|
|    {2019, MATLAB}|       9|
|        {2019, Go}|       9|
|      {2019, Bash}|       8|
|    {2018, Python}|     214|
|{2018, JavaScript}|     196|
|      {2018, Java}|     145|
|       {2018, PHP}|      99|
|         {2018, R}|      63|
|         {2018, C}|      24|
|     {2018, Scala}|      22|
|{2018, TypeScript}|      21|
|{2018, PowerShell}|      13|
|      {2018, Bash}|      12|
+------------------+--------+
only showing top 20 rows

