After consideration of the performance score of each impuation methods, we decided to use Linear Regression.

Data preparation

（Note: I have modified the file name before reading the files, because some of them have spaces in their name, it is annoying)

In [1]:
import pandas as pd
import numpy as np

#first, we create an array of all the filenames, so we can loop through them later
files = ("Target4.1.xlsx", "Target4.2.xlsx", "Target4.3.xlsx", "Target4.4.xlsx", "Target4.5.xlsx", "Target4.6.xlsx", "Target4.c.xlsx", );
df_union = pd.DataFrame();

#iterate each file and do the transformationa and aggregation
for file in files:
  #read the data from Data sheet in each file
  df = pd.read_excel(file, sheet_name="Data").copy();

  #remove the ones which has less than 3 valid values
  temp = df.isnull().sum(axis=1);
  row_to_delete = temp[temp > 15].index;
  df_data = df.drop(labels=row_to_delete);

  #read the data from Country - Metadata sheet in each file
  df_meta = pd.read_excel(file, sheet_name="Country - Metadata")[['Code', "Income Group"]].copy();

  #as we can see from the excel file that there are some countries don't have income group classification assigned to them
  #the way we deal with this case is filling them with "Unknown income"
  df_meta["Income Group"] = df_meta["Income Group"].fillna("Unknown income");
  #print(df_meta);

  #rename the column Code to Country Code so that I can join these two dataframe later
  df_meta.rename(columns={"Code":"Country Code"}, inplace=True);
  df_new = pd.merge(df_data, df_meta, how="left", on="Country Code");
  #print(df_new);
  #now, we have a dataframe with data of each yearly indicator value and the corresponding income group level.


  #here, we take the series name out and make them as the array of indicators, because we need to aggregate the data by Series Name later
  indicators = df_new['Series Name'].unique();
  #print(indicators)

  #iterate the indicator in each file
  for indicator in indicators:
    #filter the data under this indicator in this file
    df_sub = df_new[df_new['Series Name'] == indicator].copy();
    #print(df_sub);

    #manipulate the dataframe before imputation
    temp = df_sub.copy();

    #do the linear regression imputation, same as what has been done in part 2.
    #1.get the countries name into a list
    #2.drop the unnecessary columns in the linear regression
    #3.rotate the dataframe, so the columns become index, and index become columns
    #4.set the columns name to the corresponding countries name we extracted in step 1
    #5.set the index name to 2003 to 2023, we have included 2003 because we need to use the first row to store the series name, 
    #  included 2022 because we have last row storing the income group data
    #6.linear regression on each column ( country )
    #7.rotate back to original format so countries become data points and years and indicator become features, income group becomes label.
    #8.combine all dataframe from each indicator, each file into a final dataframe.
    countryNames = temp['Country Code'].values.tolist();

    temp = temp.drop(['Series Code', 'Country Name', 'Country Code'], axis=1);

    temp_T = temp.T;
    temp_T.columns = countryNames;
    temp_T.index = list(range(2003,2023));
    temp_T.rename(index={2003:"Series Name"}, inplace=True);
    temp_T.rename(index={2022:"Income Group"}, inplace=True);
    #print(temp_T);

    #imputation
    from sklearn.linear_model import LinearRegression;
    regression = LinearRegression();

    temp_regr = temp_T.copy();
    #remote the series name row and income group row before doing linear regression
    lastRow = temp_regr.copy().iloc[-1:];
    firstRow = temp_regr.copy().iloc[:1];
    #print(firstRow);
    temp_regr = temp_regr.iloc[:-1];
    temp_regr = temp_regr.iloc[1: , :];
    #print(temp_regr);

    #iterate each column (country)
    for x in temp_regr.columns:
      temp = temp_regr[x].copy();
      X_test = temp[temp.isnull()==True];

      #check if there is NaN value
      if(len(X_test.index)>0):
        train = temp[temp.isnull()==False];
        regression.fit(np.array(train.index).reshape(-1,1), np.array(train.values).reshape(-1,1));
        y_predict = regression.predict(np.array(X_test.index).reshape(-1,1));
        temp[temp.isnull()==True] = y_predict;
        temp_regr[x] = temp;

    temp_regr = temp_regr.append(lastRow);
    temp_regr = pd.concat([firstRow, temp_regr.loc[:]]);
    #print(temp_regr.T)
    aggregation = temp_regr.T.copy();
    #print("-------------------------", file,": ", indicator, "--------------------------");
    #df_aggr = aggregation.groupby("Income Group").mean();
    #print(aggregation.groupby("Income Group").mean());
    df_union = pd.concat([df_union, temp_regr.T]);


