In [None]:
%config InlineBackend.figure_format = 'retina'
import findspark
findspark.init()

import ast
import pandas as pd
import numpy as np
import os
from pyspark import SparkContext,SparkConf
from pyspark.sql import SQLContext, Row, SparkSession
from pyspark.sql.window import Window
from pyspark.sql.types import *
import pyspark.sql.functions as F
import time
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.style as style
from slugify import slugify
import os
import yaml

In [None]:
sc = SparkSession.builder.config("spark.driver.memory", "4g").getOrCreate()
sqlContext = SQLContext(sc)

In [None]:
def get_country_name(data):
    countries = ['Afghanistan', 'Albania', 'Algeria', 'American Samoa', 'Andorra', 'Angola', 'Anguilla', 'Antarctica', 'Antigua And Barbuda', 'Argentina', 'Armenia', 'Aruba', 'Australia', 'Austria', 'Azerbaijan', 'Bahamas', 'Bahrain', 'Bangladesh', 'Barbados', 'Belarus', 'Belgium', 'Belize', 'Benin', 'Bermuda', 'Bhutan', 'Bolivia', 'Bosnia And Herzegovina', 'Botswana', 'Bouvet Island', 'Brazil', 'British Indian Ocean Territory', 'Brunei Darussalam', 'Bulgaria', 'Burkina Faso', 'Burundi', 'Cambodia', 'Cameroon', 'Canada', 'Cape Verde', 'Cayman Islands', 'Central African Republic', 'Chad', 'Chile', 'China', 'Christmas Island', 'Cocos (keeling) Islands', 'Colombia', 'Comoros', 'Congo', 'Congo, The Democratic Republic Of The', 'Cook Islands', 'Costa Rica', "Cote D'ivoire", 'Croatia', 'Cuba', 'Cyprus', 'Czech Republic', 'Denmark', 'Djibouti', 'Dominica', 'Dominican Republic', 'East Timor', 'Ecuador', 'Egypt', 'El Salvador', 'Equatorial Guinea', 'Eritrea', 'Estonia', 'Ethiopia', 'Falkland Islands (malvinas)', 'Faroe Islands', 'Fiji', 'Finland', 'France', 'French Guiana', 'French Polynesia', 'French Southern Territories', 'Gabon', 'Gambia', 'Georgia', 'Germany', 'Ghana', 'Gibraltar', 'Greece', 'Greenland', 'Grenada', 'Guadeloupe', 'Guam', 'Guatemala', 'Guinea', 'Guinea-bissau', 'Guyana', 'Haiti', 'Heard Island And Mcdonald Islands', 'Holy See (vatican City State)', 'Honduras', 'Hong Kong', 'Hungary', 'Iceland', 'India', 'Indonesia', 'Iran, Islamic Republic Of', 'Iraq', 'Ireland', 'Israel', 'Italy', 'Jamaica', 'Japan', 'Jordan', 'Kazakstan', 'Kenya', 'Kiribati', "Korea, Democratic People's Republic Of", 'Korea, Republic Of', 'Kosovo', 'Kuwait', 'Kyrgyzstan', "Lao People's Democratic Republic", 'Latvia', 'Lebanon', 'Lesotho', 'Liberia', 'Libyan Arab Jamahiriya', 'Liechtenstein', 'Lithuania', 'Luxembourg', 'Macau', 'Macedonia, The Former Yugoslav Republic Of', 'Madagascar', 'Malawi', 'Malaysia', 'Maldives', 'Mali', 'Malta', 'Marshall Islands', 'Martinique', 'Mauritania', 'Mauritius', 'Mayotte', 'Mexico', 'Micronesia, Federated States Of', 'Moldova, Republic Of', 'Monaco', 'Mongolia', 'Montserrat', 'Montenegro', 'Morocco', 'Mozambique', 'Myanmar', 'Namibia', 'Nauru', 'Nepal', 'Netherlands', 'Netherlands Antilles', 'New Caledonia', 'New Zealand', 'Nicaragua', 'Niger', 'Nigeria', 'Niue', 'Norfolk Island', 'Northern Mariana Islands', 'Norway', 'Oman', 'Pakistan', 'Palau', 'Palestinian Territory, Occupied', 'Panama', 'Papua New Guinea', 'Paraguay', 'Peru', 'Philippines', 'Pitcairn', 'Poland', 'Portugal', 'Puerto Rico', 'Qatar', 'Reunion', 'Romania', 'Russian Federation', 'Rwanda', 'Saint Helena', 'Saint Kitts And Nevis', 'Saint Lucia', 'Saint Pierre And Miquelon', 'Saint Vincent And The Grenadines', 'Samoa', 'San Marino', 'Sao Tome And Principe', 'Saudi Arabia', 'Senegal', 'Serbia', 'Seychelles', 'Sierra Leone', 'Singapore', 'Slovakia', 'Slovenia', 'Solomon Islands', 'Somalia', 'South Africa', 'South Georgia And The South Sandwich Islands', 'Spain', 'Sri Lanka', 'Sudan', 'Suriname', 'Svalbard And Jan Mayen', 'Swaziland', 'Sweden', 'Switzerland', 'Syrian Arab Republic', 'Taiwan, Province Of China', 'Tajikistan', 'Tanzania, United Republic Of', 'Thailand', 'Togo', 'Tokelau', 'Tonga', 'Trinidad And Tobago', 'Tunisia', 'Turkey', 'Turkmenistan', 'Turks And Caicos Islands', 'Tuvalu', 'Uganda', 'Ukraine', 'United Arab Emirates', 'United Kingdom', 'United States', 'United States Minor Outlying Islands', 'Uruguay', 'Uzbekistan', 'Vanuatu', 'Venezuela', 'Viet Nam', 'Virgin Islands, British', 'Virgin Islands, U.s.', 'Wallis And Futuna', 'Western Sahara', 'Yemen', 'Zambia', 'Zimbabwe', 'Moldova', 'South Korea']
    co = data or ''
    
    computed = ''
    for name in co.split(' '):
        computed = ("{} {}".format(computed, name)).strip()
        if computed in countries:
            return computed
        else:
            continue

