# Final Project CAP 4784

In [1]:
from pyspark.sql import SparkSession

In [2]:
import pyspark.sql.functions as F

In [3]:
from pyspark.sql.types import *

## Import growth_data.csv

In [4]:
df_data=spark.read.csv('growth_data.csv' \
                      ,header=True \
                      ,inferSchema=True)

In [5]:
# Verify data was imported.
df_data.count()

264

In [6]:
# Verify schema of df_data
df_data.printSchema()

root
 |-- Country_Name: string (nullable = true)
 |-- Country_Code: string (nullable = true)
 |-- 1960: double (nullable = true)
 |-- 1961: double (nullable = true)
 |-- 1962: double (nullable = true)
 |-- 1963: double (nullable = true)
 |-- 1964: double (nullable = true)
 |-- 1965: double (nullable = true)
 |-- 1966: double (nullable = true)
 |-- 1967: double (nullable = true)
 |-- 1968: double (nullable = true)
 |-- 1969: double (nullable = true)
 |-- 1970: double (nullable = true)
 |-- 1971: double (nullable = true)
 |-- 1972: double (nullable = true)
 |-- 1973: double (nullable = true)
 |-- 1974: double (nullable = true)
 |-- 1975: double (nullable = true)
 |-- 1976: double (nullable = true)
 |-- 1977: double (nullable = true)
 |-- 1978: double (nullable = true)
 |-- 1979: double (nullable = true)
 |-- 1980: double (nullable = true)
 |-- 1981: double (nullable = true)
 |-- 1982: double (nullable = true)
 |-- 1983: double (nullable = true)
 |-- 1984: double (nullable = true)
 |-- 19

## Import country_codes.csv

In [7]:
df_countries=spark.read.csv('country_codes.csv' \
                           ,header=True \
                           ,inferSchema=True)

In [8]:
# Verify country_codes.csv was imported
df_countries.count()

263

In [9]:
# Verify schema of df_countries
df_countries.printSchema()

root
 |-- Country_Code: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- IncomeGroup: string (nullable = true)
 |-- SpecialNotes: string (nullable = true)
 |-- TableName: string (nullable = true)



## Join df_data and df_countries

In [10]:
df_combined=df_data.join(df_countries,on='Country_Code')

In [11]:
df_combined.printSchema()

root
 |-- Country_Code: string (nullable = true)
 |-- Country_Name: string (nullable = true)
 |-- 1960: double (nullable = true)
 |-- 1961: double (nullable = true)
 |-- 1962: double (nullable = true)
 |-- 1963: double (nullable = true)
 |-- 1964: double (nullable = true)
 |-- 1965: double (nullable = true)
 |-- 1966: double (nullable = true)
 |-- 1967: double (nullable = true)
 |-- 1968: double (nullable = true)
 |-- 1969: double (nullable = true)
 |-- 1970: double (nullable = true)
 |-- 1971: double (nullable = true)
 |-- 1972: double (nullable = true)
 |-- 1973: double (nullable = true)
 |-- 1974: double (nullable = true)
 |-- 1975: double (nullable = true)
 |-- 1976: double (nullable = true)
 |-- 1977: double (nullable = true)
 |-- 1978: double (nullable = true)
 |-- 1979: double (nullable = true)
 |-- 1980: double (nullable = true)
 |-- 1981: double (nullable = true)
 |-- 1982: double (nullable = true)
 |-- 1983: double (nullable = true)
 |-- 1984: double (nullable = true)
 |-- 19

## Total number of records

In [12]:
df_combined.count()

263

## Using UDF to calculate the average growth per record

In [13]:
from pyspark.sql.functions import col

marksColumns = [col('1960'), col('1961'),col('1962'),col('1963'),col('1964'),col('1965'),col('1966'),col('1967') \
               ,col('1968'),col('1969'),col('1970'),col('1971'),col('1972'),col('1973'),col('1974'),col('1975') \
               ,col('1976'),col('1977'),col('1978'),col('1979'),col('1980'),col('1981'),col('1982'),col('1983') \
               ,col('1984'),col('1985'),col('1986'),col('1987'),col('1988'),col('1989'),col('1990'),col('1991') \
               ,col('1992'), col('1993'),col('1994'),col('1995'),col('1996'),col('1997') \
               ,col('1998'), col('1999'),col('2000'),col('2001'),col('2002'),col('2003'),col('2004'),col('2005') \
               ,col('2006'),col('2007'),col('2008'),col('2009'),col('2010'),col('2011'),col('2012'),col('2013') \
               ,col('2014'),col('2015'),col('2016'),col('2017'),col('2018')]

