In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import requests
import os
import sys
import requests

In [3]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Create a Spark session
spark = SparkSession.builder.appName("PivotGDPData") \
                    .config("spark.network.timeout", "600s") \
                    .config("spark.executor.heartbeatInterval", "120s") \
                    .config("spark.jars", "C:\Spark\jars\postgresql-42.6.0.jar") \
                    .getOrCreate()

In [4]:
# Base URL for the World Bank API
base_url = "http://api.worldbank.org/v2/country"
indicator = "NY.GDP.MKTP.CD"
params = {"format": "json"}

# Get the list of all country codes
response = requests.get(base_url, params=params)
data = response.json()
country_list = data[1]


In [5]:
# Create a list to hold the data
gdp_data = []

# Iterate through each country and replace "BRA" with the country code
for country in country_list:
    country_code = country["id"]
    
    # Construct the API URL with the specific country code
    api_url = f"{base_url}/{country_code}/indicator/{indicator}"
   
    
    # Make the API request
    response = requests.get(api_url, params=params)
    data = response.json()
    
    # Extract GDP data if available
    if data[1]:
        entries = data[1]
        country_name = entries[0]["country"]["value"]
        for entry in entries: 
            gdp_value = entry["value"]
            if gdp_value is not None:
                gdp_value = round(entry["value"] / 10**8, 3)
            else: 
                gdp_value = 0.0

            Year = entry["date"]
            
            if(int(Year) > 2011): 
                gdp_data.append((Year, country_name, country_code, float(gdp_value)))
            else: 
                continue


In [6]:
# Create a DataFrame from the collected data
columns = ["Year", "CountryName", "CountryCode", "GDPValue in 10 million"]
df = spark.createDataFrame(gdp_data, columns)
df.show()

+----+--------------------+-----------+----------------------+
|Year|         CountryName|CountryCode|GDPValue in 10 million|
+----+--------------------+-----------+----------------------+
|2022|               Aruba|        ABW|                35.447|
|2021|               Aruba|        ABW|                31.032|
|2020|               Aruba|        ABW|                25.589|
|2019|               Aruba|        ABW|                33.958|
|2018|               Aruba|        ABW|                32.762|
|2017|               Aruba|        ABW|                30.924|
|2016|               Aruba|        ABW|                29.836|
|2015|               Aruba|        ABW|                29.629|
|2014|               Aruba|        ABW|                27.908|
|2013|               Aruba|        ABW|                27.278|
|2012|               Aruba|        ABW|                26.152|
|2022|Africa Eastern an...|        AFE|             11851.377|
|2021|Africa Eastern an...|        AFE|             108

In [7]:
# Pivot the DataFrame to have separate columns for GDP values for each year
gdp_df = df.groupBy("CountryName", "CountryCode") \
               .pivot("Year") \
               .agg({"GDPValue in 10 million": "first"})

In [8]:
indicator_id = "NY.GNP.PCAP.CD"
gni_data = []

for country in country_list:
    country_code = country["id"]
    
    # Construct the API URL with the specific country code
    api_url = f"{base_url}/{country_code}/indicator/{indicator_id}"
   
    
    # Make the API request
    response = requests.get(api_url, params=params)
    data = response.json()

    if data[1]:
        entries = data[1]
        country_name = entries[0]["country"]["value"]
        for entry in entries: 
            gni_value = entry["value"]
            if gni_value is not None:
                gni_value = entry["value"] 
            else: 
                gni_value = 0.0

            Year = entry["date"]
            
            if(int(Year) > 2011): 
                gni_data.append((Year, country_name, country_code, float(gni_value)))
            else: 
                continue


In [12]:
columns = ["Year", "CountryName", "CountryCode", "GNI"]
df = spark.createDataFrame(gni_data, columns)
gni_df = df.na.fill(value=0,subset=["GNI"])
gni_df.show()

+----+--------------------+-----------+----------------+
|Year|         CountryName|CountryCode|             GNI|
+----+--------------------+-----------+----------------+
|2022|               Aruba|        ABW|         33410.0|
|2021|               Aruba|        ABW|         30180.0|
|2020|               Aruba|        ABW|         23340.0|
|2019|               Aruba|        ABW|         29450.0|
|2018|               Aruba|        ABW|         28910.0|
|2017|               Aruba|        ABW|         27960.0|
|2016|               Aruba|        ABW|         26200.0|
|2015|               Aruba|        ABW|         25320.0|
|2014|               Aruba|        ABW|         25520.0|
|2013|               Aruba|        ABW|         25500.0|
|2012|               Aruba|        ABW|         24440.0|
|2022|Africa Eastern an...|        AFE|1551.22636765834|
|2021|Africa Eastern an...|        AFE| 1468.7399071809|
|2020|Africa Eastern an...|        AFE|1394.37153646633|
|2019|Africa Eastern an...|    