author_country = F.udf(lambda x: get_country_name(x), StringType())

authors = sqlContext.read.parquet('./data/authors')
authors = authors.withColumn('country', author_country('country'))

collaborations = sqlContext.read.parquet('./data/collaborations')
collaborations = collaborations.withColumn('published', F.to_date(F.col('published')))
collaborations = collaborations.withColumn('year', F.year(F.col('published')))
collaborations = collaborations.withColumnRenamed('author_id', 'auth_id')
collaborations = collaborations.withColumn('auth_id', collaborations.auth_id.cast('bigint'))
collaborations = collaborations.filter(F.col('year') > 2006)
ego_alters = sqlContext.read.parquet('./data/ego_alters')
# Exclude articles which have errors
collaborations = collaborations.filter(~(collaborations.abs_id.isin([85032509284,80053369953])))
# Compute authors citations based on downloaded papers
# Because not all articles for authors were scraped we need to base our analysis on grounded data, not missing
authors = authors.join(collaborations, collaborations.auth_id == authors.id, 'INNER') \
        .groupby([authors.id]) \
        .agg( \
            F.countDistinct('abs_id').alias('PAPERS'), \
            F.first('cited_by_count').alias('CITATIONS_SCOPUS'), \
            F.sumDistinct('cited_by').alias('CITATIONS'), \
            F.first('cat').alias('cat'), \
            F.first('university').alias('university'), \
            F.first('city').alias('city'), \
            F.first('country').alias('country'))

Count for each author in database the number of papers where he participated and gather first 1200 most productive from EU.

In [None]:
countries = ['Austria', 'Belgium', 'Bulgaria', 'Croatia', 'Cyprus', 'Czech Republic', 
             'Denmark', 'Estonia', 'Finland', 'France', 'Germany', 'Greece', 'Hungary', 
             'Ireland', 'Italy', 'Latvia', 'Lithuania', 'Luxembourg', 'Malta', 'Netherlands',
             'Poland', 'Portugal', 'Romania', 'Slovakia', 'Slovenia', 'Spain', 'Sweden', 'United Kingdom']

cat_name = 'MEDI'

