In [1]:
import psycopg2
import pandas as pd
import vaex
import ast 
import numpy as np
import pickle
import os
from joblib import Parallel, delayed
import os
import sys



In [None]:
base_dir = 'base_dir'
code_dir = os.path.join(base_dir, 'code')
sys.path.append(code_dir)
import const
import params

#### Read OpenAlex data Clean the data and remain only important columns

In [2]:
# Path to the Parquet file
file_path = os.path.join(const.data_base_dir, "OpenAlex_merged_data.parquet")

# Read the Parquet file into a Vaex dataframe
df = vaex.open(file_path)
df.head(5)

#,work_id,author_id,institution_id,publication_year,publication_date,cited_by_count,author_name,institution_name,country_code
0,https://openalex.org/W3126688306,https://openalex.org/A5021130645,https://openalex.org/I2799504677,2009,2009-12-31,0,Carbonne Pierre,"""Institut de l'Audiovisuel et des Télécommunicat...",FR
1,https://openalex.org/W3126688306,https://openalex.org/A5030528300,https://openalex.org/I2799504677,2009,2009-12-31,0,Thomas Hain,"""Institut de l'Audiovisuel et des Télécommunicat...",FR
2,https://openalex.org/W3126688306,https://openalex.org/A5038081513,https://openalex.org/I190778170,2009,2009-12-31,0,Gary Clemo,Ofcom,GB
3,https://openalex.org/W3126688432,https://openalex.org/A5082259378,https://openalex.org/I130701412,2020,2020-12-23,0,Віра Борщовецька,Chernivtsi National University,UA
4,https://openalex.org/W3126688432,https://openalex.org/A5045080228,https://openalex.org/I130701412,2020,2020-12-23,0,Марія Рубанець,Chernivtsi National University,UA


In [3]:
# Filter out rows with 'publication_year' between 2023 and 2025
author_data = df[(df['publication_year'] < 2023)]

# Select only the required columns
columns_to_keep = ['author_id','author_name', 'publication_year', 'institution_name' ,'country_code']
author_data_cleaned = author_data[columns_to_keep]


In [7]:
# Get the shape of the DataFrame
num_rows = len(author_data_cleaned)  # Number of rows
num_columns = len(author_data_cleaned.get_column_names())  # Number of columns

# Print the shape
print(f"Number of rows: {num_rows}")
print(f"Number of columns: {num_columns}")


Number of rows: 343450875
Number of columns: 5


In [None]:
# Chunk size (10 million rows)
chunk_size = 1000000
authors_output_dir = const.author_dir
os.makedirs(authors_output_dir, exist_ok=True)

# Split and save the dataframe in chunks
for i in range(0, len(df), chunk_size):
    chunk = df[i:i + chunk_size]
    output_file = os.path.join(authors_output_dir, f"chunk_{i // chunk_size + 1}.parquet")
    chunk.export_parquet(output_file)
    print(f"Saved chunk {i // chunk_size + 1} to {output_file}")

In [16]:
author_data_cleaned.head(5)

#### Read war data

In [16]:
merged_wars_data_file = os.path.join(const.data_base_dir, "merged_war_data_after_expand.csv")
war_data = pd.read_csv(merged_wars_data_file)

In [17]:
war_data.head(5)

Unnamed: 0,Year,Country Code,Min_Year,Date,War Name,Combatants,Location,Countries Participants
0,1939,US,1939,1939–1945,World War II,Allied Powers vs. Axis Powers,Global,"United States, Soviet Union, United Kingdom, C..."
1,1939,SU,1939,1939–1945,World War II,Allied Powers vs. Axis Powers,Global,"United States, Soviet Union, United Kingdom, C..."
2,1939,GB,1939,1939–1945,World War II,Allied Powers vs. Axis Powers,Global,"United States, Soviet Union, United Kingdom, C..."
3,1939,CN,1939,1939–1945,World War II,Allied Powers vs. Axis Powers,Global,"United States, Soviet Union, United Kingdom, C..."
4,1939,FR,1939,1939–1945,World War II,Allied Powers vs. Axis Powers,Global,"United States, Soviet Union, United Kingdom, C..."


In [18]:
war_data_unique = war_data.drop_duplicates(subset=["Year", "Country Code"])
war_lookup = war_data_unique.set_index(["Year", "Country Code"]).to_dict(orient="index")

#### Add the war information to openAlex data

In [None]:
# Group data by publication_year and process each group
unique_years = author_data_cleaned.unique("publication_year")
print(unique_years)

