In [1]:
%run header_data_treatment.ipynb

In [2]:
works_au_af = (spark.read.format('parquet').load('file:\\' + openalex_path + 'works_au_af.parquet')
               .filter(func.col('primary_topic').isNotNull())
              )

In [3]:
df_au_scanR = spark.read.format('parquet').load('file:\\' + scanR_path + 'authors.parquet')

In [3]:
set_french_authors = (works_au_af
                      .filter(func.col('publication_year').between(2000,2020))
                      .filter(func.col('country')=='FR')
                      .select('author_id').distinct()
                     )
#set_french_authors.count()

In [6]:
au_fields = (set_french_authors
                      .join(works_au_af, on = ['author_id'])
                      .filter( (func.col('publication_year')>=1900) )
             .groupBy('author_id', func.col('primary_topic.field.display_name').alias('field'))
             .agg(func.countDistinct(func.col('work_id')).alias('n_pub_field_au'))
             .withColumn('max_field', func.max(func.col('n_pub_field_au')).over(Window.partitionBy('author_id')))
             .filter(0.90*func.col('max_field') <= func.col('n_pub_field_au'))
             .withColumn('field', func.substring(func.lower(func.col('field')), 1, 4))
             .withColumn('main_field', func.concat_ws(',', func.array_sort(func.collect_set(func.col('field')).over(Window.partitionBy('author_id')))))
             .groupBy('author_id','main_field')
             .pivot('field')
             .agg(func.lit(1))
             .fillna(0)
            )

In [7]:
au_fields.printSchema()

