In [54]:
import json
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

## Data loading from JSONs

In [None]:
def load_trade_analysis_json(file_name):
    """
    Load a trade analysis JSON file and return it as a dictionary.
    
    Parameters:
        file_name (str): The path to the JSON file.
        
    Returns:
        dict: The JSON data loaded as a dictionary.
    """
    try:
        with open(file_name, 'r', encoding='utf-8') as file:
            data = json.load(file)
        return data
    except FileNotFoundError:
        print(f"Error: The file '{file_name}' was not found.")
        return None
    except json.JSONDecodeError as e:
        print(f"Error: Could not decode JSON from '{file_name}'. {e}")
        return None

# Example usage
# file_name = "path/to/your/json/file.json"
# trade_data = load_trade_analysis_json(file_name)
# if trade_data:
#     print(trade_data)


In [None]:

def load_all_trade_analysis_jsons(main_dir):
    """
    Load all trade analysis JSON files from a directory recursively.
    
    Parameters:
        main_dir (str): The main directory containing JSON files.
        
    Returns:
        list[dict]: A list of dictionaries, each representing a JSON file's data.
    """
    all_data = []
    for root, _, files in os.walk(main_dir):
        for file in files:
            if file.endswith('.json'):
                file_path = os.path.join(root, file)
                data = load_trade_analysis_json(file_path)
                if data:
                    all_data.append(data)
    return all_data

# Example usage
# main_directory = "path/to/main_dir"
# all_trade_data = load_all_trade_analysis_jsons(main_directory)
# print(f"Loaded {len(all_trade_data)} JSON files.")


In [None]:
# Check the number of reports generated is the same as the number of files

all_report_data = load_all_trade_analysis_jsons("../google_ai_responses")
len(all_report_data)


## Export balance information loading

In [None]:
# Extract export balances
def generate_export_balance_df(all_report_data):
    """
    Generate a pandas DataFrame with individual export balance values for each year 
    from all report data, ensuring consistency across reports.
    
    Parameters:
        all_report_data (list[dict]): List of trade analysis data dictionaries.
        
    Returns:
        pd.DataFrame: A DataFrame containing country, year, export value, and import value.
    """
    export_balance_records = []
    seen_years = {}  # To track values by (country, year) for consistency checks

    for report in all_report_data:
        country = report.get('country')
        export_balance = report.get('export_balance', {})
        years = export_balance.get('year', [])
        export_values = export_balance.get('export_value', [])
        import_values = export_balance.get('import_value', [])

        for year, export_value, import_value in zip(years, export_values, import_values):
            key = (country, year)
            if key in seen_years:
                # Check for consistency
                previous_export, previous_import = seen_years[key]
                if previous_export != export_value or previous_import != import_value:
                    # Remove multiline comments for warnings on mismatches
                    """
                    print(
                        f"Warning: Data mismatch for {country}, year {year}. "
                        f"Previous: Export={previous_export}, Import={previous_import}. "
                        f"New: Export={export_value}, Import={import_value}."
                    )
                    """
                # Keep the previous values
                export_value, import_value = previous_export, previous_import
            else:
                # Record the new values
                seen_years[key] = (export_value, import_value)

            export_balance_records.append({
                'Country': country,
                'Year': year,
                'Export Value (Billions)': export_value,
                'Import Value (Billions)': import_value
            })

    # Create a DataFrame from the records
    df = pd.DataFrame(export_balance_records)
    return df

export_balance_df = generate_export_balance_df(all_report_data)
export_balance_df.head()

# Storing dataframe to excel sheet
export_balance_df.to_csv(".\csv_data\export_balances.csv")


## Top ECCN analysis