most_productive = authors.orderBy(F.col('PAPERS').desc()) \
    .filter(F.col('country').isin(countries)) \
    .filter(F.col('cat').like('["{}"]'.format(cat_name))) \
    .limit(1200) \
    .select( \
        authors.cat.alias('cat'), \
        authors.id.alias('EGO_ID'), \
        authors.country.alias('EGO_COUNTRY'), \
        authors.city.alias('EGO_CITY'), \
        authors.university.alias('EGO_UNIVERSITY'), \
        authors['PAPERS'].alias('EGO_PAPERS'), \
        authors['CITATIONS'].alias('EGO_CITATIONS'))

Get papers of most productive authors from selected domain

In [None]:
most_productive_papers = most_productive \
    .join(collaborations, F.col('EGO_ID') == collaborations.auth_id, 'inner') \
    .select( \
        'EGO_ID', 'EGO_COUNTRY', 'EGO_PAPERS', 'EGO_CITATIONS', \
        F.col('abs_id').alias('paper_id'), 'year'
    )

Gather all authors of these papers, meaning coauthors of each ego. 

In [None]:
most_productive_coauthors = most_productive_papers \
    .filter(F.col('year') >= 2007) \
    .join(collaborations, most_productive_papers.paper_id == collaborations.abs_id, 'INNER') \
    .select( \
        'EGO_ID', \
        F.col('auth_id').alias('source_id'), \
        F.col('abs_id').alias('paper_id2')
    )

#most_productive_coauthors = most_productive_coauthors.filter(F.col('EGO_ID') != F.col('source_id'))

Extract a list of all unique authors. We need this for further joining. Because we only want to get connections between egos coauthors, the list should not contain co-authors of co-authors.

Collect connections between these authors, filter out these which we not interesed about, and duplicated connections

In [None]:
most_productive_conns = most_productive_coauthors \
    .join(collaborations, most_productive_coauthors.paper_id2 == collaborations.abs_id, 'INNER') \
    .select('source_id', F.col('auth_id').alias('target_id'), F.col('abs_id').alias('paper_id3'), 'year')

most_productive_conns = most_productive_conns \
    .filter(F.col('source_id') != F.col('target_id'))

most_productive_conns = most_productive_conns.select( \
        F.least(F.col('source_id'), F.col('target_id')).alias('source_id'), \
        F.greatest(F.col('source_id'), F.col('target_id')).alias('target_id'), 'paper_id3', 'year')

most_productive_conns = most_productive_conns.dropDuplicates()

Save checkpoint with all connection betweeen coauthors from selected category

In [None]:
most_productive_conns = most_productive_conns.join(authors, most_productive_conns.source_id == authors.id, 'LEFT') \
    .select('source_id', F.col('country').alias('source_country'), \
            'target_id', 'paper_id3', 'year')

most_productive_conns = most_productive_conns.join(authors, most_productive_conns.target_id == authors.id, 'LEFT') \
    .select('source_id', 'source_country', \
            'target_id', F.col('country').alias('target_country'), \
            'paper_id3', 'year')


# Filter out authors without country
most_productive_conns = most_productive_conns.filter(F.col('source_country').isNotNull())
most_productive_conns = most_productive_conns.filter(F.col('target_country').isNotNull())


most_productive_conns.write.parquet('./data/{}_authors_network'.format(cat_name))

Get data about these authors and build final aggregation

In [None]:
most_productive_conns = sqlContext.read.parquet('./data/{}_authors_network'.format(cat_name))

country_network = most_productive_conns \
    .groupby(['source_country', 'target_country', 'year']) \
    .agg( \
        F.collect_set('source_id').alias('source_uniq'), \
        F.collect_set('target_id').alias('target_uniq'), \
        F.count('paper_id3').alias('edges') \
    )

country_network = country_network.withColumn('sourceid', F.regexp_replace(F.concat('source_country', 'year'), r'[\s_-]+', '-'))
country_network = country_network.withColumn('targetid', F.regexp_replace(F.concat('target_country', 'year'), r'[\s_-]+', '-'))
country_network = country_network.withColumn('uniq_country_id', F.sort_array(F.array('source_country', 'target_country', 'year')))

