In [2]:
%%capture
!conda install -y fastprogress

In [3]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import pickle
from fastprogress.fastprogress import master_bar, progress_bar
from collections import Counter, defaultdict
import boto3, io, time, os
from pathlib import Path

import warnings
warnings.filterwarnings("ignore")

In [4]:
root_path = '/root/zhaosen_data/'
data_path = root_path + "data/"
visualizations_path = root_path + "visualizations/"
data_specific_path = f"{root_path}data_specific/"
visualizations_specific = f"{root_path}visualizations_specific/"

In [5]:
# scrape_years = [2015]
# scrape_months = ["05"]

scrape_years = [yr for yr in range(2015, 2023)]
scrape_months = ['{:02d}'.format(mo) for mo in range(1, 13)] 
df_iter_dict = {}

# This plot size is required for because of the high number of countries in the plot.
# Please change it to a smaller figsize if required for other plots.
sns.set(rc={'figure.figsize':(25,100)})

In [10]:
def return_all_files(bucket, parquet_folder_location):
    
    '''
    Return a list of all parquet files in the s3 folder
    Input: 
        bucket: s3 bucket location
        parquet_folder_location: Location of parquet files inside the s3 bucket
    Output:
        files_list: List of tuples pairs of (bucket,file_name)
    '''
    
    s3_client = boto3.Session()
    s3 = s3_client.resource('s3')
    my_bucket = s3.Bucket(bucket)
    files_list = [(file.bucket_name, file.key) for file in my_bucket.objects.filter(Prefix = parquet_folder_location)]
    return files_list

def read_parquet_file(bucket, file_key):
    
    '''
    Function that reads a given parquet file and returns the dataframe associated with it
    Input: 
        bucket: s3 bucket location
        file_key: file_name of parquet files inside the s3 bucket
    Output:
        df: Dataframe of the stored file contents
    '''
    
    engine = 'pyarrow'
    s3_client = boto3.client('s3')
    obj = s3_client.get_object(Bucket=bucket, Key=file_key)
    file = io.BytesIO(obj['Body'].read() )
    df = pd.read_parquet(file)
    return df

# Convert the themes and orgs into lists so that they can be grouped
def get_org_theme_list(row, extract = "organization"):
    if row is None or len(row) == 0:
        return []
    answer = []
    for item in row:
        answer.append(item[extract])
    return answer

# aggregate the dfs
def agg_dfs(new_df, old_df = None):
    final_df = new_df if old_df is None else pd.concat([old_df, new_df])
    return final_df.groupby(['country', 'Predicted_Sector']).agg({'organizations': 'sum', 'themes': 'sum', 'tone': 'sum', 'group_size': 'sum'}).reset_index()



def save_df_to_s3(df, file_location, year, month, file_format):
    S3_CLIENT = boto3.client('s3')
    S3_BUCKET = 'sector-classification'
    
    assert file_format in ("csv", "parquet"), "File format must be in {csv, parquet}"

    file_name = f"tones{year}{month}.{file_format}"
    file_key = file_location + "/"  + file_name
    if file_format == "csv":
        with io.StringIO() as buffer:
            df.to_csv(buffer, index=False)
            response = S3_CLIENT.put_object(Bucket = S3_BUCKET, Key = file_key, Body = buffer.getvalue())
    else:
        with io.BytesIO() as buffer:
            df.to_parquet(buffer, index=False)
            response = S3_CLIENT.put_object(Bucket = S3_BUCKET, Key = file_key, Body = buffer.getvalue())

    status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")
    success = "Successful" if status == 200 else "Unsuccessful"
    print(f"\nCSV | {success} S3 put_object response. Status - {status}\n")

In [11]:
for year in scrape_years:
    curr_viz_path = f"{visualizations_path}{year}/"
    Path(curr_viz_path).mkdir(parents=True, exist_ok=True)
    
    for month in scrape_months:
        if year == 2015 and int(month) < 3:
            continue
        if year == 2022 and int(month) > 10:
            continue
        curr_df_path = f"{data_path}{year}/{month}/"
        Path(curr_df_path).mkdir(parents=True, exist_ok=True)
        files_list = return_all_files('sector-classification', f"Bert_Results_Africa_2015_2023/{year}/{month}")
        if len(files_list) > 0:
            df_iter_dict[(year, month)] = files_list

In [12]:
# fips_df = pd.read_excel("countries_to_consider.xls")
# fips_df = fips_df[fips_df['ccode'].notna()]
# countries_df = dict(zip(fips_df.fips, fips_df.Countryname))           