In [None]:
# Store ECCN ranks
def generate_top_eccns_with_ranks(all_report_data, by="value"):
    """
    Create a pandas DataFrame with the top ECCNs by value or count for all reports, including their rank.

    Parameters:
        all_report_data (list[dict]): List of trade analysis data dictionaries.
        by (str): Criteria to sort ECCNs, either "value" or "count". Defaults to "value".

    Returns:
        pd.DataFrame: A DataFrame containing the top ECCNs for all reports with their details and rank.
    """
    valid_criteria = {"value": "top_eccn_by_value", "count": "top_eccn_by_count"}
    if by not in valid_criteria:
        raise ValueError(f"Invalid 'by' value: {by}. Must be one of {list(valid_criteria.keys())}.")

    top_eccn_data = []

    for report in all_report_data:
        country = report.get("country")
        year = report.get("report_year")
        eccn_section = report.get(valid_criteria[by], {})

        # Appending license cases
        eccn_codes = eccn_section.get(f"top_eccn_{by}_licensed_eccn_codes", [])
        eccn_descriptions = eccn_section.get(f"top_eccn_{by}_licensed_descriptions", [])
        for rank, (code, description) in enumerate(zip(eccn_codes, eccn_descriptions), start=1):
            top_eccn_data.append({
                "Country": country,
                "Year": year,
                "ECCN Code": code,
                "Description": description,
                "Rank": rank,
                "Ranking": "License"
            })

        # Appending license exception cases
        eccn_codes = eccn_section.get(f"top_eccn_{by}_exceptions_eccn_codes", [])
        eccn_descriptions = eccn_section.get(f"top_eccn_{by}_exceptions_descriptions", [])
        for rank, (code, description) in enumerate(zip(eccn_codes, eccn_descriptions), start=1):
            top_eccn_data.append({
                "Country": country,
                "Year": year,
                "ECCN Code": code,
                "Description": description,
                "Rank": rank,
                "Ranking": "Exceptions"
            })

        # Appending license exception cases
        eccn_codes = eccn_section.get(f"top_eccn_{by}_nlr_with_eccn_eccn_codes", [])
        eccn_descriptions = eccn_section.get(f"top_eccn_{by}_nlr_with_eccn_descriptions", [])
        for rank, (code, description) in enumerate(zip(eccn_codes, eccn_descriptions), start=1):
            top_eccn_data.append({
                "Country": country,
                "Year": year,
                "ECCN Code": code,
                "Description": description,
                "Rank": rank,
                "Ranking": "NLR with ECCN"
            })

    # Convert to DataFrame
    df = pd.DataFrame(top_eccn_data)
    return df

top_value_eccns_df = generate_top_eccns_with_ranks(all_report_data, by="value")
print(top_value_eccns_df.head())

top_count_eccns_df = generate_top_eccns_with_ranks(all_report_data, by="count")
print(top_count_eccns_df.head())

top_value_eccns_df.to_csv("csv_data\\top_value_eccns.csv")
top_count_eccns_df.to_csv("csv_data\\top_count_eccns.csv")


In [99]:
unique_eccns_value = top_value_eccns_df[['ECCN Code', 'Description']].drop_duplicates()
unique_eccns_count = top_count_eccns_df[['ECCN Code', 'Description']].drop_duplicates()

unique_eccns_value.to_csv("csv_data\\unique_eccns_value.csv")
unique_eccns_count.to_csv("csv_data\\unique_eccns_count.csv")

# Found these by reading through all of the unique ECCNs and picking out the ones related to space
space_related_eccns = ["9A515", "9A004", "9B515"]

unique_countries = top_value_eccns_df['Country'].drop_duplicates()
unique_countries.to_csv("csv_data\\unique_countries.csv")
unique_countries = [i for i in unique_countries]

# Countries represented in the data per year
unique_countries_2018 = top_value_eccns_df[top_value_eccns_df['Year'] == 2018]['Country'].drop_duplicates().reset_index().drop('index', axis=1)
unique_countries_2019 = top_value_eccns_df[top_value_eccns_df['Year'] == 2019]['Country'].drop_duplicates().reset_index().drop('index', axis=1)
unique_countries_2020 = top_value_eccns_df[top_value_eccns_df['Year'] == 2020]['Country'].drop_duplicates().reset_index().drop('index', axis=1)
unique_countries_2021 = top_value_eccns_df[top_value_eccns_df['Year'] == 2021]['Country'].drop_duplicates().reset_index().drop('index', axis=1)
unique_countries_2022 = top_value_eccns_df[top_value_eccns_df['Year'] == 2022]['Country'].drop_duplicates().reset_index().drop('index', axis=1)