averageFunc = sum(x for x in marksColumns)/len(marksColumns)

# add Avg_Growth to dataframe
df_average=df_combined.withColumn('Avg_Growth', averageFunc)

## Create a new dataframe
    - Country_name
    - Country_Code,
    - Region,
    - IncomeGroup,
    - Avg_Growth

In [14]:
spark.catalog.dropTempView("df_temp")
df_average.createOrReplaceTempView('df_temp')
query='''
        select 
            Country_name,
            Country_code,
            region,
            IncomeGroup,
            Avg_Growth
        from df_temp '''

df_summary=spark.sql(query)

In [15]:
df_summary.show(df_summary.count())

+--------------------+------------+--------------------+-------------------+--------------------+
|        Country_name|Country_code|              region|        IncomeGroup|          Avg_Growth|
+--------------------+------------+--------------------+-------------------+--------------------+
|               Aruba|         ABW|Latin America & C...|        High income|  1.1874106581084072|
|         Afghanistan|         AFG|          South Asia|         Low income|   2.435532836257381|
|              Angola|         AGO|  Sub-Saharan Africa|Lower middle income|  2.9576449299854315|
|             Albania|         ALB|Europe & Central ...|Upper middle income|   1.030024042742419|
|             Andorra|         AND|Europe & Central ...|        High income|  3.0818893379473686|
|          Arab World|         ARB|                null|               null|                null|
|United Arab Emirates|         ARE|Middle East & Nor...|        High income|  7.9739998445704146|
|           Argentin

## Use the filter function to show the records that have a null average growth.

In [16]:
df_summary.filter(df_summary.Avg_Growth.isNull()).select(['Country_name']).show(df_summary.count())

+--------------------+
|        Country_name|
+--------------------+
|          Arab World|
|Central Europe an...|
|Caribbean small s...|
|East Asia & Pacif...|
|Early-demographic...|
| East Asia & Pacific|
|Europe & Central ...|
|Europe & Central ...|
|           Euro area|
|             Eritrea|
|      European Union|
|Fragile and confl...|
|         High income|
|Heavily indebted ...|
|           IBRD only|
|    IDA & IBRD total|
|           IDA total|
|           IDA blend|
|            IDA only|
|              Kuwait|
|Latin America & C...|
|Latin America & C...|
|Least developed c...|
|          Low income|
| Lower middle income|
| Low & middle income|
|Late-demographic ...|
|Middle East & Nor...|
|       Middle income|
|Middle East & Nor...|
|       North America|
|         New Zealand|
|        OECD members|
|  Other small states|
|Pre-demographic d...|
|  West Bank and Gaza|
|Pacific island sm...|
|Post-demographic ...|
|          South Asia|
|              Serbia|
|Sub-Sahara

In [17]:
df_summary.filter(df_summary.Avg_Growth.isNull()).count()

53

## Use the where clause to show all the records that have a non-null average growth rate.

In [18]:
df_summary.where(df_summary.Avg_Growth.isNotNull()).select(['Country_name']).show(df_summary.count())

+--------------------+
|        Country_name|
+--------------------+
|               Aruba|
|         Afghanistan|
|              Angola|
|             Albania|
|             Andorra|
|United Arab Emirates|
|           Argentina|
|             Armenia|
|      American Samoa|
| Antigua and Barbuda|
|           Australia|
|             Austria|
|          Azerbaijan|
|             Burundi|
|             Belgium|
|               Benin|
|        Burkina Faso|
|          Bangladesh|
|            Bulgaria|
|             Bahrain|
|        Bahamas, The|
|Bosnia and Herzeg...|
|             Belarus|
|              Belize|
|             Bermuda|
|             Bolivia|
|              Brazil|
|            Barbados|
|   Brunei Darussalam|
|              Bhutan|
|            Botswana|
|Central African R...|
|              Canada|
|         Switzerland|
|     Channel Islands|
|               Chile|
|               China|
|       Cote d'Ivoire|
|            Cameroon|
|    Congo, Dem. Rep.|
|         C