In [10]:
gni_pivoted_df = gni_df.groupBy("CountryName", "CountryCode") \
               .pivot("Year") \
               .agg({"GNI": "first"})

# Show the pivoted DataFrame
gni_pivoted_df.show()

+--------------------+-----------+----------------+----------------+---------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+
|         CountryName|CountryCode|            2012|            2013|           2014|            2015|            2016|            2017|            2018|            2019|            2020|            2021|            2022|
+--------------------+-----------+----------------+----------------+---------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+
|              Bhutan|        BTN|          2330.0|          2350.0|         2400.0|          2470.0|          2600.0|          2820.0|          3070.0|          3360.0|          3030.0|          3290.0|             0.0|
|               Aruba|        ABW|         24440.0|         25500.0|        25520.0|         25320.0|         26200.

In [13]:
# Define the income group thresholds
low_income_threshold = 1045
lower_middle_income_threshold = 4095
upper_middle_income_threshold = 12695


# Create DataFrames for different income groups
low_income_df = gni_df.filter(gni_df["GNI"] <= low_income_threshold)
lower_middle_income_df = gni_df.filter((gni_df["GNI"] > low_income_threshold) & (gni_df["GNI"] <= lower_middle_income_threshold))
upper_middle_income_df = gni_df.filter((gni_df["GNI"] > lower_middle_income_threshold) & (gni_df["GNI"] <= upper_middle_income_threshold))
high_income_df = gni_df.filter(gni_df["GNI"] > upper_middle_income_threshold)

# Show the content of each DataFrame (for illustration purposes)
print("Low Income DataFrame:")

print("Lower Middle Income DataFrame:")
lower_middle_income_df.show()

print("Upper Middle Income DataFrame:")
upper_middle_income_df.show()

print("High Income DataFrame:")
high_income_df.show()

Low Income DataFrame:
Lower Middle Income DataFrame:
+----+--------------------+-----------+----------------+
|Year|         CountryName|CountryCode|             GNI|
+----+--------------------+-----------+----------------+
|2022|Africa Eastern an...|        AFE|1551.22636765834|
|2021|Africa Eastern an...|        AFE| 1468.7399071809|
|2020|Africa Eastern an...|        AFE|1394.37153646633|
|2019|Africa Eastern an...|        AFE|1503.60583375846|
|2018|Africa Eastern an...|        AFE|1466.42716824188|
|2017|Africa Eastern an...|        AFE| 1453.8870754489|
|2016|Africa Eastern an...|        AFE|1468.49955596585|
|2015|Africa Eastern an...|        AFE|1584.54598973533|
|2014|Africa Eastern an...|        AFE|1700.02530164649|
|2013|Africa Eastern an...|        AFE|1721.52058603829|
|2012|Africa Eastern an...|        AFE|1693.13397385954|
|2022|Africa Western an...|        AFW|1787.61377534841|
|2021|Africa Western an...|        AFW|1747.66336224696|
|2020|Africa Western an...|        

In [34]:
# Pivot the DataFrame
low_income_df = low_income_df.groupBy("CountryName", "CountryCode") \
                          .pivot("Year") \
                          .agg({"GNI": "first"})
                          

# Show the pivoted DataFrame before filling null values
low_income_df.show()


+--------------------+-----------+-----+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|         CountryName|CountryCode| 2012|  2013| 2014| 2015| 2016| 2017| 2018| 2019| 2020| 2021| 2022|
+--------------------+-----------+-----+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|              Bhutan|        BTN| null|  null| null| null| null| null| null| null| null| null|  0.0|
|     Channel Islands|        CHI|  0.0|   0.0|  0.0|  0.0|  0.0|  0.0|  0.0|  0.0|  0.0|  0.0|  0.0|
|             Andorra|        AND|  0.0|   0.0|  0.0|  0.0|  0.0|  0.0|  0.0| null| null| null|  0.0|
|             Burundi|        BDI|240.0| 240.0|250.0|250.0|250.0|250.0|240.0|230.0|220.0|220.0|240.0|
|         Afghanistan|        AFG|640.0| 670.0|640.0|600.0|560.0|530.0|520.0|520.0|490.0|380.0|  0.0|
|      American Samoa|        ASM|  0.0|   0.0|  0.0|  0.0|  0.0|  0.0|  0.0|  0.0|  0.0|  0.0|  0.0|
|          Bangladesh|        BGD|970.0|1030.0| null| null| null| null| null| null