unique_countries_2018 = unique_countries_2018[~unique_countries_2018['Country'].isin(['World', 'EU', 'Africa'])]
unique_countries_2019 = unique_countries_2019[~unique_countries_2019['Country'].isin(['World', 'EU', 'Africa'])]
unique_countries_2020 = unique_countries_2020[~unique_countries_2020['Country'].isin(['World', 'EU', 'Africa'])]
unique_countries_2021 = unique_countries_2021[~unique_countries_2021['Country'].isin(['World', 'EU', 'Africa'])]
unique_countries_2022 = unique_countries_2022[~unique_countries_2022['Country'].isin(['World', 'EU', 'Africa'])]

unique_country_dataframes = [
    unique_countries_2018,
    unique_countries_2019,
    unique_countries_2020,
    unique_countries_2021,
    unique_countries_2022
] 

unique_eccn_value_descriptions_dict = dict(zip(unique_eccns_value["ECCN Code"],unique_eccns_value["Description"]))
unique_eccn_count_descriptions_dict = dict(zip(unique_eccns_count["ECCN Code"],unique_eccns_count["Description"]))

# Manually fixing some of the more used descriptions. The reports show incomplete descriptions :)
unique_eccn_value_descriptions_dict["9A515"] = 'Spacecraft and related commodities'
unique_eccn_count_descriptions_dict["9A515"] = 'Spacecraft and related commodities'
unique_eccn_value_descriptions_dict["9B515"] = "Test, Inspection, and Production Equipment Specially Designed for Spacecraft"
unique_eccn_count_descriptions_dict["9B515"] = "Test, Inspection, and Production Equipment Specially Designed for Spacecraft"


## Evolution of rank for a specific ECCN and country

Given an ECCN and a country or list of countries, generates a plot showing the evolution of the ECCN along the countries rankings

In [None]:
from matplotlib.ticker import MaxNLocator

def plot_rank_evolution(df, descriptions_dictionary, eccn_code, rank_type, countries=None, exclude_countries=None):
    """
    Plots the rank evolution of a given ECCN code across years and rankings for specified or excluded countries.

    Args:
        df (pd.DataFrame): The input DataFrame with columns 'Country', 'ECCN Code', 'Year', 'Rank', 'Ranking'.
        eccn_code (str): The ECCN code to plot.
        countries (list of str or None): A list of countries to include in the plot. If None, plot all countries (except excluded).
        exclude_countries (list of str or None): A list of countries to exclude from the plot. If None, no countries are excluded.

    Returns:
        None (Displays the plot)
    """

    # 1. Filter the DataFrame for the specific ECCN Code
    df_filtered = df[df['ECCN Code'] == eccn_code].copy()

    # If there is no data for the specified ECCN code we return an error
    if df_filtered.empty:
        print(f"No data found for ECCN Code: {eccn_code}")
        return

    # 2. Filter by Countries to Exclude if Specified
    if exclude_countries:
        df_filtered = df_filtered[~df_filtered['Country'].isin(exclude_countries)]
    
    # 3. Filter by Countries if Specified
    if countries:
        df_filtered = df_filtered[df_filtered['Country'].isin(countries)]


    # If there is no data for the specified countries we return an error
    if df_filtered.empty:
        if countries:
            print(f"No data found for ECCN Code: {eccn_code} for countries: {countries}")
        elif exclude_countries:
            print(f"No data found for ECCN Code: {eccn_code} after excluding countries: {exclude_countries}")
        else:
            print(f"No data found for ECCN Code: {eccn_code}")
        return

    # 4. Create the plot
    plt.figure(figsize=(12, 8))  # Adjust figure size as needed

    # 5. Loop through each unique country in the filtered data
    for country in df_filtered['Country'].unique():
         country_data = df_filtered[df_filtered['Country'] == country].copy()

         # 6. Loop through each ranking for each country
         for ranking in country_data['Ranking'].unique():
             ranking_data = country_data[country_data['Ranking'] == ranking].copy()

             # If there is not data for the given ranking, we skip this iteration
             if ranking_data.empty:
                continue

             # Sort the data to plot it correctly
             ranking_data = ranking_data.sort_values(by=['Year'])

             # 7. Plot the line for the ranking
             plt.plot(ranking_data['Year'], ranking_data['Rank'], marker='o', linestyle='-', label=f"{country}, {ranking}")


    # 8. Customize the plot
    plt.xlabel("Year")
    plt.ylabel("Rank")
    plt.suptitle(f"Evolution of Rank {rank_type} for ECCN {eccn_code}", y=0.98, fontsize=18)
    plt.title(descriptions_dictionary[eccn_code], fontsize=12)
    plt.legend(title="Country, Ranking")
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.gca().yaxis.set_major_locator(MaxNLocator(integer=True))
    plt.gca().invert_yaxis()  # Invert y-axis to show higher rank at the top
    plt.xticks(sorted(df_filtered['Year'].unique()))  # Set xticks to show all years in the filtered df
    plt.tight_layout()  # Adjust layout for better readability
    
    # 9.Store the plot
    plt.savefig(f'./figures/eccn_rank_evolution_{rank_type}/country_{str(countries)}_eccn_{eccn_code}_ranking_evolution.png')
    plt.close()