root
 |-- author_id: string (nullable = true)
 |-- main_field: string (nullable = false)
 |-- agri: integer (nullable = true)
 |-- arts: integer (nullable = true)
 |-- bioc: integer (nullable = true)
 |-- busi: integer (nullable = true)
 |-- chem: integer (nullable = true)
 |-- comp: integer (nullable = true)
 |-- deci: integer (nullable = true)
 |-- dent: integer (nullable = true)
 |-- eart: integer (nullable = true)
 |-- econ: integer (nullable = true)
 |-- ener: integer (nullable = true)
 |-- engi: integer (nullable = true)
 |-- envi: integer (nullable = true)
 |-- heal: integer (nullable = true)
 |-- immu: integer (nullable = true)
 |-- mate: integer (nullable = true)
 |-- math: integer (nullable = true)
 |-- medi: integer (nullable = true)
 |-- neur: integer (nullable = true)
 |-- nurs: integer (nullable = true)
 |-- phar: integer (nullable = true)
 |-- phys: integer (nullable = true)
 |-- psyc: integer (nullable = true)
 |-- soci: integer (nullable = true)
 |-- vete: integer (nul

In [8]:
au_level_char = (set_french_authors
                      .join(works_au_af, on = ['author_id'])
                      .filter( (func.col('publication_year')>=1900) )
              .groupBy('author_id','publication_year').agg(func.max((func.col('country')=='FR').cast('int')).alias('in_FR'),
                                                          func.max((func.col('country')!='FR').cast('int')).alias('abroad'))
              .groupBy('author_id').agg(func.min('publication_year').alias('entry_year'),
                                        func.max('publication_year').alias('last_year'),
                                        func.sum('in_FR').alias('all_y_in_FR'), func.sum('abroad').alias('years_abroad'))
                .filter(func.col('entry_year')!=func.col('last_year'))
                 .join(au_fields, on  = ['author_id'], how = 'left')
                )
au_level_char.cache()
au_level_char.show()

+-----------+----------+---------+-----------+------------+----------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|  author_id|entry_year|last_year|all_y_in_FR|years_abroad|main_field|agri|arts|bioc|busi|chem|comp|deci|dent|eart|econ|ener|engi|envi|heal|immu|mate|math|medi|neur|nurs|phar|phys|psyc|soci|vete|
+-----------+----------+---------+-----------+------------+----------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|A5000016899|      1958|     2013|         10|           0|      phys|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   1|   0|   0|   0|
|A5000139008|      2003|     2024|          1|           9|      medi|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   1|   0|   0|   0|   0|   0|   0|   0|
|A5000139201|      1

In [9]:
#au_level_char.count()

In [10]:
au_level_char = au_level_char.filter( (func.col('entry_year')<2020) & (func.col('entry_year')>1954))

In [11]:
#au_level_char.groupBy((func.floor(func.col('entry_year')/5)*5).alias('entry_year')).count().sort('entry_year').show(100)

In [12]:
au_level_char.filter(func.col('author_id')=="A5084984675").show()

+-----------+----------+---------+-----------+------------+----------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|  author_id|entry_year|last_year|all_y_in_FR|years_abroad|main_field|agri|arts|bioc|busi|chem|comp|deci|dent|eart|econ|ener|engi|envi|heal|immu|mate|math|medi|neur|nurs|phar|phys|psyc|soci|vete|
+-----------+----------+---------+-----------+------------+----------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|A5084984675|      1985|     2024|         32|          37|      econ|   0|   0|   0|   0|   0|   0|   0|   0|   0|   1|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|
+-----------+----------+---------+-----------+------------+----------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+



In [13]:
(au_level_char.groupBy((func.floor(func.col('entry_year')/5)*5)
                       .alias('entry_year'))
.agg(*[func.mean(col).alias(col) for col in ['last_year',"all_y_in_FR", "years_abroad"]]).sort('entry_year')).show(100)

+----------+------------------+------------------+------------------+
|entry_year|         last_year|       all_y_in_FR|      years_abroad|
+----------+------------------+------------------+------------------+
|      1955|2020.9420974615527| 7.518899388549194|15.921252547711692|
|      1960|2020.5591951333645|7.6612072999532055|16.796911558259243|
|      1965|2020.0168816969167| 8.223791821561338|15.552460091843429|
|      1970| 2019.605287089592| 8.753582740129826|13.778262096647747|
|      1975| 2019.767631447758| 7.916248790358054|13.272601976481628|
|      1980|2020.2382102006375| 7.533213013313332|12.451551659478717|
|      1985|2020.3056205059686| 7.161547792760325|10.982437017633305|
|      1990|2019.9456518778768|6.5820736988316035|  9.25651605523327|
|      1995|2018.5470787111549|5.4757666425431255| 7.192473455103209|
|      2000|2017.9909198641983| 4.309213054897188| 5.344917045672923|
|      2005|2018.6420646952972|3.6978548252263486| 3.709728381767431|
|      2010| 2019.82

In [14]:
(au_level_char
 .groupBy(((func.floor(func.col('all_y_in_FR')/(func.col('last_year')-func.col('entry_year')+1)*50)*2).alias('share_y_in_fr')))
 .count()
 .sort('share_y_in_fr')).show(100)

+-------------+-----+
|share_y_in_fr|count|
+-------------+-----+
|            0|21291|
|            2|92440|
|            4|79735|
|            6|52625|
|            8|38167|
|           10|40441|
|           12|29602|
|           14|29789|
|           16|27659|
|           18|13879|
|           20|30668|
|           22|16876|
|           24|28896|
|           26|12045|
|           28|14817|
|           30|13054|
|           32|35159|
|           34| 7445|
|           36|12599|
|           38| 6254|
|           40|19927|
|           42|11366|
|           44| 9878|
|           46| 7100|
|           48| 2332|
|           50|47744|
|           52| 6390|
|           54| 7856|
|           56| 9183|
|           58| 4379|
|           60|13592|
|           62| 7652|
|           64| 4451|
|           66|24156|
|           68| 4245|
|           70| 8283|
|           72| 4856|
|           74|11871|
|           76| 5112|
|           78| 2912|
|           80| 9277|
|           82| 6374|
|         

In [15]:
from itertools import product

In [16]:
df_inst = spark.read.format('parquet').load('file:\\' + openalex_path + 'institutions.parquet')

In [17]:
inst = (au_level_char
                  #.filter(func.col('author_id').isin(["A5084984675"]))
                  .join(works_au_af.withColumnRenamed('publication_year', 'year'), on = ['author_id'], how = 'inner')
        .select('inst_id','country')
        .distinct()
        .join(df_inst
              .select(func.regexp_replace(func.col('id'), 'https://openalex.org/', '').alias('inst_id'),
                      func.explode(func.col('associated_institutions')).alias('associated_institutions'))
              .filter(func.col('associated_institutions.relationship') =='parent')
              .select('inst_id', func.regexp_replace(func.col('associated_institutions.id'), 'https://openalex.org/', '').alias('parent_id'),
                      func.col('associated_institutions.display_name').alias('parent_name'),
                      func.col('associated_institutions.type').alias('parent_type'))
             , on = 'inst_id', how = 'left')
        .join(df_inst
              .select(func.regexp_replace(func.col('id'), 'https://openalex.org/', '').alias('inst_id'),
                      'display_name',func.col('type').alias('inst_type'))
              
             , on = 'inst_id', how = 'left')
       )
inst.cache()
inst.show()

                      

+-----------+-------+-----------+--------------------+-----------+--------------------+----------+
|    inst_id|country|  parent_id|         parent_name|parent_type|        display_name| inst_type|
+-----------+-------+-----------+--------------------+-----------+--------------------+----------+
|I4210156707|     IT|I4210153126|Istituti di Ricov...| healthcare|Fondazione IRCCS ...|healthcare|
|I4210156707|     IT|I4210153126|Istituti di Ricov...| healthcare|Fondazione IRCCS ...|healthcare|
|I4210156707|     IT|I4210153126|Istituti di Ricov...| healthcare|Fondazione IRCCS ...|healthcare|
|I4210156707|     IT|I4210153126|Istituti di Ricov...| healthcare|Fondazione IRCCS ...|healthcare|
| I172901346|     RU|       NULL|                NULL|       NULL|St Petersburg Uni...| education|
| I172901346|     RU|       NULL|                NULL|       NULL|St Petersburg Uni...| education|
|I4210087978|     FR|       NULL|                NULL|       NULL|Centre Hospitalie...|healthcare|
|I42100879

In [18]:
inst.filter(func.col('inst_type')=='facility').filter(func.col('parent_id')=='I1294671590').count()

128

In [20]:
au_inst_period = (au_level_char
                  #.filter(func.col('author_id').isin(["A5084984675"]))
                  .join(works_au_af.withColumnRenamed('publication_year', 'year')
                        .drop('inst_type').filter(func.col('source_id').isNotNull())
                        , on = ['author_id'], how = 'inner')
                  .filter(func.col('inst_id').isNotNull())
                  .join(inst.drop('country').withColumnRenamed('type','inst_type'), on = ['inst_id'])
                .withColumn('parent_id', func.when(func.col('parent_id').isNull(), func.col('inst_id')).otherwise(func.col('parent_id')))
                .withColumn('parent_name', func.when(func.col('parent_name').isNull(), func.col('display_name')).otherwise(func.col('parent_name')))
                .withColumn('parent_type', func.when(func.col('parent_type').isNull(), func.col('inst_type')).otherwise(func.col('parent_type')))
                  .groupBy('author_id','inst_id','year')
                  .agg( *[func.first(col).alias(col) for col in ['entry_year', 'last_year', 'all_y_in_FR','years_abroad','country',
                                                                 'display_name', 'inst_type','parent_id','parent_name','parent_type'
                                                                ] + 
                         [col for col in au_fields.columns if col != 'author_id']],
                        func.countDistinct('work_id').alias('publications_raw'),
                        func.sum('citations').alias('citations_raw'),
                     #   func.mean('rank_by_field').alias('rank_by_field'),
                      *[func.sum( (func.col(type_col) == type_value).cast('int')).alias('nr_' + type_col + '_' + type_value + '_raw')
                        for type_col,type_value in 
                        [ [el1 , el2] for el1,el2 in product(['type'], ['article', 'book', 'book-series'] )] +
                        [ [el1 , el2] for el1,el2 in product(['language'], ['en', 'fr'])]
                          ] ,
                      *[func.sum(( (func.col(type_col) == type_value).cast('int'))*func.col('citations') ).alias('citations_' + type_col + '_' + type_value + '_raw')
                        for type_col,type_value in 
                        [ [el1 , el2] for el1,el2 in product(['type'], ['article', 'book', 'book-series'])] +
                        [ [el1 , el2] for el1,el2 in product(['language'], ['en', 'fr'])]
                          ] 

                      )
                  .withColumn('new_inst_period', func.when((func.lag('year').over(Window.partitionBy('author_id','inst_id').orderBy('year')).isNull())
                                                            | (func.lag('year').over(Window.partitionBy('author_id','inst_id').orderBy('year')) == func.col('year')-1
                                                               ), 0)
                                                      .otherwise(1))
                  .withColumn('period_inst',1+ func.sum(func.col('new_inst_period')).over(Window.partitionBy("author_id", "inst_id").orderBy('year')))    
                  .drop('new_inst_period')
                 )

In [21]:
au_inst_period = (au_inst_period
                  .select(*au_inst_period.columns,
                          (func.max(func.col('year')).over(Window.partitionBy('period_inst','inst_id','author_id'))).alias('last_y_inst_period'),
                          (func.min(func.col('year')).over(Window.partitionBy('period_inst','inst_id','author_id'))).alias('first_y_inst_period'),
                          func.size(func.collect_set('inst_id').over(Window.partitionBy('year','author_id'))).alias('n_inst_y'),
                         *[ (func.col(column)/func.size(func.collect_set('inst_id').over(Window.partitionBy('year','author_id'))))
                           .alias(column.replace('_raw',''))
                           for column in [column for column in au_inst_period.columns if '_raw' in column]]
                         )
                  .filter(func.col('year').between(2000,2020))
                 )
au_inst_period.printSchema()

root
 |-- author_id: string (nullable = true)
 |-- inst_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- entry_year: integer (nullable = true)
 |-- last_year: integer (nullable = true)
 |-- all_y_in_FR: long (nullable = true)
 |-- years_abroad: long (nullable = true)
 |-- country: string (nullable = true)
 |-- display_name: string (nullable = true)
 |-- inst_type: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- parent_name: string (nullable = true)
 |-- parent_type: string (nullable = true)
 |-- main_field: string (nullable = true)
 |-- agri: integer (nullable = true)
 |-- arts: integer (nullable = true)
 |-- bioc: integer (nullable = true)
 |-- busi: integer (nullable = true)
 |-- chem: integer (nullable = true)
 |-- comp: integer (nullable = true)
 |-- deci: integer (nullable = true)
 |-- dent: integer (nullable = true)
 |-- eart: integer (nullable = true)
 |-- econ: integer (nullable = true)
 |-- ener: integer (nullable = true)
 |-- engi:

In [22]:
save_path = scanR_path.replace("scanR", "panel_fr_res")
if not os.path.exists(save_path):
    os.mkdir(save_path)

In [23]:
au_inst_period.write.mode('overwrite').parquet('file:\\' + save_path + 'panel_non_sphericised.parquet')

In [24]:
spark.catalog.clearCache()

In [25]:
au_inst_period = spark.read.format('parquet').load('file:\\' + save_path + 'panel_non_sphericised.parquet')

In [26]:
au_inst_period.filter(func.col('parent_id')=='I1294671590').count()

791985

In [6]:
df_inst.printSchema()

root
 |-- associated_institutions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- country_code: string (nullable = true)
 |    |    |-- display_name: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- relationship: string (nullable = true)
 |    |    |-- ror: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- cited_by_count: long (nullable = true)
 |-- country_code: string (nullable = true)
 |-- counts_by_year: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cited_by_count: long (nullable = true)
 |    |    |-- oa_works_count: long (nullable = true)
 |    |    |-- works_count: long (nullable = true)
 |    |    |-- year: long (nullable = true)
 |-- created_date: string (nullable = true)
 |-- display_name: string (nullable = true)
 |-- display_name_acronyms: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- display_name_altern

In [7]:
inst = (au_inst_period
        .select('inst_id','country')
        .distinct()
        .join(df_inst
              .select(func.regexp_replace(func.col('id'), 'https://openalex.org/', '').alias('inst_id'),
                      func.explode(func.col('associated_institutions')).alias('associated_institutions'))
              .filter(func.col('associated_institutions.relationship') =='parent')
              .select('inst_id', func.col("associated_institutions.id").alias('parent_id'),
                      func.col('associated_institutions.display_name').alias('parent_name'),
                      func.col('associated_institutions.type').alias('parent_type'))
             , on = 'inst_id', how = 'left')
        .join(df_inst
              .select(func.regexp_replace(func.col('id'), 'https://openalex.org/', '').alias('inst_id'),
                      'display_name','type')
              
             , on = 'inst_id', how = 'left')
       )
inst.cache()
inst.show()

                      

+-----------+-------+--------------------+--------------------+-----------+--------------------+----------+
|    inst_id|country|           parent_id|         parent_name|parent_type|        display_name|      type|
+-----------+-------+--------------------+--------------------+-----------+--------------------+----------+
| I165799507|     US|                NULL|                NULL|       NULL|Rensselaer Polyte...| education|
| I172901346|     RU|                NULL|                NULL|       NULL|St Petersburg Uni...| education|
| I193775966|     KR|                NULL|                NULL|       NULL|   Yonsei University| education|
|  I67900169|     KR|                NULL|                NULL|       NULL|Chung-Ang University| education|
|I4210102325|     FR|https://openalex....|Hôpitaux Universi...| healthcare|Hôpital Européen ...|healthcare|
| I119942576|     US|                NULL|                NULL|       NULL|University of the...| education|
|I4210087978|     FR|       

In [None]:
inst = (au_inst_period
        .select('inst_id','country')
        .distinct()
        .join(df_inst
              .select(func.regexp_replace(func.col('id'), 'https://openalex.org/', '').alias('inst_id'),
                      func.explode(func.col('associated_institutions')).alias('associated_institutions'))
              .filter(func.col('associated_institutions.relationship') =='parent')
              .select('inst_id', func.col("associated_institutions.id").alias('parent_id'),
                      func.col('associated_institutions.display_name').alias('parent_name'),
                      func.col('associated_institutions.type').alias('parent_type'))
             , on = 'inst_id', how = 'left')
        .join(df_inst
              .select(func.regexp_replace(func.col('id'), 'https://openalex.org/', '').alias('inst_id'),
                      'display_name','type')
              
             , on = 'inst_id', how = 'left')
       )
inst.cache()
inst.show()

In [8]:
inst.groupBy(func.col('parent_id').isNotNull().cast('int').alias('has_parent'), (func.col('country')=='FR').cast('int').alias('fr')).count().show()

+----------+----+-----+
|has_parent|  fr|count|
+----------+----+-----+
|         1|   0|10456|
|         1|   1| 3961|
|         0|   0|45151|
|         0|   1| 1918|
|         0|NULL|   17|
+----------+----+-----+



In [9]:
(inst
 .filter( (func.col('country')=='FR')
         & (func.col('parent_id').isNotNull())
        )
).limit(20).collect()

[Row(inst_id='I4210102325', country='FR', parent_id='https://openalex.org/I4210120235', parent_name='Hôpitaux Universitaires Paris-Ouest', parent_type='healthcare', display_name='Hôpital Européen Georges-Pompidou', type='healthcare'),
 Row(inst_id='I4210157127', country='FR', parent_id='https://openalex.org/I4210145324', parent_name='Hôpitaux Universitaires de Strasbourg', parent_type='healthcare', display_name="Hôpital d'Hautepierre", type='healthcare'),
 Row(inst_id='I4210132604', country='FR', parent_id='https://openalex.org/I4210161065', parent_name='Laboratory of Medical Information Processing', parent_type='facility', display_name='Centre Hospitalier Régional Universitaire de Brest', type='healthcare'),
 Row(inst_id='I4210119916', country='FR', parent_id='https://openalex.org/I4647051', parent_name='University of Picardie Jules Verne', parent_type='education', display_name='Ecology and Dynamics of Human Influenced Systems', type='facility'),
 Row(inst_id='I4210119916', country='F

In [10]:
au_inst_period_winst = (au_inst_period
                  .join(inst.drop('country')
                        .withColumn('parent_id', func.regexp_replace(func.col('parent_id'), 'https://openalex.org/',''))
                        , on = 'inst_id', how = 'left')
                .withColumn('parent_id', func.when(func.col('parent_id').isNull(), func.col('inst_id')).otherwise(func.col('parent_id')))
                .withColumn('parent_name', func.when(func.col('parent_id').isNull(), func.col('display_name')).otherwise(func.col('parent_name')))
                .withColumn('parent_type', func.when(func.col('parent_id').isNull(), func.col('type')).otherwise(func.col('parent_type')))
                 )


In [11]:
au_inst_period_winst.show()

+-----------+-----------+----+----------+---------+-----------+------------+-------+----------------+-------------+------------------+-------------------+----------------+-----------------------+------------------+------------------+--------------------------+-----------------------+------------------------------+-------------------------+-------------------------+-----------+------------------+-------------------+--------+--------------------+-------------------+--------------------+------------+-------------------+--------------------+--------------+----------------------+-------------------+--------------------------+---------------------+---------------------+-----------+--------------------+-----------+--------------------+----------+
|    inst_id|  author_id|year|entry_year|last_year|all_y_in_FR|years_abroad|country|publications_raw|citations_raw|     rank_by_field|nr_type_article_raw|nr_type_book_raw|nr_type_book-series_raw|nr_language_en_raw|nr_language_fr_raw|citations_type_ar

In [12]:
au_inst_period_winst.write.mode('overwrite').parquet('file:\\' + save_path + 'panel_non_sphericised_inst.parquet')