In [1]:
import sys
import os

folder_path = os.path.dirname(os.path.abspath(__file__)) if '__file__' in globals() else os.getcwd()

sys.path.append(os.path.abspath(os.path.join(folder_path, '..')))

from csv_analysis import *


## View full data before filtering

In [8]:
df = read_first_n_rows("../data/q2/merged_openalex_works_data.csv", nrows=15)
df

Unnamed: 0,work_id,publication_year,institution_id,country,country_code,city,geonames_city_id,author_position,author_id
0,https://openalex.org/W100000002,1988,https://openalex.org/I4210095140,United States,US,Durham,4464368,first,https://openalex.org/A5012874907
1,https://openalex.org/W100000002,1988,https://openalex.org/I4210095140,United States,US,Durham,4464368,last,https://openalex.org/A5006412880
2,https://openalex.org/W1000000251,1990,https://openalex.org/I1292966370,United Kingdom,GB,London,2643743,first,https://openalex.org/A5057387418
3,https://openalex.org/W1000000251,1990,https://openalex.org/I1292966370,United Kingdom,GB,London,2643743,last,https://openalex.org/A5035275962
4,https://openalex.org/W1000000589,2010,https://openalex.org/I7877124,Germany,DE,Berlin,2950159,first,https://openalex.org/A5091270897
5,https://openalex.org/W1000000589,2010,https://openalex.org/I7877124,Germany,DE,Berlin,2950159,last,https://openalex.org/A5071995676
6,https://openalex.org/W1000001125,2001,https://openalex.org/I231086526,Poland,PL,Siedlce,759412,first,https://openalex.org/A5060029945
7,https://openalex.org/W1000001192,1988,https://openalex.org/I1328594524,United States,US,Kansas City,4393217,first,https://openalex.org/A5072333347
8,https://openalex.org/W1000001192,1988,https://openalex.org/I4210128618,United States,US,Kansas City,4273837,first,https://openalex.org/A5072333347
9,https://openalex.org/W1000001555,2011,https://openalex.org/I2802849423,United States,US,Lubbock,5525577,first,https://openalex.org/A5050684082


## Creating a consolidated file of work and all states of work

This script uses Dask to lazily read and parallel-process a large CSV, grouping by `work_id` to extract the first `publication_year` and compile unique country lists.
It then writes the aggregated DataFrame to multiple CSV partitions in the specified output directory while displaying a progress bar via Dask Diagnostics.


In [None]:
import os
import dask
import dask.dataframe as dd
from dask.diagnostics import ProgressBar

def aggregate_works(
    input_path="../data/q2/merged_openalex_works_data.csv",
    output_dir="../data/collaboration",
    sep=",",
    country_sep=",",
    blocksize="64MB",
):
    """
    Produce one line per work_id:
    work_id, publication_year, countries (comma-separated, unique, unsorted)
    and save as multiple CSV parts with a progress bar.
    """

    # 1) Lazy read
    dtypes  = {
        "work_id": "string",
        "publication_year": "int32",
        "country": "string",
    }
    usecols = ["work_id", "publication_year", "country"]

    df = dd.read_csv(
        input_path,
        sep=sep,
        usecols=usecols,
        dtype=dtypes,
        blocksize=blocksize,
    )

    # 2) First year per work
    years = df.groupby("work_id")["publication_year"].first()

    # 3) Unique-countries array per work, then join with commas
    unique_countries = (
        df.groupby("work_id")["country"]
          .unique()                                 # → numpy array per group
          .map_partitions(
              lambda s: s.apply(lambda arr: country_sep.join(arr)),
              meta=("countries", "object"),
          )
    )

    # 4) Combine the two Series into one DataFrame
    aggregated = dd.concat([years, unique_countries], axis=1).reset_index()

    # 5) Write several CSV parts (one per partition) with progress bar
    os.makedirs(output_dir, exist_ok=True)
    out_pattern = os.path.join(output_dir, "works_year_countries-*.csv")

    print("🟢 Writing output parts (progress bar below)…")
    with ProgressBar():
        tasks = aggregated.to_csv(out_pattern, index=False)   # returns Delayed list
        dask.compute(tasks)

    print(f"✓ Done!  Files written to {output_dir}")

if __name__ == "__main__":
    aggregate_works()


🟢 Writing output parts (progress bar below)…


We're assuming that the indices of each dataframes are 
 aligned. This assumption is not generally safe.