for unique_country in unique_countries:
    for eccn in space_related_eccns:
        plot_rank_evolution(top_value_eccns_df, unique_eccn_value_descriptions_dict, eccn, "value", [unique_country])
        plot_rank_evolution(top_count_eccns_df, unique_eccn_count_descriptions_dict, eccn, "count", [unique_country])


### Ranking counts distribution
Given a year and ECCNs, provides the distribution of rankings for that ECCN across all countries where the data is available for that year. 

In [None]:
from matplotlib.ticker import MaxNLocator

def plot_eccn_stacked_ranking_distribution(df, year, eccn_code, rank_type):

    # Filter the data for the specific ECNNs
    # Also exlude World, EU and Africa
    df = df[(df['Year'] == year) & (df['ECCN Code'].isin(eccn_code))].copy()
    df.drop(df[df['Country'] == 'World'].index, inplace = True)
    df.drop(df[df['Country'] == 'EU'].index, inplace = True)
    df.drop(df[df['Country'] == 'Africa'].index, inplace = True)
    df = df.reset_index()

    # Get unique ranking values
    ranking_values = sorted(df['Ranking'].unique())

    # Get unique rank values (1 through 10)
    ranks = list(range(1, 11))

    # Initialize the dictionary
    rank_values = {ranking_value: [0] * len(ranks) for ranking_value in ranking_values}

    # Populate the dictionary using explicit loops
    for _, row in df.iterrows():
        rank = row['Rank']
        ranking = row['Ranking']
        if ranking in rank_values and rank in ranks: #Check to make sure value is on both sets
            rank_index = ranks.index(rank) #get index of rank
            rank_values[ranking][rank_index] += 1


    fig, ax = plt.subplots()
    width = 0.5
    bottom = np.zeros(10)

    for ranking_category, ranking_count in rank_values.items():
        p = ax.bar(ranks, ranking_count, width, label=ranking_category, bottom=bottom)
        bottom += ranking_count

    plt.xlabel("Rank")
    plt.ylabel("Number of Countries")
    plt.title(f"Distribution of {rank_type} rankings for space ECCNs in {year}", fontsize=16)
    plt.xticks(ranks)
    ax.yaxis.set_major_locator(MaxNLocator(integer=True))
    ax.set_ylim([0, 4])
    plt.legend(title="Type of license", bbox_to_anchor=(1.05, 1), loc='upper right')
    plt.tight_layout()
    plt.savefig(f'./figures/eccn_rank_distribution_per_year_{rank_type}/rank_distribution_{rank_type}_{year}.png')
    plt.close()

# Here I use the functions to generate the distributions of the space related eccns accross the values
for i_year in range(2018, 2023):
    plot_eccn_stacked_ranking_distribution(top_value_eccns_df, i_year, space_related_eccns, "value")
    plot_eccn_stacked_ranking_distribution(top_count_eccns_df, i_year, space_related_eccns, "count")

### Count of countries where an ECCN is above the N rank as a function of time

Plots the number of countries where the ECCN is present in their top N ranking accross the years for which data is available

