In [0]:
# IMPORT ALL NECESSARY LIBRARIES
import pandas as pd # Dataframe
import json # Parsing json to object
import requests # Making HTTP get requests
from pyspark.sql import SparkSession

In [0]:
gender_indicators = [
    "FIN21.T.D.2017.1","FIN21.T.D.2017.2","FIN21.T.D.2017","SG.GEN.PARL.ZS",
    "SG.GEN.MNST.ZS","SE.SEC.ENRR.FE","UIS.FGP.5T8.F600","SL.TLF.CACT.FE.ZS",
    "SG.LAW.NODC.HR","SG.OWN.LDAL.FE.ZS","SG.OPN.BANK.EQ","SG.CNT.SIGN.EQ",
    "SP.DYN.SMAM.FE","SP.DYN.SMAM.MA","SP.M15.2024.FE.ZS","SP.M18.2024.FE.ZS",
    "SG.VAW.1549.ME.ZS","SG.VAW.15PL.ME.ZS","SG.VAW.1549.LT.ME.ZS","SG.VAW.15PL.LT.ME.ZS",
    "SG.LEG.DVAW","SH.STA.MMRT","SH.STA.MMRT.NE","SP.DYN.LE00.FE.IN","SP.DYN.LE00.MA.IN","SP.DYN.LE00.IN"
]

In [0]:
def generate_indicator_info():
    # Initialize an index to track the row number in the DataFrame
    index = 0;

    # Create an empty DataFrame with specific column names
    df = pd.DataFrame(columns=["Index", "Series", "Label", "Description"])

    # Iterate over a list of gender indicators
    for indicator in gender_indicators:
        # Construct the API URL for each indicator
        url = f"https://api.worldbank.org/v2/indicator/{indicator}?format=json"

        # Send a GET request to the World Bank API
        response = requests.get(url)

        # Check if the response is successful (HTTP status code 200)
        if response.status_code == 200:
            # Parse the JSON response
            data = json.loads(response.text)

            # Extract the label and description from the response
            label = data[1][0]["name"]
            description = data[1][0]["sourceNote"]

            # Add the extracted data to the DataFrame
            df.loc[index] = { "Index": index + 1, "Series": indicator, "Label": label, "Description": description }

            # Increment the index for the next row in the DataFrame
            index = index + 1
        else:
            # Raise an exception if the API call fails
            raise Exception(f"Failed to download indicator information {indicator_name}")

    # Return the populated DataFrame
    return df


In [0]:
df = generate_indicator_info()
df.head()

In [0]:
# READ THE AZURE SQL DB CREDENTIALS FROM DATABRICKS SECRETS

# Retrieve the SQL DB username and password from Azure Key Vault using Databricks Secret Scope Management
sql_uname = dbutils.secrets.get(scope="cirdb-username", key="cirdb-username")
sql_pwd = dbutils.secrets.get(scope="cirdb-username", key="cirdb-pwd")

#Retrieve the JDBC URL for the SQL database from Databricks secrets
jdbc_url = dbutils.secrets.get(scope="cirdb-username",key ="cirdb-jdbc-url")

# Set up connection properties for the SQL database
connection_properties = {
    "user": sql_uname,
    "password": sql_pwd,
    # The JDBC driver required for connecting to the SQL database
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

In [0]:
# Create an instance of SparkSession, which is the entry point for programming Spark with the Dataset and DataFrame API.
spark = SparkSession.builder \
        .getOrCreate()

In [0]:
# WRITE DATA TO SQL DB
df_spark = spark.createDataFrame(df)
df_spark.write.jdbc(url=jdbc_url, table="dbricks_worldbank_gender_ref", mode="overwrite", properties=connection_properties)

In [0]:
def generate_markdown(df):
    markdown = "## Indicator Details\n"
    markdown += "| Index | Series | Label | Description |\n"
    markdown += "| :---: | :--- | :--- | :--- |\n"
    for index, row in df.iterrows():
        markdown += f"| {row['Index']} | {row['Series']} | {row['Label']} | {row['Description']} |\n"
    return markdown
    

In [0]:
indicator_info_markdown = generate_markdown(df)
print(indicator_info_markdown)