[################################        ] | 81% Completed | 2hr 50ms

In [3]:
df = read_first_n_rows("../data/collaboration/works_year_countries.csv", nrows=15)
df

Unnamed: 0,work_id,publication_year,countries
0,https://openalex.org/W143756818,1998,Germany
1,https://openalex.org/W1437651587,1973,Czechia
2,https://openalex.org/W143770321,2000,China
3,https://openalex.org/W1437735228,2003,United States
4,https://openalex.org/W143773981,2002,Belgium
5,https://openalex.org/W143775390,2001,Israel
6,https://openalex.org/W1437804358,2015,Australia
7,https://openalex.org/W143781885,2012,Canada
8,https://openalex.org/W1437876792,2015,United States
9,https://openalex.org/W143791697,2009,France


## Creating a file only of works that involve collaboration between several countries

In [4]:
import pandas as pd
from tqdm import tqdm

def filter_multiple_countries(file_path, output_path):
    # Load the data from the CSV file
    df = pd.read_csv(file_path)
    
    # Add a column to count the number of countries per row
    df['countries_count'] = df['countries'].apply(lambda x: len(x.split(',')))
    
    # Filter rows where there are more than one country
    filtered_df = df[df['countries_count'] > 1]
    
    # Drop the 'countries_count' column
    filtered_df = filtered_df.drop(columns=['countries_count'])
    
    # Save the filtered data to a new CSV file
    filtered_df.to_csv(output_path, index=False)
    
    print(f"File saved to {output_path}")

# Example usage
input_file = "../data/collaboration/works_year_countries.csv"
output_file = "../data/collaboration/works_multiple_countries.csv"

# Show progress using tqdm
tqdm.pandas(desc="Processing rows")
filter_multiple_countries(input_file, output_file)


File saved to ../data/collaboration/works_multiple_countries.csv


In [3]:
df = read_first_n_rows("../data/collaboration/works_multiple_countries.csv", nrows=15)
df

Unnamed: 0,work_id,publication_year,countries
0,https://openalex.org/W1438053537,2005,"United States,Georgia,Spain,Japan"
1,https://openalex.org/W143808035,2014,"United Kingdom,Ivory Coast,Tunisia"
2,https://openalex.org/W1438510599,2015,"United Kingdom,Israel,France"
3,https://openalex.org/W143872587,2013,"Singapore,China,Hong Kong"
4,https://openalex.org/W1438912788,2015,"Russia,United States,Italy"
5,https://openalex.org/W143981305,1995,"Germany,United States"
6,https://openalex.org/W143997697,1981,"Taiwan,China,Bulgaria"
7,https://openalex.org/W2051785559,2015,"United States,Brazil,United Kingdom"
8,https://openalex.org/W2051791908,2013,"Singapore,Taiwan"
9,https://openalex.org/W2051794810,2009,"United States,Georgia"


## Creating a file that contains all the works of collaborations of which Israel is one of the countries

In [15]:
import pandas as pd
from tqdm import tqdm

def filter_multiple_countries(file_path, output_path):
    # Load the data from the CSV file
    df = pd.read_csv(file_path)
    
    # Enable progress_apply on the DataFrame
    tqdm.pandas(desc="Checking for Israel")
    
    # Add a helper column to mark rows where one of the countries is Israel
    df['has_israel'] = df['countries'].progress_apply(
        lambda x: any(country.strip() == 'Israel' for country in x.split(','))
    )
    
    # Filter only the rows with Israel
    filtered_df = df[df['has_israel']]
    
    # Drop the helper column
    filtered_df = filtered_df.drop(columns=['has_israel'])
    
    # Save the filtered data to a new CSV file
    filtered_df.to_csv(output_path, index=False)
    
    print(f"File saved to {output_path}")

# Example usage
input_file  = "../data/collaboration/works_multiple_countries.csv"
output_file = "../data/collaboration/works_israel.csv"

# Process and filter
filter_multiple_countries(input_file, output_file)

Checking for Israel: 100%|██████████| 13305986/13305986 [00:19<00:00, 690678.57it/s]


File saved to ../data/collaboration/works_israel.csv


In [16]:
df = read_first_n_rows("../data/collaboration/works_israel.csv", nrows=15)
df

Unnamed: 0,work_id,publication_year,countries
0,https://openalex.org/W1438510599,2015,"United Kingdom,Israel,France"
1,https://openalex.org/W2052031909,2010,"Israel,Canada"
2,https://openalex.org/W2052156706,2013,"United States,Israel"
3,https://openalex.org/W2052285261,2013,"Israel,Germany"
4,https://openalex.org/W2052550778,1988,"United States,Israel"
5,https://openalex.org/W2052555612,2011,"United States,Israel"
6,https://openalex.org/W2052556251,1998,"United States,Israel"
7,https://openalex.org/W2052586778,2007,"Israel,United States"
8,https://openalex.org/W2052619683,1998,"Finland,Israel"
9,https://openalex.org/W2052783340,1999,"Israel,United States"
