In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from MAG import MicrosoftAcademicGraph
import os
from sparkhpc import sparkjob
import findspark
from matplotlib import pyplot as plt
import seaborn as sns
import pandas as pd

plt.style.use('ggplot')

# set environment variables
os.environ["SPARK_LOCAL_DIRS"] = "/home/laal/MAG/TMP"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.242.b08-0.el7_7.x86_64"
os.environ['SPARK_HOME'] = "/home/laal/MAG/spark-3.0.2-bin-hadoop2.7"

In [2]:
def get_cluster_client(jobid, memory_per_executor=16000):
    sj = sparkjob.sparkjob(jobid=jobid, memory_per_executor=memory_per_executor)
    config_options = {
    "spark.memory.offHeap.enabled": True,
    "spark.memory.offHeap.size":"2g",
    "spark.sql.adaptive.enabled": True,
    "spark.sql.adaptive.coalescePartitions.enabled": True,
    "spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly": False,
    "spark.shuffle.io.retryWait": "60s",
    "spark.reducer.maxReqsInFlight": 5,
    "spark.executor.memoryOverhead": "2gb",
    "spark.driver.memory": "20g",
    "spark.sql.shuffle.partitions": 300  
    }

    job = sj.start_spark(extra_conf = config_options)
    spark = SparkSession.builder.config(conf=job.getConf()).getOrCreate()
    return spark


def get_node_client(executor_memory="24g"):
    spark = SparkSession \
      .builder \
      .config("spark.executor.memory", executor_memory)\
      .config("spark.driver.memory", "2g")\
      .config("spark.executor.cores", 7)\
      .config("spark.memory.offHeap.enabled", True)\
      .config("spark.memory.offHeap.size","2g")\
      .config("spark.sql.adaptive.enabled", True)\
      .config("spark.sql.adaptive.coalescePartitions.enabled", True)\
      .config("spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly", False)\
      .appName("MAG app") \
      .getOrCreate()
    return spark

In [3]:
spark = get_cluster_client(39509, memory_per_executor=14000)
mag = MicrosoftAcademicGraph(spark=spark, data_folderpath="/home/laal/MAG/DATA/")

['NAME STATE JOBID', 'jupyter RUNNING 39434', 'main_job RUNNING 39530', 'sparkcluster RUNNING 39509', '2.train.15 RUNNING 39149', 'hebbian_weights_submit RUNNING 39475', 'hebbian_weights_submit RUNNING 39428', 'hebbian_weights_submit RUNNING 39407', '2.train.18 RUNNING 39152', '2.train.17 RUNNING 39151', 'train-network-incv RUNNING 39529', 'lil_bobby RUNNING 39525', 'train-network-baseline RUNNING 39489', 'jupyter RUNNING 39514', 'transDe.1 RUNNING 39470', 'transDe.0 RUNNING 39471', 'transDeLW.1 RUNNING 39469', 'transDeLW.0 RUNNING 39468', 'piano RUNNING 39518', 'tmp_ct RUNNING 39401', '']


In [19]:
def append_gender_and_macrank(mag, centrality_filename):
    
    # Assign network info on mag
    mag.streams[centrality_filename] = ('NETWORKS/{}.csv'.format(centrality_filename), 
                                     ['AuthorId:long', 'PageRank:float', 'PageRank05:float', 
                                      'InDegreeStrength:float', 'InDegree:float', 'OutDegreeStrength:float', 
                                      'OutDegree:float'])
    
    cent = mag.getDataframe(centrality_filename)
    wtm = mag.getDataframe('WosToMag')
    authors = mag.getDataframe('Authors')
    
    query = """
        SELECT c.*, 
        CASE WHEN wtm.Gender IN (0, 1) THEN wtm.Gender ELSE -1 END as Gender,
        a.Rank
        FROM {} c
        LEFT JOIN WosToMag wtm ON c.AuthorId = wtm.MAG 
        INNER JOIN Authors a ON c.AuthorId = a.AuthorId
    """.format(centrality_filename)
    
    centrality = mag.query_sql(query)
    centrality_df = centrality.toPandas()
    
    centrality_df.to_csv("/home/laal/MAG/DATA/NETWORKS/{}".format(centrality_filename + 'Gendered.csv'),
                         index=False, sep="\t")

In [20]:
append_gender_and_macrank(mag, 'SimpleWeightEconomics2010Centrality')

In [21]:
append_gender_and_macrank(mag, 'SimpleWeightMathematics2010Centrality')

In [22]:
append_gender_and_macrank(mag, 'SimpleWeightPsychology2010Centrality')

In [23]:
append_gender_and_macrank(mag, 'SimpleWeightChemistry2010Centrality')

In [1]:
".".join("/home/laal/MAG/DATA/NETWORKS/SimpleWeightPsychology2010Nodelist.txt".split(".")[:-1])

'/home/laal/MAG/DATA/NETWORKS/SimpleWeightPsychology2010Nodelist'