In [0]:
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [0]:
# File location and type
file_location = "/FileStore/tables/large.json.gz"
file_type = "json"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)




path_journal_info = "/FileStore/tables/journal_information.csv"
journal_info = spark.read.option('header', True).csv(path_journal_info)

#pre-processing of the journal file
journal_info = journal_info.withColumnRenamed("Journal Name", "journal_name") 
journal_info = journal_info.withColumnRenamed("Category & Journal Quartiles", "category_and_journal_quartiles") 

#removing null row 
journal_info = journal_info.dropna(subset=['journal_name'])

In [0]:
# Creating a view for analysis of Q1, Q2, Q3

temp_table_name = ("publications")
df.createOrReplaceTempView(temp_table_name)

In [0]:
#question 1

temp_table_name = ("publications")
df.createOrReplaceTempView(temp_table_name)

#using pyspark sql to query the data
distinct_papers_no = spark.sql("""
    SELECT 
        COUNT(DISTINCT corpusid) AS distinct_id_count,
        COUNT(*) AS total_count
    FROM publications
""")

#converting the output of pyspark sql query (dataframe) into a list variable
result_data = distinct_papers_no.collect()[0]
distinct_id_count = result_data['distinct_id_count']
total_count = result_data['total_count']

# Outputting the results
print(f"Total number of papers: {total_count}")
print(f"Number of distinct IDs: {distinct_id_count}")

# Checking if all papers have unique IDs
if distinct_id_count == total_count:
    print("All papers have unique IDs.")
else:
    print("There are duplicate IDs in the dataset.")

Total number of papers: 150000
Number of distinct IDs: 150000
All papers have unique IDs.


In [0]:
#question 2

temp_table_name = ("publications")
df.createOrReplaceTempView(temp_table_name)

avg_authors = spark.sql("""
    SELECT AVG(size(authors)) AS average_num_authors
    FROM publications
""")

avg_authors_result = avg_authors.collect()[0][0]
print(f"Average number of authors per each paper: {avg_authors_result}")

Average number of authors per each paper: 2.81628


In [0]:
#question 3

temp_table_name = ("publications")
df.createOrReplaceTempView(temp_table_name)

journal_count = spark.sql("""
    SELECT COUNT(DISTINCT journal.name) AS different_journals_count
    FROM publications
    WHERE journal.name IS NOT NULL AND TRIM(journal.name) != ''
""")

journal_count_result = journal_count.collect()[0][0]

print(f"Total number of unique journals: {journal_count_result}")

Total number of unique journals: 33916


In [0]:
#question 4

exploded_df = df.selectExpr("corpusid", "explode(authors) as author")
exploded_df.createOrReplaceTempView("exploded_publications")


top_authors_result = spark.sql("""
    SELECT author, COUNT(*) AS publication_count
    FROM exploded_publications
    GROUP BY author
    ORDER BY publication_count DESC
    LIMIT 5
""")

top_authors_result.display()

author,publication_count
"List(2149377746, B. Noble)",23
"List(90537224, S. Sukhoruchkin)",16
"List(88842366, Z. Soroko)",16
"List(49898687, M. Kumar)",15
"List(49611617, M. Jain)",10


In [0]:
#question 5

selected_columns = df.select(col("journal.name").alias("journal_name"), explode("authors").alias("authors"))

# Creating temporary views
selected_columns.createOrReplaceTempView("papers")
journal_info.createOrReplaceTempView("journals")

top_authors_if_result = spark.sql("""
    SELECT authors AS author_name, SUM(IF) AS cumulative_impact_factor
    FROM papers
    LEFT JOIN journals ON papers.journal_name = journals.journal_name
    GROUP BY authors
    ORDER BY cumulative_impact_factor DESC
    LIMIT 5
""")

# Showing the results table
top_authors_if_result.display()

author_name,cumulative_impact_factor
"List(2155504929, Ying Li)",93.832
"List(144797099, M. Viana)",92.238
"List(5152451, L. Andrade)",92.238
"List(49900836, H. Wood)",90.422
"List(7695437, A. M. Ruscio)",87.899


In [0]:
# question 6

selected_columns_for_q6 = df.select(col("journal.name").alias("journal_name"), "year")
selected_columns_for_q6.createOrReplaceTempView("papers_trends")
journal_info.createOrReplaceTempView("journals")

publications_count_result = spark.sql("""
    SELECT year, COUNT(*) AS publications_count
    FROM papers_trends
    LEFT JOIN journals ON papers_trends.journal_name = journals.journal_name
    WHERE IF >= 1 AND year BETWEEN 2010 AND 2020
    GROUP BY year
    ORDER BY year DESC
""")

publications_count_result.display()

year,publications_count
2020,444
2019,396
2018,365
2017,329
2016,283
2015,244
2014,242
2013,178
2012,165
2011,139