In [35]:
# Pivot the DataFrame
lower_middle_income_df = lower_middle_income_df.groupBy("CountryName", "CountryCode") \
                          .pivot("Year") \
                          .agg({"GNI": "first"})
                          

# Show the pivoted DataFrame before filling null values
lower_middle_income_df.show()


+--------------------+-----------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+
|         CountryName|CountryCode|            2012|            2013|            2014|            2015|            2016|            2017|            2018|            2019|            2020|            2021|            2022|
+--------------------+-----------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+
|              Bhutan|        BTN|          2330.0|          2350.0|          2400.0|          2470.0|          2600.0|          2820.0|          3070.0|          3360.0|          3030.0|          3290.0|            null|
|          Azerbaijan|        AZE|            null|            null|            null|            null|          

In [37]:
upper_middle_income_df = upper_middle_income_df.groupBy("CountryName", "CountryCode") \
               .pivot("Year") \
               .agg({"GNI": "first"})

# Show the pivoted DataFrame
upper_middle_income_df.show()

+--------------------+-----------+----------------+----------------+---------------+---------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+
|         CountryName|CountryCode|            2012|            2013|           2014|           2015|            2016|            2017|            2018|            2019|            2020|            2021|            2022|
+--------------------+-----------+----------------+----------------+---------------+---------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+
|          Azerbaijan|        AZE|          6480.0|          7450.0|         7740.0|         6610.0|          4790.0|          4110.0|            null|          4510.0|          4480.0|          4910.0|          5660.0|
|Bosnia and Herzeg...|        BIH|          4900.0|          5160.0|         5170.0|         5130.0|          5060.0|   

In [38]:
high_income_df = high_income_df.groupBy("CountryName", "CountryCode") \
               .pivot("Year") \
               .agg({"GNI": "first"})

# Show the pivoted DataFrame
high_income_df.show()

+--------------------+-----------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+
|         CountryName|CountryCode|            2012|            2013|            2014|            2015|            2016|            2017|            2018|            2019|            2020|            2021|            2022|
+--------------------+-----------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+
|               Aruba|        ABW|         24440.0|         25500.0|         25520.0|         25320.0|         26200.0|         27960.0|         28910.0|         29450.0|         23340.0|         30180.0|         33410.0|
|        Bahamas, The|        BHS|         27170.0|         26210.0|         26520.0|         26760.0|         2

In [39]:
from pyspark.sql.functions import col, when

# List of columns starting from the third column (excluding 'CountryName' and 'CountryCode')
columns_to_fill = low_income_df.columns[2:]

# Replace null values with 0 in the specified columns
for col_name in columns_to_fill:
    low_income_df = low_income_df.withColumn(col_name, when(col(col_name).isNull(), 0).otherwise(col(col_name)))
    lower_middle_income_df = lower_middle_income_df.withColumn(col_name, when(col(col_name).isNull(), 0).otherwise(col(col_name)))
    upper_middle_income_df = upper_middle_income_df.withColumn(col_name, when(col(col_name).isNull(), 0).otherwise(col(col_name)))
    high_income_df =  high_income_df.withColumn(col_name, when(col(col_name).isNull(), 0).otherwise(col(col_name)))



In [40]:
selected_column = "CountryName"
low_countries = low_income_df.select(selected_column).distinct()
low_countries.show()

+--------------------+
|         CountryName|
+--------------------+
|         Afghanistan|
|             Andorra|
|      American Samoa|
|             Burundi|
|        Burkina Faso|
|          Bangladesh|
|Central African R...|
|              Bhutan|
|     Channel Islands|
+--------------------+



In [23]:
selected_column = "CountryName"
lower_middle_countries = lower_middle_income_df.select(selected_column).distinct()
lower_middle_countries.show()

+--------------------+
|         CountryName|
+--------------------+
|              Angola|
|Africa Eastern an...|
|Africa Western an...|
|             Armenia|
|               Benin|
|          Azerbaijan|
|          Bangladesh|
|             Bolivia|
|              Bhutan|
+--------------------+