#aggregate the data by using groupby(), so we can see the average value of each yearly indicator value in each income group level.
df_series = df_union.copy();
df_aggr = df_series.groupby(["Series Name","Income Group"]).mean();
print(df_aggr)


                                                                             2004  \
Series Name                                        Income Group                     
Adolescents out of school (% of lower secondary... High income           3.936632   
                                                   Low income           52.267323   
                                                   Lower middle income  24.090806   
                                                   Unknown income       20.590895   
                                                   Upper middle income   9.056628   
...                                                                           ...   
School enrollment, tertiary (gross), gender par... High income           1.255849   
                                                   Low income            0.433131   
                                                   Lower middle income   0.814056   
                                                   Unknown income

In the aggregation, because we are trying to aggregate the yearly indicator values based on the income group, for each indicator, it is more suitable to use the average value in this situation, as it can show us countries with what are the average value of each year for each indicator are considered to be "High income", "Low income", "Lower middle income", "Upper middle income" and "Unkonwn income".

The following code is used for installation of pyspark in Google Colab, as we are doing the project at home.

In [2]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# Let's import the libraries we will need
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
openjdk-8-jdk-headless is already the newest version (8u342-b07-0ubuntu1~18.04).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 12 not upgraded.


Data Transformation

Data transformation before classification

In [3]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer

#1.get all countries' code into a list
#2.reset the index of the dataframe because currently it is the countries name
#3.rename the original index column to Country.
#4.iterate each country using the countries'code list
#5.combine all the dataframe of each country into one.
#print(df_union);
countries = df_union.index.to_list();
UnionDF = df_union.copy().reset_index();
UnionDF.rename(columns={"index":"Country"}, inplace=True);
#print(UnionDF);

FinalDF = pd.DataFrame();

for country in countries:
  #filter the data of this country
  TempDF = UnionDF[UnionDF['Country'] == country];
  DF = TempDF.copy().drop(['Country'], axis=1);
  Series = DF['Series Name'].values.tolist();
  #get the income value of this country
  IncomeLv = DF['Income Group'].values[0];
  #drop the income group column because it will affect our aggregation later on
  DF = DF.drop(['Income Group'], axis=1);
  #print(IncomeLv)
  #drop the series name column and make it as the index of the dataframe, so as to transform our dataframe.
  DF = DF.drop(['Series Name'], axis=1);
  DF.index = Series;
  AggrDF = DF.T;
  #reset index so we can use year to aggregate later on
  AggrDF = AggrDF.reset_index();
  AggrDF.rename(columns={"index":"Year"}, inplace=True);
  #set a column storing the income group value of this country, because one country can only have one income group value, 
  #we can simly set the value of this column to the IncomeLv we got previously from the dataframe.
  AggrDF['Income Group'] = IncomeLv;
  #transformation done!
  #print(AggrDF);
  FinalDF = pd.concat([FinalDF, AggrDF]);

FinalDF = FinalDF.groupby(['Year', 'Income Group']).mean();
#fill the index under the same 'Year' group.
FinalDF = FinalDF.reset_index(level=1);
print(FinalDF);

             Income Group  Children out of school (% of primary school age)  \
Year                                                                          
2004          High income                                          2.942631   
2004           Low income                                         28.415644   
2004  Lower middle income                                         13.302011   
2004       Unknown income                                         12.870535   
2004  Upper middle income                                          3.891834   
...                   ...                                               ...   
2021          High income                                          2.353911   
2021           Low income                                         16.065125   
2021  Lower middle income                                          7.917651   
2021       Unknown income                                          7.960475   
2021  Upper middle income                           

Now, we have a dataframe:

**Year | indicator 1 | indicator 2 |.....| income group**

Now, 'Year' becomes the data points, each indicator becomes the features and income group becomes the labels.

Normalization

Because the differences between each indicator values range are big, it is more suitable to normalize them into a same specific range before classificaiton.

In [4]:
SparkDF = spark.createDataFrame(FinalDF);
#SparkDF.show();

#because income group column is nominal data (categorical information), we have to convert it into ordinal data (numerical information) first.
indexer = StringIndexer(inputCol="Income Group", outputCol="labels")
indexed = indexer.fit(SparkDF).transform(SparkDF)
#indexed.show();

#transform the dataframe, and put all indicators value into features
sparkDF_features = SparkDF.drop("Income Group");
assembler = VectorAssembler(inputCols=sparkDF_features.columns, outputCol="features");
features = assembler.transform(indexed);

#features.show();

#normalize the data using standard scaler
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=False, withStd=True);
model = scaler.fit(features);
scaled_features = model.transform(features);

