# 13. Field of Study Top Entities   

In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import *
from CreateFunctions_mag import *
from datetime import date
import pyspark.sql.functions as sf

In [None]:
rootpath = 'wasbs://mag-2018-09-27@magtrainingsource.blob.core.windows.net/mag/'
outputDir = '/output/jiaxin/pyspark/'
conferenceShortName = 'WWW'
conferenceAnalyticsBaseDir = '/output/conferenceAnalytics/'
# fieldOfStudyLevelToInclude SQL.ARRAY<int> = new SQL.ARRAY<int>{0, 1}

In [None]:
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

In [None]:
# Get all papers and transform PaperRank to probability that can be summed. 
Papers = Papers(rootpath, spark)

papers = Papers.selectExpr("PaperId", "JournalId", "ConferenceSeriesId", "Rank * -0.001 as RankScore", "EstimatedCitationCount", "Year") \
.agg(sf.exp("RankScore"))


# Select target fields of study for ranking by field level
FieldsOfStudy = FieldsOfStudy(rootpath, spark)

fosIds = FieldsOfStudy.where("Level" ) \
.select("FieldOfStudyId")
# @fosIds =
#     SELECT FieldOfStudyId
#     FROM FieldsOfStudy
#     WHERE FieldsOfStudy.Level IN @fieldOfStudyLevelToInclude;


# Get all [Paper] -> [Field of Study] relationships along with paper/field details for later calculation
PaperFieldsOfStudy = PaperFieldsOfStudy(rootpath, spark)

paperFos = PaperFieldsOfStudy.join(fosIds, "FieldOfStudyId", 'inner') \
.join(papers, "PaperId", 'inner') \
.select("PaperId", "FieldOfStudyId", "JournalId", "ConferenceSeriesId", "RankScore", "EstimatedCitation", "Year")


# Get all [Journal] -> [Paper] -> [Field of Study] relationships
paperJournalFos = paperFos.where(sf.col("JournalId").isnotnull()) \
.selectExpr("PaperId", "FieldOfStudyId", "JournalId as EntityId", "RankScore", "EstimatedCitation", "Year")


# Get all [Conference] -> [Paper] -> [Field of Study] relationships
paperConferenceFos = paperFos.where(sf.col("ConferenceSeriesId").isnotnull()) \
.selectExpr("PaperId", "FieldOfStudyId", "ConferenceSeriesId as EntityId", "RankScore", "EstimatedCitation", "Year")


# Get all [Paper] -> [Author] -> [Affiliation] relationships
PaperAuthorAffiliations = PaperAuthorAffiliations(rootpath, spark)

paperAuthorAffiliationFos = paperFos.join(PaperAuthorAffiliations, "PaperId", 'inner') \
.select("PaperId", "FieldOfStudyId", "AuthorId", "AffiliationId", "RankScore", "EstimatedCitation", "Year")


# Get all [Paper] -> [Author] relationships
paperAuthorFos = paperAuthorAffiliationFos.selectExpr("PaperId", "FieldOfStudyId", "AuthorId as EntityId", "RankScore", 
                                                      "EstimatedCitation", "Year")

# Get all [Paper] -> [Affiliation] relationships
paperAffiliationFos = paperAuthorAffiliationFos.where(sf.col("AffiliationId").isnotnull()) \
.selectExpr("PaperId", "FieldOfStudyId", "AffiliationId as EntityId", "RankScore", "EstimatedCitation", "Year")


# Union all the relationships we got above to form [Paper] -> [Entity] -> [Field of Study]
paperEntityFos = paperAffiliationFos.union(paperAuthorFos).union(paperConferenceFos).union(paperJournalFos)


# Calcuate All time Rank by Estimated Citations for HIndex calcuation in later step
paperEntityFosAll = paperEntityFos.select()
# @paperEntityFosAll =
#     SELECT @paperEntityFos. *,
#            ROW_NUMBER() OVER(PARTITION BY @paperEntityFos.FieldOfStudyId, @paperEntityFos.EntityId ORDER BY @paperEntityFos.EstimatedCitation DESC) AS CitationRank,
#            "All" AS TimeRange
#     FROM @paperEntityFos;


# Calcuate 10yr Rank by Estimated Citations for HIndex calcuation in later step
paperEntityFos10yr = paperEntityFos.where(sf.col("Year") >= (thisYear - 10)) \
.select()
# @paperEntityFos10yr =
#     SELECT @paperEntityFos. *,
#            ROW_NUMBER() OVER(PARTITION BY @paperEntityFos.FieldOfStudyId, @paperEntityFos.EntityId ORDER BY @paperEntityFos.EstimatedCitation DESC) AS CitationRank,
#            "10yr" AS TimeRange
#     FROM @paperEntityFos
#     WHERE @paperEntityFos.Year >= @thisYear - 10;


