**Objective**: to learn the PySpark (SQL module) tools by analyzing the structure and data of a big dataset.

**Data:** World Development Indicators, The World Bank ([sourse](https://datacatalog.worldbank.org/dataset/world-development-indicators)).

## 1. Setup

### 1.1. Installing dependencies


In [71]:
# Install pyspark to use Spark
!pip install -q pyspark

In [72]:
# Mount Google Drive 
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


### 1.2. Import of modules required for operation

In [73]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql.types import DecimalType

In [74]:
# Create a SparkSession
spark = (SparkSession.builder
                     .master('local') 
                     .appName("WDI Analysis") 
                     .getOrCreate())

## 2. Load and view the structure of files with data

For the analysis, we will to use one main data file ("WDIData") and combine it with some columns from two other files ("WDICountry" and "WDISeries"), which refine the information about countries and indicators, respectively. 


Let's create a function that will output the size and schema of DataFrames, and consider each of the files separately.

In [75]:
def show_size_shema(df):
    """Show the size and schema of a DataFrame."""
    print("Rows: {0}, columns: {1}".format(df.count(), len(df.columns)))
    df.printSchema()

> *WDI is an acronym for World Development Indicators.*


### 2.1. The "WDIData" file

In [76]:
# Create a WDI DataFrame
wdi_path = '/content/drive/MyDrive/Colab_Notebooks/WDI_Project/WDI_csv/WDIData.csv'
df_wdi = spark.read.csv(wdi_path, inferSchema=True, header=True)

In [77]:
show_size_shema(df_wdi)

Rows: 380160, columns: 66
root
 |-- Country Name: string (nullable = true)
 |-- Country Code: string (nullable = true)
 |-- Indicator Name: string (nullable = true)
 |-- Indicator Code: string (nullable = true)
 |-- 1960: double (nullable = true)
 |-- 1961: double (nullable = true)
 |-- 1962: double (nullable = true)
 |-- 1963: double (nullable = true)
 |-- 1964: double (nullable = true)
 |-- 1965: double (nullable = true)
 |-- 1966: double (nullable = true)
 |-- 1967: double (nullable = true)
 |-- 1968: double (nullable = true)
 |-- 1969: double (nullable = true)
 |-- 1970: double (nullable = true)
 |-- 1971: double (nullable = true)
 |-- 1972: double (nullable = true)
 |-- 1973: double (nullable = true)
 |-- 1974: double (nullable = true)
 |-- 1975: double (nullable = true)
 |-- 1976: double (nullable = true)
 |-- 1977: double (nullable = true)
 |-- 1978: double (nullable = true)
 |-- 1979: double (nullable = true)
 |-- 1980: double (nullable = true)
 |-- 1981: double (nullable = tru

In [78]:
# Show the first 5 rows of the DataFrame in tabular form
df_wdi.select(df_wdi.columns[:7] + df_wdi.columns[-5:]).show(5)

+------------+------------+--------------------+-----------------+----+----+----+----------------+----------------+----+----+----+
|Country Name|Country Code|      Indicator Name|   Indicator Code|1960|1961|1962|            2017|            2018|2019|2020|_c65|
+------------+------------+--------------------+-----------------+----+----+----+----------------+----------------+----+----+----+
|  Arab World|         ARB|Access to clean f...|   EG.CFT.ACCS.ZS|null|null|null|            null|            null|null|null|null|
|  Arab World|         ARB|Access to electri...|   EG.ELC.ACCS.ZS|null|null|null|90.2836375587196| 89.286856223316|null|null|null|
|  Arab World|         ARB|Access to electri...|EG.ELC.ACCS.RU.ZS|null|null|null|81.1021335972971|79.2481000553718|null|null|null|
|  Arab World|         ARB|Access to electri...|EG.ELC.ACCS.UR.ZS|null|null|null|97.4679148475457|97.0639592990141|null|null|null|
|  Arab World|         ARB|Account ownership...|   FX.OWN.TOTL.ZS|null|null|null| 3

We can see that our first (main) DataFrame (file) contains information about the values of indicators by country and year, and in the "Country" column there are not only the names of countries but also more general values. For simplicity, in the future we will call them all "countries", i.e. by the name of the column.

Let's see how many different countries and indicators there are in the DataFrame.

In [79]:
df_wdi.select('Country Code').distinct().count()

264

According to the overview of the datasets on the source site, the number of economies in it is 217, which means that 47 country names are not countries in themselves, but some generalized or other names.

In [80]:
df_wdi.select('Indicator Code').distinct().count()

1440

### 2.2. The "WDICountry" file

In [81]:
# Create a DataFrame of countries
country_path = '/content/drive/MyDrive/Colab_Notebooks/WDI_Project/WDI_csv/WDICountry.csv'
df_country = spark.read.csv(
    country_path, inferSchema=True, header=True, multiLine=True)

In [82]:
show_size_shema(df_country)

Rows: 263, columns: 31
root
 |-- Country Code: string (nullable = true)
 |-- Short Name: string (nullable = true)
 |-- Table Name: string (nullable = true)
 |-- Long Name: string (nullable = true)
 |-- 2-alpha code: string (nullable = true)
 |-- Currency Unit: string (nullable = true)
 |-- Special Notes: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Income Group: string (nullable = true)
 |-- WB-2 code: string (nullable = true)
 |-- National accounts base year: string (nullable = true)
 |-- National accounts reference year: integer (nullable = true)
 |-- SNA price valuation: string (nullable = true)
 |-- Lending category: string (nullable = true)
 |-- Other groups: string (nullable = true)
 |-- System of National Accounts: string (nullable = true)
 |-- Alternative conversion factor: string (nullable = true)
 |-- PPP survey year: string (nullable = true)
 |-- Balance of Payments Manual in use: string (nullable = true)
 |-- External debt Reporting status: string (nu

We can see that this DataFrame (file) contains various clarifying information about countries.

Let's check how many countries are indicated in it.

In [83]:
df_country.select('Country Code').distinct().count()

263

The number is also greater than 217, which means that, as in the previous DataFrame, it contains information not only about the countries themselves but also in some other form.

In [84]:
# Show the last 7 rows of the DataFrame as a list of Rows
df_country.select('Country Code', 'Short Name').tail(7)

[Row(Country Code='WLD', Short Name='World'),
 Row(Country Code='WSM', Short Name='Samoa'),
 Row(Country Code='XKX', Short Name='Kosovo'),
 Row(Country Code='YEM', Short Name='Yemen'),
 Row(Country Code='ZAF', Short Name='South Africa'),
 Row(Country Code='ZMB', Short Name='Zambia'),
 Row(Country Code='ZWE', Short Name='Zimbabwe')]

Even from the list shown, we can see that the dataset also contains information about common regions that unite several countries.

### 2.3. The "WDISeries" file

In [85]:
# Create a DataFrame of indicators (series)
series_path = '/content/drive/MyDrive/Colab_Notebooks/WDI_Project/WDI_csv/WDISeries.csv'
df_series = spark.read.csv(
    series_path, inferSchema=True, header=True, multiLine=True, escape="\"")

In [86]:
show_size_shema(df_series)

Rows: 1440, columns: 21
root
 |-- Series Code: string (nullable = true)
 |-- Topic: string (nullable = true)
 |-- Indicator Name: string (nullable = true)
 |-- Short definition: string (nullable = true)
 |-- Long definition: string (nullable = true)
 |-- Unit of measure: string (nullable = true)
 |-- Periodicity: string (nullable = true)
 |-- Base Period: string (nullable = true)
 |-- Other notes: string (nullable = true)
 |-- Aggregation method: string (nullable = true)
 |-- Limitations and exceptions: string (nullable = true)
 |-- Notes from original source: string (nullable = true)
 |-- General comments: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Statistical concept and methodology: string (nullable = true)
 |-- Development relevance: string (nullable = true)
 |-- Related source links: string (nullable = true)
 |-- Other web links: string (nullable = true)
 |-- Related indicators: string (nullable = true)
 |-- License Type: string (nullable = true)
 |-- _c20

We can see that this DataFrame (file) contains various clarifying information about indicators.

In [87]:
df_series.select('Series Code').distinct().count()

1440

The number of unique indicators is the same as the number in the WDI DataFrame.

In [88]:
# Show the first 5 rows of the DataFrame without truncation 
df_series.select('Indicator Name', 'Topic').show(5, False)

+-------------------------------------------------------------+------------------------------------+
|Indicator Name                                               |Topic                               |
+-------------------------------------------------------------+------------------------------------+
|Agricultural machinery, tractors                             |Environment: Agricultural production|
|Fertilizer consumption (% of fertilizer production)          |Environment: Agricultural production|
|Fertilizer consumption (kilograms per hectare of arable land)|Environment: Agricultural production|
|Agricultural land (sq. km)                                   |Environment: Land use               |
|Agricultural land (% of land area)                           |Environment: Land use               |
+-------------------------------------------------------------+------------------------------------+
only showing top 5 rows



### 2.4. Preparing to merge the DataFrames

To clarify the available data on countries and indicators in the WDI DataFrame, we will add additional information from the "Region" and "Income Group" columns of the DataFrame of countries and the "Topic" column of the DataFrame of indicators. 

Let's see what data is contained in these selected columns.


In [89]:
df_country.select('Region', 'Income Group').show(7, False)

+--------------------------+-------------------+
|Region                    |Income Group       |
+--------------------------+-------------------+
|Latin America & Caribbean |High income        |
|South Asia                |Low income         |
|Sub-Saharan Africa        |Lower middle income|
|Europe & Central Asia     |Upper middle income|
|Europe & Central Asia     |High income        |
|null                      |null               |
|Middle East & North Africa|High income        |
+--------------------------+-------------------+
only showing top 7 rows



In [90]:
df_series.select('Topic').limit(5).collect()

[Row(Topic='Environment: Agricultural production'),
 Row(Topic='Environment: Agricultural production'),
 Row(Topic='Environment: Agricultural production'),
 Row(Topic='Environment: Land use'),
 Row(Topic='Environment: Land use')]

The "Region" and "Income Group" contain information about the region and an income group to which each country belongs, and the "Topic" column contains information about the topics to which each indicator belongs.

As we saw earlier, the number of countries in the WDI DataFrame and the DataFrame of countries is not the same. First, let's see what this mismatched value is. To compare the data, we will use the "Country Code" column, which is present in both of these DataFrames.

In [91]:
# View the number of country names contained in both of these DataFrames 
# (with the removal of duplicates)
df_wdi.select('Country Code').intersect(
    df_country.select('Country Code')).count()

263

In [92]:
# View a country code that is in the WDI DataFrame but is not in the Dataframe 
# of countries (without the removal of duplicates)
(df_wdi.select('Country Code').distinct()
    .exceptAll(df_country.select('Country Code')).show())

+------------+
|Country Code|
+------------+
|         INX|
+------------+



In [93]:
# View what values in other columns
df_wdi.where(F.col('Country Code') == 'INX').select(df_wdi.columns[:7]).show(5)

+--------------+------------+--------------------+-----------------+----+----+----+
|  Country Name|Country Code|      Indicator Name|   Indicator Code|1960|1961|1962|
+--------------+------------+--------------------+-----------------+----+----+----+
|Not classified|         INX|Access to clean f...|   EG.CFT.ACCS.ZS|null|null|null|
|Not classified|         INX|Access to electri...|   EG.ELC.ACCS.ZS|null|null|null|
|Not classified|         INX|Access to electri...|EG.ELC.ACCS.RU.ZS|null|null|null|
|Not classified|         INX|Access to electri...|EG.ELC.ACCS.UR.ZS|null|null|null|
|Not classified|         INX|Account ownership...|   FX.OWN.TOTL.ZS|null|null|null|
+--------------+------------+--------------------+-----------------+----+----+----+
only showing top 5 rows



In [94]:
# Count the number of rows with values in the annual columns 
# by deleting all empty rows in them
(df_wdi.where(F.col('Country Code') == 'INX')
       .dropna('all', subset=df_wdi.columns[4:])
       .select('*').count())

0

The country code that is added to the WDI DataFrame does not have any indicator values, so we can delete all rows with the country code value "INX".

Let's check if there are missing values in the "Region", "Income Group", "Topic" columns and why they are empty.

In [95]:
# Count the number of missing values
for ncols in ['Region', 'Income Group']:
    print(f"Missing values in '{ncols}': ", df_country.where(df_country[ncols].isNull()).count())
print("Missing values in 'Topic': ", df_series.where(df_series['Topic'].isNull()).count())

Missing values in 'Region':  46
Missing values in 'Income Group':  46
Missing values in 'Topic':  13


In [96]:
# See how many countries do not have values in the "Region" 
# and "Income Group" columns
(df_country
    .filter(df_country['Income Group'].isNull() | df_country['Region'].isNull())
    .select('Country Code')
    .distinct()
    .count())

46

In [97]:
# Show the names of these countries
(df_country
    .filter(df_country['Region'].isNull())
    .select('Short Name', 'Country Code')
    .distinct()
    .collect())

[Row(Short Name='Euro area', Country Code='EMU'),
 Row(Short Name='Low income', Country Code='LIC'),
 Row(Short Name='Middle East & North Africa', Country Code='MEA'),
 Row(Short Name='Least developed countries: UN classification', Country Code='LDC'),
 Row(Short Name='Middle East & North Africa (excluding high income)', Country Code='MNA'),
 Row(Short Name='Middle East & North Africa (IDA & IBRD)', Country Code='TMN'),
 Row(Short Name='Europe & Central Asia (excluding high income)', Country Code='ECA'),
 Row(Short Name='East Asia & Pacific (IDA & IBRD)', Country Code='TEA'),
 Row(Short Name='IDA total', Country Code='IDA'),
 Row(Short Name='Middle income', Country Code='MIC'),
 Row(Short Name='Arab World', Country Code='ARB'),
 Row(Short Name='Latin America & Caribbean (excluding high income)', Country Code='LAC'),
 Row(Short Name='Sub-Saharan Africa (excluding high income)', Country Code='SSA'),
 Row(Short Name='Pacific island small states', Country Code='PSS'),
 Row(Short Name='IBRD

As we assumed earlier, the "Country Code" column of DataFrame of countries, and therefore the "Country Name" column of the WDI DataFrame, includes not only the names of the countries themselves, but also the names of regions, groups of countries by their income, and much more.

Since we decided to change the original dataset by adding the names of regions and income groups as columns, we can delete this data in the row view, except for the values "World", by filling in the missing values in the corresponding columns "Region" and "Income group" with the word "World".


In [98]:
# Add rows with filled in missing values for the "World"
df_country_w = (df_country
                .unionByName(df_country
                                .filter(df_country['Country Code'] == 'WLD')
                                .fillna({'Region': 'World', 
                                         'Income group': 'World'})))

In [99]:
(df_country_w
    .filter(df_country_w['Country Code'] == 'WLD')
    .select('Country Code', 'Region')
    .show())

+------------+------+
|Country Code|Region|
+------------+------+
|         WLD|  null|
|         WLD| World|
+------------+------+



We will join the WDI and DataFrames of indicators by matching the "Indicator Code" and "Series Code" columns. Let's check if they have the same values.


In [100]:
(df_wdi.select('Indicator Code')
    .distinct()
    .intersect(df_series.select('Series Code'))
    .count())

1440

In the WDI DataFrame, there is a column "_c65", the name of which does not mean anything to us. Let's see if it contains any values at all.



In [101]:
df_wdi.filter(df_wdi['_c65'].isNotNull()).count()

0

This column doesn't contain any values, so it's just an empty column with no name, which means we can just delete it.

### 2.5. Сombining the DataFrames

After reviewing the columns we need and matching the data in the join columns, we will combine everything together, dropping the columns with missing values, and filling in the missing values in the "Topic" column with the word "Unknown". The part of the WDI DataFrame with the "INX" value in the "Country Code" column will not be included in the new DataFrame, because the inner join is used.

In [102]:
# Select columns in a specific order
cols_ord = (['Country Code', 'Country Name', 'Region', 'Income Group', 'Topic'] 
            + df_wdi.columns[2:len(df_wdi.columns)-1])

# Join all the selected DataFrames together
wdi_csdf = (df_wdi
            .join(df_country_w
                  .dropna(subset=['Region', 'Income Group'])
                  .select('Country Code', 'Region', 'Income group'), 
                  'Country Code')
            .join(df_series.select('Series Code', 'Topic'),  
                  df_wdi['Indicator Code'] == df_series['Series Code'], 
                  'left')
            .fillna({'Topic': 'Unknown'})
            .select(cols_ord))

Let's check if the merge occurred correctly by calculating the final number 
of rows and columns.

In [103]:
# Calculate the final number of rows
rows_df = (df_wdi.where(df_wdi['Country Code'] != 'INX').count() 
           - (df_country.where(df_country['Region'].isNull()).count() - 1)
           * df_series.select('Series Code').count())

print("The correct number of rows: {0}, columns: {1}".format(
    rows_df, len(df_wdi.columns) + 2))
print("The resulting number of rows: {0}, columns: {1}".format(
    wdi_csdf.count(), len(wdi_csdf.columns)))

The correct number of rows: 313920, columns: 68
The resulting number of rows: 313920, columns: 68


The number of rows and columns in the DataFrame matches the calculations, which means that the merge was performed as intended.

## 3. Analysis

### 3.1. Overview of the joined DataFrame

We see what values are stored in our new DataFrame, for how many years' data on indicators is collected, how many values are missing.

In [104]:
# Display a sampled subset
wdi_csdf.sample(0.2).show(10)

+------------+------------+------+------------+--------------------+--------------------+--------------------+----+----+----+----+----+----+----+----+----+----+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+-----------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------

In [105]:
# Divide the columns into numeric and string columns
years_cols = [col[0] for col in wdi_csdf.dtypes if col[1] == 'double']
name_cols = [col[0] for col in wdi_csdf.dtypes if col[1] == 'string']

In [106]:
print("Number of years in the dataset: {}".format(len(years_cols)))

Number of years in the dataset: 61


In [107]:
# Counts the number of records for each year
wdi_csdf.select(years_cols).summary('count').show()

+-------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+-----+
|summary| 1960| 1961| 1962| 1963| 1964| 1965| 1966| 1967| 1968| 1969| 1970| 1971| 1972| 1973| 1974| 1975| 1976| 1977| 1978| 1979| 1980| 1981| 1982| 1983| 1984| 1985| 1986| 1987| 1988| 1989|  1990|  1991|  1992|  1993|  1994|  1995|  1996|  1997|  1998|  1999|  2000|  2001|  2002|  2003|  2004|  2005|  2006|  2007|  2008|  2009|  2010|  2011|  2012|  2013|  2014|  2015|  2016|  2017|  2018| 2019| 2020|
+-------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-

As we can see, there is not a single year where there is no record at all.

In [108]:
# Counts the number of records in string columns
wdi_csdf.select(name_cols).summary('count').show()

+-------+------------+------------+------+------------+------+--------------+--------------+
|summary|Country Code|Country Name|Region|Income Group| Topic|Indicator Name|Indicator Code|
+-------+------------+------------+------+------------+------+--------------+--------------+
|  count|      313920|      313920|313920|      313920|313920|        313920|        313920|
+-------+------------+------------+------+------------+------+--------------+--------------+



The number in all columns is equal, which means that there are no missing entries in them.

### 3.2. The "Region", "Income Group" and "Topic" columns

We will find out how many regions and income groups countries are divided into, what they are, and also calculate the number of countries in each such unit.

In [109]:
wdi_csdf.select('Region').distinct().collect()

[Row(Region='South Asia'),
 Row(Region='World'),
 Row(Region='Sub-Saharan Africa'),
 Row(Region='Europe & Central Asia'),
 Row(Region='North America'),
 Row(Region='East Asia & Pacific'),
 Row(Region='Middle East & North Africa'),
 Row(Region='Latin America & Caribbean')]

In [110]:
(wdi_csdf.select('Country Code', 'Region').distinct()
         .groupBy('Region').count().orderBy('count')
         .show(truncate=False))

+--------------------------+-----+
|Region                    |count|
+--------------------------+-----+
|World                     |1    |
|North America             |3    |
|South Asia                |8    |
|Middle East & North Africa|21   |
|East Asia & Pacific       |37   |
|Latin America & Caribbean |42   |
|Sub-Saharan Africa        |48   |
|Europe & Central Asia     |58   |
+--------------------------+-----+



In [111]:
wdi_csdf.select('Income Group').distinct().collect()

[Row(Income Group='Lower middle income'),
 Row(Income Group='World'),
 Row(Income Group='High income'),
 Row(Income Group='Upper middle income'),
 Row(Income Group='Low income')]

In [112]:
(wdi_csdf.filter(wdi_csdf['Income Group'] != 'World')
         .select('Country Code', 'Income Group').distinct()
         .groupBy('Income Group').count()
         .orderBy(F.desc('Income Group')).show())

+-------------------+-----+
|       Income Group|count|
+-------------------+-----+
|Upper middle income|   56|
|Lower middle income|   50|
|         Low income|   29|
|        High income|   82|
+-------------------+-----+



We also find out the number of the indicator topics presented in the DataFrame and how many indicators belong to each of them.

In [113]:
wdi_csdf.select('Topic').distinct().count()

90

In [114]:
(wdi_csdf.select('Indicator Code', 'Topic').distinct()
         .groupBy('Topic').count().orderBy('Topic')
         .show(10, False))

+--------------------------------------------------------------------------------------+-----+
|Topic                                                                                 |count|
+--------------------------------------------------------------------------------------+-----+
|Economic Policy & Debt: Balance of payments: Capital & financial account              |11   |
|Economic Policy & Debt: Balance of payments: Current account: Balances                |4    |
|Economic Policy & Debt: Balance of payments: Current account: Goods, services & income|22   |
|Economic Policy & Debt: Balance of payments: Current account: Transfers               |7    |
|Economic Policy & Debt: Balance of payments: Reserves & other items                   |6    |
|Economic Policy & Debt: External debt: Debt outstanding                               |10   |
|Economic Policy & Debt: External debt: Debt ratios & other items                      |11   |
|Economic Policy & Debt: External debt: Debt servi

### 3.3. Indicators and their topics

Next, we will conduct a more detailed analysis of the indicators and their topics, namely, for which of them we have no or very few entries, in which years, countries, years and countries at the same time. 

Since we will often use the three main columns, we will create a list with them. In the analysis, we will also use aggregate functions whose name is displayed in the results, so we will create a function that will rename the resulting columns for our convenience.


In [115]:
# Create a list of columns to analyze
ind_nc_topic = ['Indicator Name', 'Indicator Code', 'Topic']

In [116]:
def rename_agg_cols(df, cols_rename, chars, new_form=None):
    """Return a new DataFrame with renamed aggregate columns.

    Parameters:
    df -- a PySpark DataFrame
    cols_rename -- names of columns to rename
    chars -- a string specifying the set of characters to be removed
    new_form -- a new format of column names
    """
    for old_name in cols_rename:
    
        if new_form:
            new_name = new_form.format(old_name.strip(chars)) 
        else:
            new_name = old_name.strip(chars)

        df = df.withColumnRenamed(old_name, new_name)

    return df


To get started, let's see how many times each indicator appears in the dataset.

In [117]:
wdi_csdf.groupBy('Indicator Code').count().select('count').distinct().show()

+-----+
|count|
+-----+
|  218|
+-----+



We found that each indicator is repeated 218 times. This means that we can find out for each indicator the number of countries that contain data on it, for which there are few such records (or none at all) or for which there are many of them.

First, we will determine the number of countries for each indicator in which the indicator has at least one entry for all the years, and similarly the number of indicators for each country, and then we will be able to see which indicators are represented in the majority (or minority) of countries.

In [118]:
# Count the number of countries with data for each indicator
# (the specific year does not matter)
country_ind = (wdi_csdf.dropna('all', subset=years_cols)
                       .groupBy(ind_nc_topic)
                       .count())

country_ind.select('Indicator Name', 'count').show(7, False)

+--------------------------------------------------------------+-----+
|Indicator Name                                                |count|
+--------------------------------------------------------------+-----+
|Depth of credit information index (0=low to 8=high)           |191  |
|Manufactures exports (% of merchandise exports)               |199  |
|People practicing open defecation (% of population)           |214  |
|Personal remittances, received (% of GDP)                     |194  |
|Electricity production from coal sources (% of total)         |143  |
|GNI growth (annual %)                                         |169  |
|Households and NPISHs final consumption expenditure (% of GDP)|194  |
+--------------------------------------------------------------+-----+
only showing top 7 rows



In [119]:
# Count the number of indicators with data for each country
# (the specific indicator does not matter)
(wdi_csdf.dropna('all', subset=years_cols)
         .groupBy('Country Name').count()
         .select('Country Name', 'count')
         .limit(5).collect())

[Row(Country Name='Chad', count=1212),
 Row(Country Name='World', count=948),
 Row(Country Name='Congo, Dem. Rep.', count=1298),
 Row(Country Name='Cabo Verde', count=1216),
 Row(Country Name='Kiribati', count=917)]

Next, we will focus on finding those indicators for which data is available in a large number of countries, for example, more than 200.

In [120]:
# Set the lower and upper bounds (inclusive) for selecting indicators
sel_bounds = (200, 220)

In [121]:
# Find out indicators with data in a certain number of countries
country_ind_sel = country_ind.where(F.col('count').between(*sel_bounds))
country_ind_sel.count()

227

In [122]:
# View which topics these indicators relate to
country_ind_sel.select('Topic').distinct().limit(5).collect()

[Row(Topic='Education: Efficiency'),
 Row(Topic='Social Protection & Labor: Labor force structure'),
 Row(Topic='Environment: Density & urbanization'),
 Row(Topic='Environment: Agricultural production'),
 Row(Topic='Economic Policy & Debt: National accounts: Local currency at current prices: Expenditure on GDP')]

In [123]:
country_ind_sel.select('Topic').distinct().count()

44

Now we will expand the previously obtained data to see which indicators and in which specific years we have data. 

In [124]:
# Count the number of countries with data for each indicator in each year
agg_op = {year: 'count' for year in years_cols}
year_ind_agg = (wdi_csdf.groupBy(ind_nc_topic)
                        .agg(agg_op)
                        .orderBy('Indicator Name'))

# Use our defined function to rename aggregate columns
year_ind = rename_agg_cols(year_ind_agg.select(ind_nc_topic
                                               + sorted(year_ind_agg.columns[3:])), 
                           year_ind_agg.columns[3:], 'count()')

year_ind.show(10)

+--------------------+-----------------+--------------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|      Indicator Name|   Indicator Code|               Topic|1960|1961|1962|1963|1964|1965|1966|1967|1968|1969|1970|1971|1972|1973|1974|1975|1976|1977|1978|1979|1980|1981|1982|1983|1984|1985|1986|1987|1988|1989|1990|1991|1992|1993|1994|1995|1996|1997|1998|1999|2000|2001|2002|2003|2004|2005|2006|2007|2008|2009|2010|2011|2012|2013|2014|2015|2016|2017|2018|2019|2020|
+--------------------+-----------------+--------------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+

From the table above, it can be seen that in many years some indicators have no values at all. We can create a new DataFrame, which will collect all the years that meet certain conditions in a single list and count the number of such years. Then we can select from this DataFrame exactly those indicators that we need. 

First, we define a function that will help us find the right years for us.

In [125]:
def collect_cols(df, cols, new_cols, bounds=None, exp_isnull=None):
    """Collects in a new column all the column names whose values match 
    the condition, and counts the number of such columns.

    Parameters:
    df -- a PySpark DataFrame
    cols -- columns whose value will be compared with the condition
    new_cols -- names for the two new columns
    bounds -- values for the PySpark Column method "between()" 
    to select columns (default None)
    exp_isnull -- whether to use the PySpark Column methods "isNull()" 
    (if True) or "isNotNull()" (if bounds is None) (default None). 
    This parameter is ignored when bounds have a value other than None
    
    Return a new DataFrame with the two new columns.
    """
    if bounds:
        cond = [F.when(df[col].between(*bounds), F.lit(col)) for col in cols]
    elif exp_isnull:
        cond = [F.when(df[col].isNull(), F.lit(col)) for col in cols]
    else:
        cond = [F.when(df[col].isNotNull(), F.lit(col)) for col in cols]

    new_df = (df.withColumn(new_cols[0], F.array_sort(F.array(cond)))
                .withColumn(new_cols[0], 
                            F.array_intersect(new_cols[0], 
                                              F.array([F.lit(col) for col in cols])))
                .withColumn(new_cols[1], F.size(new_cols[0]))
                .select('*'))

    return new_df

In [126]:
# Find out the years when the indicators have entries in a certain number 
# of countries
year_country_ind = collect_cols(year_ind, years_cols, 
                                ['Name_years', 'Num_years'], 
                                bounds=sel_bounds).drop(*years_cols)
                            
year_country_ind.select('*').show(5)

+--------------------+-----------------+--------------------+--------------------+---------+
|      Indicator Name|   Indicator Code|               Topic|          Name_years|Num_years|
+--------------------+-----------------+--------------------+--------------------+---------+
|ARI treatment (% ...|   SH.STA.ARIC.ZS|Health: Disease p...|                  []|        0|
|Access to clean f...|   EG.CFT.ACCS.ZS|Environment: Ener...|                  []|        0|
|Access to electri...|   EG.ELC.ACCS.ZS|Environment: Ener...|[2005, 2006, 2007...|       14|
|Access to electri...|EG.ELC.ACCS.RU.ZS|Environment: Ener...|[2008, 2009, 2010...|       11|
|Access to electri...|EG.ELC.ACCS.UR.ZS|Environment: Ener...|[2005, 2006, 2007...|       14|
+--------------------+-----------------+--------------------+--------------------+---------+
only showing top 5 rows



In [127]:
# Select indicators that have values in a certain number of countries and years
year_country_ind_sel = year_country_ind.filter(year_country_ind['Num_years'] >= 50)

# Create a list of these indicator names
year_country_ind_sel.select(ind_nc_topic[:2]).show(7, False)

+---------------------------------------------------+--------------+
|Indicator Name                                     |Indicator Code|
+---------------------------------------------------+--------------+
|Land area (sq. km)                                 |AG.LND.TOTL.K2|
|Population density (people per sq. km of land area)|EN.POP.DNST   |
|Population growth (annual %)                       |SP.POP.GROW   |
|Population, total                                  |SP.POP.TOTL   |
|Primary education, duration (years)                |SE.PRM.DURS   |
|Primary school starting age (years)                |SE.PRM.AGES   |
|Rural population                                   |SP.RUR.TOTL   |
+---------------------------------------------------+--------------+
only showing top 7 rows



Above, we can see what kind of data we have the most.

In the same way, we can define specific topics.

In [128]:
year_country_ind_sel.select('Topic').distinct().limit(5).collect()

[Row(Topic='Environment: Density & urbanization'),
 Row(Topic='Environment: Land use'),
 Row(Topic='Health: Population: Structure'),
 Row(Topic='Education: Outcomes'),
 Row(Topic='Health: Population: Dynamics')]

In [129]:
year_country_ind_sel.select('Topic').distinct().count()

6

Previously, we found out information on indicators without taking into account individual countries, but in the same way, we can find specific years in which each indicator of each country has data.

In [130]:
# Find out in which years the indicators have records in certain countries
wdi_csdf_years = collect_cols(wdi_csdf, years_cols, 
                              ['Name_years_c', 'Num_years_c']).drop(*years_cols)

wdi_csdf_years.select('Country Name', 'Indicator Name',
                      'Name_years_c', 'Num_years_c').show(5)

+------------+--------------------+--------------------+-----------+
|Country Name|      Indicator Name|        Name_years_c|Num_years_c|
+------------+--------------------+--------------------+-----------+
|       World|Access to clean f...|[2000, 2001, 2002...|         17|
|       World|Access to electri...|[1998, 1999, 2000...|         21|
|       World|Access to electri...|[2000, 2001, 2002...|         19|
|       World|Access to electri...|[1993, 1994, 1995...|         26|
|       World|Account ownership...|  [2011, 2014, 2017]|          3|
+------------+--------------------+--------------------+-----------+
only showing top 5 rows



### 3.4. Overview of the operation of some Pyspark tools 

We will conduct a small analysis of changes in demographic indicators (Population, total ('SP.POP.TOTL'), Population density (people per square km of land area) (EN.POP.DNST)) over the past 5 years in the context of the world, to additionally look at the work of some of the built-in functions of the PySpark SQL module.

We will rename the columns with spaces in the names so that there are no problems with queries, otherwise we will need to use the (`) sign. And then we create a local temporary view with our DataFrame.

In [131]:
# Create a new DataFrame with decimal values
pop_df1 = (wdi_csdf
          .filter(wdi_csdf['Indicator Code'].isin(['SP.POP.TOTL', 
                                                   'EN.POP.DNST']))
          .select('Income Group', 'Indicator Name', 
                  *[wdi_csdf[cname].astype(DecimalType()) 
                    for cname in wdi_csdf.columns[-6:-1]])
          .groupBy('Income Group', 'Indicator Name').sum()
          .withColumnRenamed('Income Group', 'IncomeGroup')
          .withColumnRenamed('Indicator Name', 'IndicatorName'))

pop_df = rename_agg_cols(pop_df1, pop_df1.schema.names[2:], 'sum()', 'YR{}')

# Create a local temporary view with this DataFrame
pop_df.createOrReplaceTempView('Population')

Let's send some simple raw SQL queries to our DataFrame.

In [132]:
query1 = "SELECT * FROM Population ORDER BY IncomeGroup, IndicatorName"
spark.sql(query1).show(5)

+-------------------+--------------------+----------+----------+----------+----------+----------+
|        IncomeGroup|       IndicatorName|    YR2015|    YR2016|    YR2017|    YR2018|    YR2019|
+-------------------+--------------------+----------+----------+----------+----------+----------+
|        High income|Population densit...|     76299|     77209|     77947|     57117|      null|
|        High income|   Population, total|1188790808|1195543881|1201478568|1206941436|1212261807|
|         Low income|Population densit...|      3027|      3095|      3169|      3246|      null|
|         Low income|   Population, total| 599891723| 615404283| 631383801| 647874801| 664957848|
|Lower middle income|Population densit...|      7195|      7307|      7418|      7530|        94|
+-------------------+--------------------+----------+----------+----------+----------+----------+
only showing top 5 rows



In [133]:
query2 = "SELECT *  FROM Population WHERE IncomeGroup != 'World' ORDER BY IncomeGroup, IndicatorName"
spark.sql(query2).show(5)

+-------------------+--------------------+----------+----------+----------+----------+----------+
|        IncomeGroup|       IndicatorName|    YR2015|    YR2016|    YR2017|    YR2018|    YR2019|
+-------------------+--------------------+----------+----------+----------+----------+----------+
|        High income|Population densit...|     76299|     77209|     77947|     57117|      null|
|        High income|   Population, total|1188790808|1195543881|1201478568|1206941436|1212261807|
|         Low income|Population densit...|      3027|      3095|      3169|      3246|      null|
|         Low income|   Population, total| 599891723| 615404283| 631383801| 647874801| 664957848|
|Lower middle income|Population densit...|      7195|      7307|      7418|      7530|        94|
+-------------------+--------------------+----------+----------+----------+----------+----------+
only showing top 5 rows



We use the window function to rank the population density and the total population value.

In [134]:
pop_window = Window.partitionBy('IndicatorName').orderBy(pop_df.columns[2:])
        
pop_df.withColumn('Rank', F.dense_rank().over(pop_window)).show()   

+-------------------+--------------------+----------+----------+----------+----------+----------+----+
|        IncomeGroup|       IndicatorName|    YR2015|    YR2016|    YR2017|    YR2018|    YR2019|Rank|
+-------------------+--------------------+----------+----------+----------+----------+----------+----+
|              World|Population densit...|        58|        58|        59|        60|      null|   1|
|         Low income|Population densit...|      3027|      3095|      3169|      3246|      null|   2|
|Lower middle income|Population densit...|      7195|      7307|      7418|      7530|        94|   3|
|Upper middle income|Population densit...|      7437|      7558|      7678|      7619|        65|   4|
|        High income|Population densit...|     76299|     77209|     77947|     57117|      null|   5|
|         Low income|   Population, total| 599891723| 615404283| 631383801| 647874801| 664957848|   1|
|        High income|   Population, total|1188790808|1195543881|120147856

We will write a function to rotate the DataFrame so that the columns are indicators and the samples are countries and years.

In [135]:
def transpose_cols(df, show_cols, unpivot_cols=None, new_names=('Key', 'Val'), 
                   pivot_col=None, group_cols=('Key', 'Val')):
    """Returns a new transpose DataFrame or Group data (if a pivot is used).

    Parameters:
    df -- a Pyspark DataFrame
    show_cols -- columns that will be displayed along with the new columns
    unpivot_cols -- columns that will be unpivot
    new_names -- names for two new columns that will be created by unpivoting
    pivot_col -- a column whose values will be new columns 
    (aggregation is not used)
    group_cols -- a tuple of columns for grouping
    """
    if unpivot_cols:
        transp_df = (df.select('*', F.explode_outer(F.array(
            [F.create_map(F.lit(name), F.col(name)) 
            for name in unpivot_cols])).alias('New_cols'))
                       .select(*show_cols, 
                               F.explode_outer('New_cols').alias(*new_names)))

        if pivot_col: 
            transp_df = (transp_df.withColumn(pivot_col, df[pivot_col])
                                  .groupBy(*group_cols).pivot(pivot_col))
            
    elif pivot_col:
        transp_df = df.groupBy(*group_cols).pivot(pivot_col)

    return transp_df

In [136]:
# Transpose the DataFrame so that the indicators and the years are reversed
tr_pop_df = transpose_cols(pop_df, pop_df.columns[:2], 
                           unpivot_cols=pop_df.columns[2:], 
                           new_names=('Years', 'Vals'), 
                           pivot_col='IndicatorName', 
                           group_cols=('IncomeGroup', 'Years'))

tr_pop_df.max().orderBy('IncomeGroup', 'Years').show()

+-------------------+------+---------------------------------------------------+-----------------+
|        IncomeGroup| Years|Population density (people per sq. km of land area)|Population, total|
+-------------------+------+---------------------------------------------------+-----------------+
|        High income|YR2015|                                              76299|       1188790808|
|        High income|YR2016|                                              77209|       1195543881|
|        High income|YR2017|                                              77947|       1201478568|
|        High income|YR2018|                                              57117|       1206941436|
|        High income|YR2019|                                               null|       1212261807|
|         Low income|YR2015|                                               3027|        599891723|
|         Low income|YR2016|                                               3095|        615404283|
|         

## 4. Saving results

Finally, we will save the DataFrames we need in CSV files so that we can use them later.

In [137]:
path_to_save = '/content/drive/MyDrive/Colab_Notebooks/WDI_Project/New_df/'

wdi_csdf.write.csv(path_to_save + 'Wdi_csdf.csv', header=True, mode='ignore')
country_ind.write.csv(path_to_save + 'Country_ind.csv', header=True, mode='ignore')
year_ind.write.csv(path_to_save + 'Year_ind.csv', header=True, mode='ignore')

# Save to JSON, because CSV does not support the array<string> data type
year_country_ind.write.json(path_to_save + 'year_country_ind.json', mode='ignore')
wdi_csdf_years.write.json(path_to_save + 'wdi_csdf_years.json', mode='ignore')