# fips_list_to_consider = list(countries_df.keys())
for key, value in df_iter_dict.items():

    year, month = key
    curr_df_path = f"{data_path}{year}/{month}/"
    curr_viz_path = f"{visualizations_path}{year}/"

    fp = Path(f"{curr_df_path}heatmap_df.pkl")
    # if fp.is_file():
    #     print(f"{month} {year} has already been processed. Skipping..")
    #     continue
    heatmap_df = None
    print(f"Fetching {len(value)} files for {month} {year}")
    t0 = time.time()
    for buck, curr_file in progress_bar(value):
        if not curr_file.endswith(".parquet"):
            print(f"{curr_file} not valid, skipping..")
            continue
        df = read_parquet_file(buck, curr_file)
        df = df[["organizations", "themes", "tone", "country", "Predicted_Sector"]]
        df["tone"] = df["tone"].apply(lambda x: x["tone"])
        df = df.astype({'tone' : 'float64'})
        # df = df[df['country'].isin(fips_list_to_consider)]
        df['organizations'] = df['organizations'].apply(lambda row: get_org_theme_list(row))
        df['themes'] = df['themes'].apply(lambda row: get_org_theme_list(row, extract = "theme"))
        df["group_size"] = 1
        df = agg_dfs(df)
        heatmap_df = df.copy() if heatmap_df is None else agg_dfs(df.copy(), heatmap_df)
        del df
    print("Total time taken: ",round((time.time() - t0)/60, 3), "mins")
    heatmap_df["average_tone"] = heatmap_df.apply(lambda row: round((row.tone / row.group_size),3), axis=1)
    heatmap_df["country_name"] = 0
    heatmap_df["organizations"] = heatmap_df["organizations"].apply(lambda x: Counter(x))
    heatmap_df["themes"] = heatmap_df["themes"].apply(lambda x: Counter(x))
    # heatmap_df.to_pickle(f"{curr_df_path}heatmap_df.pkl")
    # heatmap_df = heatmap_df[['country', 'Predicted_Sector', 'average_tone']]
    # pivot = heatmap_df.pivot(index='country', columns='Predicted_Sector', values='average_tone')
    # pivot.to_pickle(f"{curr_df_path}heatmap_pivot_df.pkl")
    # print(f"Data Saved to: {curr_df_path}heatmap_pivot_df.pkl")
    # print("Data stored. Saving Heatmap..")
    # plt.clf()
    # ax = sns.heatmap(pivot, annot=True, cmap="Blues_r")
    # plt.savefig(f"{curr_viz_path}{month.lower()}_heatmap.png", bbox_inches='tight')
    # del pivot, heatmap_df
    
#     current_month_to_plot, current_year_to_plot = month, year

#     year, month = current_year_to_plot, current_month_to_plot
    # curr_only_df_path = f"{data_path}{year}/{month}"
    # curr_viz_path = f"{visualizations_specific}{year}/{month}/"
    # Path(curr_df_path).mkdir(parents=True, exist_ok=True)
    # fp = Path(f"{curr_only_df_path}/heatmap_df.pkl")

    grouped_df = heatmap_df.copy()

    tones_df = grouped_df.drop(['organizations', 'themes', 'tone', 'country_name'], axis=1)
    tones_df["Month"] = int(month)
    tones_df["Year"] = year

    # Pivot the 'predicted_sector' column into new columns based on 'group_size' and 'average_tone'
    pivot_df = tones_df.pivot_table(index=['country', 'Month', 'Year'], columns='Predicted_Sector', values=['group_size', 'average_tone'])

    # Flatten multi-level column index
    pivot_df.columns = [' '.join(col[::-1]).strip() for col in pivot_df.columns.values]

    # Add two new columns for the sum of group_size and the weighted average of average_tone
    pivot_df['total'] = pivot_df.filter(regex='group_size').sum(axis=1)
    pivot_df['total_average'] = pivot_df.filter(regex='average_tone').multiply((pivot_df.filter(regex='group_size')).values).sum(axis=1) / pivot_df['total']

    pivot_df = pivot_df.reset_index()

    save_df_to_s3(df=pivot_df, file_location='Bert_Results_Africa_2015_2023/aggregated_results', year=year, month=month, file_format="csv")
    # pivot_df.to_csv(f"{root_path}/final_data/tones{year}{month}.csv", index=False)


Fetching 30 files for 05 2015


Total time taken:  3.332 mins

CSV | Successful S3 put_object response. Status - 200



In [34]:

# for year in scrape_years:
#     for month in scrape_months:
#         if year == 2015 and int(month) < 3:
#             continue
#         if year == 2022 and int(month) > 10:
#             continue