scaled_features['scaled_features', 'labels'].show();

+--------------------+------+
|     scaled_features|labels|
+--------------------+------+
|[0.46544305882724...|   0.0|
|[4.49457153240428...|   1.0|
|[2.10401133886066...|   2.0|
|[2.03576376764696...|   3.0|
|[0.61558086259305...|   4.0|
|[0.46780086787616...|   0.0|
|[4.13743226910186...|   1.0|
|[2.13866655205636...|   2.0|
|[1.91617097266923...|   3.0|
|[0.67179241522083...|   4.0|
|[0.43117709049354...|   0.0|
|[3.78155943873275...|   1.0|
|[2.03486361711975...|   2.0|
|[1.84927254880942...|   3.0|
|[0.69524656764033...|   4.0|
|[0.35290865925151...|   0.0|
|[3.52824175177563...|   1.0|
|[1.81416833192246...|   2.0|
|[1.66888144668028...|   3.0|
|[0.63499351549132...|   4.0|
+--------------------+------+
only showing top 20 rows



Split the data into training and test 

In [5]:
#split data into training and testing under the ratio 8:2
train, test = scaled_features.randomSplit([0.8, 0.2],seed = 24)

Classification

Multilayer perceptron classifier

Because there are several paremeters needed to set in the Multilayer perceptron classifier, we decided to set several parameter settings (different intermediate layers) to find the best model for this classifier. After classification, we will do the evaluation of the classification by calculating the error percentage.

So does Decision Tree Classification, we have set several parameter settings (different tree depth and different bin values) in order to find out the best model of the Decision Tree Classifier.

(Note: because we are using 10 fold cross validation here, it may take quite a long time to finish evaluation------- around 5 min)

In [6]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#set up evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="labels", predictionCol="prediction", metricName="accuracy");

#print(len(sparkDF_features.columns));
mlpc = MultilayerPerceptronClassifier(labelCol="labels", featuresCol="scaled_features", maxIter=1000);

#we set 3 different sets of layer, to find the best parameter setting for Multilayer perceptron classifier
layer1 = [len(sparkDF_features.columns), 15, 5];
layer2 = [len(sparkDF_features.columns), 10, 5];
layer3 = [len(sparkDF_features.columns), 5, 5];

#set 3 different parameter sets with 3 different intermediate layer values.
paramGrid = ParamGridBuilder()\
  .addGrid(mlpc.layers, [layer1, layer2, layer3])\
  .build()

#here, I use numFolds=10, because we are asked to use 10 cross validation
crossValidation = CrossValidator(estimator=mlpc, evaluator=evaluator, numFolds=10, estimatorParamMaps=paramGrid);

xValidation_model = crossValidation.fit(train);
prediction = xValidation_model.transform(test);

#mlpc_model = mlpc.fit(train);
#prediction = mlpc_model.transform(test);
prediction.select("prediction", "labels", "scaled_features").show();

print("The best Model of Multilayer perceptron classifier is: ", xValidation_model.bestModel);

#have a look at the error percentage
accuracy = evaluator.evaluate(prediction)
print("Test Error = %g" % (1.0 - accuracy))

+----------+------+--------------------+
|prediction|labels|     scaled_features|
+----------+------+--------------------+
|       0.0|   0.0|[0.46544305882724...|
|       1.0|   1.0|[3.29985556008803...|
|       2.0|   2.0|[1.44690861039781...|
|       2.0|   2.0|[1.55148496547467...|
|       4.0|   4.0|[0.69524656764033...|
|       4.0|   4.0|[0.73231777016872...|
|       1.0|   1.0|[2.77038160530703...|
|       3.0|   2.0|[1.25592365971473...|
|       2.0|   2.0|[1.40212583438724...|
|       4.0|   4.0|[0.75784732458839...|
|       4.0|   4.0|[0.90550252470881...|
+----------+------+--------------------+

The best Model of Multilayer perceptron classifier is:  MultilayerPerceptronClassificationModel: uid=MultilayerPerceptronClassifier_6d92663177db, numLayers=3, numClasses=5, numFeatures=20
Test Error = 0.0909091


DecisionTree Classification

In [7]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

dt = DecisionTreeClassifier(labelCol="labels", featuresCol="scaled_features")

#set different parameter settings for decision tree classification
paramGrid = ParamGridBuilder() \
 .addGrid(dt.maxDepth, [10, 20, 30]) \
 .addGrid(dt.maxBins, [30, 40, 50]) \
 .build()

#declaring  crossvalidation object with k=10
crossval = CrossValidator(estimator=dt,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=10)

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train)

prediction = cvModel.transform(test)
prediction.select("prediction", "labels", "scaled_features").show()

# best model
print("The best Model of Decision Tree classifier is: ", cvModel.bestModel);

#accuracy on test dataset
accuracy = evaluator.evaluate(prediction)
print("Test Error = %g" % (1.0 - accuracy))

+----------+------+--------------------+
|prediction|labels|     scaled_features|
+----------+------+--------------------+
|       0.0|   0.0|[0.46544305882724...|
|       1.0|   1.0|[3.29985556008803...|
|       2.0|   2.0|[1.44690861039781...|
|       2.0|   2.0|[1.55148496547467...|
|       4.0|   4.0|[0.69524656764033...|
|       4.0|   4.0|[0.73231777016872...|
|       1.0|   1.0|[2.77038160530703...|
|       2.0|   2.0|[1.25592365971473...|
|       2.0|   2.0|[1.40212583438724...|
|       4.0|   4.0|[0.75784732458839...|
|       4.0|   4.0|[0.90550252470881...|
+----------+------+--------------------+

The best Model of Decision Tree classifier is:  DecisionTreeClassificationModel: uid=DecisionTreeClassifier_c4e8b35128c6, depth=4, numNodes=11, numClasses=5, numFeatures=20
Test Error = 0


Naive Bayes classifiers

(There is no parameter setting needed to considered in this classifer)

In [8]:
from pyspark.ml.classification import NaiveBayes

#because there is no parameter setting, we don't need to use cross validation in this classification
nb = NaiveBayes(labelCol="labels", featuresCol="scaled_features")
nb_model = nb.fit(train)
nb_prediction = nb_model.transform(test)
nb_prediction.select("prediction", "labels", "scaled_features").show()

accuracy = evaluator.evaluate(nb_prediction)
print("Test Error = %g" % (1.0 - accuracy))

+----------+------+--------------------+
|prediction|labels|     scaled_features|
+----------+------+--------------------+
|       0.0|   0.0|[0.46544305882724...|
|       1.0|   1.0|[3.29985556008803...|
|       2.0|   2.0|[1.44690861039781...|
|       2.0|   2.0|[1.55148496547467...|
|       4.0|   4.0|[0.69524656764033...|
|       4.0|   4.0|[0.73231777016872...|
|       1.0|   1.0|[2.77038160530703...|
|       2.0|   2.0|[1.25592365971473...|
|       2.0|   2.0|[1.40212583438724...|
|       4.0|   4.0|[0.75784732458839...|
|       4.0|   4.0|[0.90550252470881...|
+----------+------+--------------------+

Test Error = 0


Logistic Regression

In [9]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="labels", featuresCol="scaled_features")

lrModel = lr.fit(train)

lr_prediction = lrModel.transform(test)
lr_prediction.select("prediction", "labels", "scaled_features").show()

accuracy = evaluator.evaluate(lr_prediction)
print("Test Error = %g" % (1.0 - accuracy))

+----------+------+--------------------+
|prediction|labels|     scaled_features|
+----------+------+--------------------+
|       0.0|   0.0|[0.46544305882724...|
|       1.0|   1.0|[3.29985556008803...|
|       2.0|   2.0|[1.44690861039781...|
|       2.0|   2.0|[1.55148496547467...|
|       4.0|   4.0|[0.69524656764033...|
|       4.0|   4.0|[0.73231777016872...|
|       1.0|   1.0|[2.77038160530703...|
|       2.0|   2.0|[1.25592365971473...|
|       2.0|   2.0|[1.40212583438724...|
|       4.0|   4.0|[0.75784732458839...|
|       4.0|   4.0|[0.90550252470881...|
+----------+------+--------------------+

Test Error = 0


Therefore, apart from Multilayer perceptron classifier, it seems like all other classification algorithms (Decision Tree Classification, Naive Bayes classifiers and Logistic Regression) did great jobs in this task, the Test Error of them are all 0, they all did prediction correctly.

**Contribution:**

**Contributor:** 

Xiaotao Pang - 572683

Junyuan Tang - 553287

Yexin Li     - 593295

**Works:**

Data Preparation and Aggregation:    ------ Xiaotao Pang, Yexin Li

Data Transformation:  ----- Xiaotao Pang, Junyuan Tang

Data Normalization:  ----- Junyuan Tang

Multilayer perceptron Classification: ------ Xiaotao Pang

Decision Tree Classification:  ----- Junyuan Tang

Naive Bayes Classification:   ------ Yexin Li

Linear Regression Classification:    ------ Yexin Li

Final Evaluation:    ------ Junyuan Tang

Justification and identification:  ------ Yexin Li 