country_conns = country_network.select('uniq_country_id', 'sourceid', 'targetid', 'source_country', 'target_country', 'year', 'edges')
country_conns = country_conns.groupby('uniq_country_id') \
    .agg( \
        F.first('sourceid').alias('sourceid'), \
        F.first('targetid').alias('targetid'), \
        F.first('source_country').alias('source_country'), \
        F.first('target_country').alias('target_country'), \
        F.first('year').alias('year'), \
        F.sum('edges').alias('edges') \
    )

country_conns = country_conns.select('sourceid', 'targetid', 'source_country', 'target_country', 'year', 'edges')

# Country of interest (only these will be saved)
europe_countries = ['Albania','Andorra','Armenia','Austria','Azerbaijan','Belarus','Belgium','Bosnia and Herzegovina','Bulgaria','Croatia','Cyprus','Czechia','Denmark','Estonia','Finland','France','Georgia','Germany','Greece','Hungary','Iceland','Ireland','Italy','Kazakhstan','Kosovo','Latvia','Liechtenstein','Lithuania','Luxembourg','Malta','Moldova','Monaco','Montenegro','Netherlands','Macedonia','Norway','Poland','Portugal','Romania','Russia','San Marino','Serbia','Slovakia','Slovenia','Spain','Sweden','Switzerland','Turkey','Ukraine','United Kingdom','Vatican City']
country_conns = country_conns.filter(F.col('source_country').isin(europe_countries))
country_conns = country_conns.filter(F.col('target_country').isin(europe_countries))

country_conns.write.csv("./data/country_networks/{}_conns".format(cat_name), compression=None, nullValue='N/A')

Collect unique nodes from edges, collect papers and citations data about them, exploding the unique array, then export it again

In [None]:
# Unique authors for each country from source column
country_source_nodes = country_network.groupby(['source_country', 'year']) \
    .agg( \
        F.first('sourceid').alias('sourceid'), \
        F.collect_list('source_uniq').alias('source_authors'), \
    )
country_source_nodes = country_source_nodes.withColumn('source_authors', F.array_distinct(F.flatten('source_authors')))

# Unique authors for each country from target column
country_target_nodes = country_network.groupby(['target_country', 'year']) \
    .agg( \
        F.first('targetid').alias('targetid'), \
        F.collect_list('target_uniq').alias('target_authors'), \
    )
country_target_nodes = country_target_nodes.withColumn('target_authors', F.array_distinct(F.flatten('target_authors')))


country_nodes = country_source_nodes.join(country_target_nodes.alias('t'), country_source_nodes.sourceid == country_target_nodes.targetid, 'LEFT') \
    .select(F.col('sourceid').alias('nodeid'), F.col('source_country').alias('country'), 't.year', \
            F.array_distinct(F.flatten(F.array('source_authors', 'target_authors'))).alias('uniq_authors'))
    

country_nodes = country_nodes.select('*', F.explode('uniq_authors').alias('auth_id'))
country_nodes = country_nodes.join(authors.alias('ad'), country_nodes.auth_id == authors.id, 'LEFT') \
    .select('nodeid', 'ad.country', 'year', 'auth_id', 'ad.PAPERS', 'ad.CITATIONS')

country_nodes = country_nodes.groupby(['nodeid']) \
    .agg( \
        F.first('country').alias('country'), \
        F.first('year').alias('year'), \
        F.count('auth_id').alias('nodes'), \
        F.sum('PAPERS').alias('cumulated_papers'), \
        F.sum('CITATIONS').alias('cumulated_citations') \
    )

country_nodes = country_nodes.join(country_network.alias('network'), (country_nodes.nodeid == country_network.sourceid) & (country_nodes.nodeid == country_network.targetid)) \
    .select('nodeid', 'country', 'network.year', 'nodes', 'cumulated_papers', 'cumulated_citations', 'network.edges')

# Country of interest (only these will be saved)
europe_countries = ['Albania','Andorra','Armenia','Austria','Azerbaijan','Belarus','Belgium','Bosnia and Herzegovina','Bulgaria','Croatia','Cyprus','Czechia','Denmark','Estonia','Finland','France','Georgia','Germany','Greece','Hungary','Iceland','Ireland','Italy','Kazakhstan','Kosovo','Latvia','Liechtenstein','Lithuania','Luxembourg','Malta','Moldova','Monaco','Montenegro','Netherlands','Macedonia','Norway','Poland','Portugal','Romania','Russia','San Marino','Serbia','Slovakia','Slovenia','Spain','Sweden','Switzerland','Turkey','Ukraine','United Kingdom','Vatican City']
country_nodes = country_nodes.filter(F.col('country').isin(europe_countries))