In [24]:
selected_column = "CountryName"
lower_countries = low_income_df.select(selected_column).distinct()
lower_countries.show()

+--------------------+
|         CountryName|
+--------------------+
|         Afghanistan|
|             Andorra|
|      American Samoa|
|             Burundi|
|        Burkina Faso|
|          Bangladesh|
|Central African R...|
|              Bhutan|
|     Channel Islands|
+--------------------+



In [25]:
selected_column = "CountryName"
upper_middle_countries = upper_middle_income_df.select(selected_column).distinct()
upper_middle_countries.show()

+--------------------+
|         CountryName|
+--------------------+
|              Angola|
|           Argentina|
|             Albania|
|          Arab World|
|             Armenia|
|          Azerbaijan|
|Bosnia and Herzeg...|
|            Bulgaria|
|             Belarus|
|              Belize|
|              Brazil|
|            Botswana|
+--------------------+



In [26]:
selected_column = "CountryName"
high_income_countries = high_income_df.select(selected_column).distinct()
high_income_countries.show()

+--------------------+
|         CountryName|
+--------------------+
|               Aruba|
|           Argentina|
|             Andorra|
|United Arab Emirates|
| Antigua and Barbuda|
|           Australia|
|             Austria|
|             Belgium|
|        Bahamas, The|
|            Bulgaria|
|             Bahrain|
|              Brazil|
|             Bermuda|
|   Brunei Darussalam|
|            Barbados|
|               Chile|
|Central Europe an...|
|         Switzerland|
|              Canada|
+--------------------+



In [1]:
import matplotlib.pyplot as plt

# Assuming you have two DataFrames: gdp_df and gni_df, each containing the GDP and GNI data respectively
# Assuming the columns in the DataFrames are named similar to the year columns in your pivoted DataFrames

# List of columns representing years
year_columns = ['2012', '2013', '2014', '2015', '2016', '2017', '2018', '2019', '2020', '2021', '2022']

# Extracting country names from the DataFrame high_income_countries
countries_to_plot = high_income_countries.select('CountryName').rdd.flatMap(lambda x: x).collect()

# Plotting both GDP and GNI for selected countries over different years
for country in countries_to_plot:
    # Get GDP and GNI values for the country from the respective DataFrames
    gdp_values = gdp_df.filter(gdp_df['CountryName'] == country).select(year_columns).collect()[0]
    gni_values = gni_df.filter(gni_df['CountryName'] == country).select(year_columns).collect()[0]

    # Plotting GDP and GNI values
    plt.plot(year_columns, gdp_values, label=f'{country} - GDP')
    plt.plot(year_columns, gni_values, label=f'{country} - GNI')

# Customize the plot
plt.xlabel('Year')
plt.ylabel('Value')
plt.title('GDP and GNI Comparison')
plt.legend()
plt.grid(True)
plt.xticks(year_columns)  # Set x-axis ticks to the years

# Show the plot
plt.tight_layout()
plt.show()


NameError: name 'high_income_countries' is not defined

In [None]:
jdbc_url = "jdbc:postgresql://localhost:59064/worldBank"

# Connection properties
properties = {
    "user": "",
    "password": "",
    "driver": ""
}

# Save DataFrame to Postgres
gdp_df.write.jdbc(url=jdbc_url, table="gdp", mode="overwrite", properties=properties)

gni_df.write.jdbc(url=jdbc_url, table="gni", mode="overwrite", properties=properties)

low_income_df.write.jdbc(url=jdbc_url, table="low_income", mode="overwrite", properties=properties)

lower_middle_income_df.write.jdbc(url=jdbc_url, table="low_middle_income", mode="overwrite", properties=properties)

upper_middle_income_df.write.jdbc(url=jdbc_url, table="upper_middle_income", mode="overwrite", properties=properties)

high_income_df.write.jdbc(url=jdbc_url, table="high_income", mode="overwrite", properties=properties)

lower_middle_countries.write.jdbc(url=jdbc_url, table="low_middle_income_countries", mode="overwrite", properties=properties)

low_countries.write.jdbc(url=jdbc_url, table="low_income_countries", mode="overwrite", properties=properties)

upper_middle_countries.write.jdbc(url=jdbc_url, table="upper_middle_income_countries", mode="overwrite", properties=properties)

high_income_countries.write.jdbc(url=jdbc_url, table="high_income_countries", mode="overwrite", properties=properties)