In [19]:
df_summary.where(df_summary.Avg_Growth.isNotNull()).count()

210

## Aggregation

Remove data with null values in avg_growth before using aggregation

In [20]:
spark.catalog.dropTempView("df_temp")
df_summary.createOrReplaceTempView('df_temp')
query='''
        select 
            Country_name,
            Country_code,
            region,
            IncomeGroup,
            Avg_Growth
        from df_temp 
        where Avg_Growth IS NOT NULL'''

df_non_null_data=spark.sql(query)

In [21]:
df_non_null_data.groupBy('Region').agg(F.mean('Avg_Growth')).show(truncate=False)

+--------------------------+------------------+
|Region                    |avg(Avg_Growth)   |
+--------------------------+------------------+
|Latin America & Caribbean |1.6452694401063142|
|South Asia                |2.168074728144582 |
|Sub-Saharan Africa        |2.5458202108397052|
|Europe & Central Asia     |0.762773237720641 |
|Middle East & North Africa|3.2223036268827836|
|East Asia & Pacific       |1.8463231979647594|
|North America             |0.9849502783965404|
+--------------------------+------------------+



In [22]:
df_non_null_data.groupBy('IncomeGroup').agg(F.mean('Avg_Growth')).show(truncate=False)

+-------------------+------------------+
|IncomeGroup        |avg(Avg_Growth)   |
+-------------------+------------------+
|High income        |1.3999248336438435|
|Low income         |2.494096718841889 |
|Lower middle income|2.2367045075714262|
|Upper middle income|1.5818141110861337|
+-------------------+------------------+



In [23]:
#from pyspark.sql.functions import desc
df_non_null_data.groupBy('Region', 'IncomeGroup').agg(F.mean('Avg_Growth')).sort('Region').show(truncate=False)

+--------------------------+-------------------+------------------+
|Region                    |IncomeGroup        |avg(Avg_Growth)   |
+--------------------------+-------------------+------------------+
|East Asia & Pacific       |Lower middle income|2.0596108795335195|
|East Asia & Pacific       |Low income         |1.4061451310647322|
|East Asia & Pacific       |Upper middle income|1.596952527769122 |
|East Asia & Pacific       |High income        |1.8597519403366365|
|Europe & Central Asia     |Upper middle income|0.7788090060194871|
|Europe & Central Asia     |High income        |0.6576877232632173|
|Europe & Central Asia     |Lower middle income|1.2276650829817717|
|Europe & Central Asia     |Low income         |2.550833367118104 |
|Latin America & Caribbean |Low income         |1.8239932185851366|
|Latin America & Caribbean |Upper middle income|1.568635940496616 |
|Latin America & Caribbean |High income        |1.6196206675109663|
|Latin America & Caribbean |Lower middle income|

## Collect set

In [26]:
df_non_null_data.groupBy('Region').agg(F.collect_set('IncomeGroup')).show(truncate=False)

+--------------------------+-------------------------------------------------------------------+
|Region                    |collect_set(IncomeGroup)                                           |
+--------------------------+-------------------------------------------------------------------+
|Latin America & Caribbean |[Lower middle income, High income, Upper middle income, Low income]|
|South Asia                |[Lower middle income, Upper middle income, Low income]             |
|Sub-Saharan Africa        |[Lower middle income, Upper middle income, Low income]             |
|Europe & Central Asia     |[Lower middle income, High income, Upper middle income, Low income]|
|Middle East & North Africa|[Lower middle income, High income, Upper middle income, Low income]|
|East Asia & Pacific       |[Lower middle income, High income, Upper middle income, Low income]|
|North America             |[High income]                                                      |
+--------------------------+--

## Collect list

In [25]:
df_non_null_data.groupBy('Region').agg(F.collect_list('IncomeGroup')).show(truncate=False)

+--------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Region                    |collect_list(IncomeGroup)                         