country_nodes.write.csv("./data/country_networks/{}_nodes".format(cat_name), compression=None, nullValue='N/A')

down here things should be rewrited and refactored

old code which could be simplified

In [None]:
def country_network(country):
    i_edges = sqlContext.read.parquet(os.path.abspath('./data/per_country/{}'.format(slugify(country))))
    # Select only edges between networks of most cited authors in country 
    nodes = set(i_edges.select(F.collect_set("source").alias("source")).first()["source"])
    data = i_edges.filter(i_edges.target.isin(nodes))
    
    authors_details = authors.filter(authors.id.isin(nodes)).select(authors.id, authors.country, authors.cited_by_count, authors.agg_citations)
    edges = data.groupby([i_edges.source,i_edges.target]).agg(F.sum('weight').alias('weight'), F.first('type').alias('type'))
    
    def relation_type(source, target, egos):
        etype = 'missing'
        source_is_ego = str(source) in egos
        target_is_ego = str(target) in egos
        # domestic types
        if source_is_ego and target_is_ego:
            etype = 'ego-ego'
        
        if source_is_ego and not target_is_ego:
            etype = 'ego-coauthor'
            
        if not source_is_ego and target_is_ego:
            etype = 'coauthor-ego'
        
        if not source_is_ego and not target_is_ego:
            etype = 'coauthor-coauthor'
            
        return etype
    
    def apply_relation(egos):
        return F.udf(lambda s,t: relation_type(s,t,egos), StringType())

    edges = edges.withColumn('relation', apply_relation(to_build['id'].values)(F.col('source'), F.col('target')))
    
    
    authpan = authors_details.toPandas()
    authpan['ego'] = authpan['id'].map(lambda x: 'Yes' if str(x) in to_build['id'].values else 'No')
    authpan['type'] = authpan['country'].map(lambda x: 'Intern' if x == country else 'Extern')
    authpan['cited_by_count'] = authpan['cited_by_count'].fillna(authpan.agg_citations).astype(int)
    authpan.drop(['agg_citations'], axis=1, inplace=True)
    
    nodes_path = './data/country_networks/{}_nodes.csv'.format(slugify(country))
    edges_path = './data/country_networks/{}_edges.csv'.format(slugify(country))
    
    authpan.to_csv(nodes_path, index=False)
    edges.toPandas().to_csv(edges_path,index=False)
    #return authpan, edges


def country_network_authors(country):
    i_edges = sqlContext.read.parquet(os.path.abspath('./data/per_country/{}'.format(slugify(country))))
    # Select only edges between networks of most cited authors in country 
    nodes = set(i_edges.select(F.collect_set("source").alias("source")).first()["source"])
    data = i_edges.filter(i_edges.target.isin(nodes))
    
    authors_details = authors.filter(authors.id.isin(nodes)).select(authors.id, authors.country, authors.cited_by_count, authors.agg_citations)
    authpan = authors_details.toPandas()
    authpan['type'] = authpan['country'].map(lambda x: 'Intern' if x == country else 'Extern')
    authpan['cited_by_count'] = authpan['cited_by_count'].fillna(authpan.agg_citations).astype(int)
    
    return authpan

def country_network_edges(country):
    i_edges = sqlContext.read.parquet(os.path.abspath('./data/per_country/{}'.format(slugify(country))))
    # Select only edges between networks of most cited authors in country 
    nodes = set(i_edges.select(F.collect_set("source").alias("source")).first()["source"])
    data = i_edges.filter(i_edges.target.isin(nodes))
    edges = data.groupby([i_edges.source,i_edges.target]).agg(F.sum('weight').alias('weight'), F.first('type').alias('type'))
    
    return edges.groupby(edges.type).agg(F.count('type').alias('count'))
    
    