In [122]:
def plot_eccn_total_ranking_evolution(df, df_type, eccn_code, n_cuttoff):

    # Find number of countries represented in analysis
    unique_countries_per_year = np.array([i.size for i in unique_country_dataframes])
    max_countries_accounted_for = max(unique_countries_per_year)

    # Filter the data for the specific ECNNs
    # Also exlude World, EU and Africa
    df = df[df['ECCN Code'].isin(eccn_code)].copy()
    df.drop(df[df['Country'] == 'World'].index, inplace = True)
    df.drop(df[df['Country'] == 'EU'].index, inplace = True)
    df.drop(df[df['Country'] == 'Africa'].index, inplace = True)

    # Filter occurences where Rank is lower than N
    df = df[df["Rank"] < n_cuttoff]

    # Count unique occurences for each year and country
    df_2018 = df[df['Year'] == 2018]
    df_2019 = df[df['Year'] == 2019]
    df_2020 = df[df['Year'] == 2020]
    df_2021 = df[df['Year'] == 2021]
    df_2022 = df[df['Year'] == 2022] 

    top_countries_2018 = df_2018['Country'].drop_duplicates().to_numpy()
    present_2018 = df_2018['Country'].drop_duplicates().size
    top_countries_2019 = df_2019['Country'].drop_duplicates().to_numpy()
    present_2019 = df_2019['Country'].drop_duplicates().size
    top_countries_2020 = df_2020['Country'].drop_duplicates().to_numpy()
    present_2020 = df_2020['Country'].drop_duplicates().size
    top_countries_2021 = df_2021['Country'].drop_duplicates().to_numpy()
    present_2021 = df_2021['Country'].drop_duplicates().size
    top_countries_2022 = df_2022['Country'].drop_duplicates().to_numpy()
    present_2022 = df_2022['Country'].drop_duplicates().size
    present_values = np.array([present_2018, present_2019, present_2020, present_2021, present_2022])
    top_countries = [top_countries_2018, top_countries_2019, top_countries_2020, top_countries_2021, top_countries_2022]
    # Plotting

    years = (2018, 2019, 2020, 2021, 2022)
    year_counts = {
        "Present": present_values,
        "Not present": unique_countries_per_year - present_values,
        "No data": max_countries_accounted_for - unique_countries_per_year
    }
    
    fig, ax = plt.subplots()
    bottom = np.zeros(len(years))
    width = 0.5

    for category, year_count in year_counts.items():
        p = ax.bar(years, year_count, width, label=category, bottom=bottom)
        bottom += year_count

    plt.xlabel("Year")
    plt.ylabel("# of Countries")
    plt.title(f"# of countries where Space ECCNs are amongst\n their top {n_cuttoff} licensed export, by {df_type}.", fontsize=16)
    ax.yaxis.set_major_locator(MaxNLocator(integer=True))
    plt.legend(title="Is country:", bbox_to_anchor=(1.05, 1), loc='upper right')
    plt.savefig(f'./figures/number_of_space_eccn_countries_{df_type}.png')
    plt.close()

    return top_countries

print(plot_eccn_total_ranking_evolution(top_value_eccns_df, "value", space_related_eccns, 10))
print(plot_eccn_total_ranking_evolution(top_count_eccns_df, "count", space_related_eccns, 10))

[array(['France', 'Germany', 'India', 'Japan', 'Korea', 'Netherlands',
       'Taiwan', 'Uk'], dtype=object), array(['Brazil', 'Germany', 'Hong Kong', 'India', 'Japan', 'Russia',
       'United Kingdom'], dtype=object), array(['Germany', 'India', 'Japan', 'Korea', 'Russia'], dtype=object), array(['Australia', 'France', 'Germany', 'India', 'Japan', 'Korea',
       'Spain', 'United Kingdom'], dtype=object), array(['France', 'Germany', 'India', 'Indonesia', 'Japan', 'Netherlands',
       'Spain', 'Taiwan', 'United Kingdom'], dtype=object)]
[array(['Canada', 'France', 'India', 'Japan'], dtype=object), array(['India', 'Japan', 'Russia'], dtype=object), array(['Germany', 'Hong Kong', 'India', 'Japan', 'Russia'], dtype=object), array(['France', 'Germany', 'India', 'Japan', 'Mexico', 'Spain'],
      dtype=object), array(['Australia', 'France', 'India', 'Japan', 'Netherlands', 'Spain'],
      dtype=object)]


## License process data