In [0]:
Large_file_path = "dbfs:/FileStore/tables/large.json.gz"

Journal_file_path = "dbfs:/FileStore/tables/journal_information.csv"

journal_df = spark.read.format("csv").option("header", "true").load(Journal_file_path)
large_df = spark.read.format("json").load(Large_file_path)

In [0]:
# Registering DataFrame as temporary SQL tables
large_table =large_df.createOrReplaceTempView("large_table")
journal_table= journal_df.createOrReplaceTempView("journal_table")

In [0]:
"""1. Programmatically confirm that all papers have unique IDs and output the number of papers in the file."""
def check_unique_ids_sql(large_table):

    # SQL query to count distinct CorpusId and total papers
    Distinct_corpusID_papers = spark.sql("""
    SELECT COUNT(DISTINCT CorpusId) AS unique_ids_count, COUNT(*) AS total_papers_count
    FROM large_table
    """)
    # Executing SQL query and collect the result
    result = Distinct_corpusID_papers.collect()[0]

    # Extracting unique IDs count and total papers count
    unique_ids_count = result['unique_ids_count']
    total_papers_count = result['total_papers_count']

    print("Total number of papers:", total_papers_count)
    print("Number of unique IDs:", unique_ids_count)

check_unique_ids_sql(large_table)


Total number of papers: 150000
Number of unique IDs: 150000


In [0]:
"""2. What is the average number of authors per paper?"""
def calculate_avg_authors_per_paper_sql(large_table):

    # SQL query to calculate the average number of authors per paper
    Avg_authors = spark.sql("""
    SELECT AVG(size(authors)) AS avg_authors_per_paper
    FROM large_table
    """)

    # to collect the result
    avg_authors_per_paper = Avg_authors.collect()[0]['avg_authors_per_paper']

    print("Average number of authors per paper:", avg_authors_per_paper)

calculate_avg_authors_per_paper_sql(large_table)


Average number of authors per paper: 2.81628


In [0]:
""" 3. How many different journals were the papers published in?"""
def count_distinct_journals_sql(large_table):

    # Write SQL query to filter out null values and empty strings in the "journal" column
    journal_filter = spark.sql("""
        SELECT COUNT(DISTINCT journal.name) AS distinct_journals_count
        FROM large_table
        WHERE journal.name IS NOT NULL AND journal.name != ''
    """)

    # to collect the result
    distinct_journals_count = journal_filter.collect()[0][0]
    print("Number of different journals:", distinct_journals_count)

count_distinct_journals_sql(large_table)

Number of different journals: 33916


In [0]:
"""4. Find the 5 authors with the highest number of publications. 
    Give their names along with the number of publications they contributed to."""
def top_authors_by_publication_count_sql(large_table):

    # SQL query to explode the authors array, group by author, and count the number of publications per author
    count_publication = spark.sql("""
    SELECT author.authorId, author.name, COUNT(*) AS publication_count
    FROM large_table
    LATERAL VIEW explode(authors) AS author
    GROUP BY author.authorId, author.name
    """)

    # selecting the top 5 authors
    top_10_authors = count_publication.orderBy(count_publication["publication_count"].desc()).limit(5)

    # Display the top 5 authors with the highest number of publications
    top_10_authors.show(truncate=False)


top_authors_by_publication_count_sql(large_table)


+----------+---------------+-----------------+
|authorId  |name           |publication_count|
+----------+---------------+-----------------+
|2149377746|B. Noble       |23               |
|90537224  |S. Sukhoruchkin|16               |
|88842366  |Z. Soroko      |16               |
|49898687  |M. Kumar       |15               |
|49611617  |M. Jain        |10               |
+----------+---------------+-----------------+



In [0]:
""" Created a function for joining of table for simplicity of further analysis """
def joined_tables (large_table, journal_table):
# Filtering out the header row in journal_table
    journal_table1 = spark.sql("""
        SELECT *
        FROM (
            SELECT *, row_number() OVER (ORDER BY NULL) as row_num FROM journal_table
        )
        WHERE row_num > 1
    """)

    # Filtering out null values in journal column and flatten large_table
    flattened_table = spark.sql("""
        SELECT *, journal.name AS journal_name FROM large_table
        WHERE journal IS NOT NULL AND journal.name IS NOT NULL
    """)

    flattened_table.createOrReplaceTempView("flattened_table")
    journal_table1.createOrReplaceTempView("journal_table1")

    # Joining flattened_table with journal_table1 based on 'Journal Name'
    joined_df = spark.sql("""
        SELECT s.*, j.* FROM flattened_table s JOIN journal_table1 j
        ON s.journal_name = j.`Journal Name`
    """)

    # Registering DataFrame as temporary SQL table
    joined_df.createOrReplaceTempView("joined_table")

joined_tables (large_table, journal_table)




In [0]:
"""5. Find the top 5 authors with the highest cummulative impact factor.
       Output both the author information and the cummulative impact factor."""

# Calculating cumulative impact factor for each author
author_impact_factor = spark.sql("""
    SELECT exploded_author.authorId AS authorId_exploded, 
            exploded_author.name AS author_name_exploded, 
            COALESCE(SUM(joined_table.IF), 0) AS cumulative_impact_factor
    FROM joined_table
    LATERAL VIEW explode(authors) exploded_table AS exploded_author
    WHERE joined_table.journal_name IS NOT NULL
    GROUP BY exploded_author.authorId, exploded_author.name
""")

author_impact_factor.createOrReplaceTempView("author_impact_factor")

# top 5 authors with the highest cumulative impact factor
top_authors_with_IF = spark.sql("""
    SELECT authorId_exploded AS authorId, 
            author_name_exploded AS author_name, 
            SUM(cumulative_impact_factor) AS cumulative_IF
    FROM author_impact_factor
    GROUP BY authorId_exploded, author_name_exploded
    ORDER BY cumulative_IF DESC
    LIMIT 5
""")

top_authors_with_IF.show()



+----------+------------+-------------+
|  authorId| author_name|cumulative_IF|
+----------+------------+-------------+
|2155504929|     Ying Li|       93.832|
| 144797099|    M. Viana|       92.238|
|   5152451|  L. Andrade|       92.238|
|  49900836|     H. Wood|       90.422|
|   7695437|A. M. Ruscio|       87.899|
+----------+------------+-------------+



In [0]:
"""6. You’d like some additional information about publication trends. 
        How many publications with impact factor > 1 were published in each of the years between 2010-2020?"""

# Filter publications with impact factor > 1 and publication year between 2010 and 2020
filtered_df = spark.sql("""
    SELECT *
    FROM joined_table
    WHERE CAST(IF AS FLOAT) > 1 AND year BETWEEN 2010 AND 2020
""")
# Register filtered_df as temporary SQL table
filtered_df.createOrReplaceTempView("filtered_table")

# Group by publication year and count the number of publications
publications_per_year = spark.sql("""
    SELECT year, COUNT(*) AS count
    FROM filtered_table
    GROUP BY year
    ORDER BY year
""")

publications_per_year.show()



+----+-----+
|year|count|
+----+-----+
|2010|  112|
|2011|  139|
|2012|  165|
|2013|  178|
|2014|  241|
|2015|  243|
|2016|  283|
|2017|  329|
|2018|  365|
|2019|  396|
|2020|  444|
+----+-----+