for year in unique_years:
    print(f"Processing year: {year}")
    
    # Filter the data for the current year
    year_data = author_data_cleaned[author_data_cleaned["publication_year"] == year]
    print(len(year_data))
    

In [9]:
# Define a function to process chunks
def process_chunk_pandas(chunk, war_lookup):
    # Apply the filter for publication_year < 2023
    # chunk = chunk[chunk["publication_year"] < 2023]
    print("Add war exist")
    # Vectorized lookup for war information
    chunk["war_exist"] = chunk.apply(
        lambda row: 1 if (row["publication_year"], row["country_code"]) in war_lookup else 0,
        axis=1
    )
    
    print("Add year in war")
    chunk["year_in_war"] = chunk.apply(
        lambda row: row["publication_year"] - war_lookup.get((row["publication_year"], row["country_code"]), {}).get("Min_Year", None)
        if (row["publication_year"], row["country_code"]) in war_lookup else 0,
        axis=1
    )
    
    print("Add war name")
    chunk["war_name"] = chunk.apply(
        lambda row: war_lookup.get((row["publication_year"], row["country_code"]), {}).get("War Name", "No War")
        if (row["publication_year"], row["country_code"]) in war_lookup else "No War",
        axis=1
    )

    return chunk

In [17]:
def main(task_id):
    print(f"Start load and processing war data")
    merged_wars_data_file = os.path.join(const.data_base_dir, "merged_war_data_after_expand.csv")
    war_data = pd.read_csv(merged_wars_data_file)
    war_data_unique = war_data.drop_duplicates(subset=["Year", "Country Code"])
    columns_to_drop = ["Date", "Combatants", "Location", "Countries Participants"]
    war_data_unique = war_data_unique.drop(columns=columns_to_drop)

    # Create war lookup dictionary
    war_lookup = {
        (row["Year"], row["Country Code"]): {"Min_Year": row["Year"], "War Name": row["War Name"]}
        for _, row in war_data_unique.iterrows()
    }
    print(f"Finish load and processing war data")

    print(f"Start load and processing author data")
    # Load author data
    author_file_path = os.path.join(const.data_base_dir, "OpenAlex_merged_data.parquet")
    df = vaex.open(author_file_path, transform=True)
    author_data_cleaned = df[df["publication_year"] < 2023]
    columns_to_keep = ['author_name', 'publication_year', 'institution_name', 'country_code']
    author_data_cleaned = author_data_cleaned[columns_to_keep]

    print(f"Finish load and processing author data")

    # Process data for the specific year
    year = params.years_dict[task_id]
    print(f"Processing year {year}")
    chunk = params.get_year_df(author_data_cleaned, task_id)  # Get chunk for the specific year

    chunk = chunk.to_pandas_df()
    # Process the chunk
    processed_chunk = process_chunk_pandas(chunk, war_lookup)

    # Drop NaN values
    processed_chunk = processed_chunk.dropna()
    num_rows = len(processed_chunk)  # Number of rows
    # Print the shape
    print(f"Number of rows: {num_rows}")

    # Directory to save processed Parquet files
    output_dir = os.path.join(const.data_base_dir,"processed_chunks_combine_war_openalex")
    os.makedirs(output_dir, exist_ok=True)

    # Save processed chunk to Parquet
    output_file = os.path.join(output_dir, f"year_{year}.pkl")
    processed_chunk.to_pickle(output_file)

    # processed_chunk.export_parquet(output_file)
    print(f"Finished processing year {year} and saved to {output_file}")

if __name__ == "__main__":
    # for task_id in range(1,73):
    task_id = int(sys.argv[1])
    main(task_id)


Start load and processing war data
Finish load and processing war data
Start load and processing author data
Finish load and processing author data
Processing year 1951
Add war exist
Add year in war
Add war name
Number of rows: 66775
Finished processing year 1951 and saved to /home/reutme/Big_data/final_project/data/processed_chunks_combine_war_openalex/year_1951.pkl
Start load and processing war data
Finish load and processing war data
Start load and processing author data
Finish load and processing author data
Processing year 1952
Add war exist
Add year in war
Add war name
Number of rows: 70577
Finished processing year 1952 and saved to /home/reutme/Big_data/final_project/data/processed_chunks_combine_war_openalex/year_1952.pkl
Start load and processing war data
Finish load and processing war data
Start load and processing author data
Finish load and processing author data
Processing year 1953
Add war exist
Add year in war
Add war name
Number of rows: 76799
Finished processing year 1