#//Calcuate 5yr Rank by Estimated Citations for HIndex calcuation in later step
paperEntityFos5yr = paperEntityFos.where(sf.col("Year") >= (thisYear - 5)) \
.select()
# @paperEntityFos5yr =
#     SELECT @paperEntityFos. *,
#            ROW_NUMBER() OVER(PARTITION BY @paperEntityFos.FieldOfStudyId, @paperEntityFos.EntityId ORDER BY @paperEntityFos.EstimatedCitation DESC) AS CitationRank,
#            "5yr" AS TimeRange
#     FROM @paperEntityFos
#     WHERE @paperEntityFos.Year >= @thisYear - 5;


# Union all time range result into a single table
paperEntityFosTimeRange = paperEntityFos5yr.union(paperEntityFos10yr) \
.union(paperEntityFosAll)


# Calculate Total RankScore, EstimatedCitation and HIndex for each [Field Of Study] -> [Entity] -> [TimeRange]
entityFosStats = paperEntityFosTimeRange.groupby("FieldOfStudyId", "EntityId", "TimeRange") \
.agg(sf.col("EntityType").isnotnull().alias("EntityType"), sf.sum(sf.col("RankScore")).alias("RankScore"), 
     sf.sum(sf.col("EstimatedCitation")).alias("EstimatedCitation"), sf.max(sf.col("EstimatedCitation") >= sf.col("CitationRank")).alias("HIndex"))


# Rank each [Field Of Study] -> [Entity] -> [TimeRange] by Total RankScore, EstimatedCitation and HIndex
entityFosStats = entityFosStats.select("*", sf.rank().over(Window.orderBy("AuthorProbRank")).alias("AuthorRank"), 
                                        sf.rank().over(Window.orderBy(sf.col("AuthorProbRank") / sf.col("PublicationCount"))).alias("AuthorNormalizedRank"))
# @entityFosStats =
#     SELECT @entityFosStats. *,
#            RANK() OVER(PARTITION BY @entityFosStats.FieldOfStudyId, @entityFosStats.EntityType, @entityFosStats.TimeRange ORDER BY RankScore DESC) AS RankScore_Rank,
#            RANK() OVER(PARTITION BY @entityFosStats.FieldOfStudyId, @entityFosStats.EntityType, @entityFosStats.TimeRange ORDER BY EstimatedCitation DESC) AS EstimatedCitation_Rank,
#            RANK() OVER(PARTITION BY @entityFosStats.FieldOfStudyId, @entityFosStats.EntityType, @entityFosStats.TimeRange ORDER BY HIndex DESC) AS HIndex_Rank
#     FROM @entityFosStats;


# Keep only top k entities for each field of study. 
entityFosStats = entityFosStats.where((sf.col("RankScore_Rank") <= topKEntities) | 
                                      (sf.col("EstimatedCitation_Rank") <= topKEntities) | 
                                      (sf.col("HIndex_Rank") <= topKEntities)) \
.agg(sf.log(sf.col("RankScore") * -1000).alias("Rank")) \
.select("FieldOfStudyId", "EntityId", "EntityType", "TimeRange", "Rank", "EstimatedCitation", "HIndex")

entityFosStats.write.csv(outputDir + "entityFosStats.csv", mode='overwrite', header='true')

In [None]:
# output all [FieldOfStudyId, Name] for easier lookup in visuals
fieldOfStudyDetails = FieldsOfStudy.selectExpr("FieldOfStudyId", "NormalizedName as Name")

fieldOfStudyDetails.write.csv(outputDir + "fieldOfStudyDetails.csv", mode='overwrite', header='true')

In [None]:
# Get all [EntityId, Name] for easier lookup in visuals

@entityDetails =
    SELECT Journals.JournalId AS EntityId,
           Journals.DisplayName AS Name
    FROM Journals
        UNION ALL
    SELECT ConferenceSeries.ConferenceSeriesId AS EntityId,
           ConferenceSeries.NormalizedName AS Name
    FROM ConferenceSeries
        UNION ALL 
    SELECT Affiliations.AffiliationId AS EntityId, 
           Affiliations.DisplayName AS Name
    FROM Affiliations 
        UNION ALL 
    SELECT Authors.AuthorId AS EntityId, 
           Authors.DisplayName AS Name 
    FROM Authors;


# Get all distinct EntityIds
distinctEntityIds = entityFosStats.select("EntityId").distinct()


# Only output [EntityId, Name] for top K entities
entityDetails = entityDetails.join(distinctEntityIds, "EntityId", 'inner') \
.select("EntityId", "Name")

entityDetails.write.csv(outputDir + "entityDetails.csv", mode='overwrite', header='true')

In [None]:
# output all [FieldOfStudyId, Name] for easier lookup in visuals
fieldsOfStudy = FieldsOfStudy.selectExpr("FieldOfStudyId", "NormalizedName as Name")

fieldsOfStudy.write.csv(outputDir + "fieldsOfStudy.csv", mode='overwrite', header='true')

In [None]:
sc.stop()