def country_per_year_articles(country):
    i_edges = sqlContext.read.parquet(os.path.abspath('./data/per_country/{}'.format(slugify(country))))
    # Select only edges between networks of most cited authors in country 
    nodes = set(i_edges.select(F.collect_set("source").alias("source")).first()["source"])
    data = i_edges.filter(i_edges.target.isin(nodes))
    a_details = authors.filter(authors.id.isin(nodes)).select(authors.id, authors.country)
    author_articles = a_details.join(collaborations, a_details.id == collaborations.auth_id, 'inner') \
        .groupby([a_details.id, collaborations.year]).agg(F.count('abs_id').alias('count'))
    
    return author_articles
    
    

In [None]:
# Build csv files
#country_network('Poland')
for country in countries:
    country_network(country)

In [None]:
country_edges = pd.DataFrame(columns=['Country'])

for cny in countries:
    data = country_network_edges(cny)
    data = data.toPandas().set_index('type').T.to_dict('list')
    
    country_edges = country_edges.append({
        'Country': cny,
        'domestic': data['domestic'][0],
        'nondomestic': data['nondomestic'][0],
        'intradomestic': data['intradomestic'][0],
    },ignore_index=True)
    
country_edges

In [None]:
country_data = pd.DataFrame(columns=['cited_by_count', 'country'])

for cty in countries:
    nodes = country_network_authors(cty)
    nodes['country'] = cty
    country_data = country_data.append(nodes[['cited_by_count', 'country']])

country_data.rename({'cited_by_count': 'Citations'}, inPlace=True)

In [None]:
filtered = country_data[country_data.cited_by_count < 100000]
pal = sns.cubehelix_palette(1, rot=1, light=.1)
g = sns.FacetGrid(filtered, row="country", hue="country", aspect=15, height=1, palette=pal)

# Draw the densities in a few steps
g.map(sns.kdeplot, "cited_by_count", clip_on=False, shade=True, alpha=1, lw=1.5, bw=.2)
g.map(sns.kdeplot, "cited_by_count", clip_on=False, color="w", lw=2, bw=.2)
g.map(plt.axhline, y=0, lw=2, clip_on=False)

# Define and use a simple function to label the plot in axes coordinates
def label(x, color, label):
    ax = plt.gca()
    ax.margins(x=0, y=0) 
    ax.text(-0.02, .2, label, fontweight="bold", color=color,
            ha="right", va="center", transform=ax.transAxes)
    
g.map(label, "cited_by_count")

# Set the subplots to overlap
g.fig.subplots_adjust(hspace=-.20)

# Remove axes details that don't play well with overlap
g.set_titles("")
g.set_xlabels('Citations')
g.set(yticks=[])
g.despine(bottom=True, left=True)
g.savefig("./phys_citations_densities.png", format='png', dpi=300)

In [None]:
country_edges.to_csv('./phys_country_connections_type.csv',index=False)

In [None]:
country_per_year = pd.DataFrame(columns=['country', 'id', 'year', 'count'])

for cty in countries:
    nodes = country_per_year_articles(cty)
    nodes = nodes.toPandas()
    nodes['country'] = cty
    country_per_year = country_per_year.append(nodes[['country', 'id', 'year', 'count']])


In [None]:
# country_per_year['year'] = country_per_year['year'].fillna(0).astype(int)
# country_per_year['count'] = country_per_year['count'].astype(int)

country_per_year = pd.read_csv('./phys_country_articles.csv')
country_per_year['year'] = country_per_year['year'].fillna(0).astype(int)
country_per_year['count'] = country_per_year['count'].astype(int)

In [None]:

# to_save = country_per_year[country_per_year['year'] > 2006]
# to_save = to_save[to_save['year'] < 2018]


# to_save['year'] = to_save['year'].astype(str)
# to_save['year'] = to_save['year'].str.replace('.0', '')


# to_show = country_per_year[country_per_year.country == 'Polonia']

plt.style.use('ggplot')
sns.set(rc={
    'figure.figsize':(16,20),
    'font.size':20,
    'axes.titlesize':20,
    'axes.labelsize':20,
})


# to_show = to_show.groupby('year').agg({ 'count': 'sum' })


sns.boxplot(x="count", y="country", data=country_per_year)