In [157]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, coalesce, avg
spark = SparkSession.builder.getOrCreate()

In [139]:
########
# 2018 data
# import data
df1 = spark.read.options(header='true').csv("2018.csv")
df1.printSchema()
# df.show(2)

########
# 2019 data
# import data
df2 = spark.read.options(header='true').csv("2019.csv")
df2.printSchema()
# df.show(2)

# join the tables
df = df1.union(df2)
df.show()

root
 |-- Overall rank: string (nullable = true)
 |-- Country or region: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- GDP per capita: string (nullable = true)
 |-- Social support: string (nullable = true)
 |-- Healthy life expectancy: string (nullable = true)
 |-- Freedom to make life choices: string (nullable = true)
 |-- Generosity: string (nullable = true)
 |-- Perceptions of corruption: string (nullable = true)

root
 |-- Overall rank: string (nullable = true)
 |-- Country or region: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- GDP per capita: string (nullable = true)
 |-- Social support: string (nullable = true)
 |-- Healthy life expectancy: string (nullable = true)
 |-- Freedom to make life choices: string (nullable = true)
 |-- Generosity: string (nullable = true)
 |-- Perceptions of corruption: string (nullable = true)

+------------+--------------------+-----+--------------+--------------+-----------------------+---------------------

In [146]:
old_column_names = ["Country or region", "Score", "GDP per capita", "Social support" \
               ,"Healthy life expectancy", "Freedom to make life choices", "Generosity", "Perceptions of corruption"]

new_column_names = ["country", "score", "gdp_per_capita", "social_support", "health", "freedom", "generosity", "corruption"]

In [136]:
# clean data

# # test blank_as_null -- rewrite a value
# df1 = df1.withColumn("Country or region", when(col("Country or region") == "Finland", "").otherwise(col("Country or region")))
# df1.show(2)

# change any empty country strings to null
# Source: https://stackoverflow.com/questions/33287886/replace-empty-strings-with-none-null-values-in-dataframe
def blank_as_null(x):
    return when(col(x) != "", col(x)).otherwise(None)

for column in old_column_names:
    df = df.withColumn(column, blank_as_null(column))

# df1.show(2)

# remove rows where country is null
df.na.drop(subset=["Country or region"])

# df1.show(2)

DataFrame[Overall rank: string, Country or region: string, Score: string, GDP per capita: string, Social support: string, Healthy life expectancy: string, Freedom to make life choices: string, Generosity: string, Perceptions of corruption: string]

In [151]:
# rename columns
for column in range(8):
    df = df.withColumnRenamed(old_column_names[column], new_column_names[column])
        
df.show()

+------------+--------------------+-----+--------------+--------------+------+-------+----------+----------+
|Overall rank|             country|score|gdp_per_capita|social_support|health|freedom|generosity|corruption|
+------------+--------------------+-----+--------------+--------------+------+-------+----------+----------+
|           1|             Finland|7.632|         1.305|         1.592| 0.874|  0.681|     0.202|     0.393|
|           2|              Norway|7.594|         1.456|         1.582| 0.861|  0.686|     0.286|     0.340|
|           3|             Denmark|7.555|         1.351|         1.590| 0.868|  0.683|     0.284|     0.408|
|           4|             Iceland|7.495|         1.343|         1.644| 0.914|  0.677|     0.353|     0.138|
|           5|         Switzerland|7.487|         1.420|         1.549| 0.927|  0.660|     0.256|     0.357|
|           6|         Netherlands|7.441|         1.361|         1.488| 0.878|  0.638|     0.333|     0.295|
|           7|     

In [158]:
###############
# 
# combine and average

grouped_df = df.groupBy(col('country')).agg(avg('score').alias('avg_score'),\
    avg('gdp_per_capita').alias('avg_gdp_per_capita'), avg('social_support').alias('avg_social_support'),\
    avg('health').alias('avg_health'),\
    avg('freedom').alias('avg_freedom'), avg('generosity').alias('avg_generosity'), avg('corruption').alias('avg_corruption'))

grouped_df.show()

+-----------+------------------+-------------------+------------------+-------------------+-------------------+-------------------+--------------------+
|    country|         avg_score| avg_gdp_per_capita|avg_social_support|         avg_health|        avg_freedom|     avg_generosity|      avg_corruption|
+-----------+------------------+-------------------+------------------+-------------------+-------------------+-------------------+--------------------+
|       Chad|            4.3255|              0.354|            0.8365|             0.1225|             0.1815|             0.1895|               0.069|
|     Russia| 5.728999999999999|              1.167|            1.4655|             0.6625|0.36650000000000005|0.07350000000000001|               0.028|
|   Paraguay|             5.712|              0.845|            1.4985|              0.696| 0.5275000000000001|              0.173|               0.077|
|      Yemen|3.3674999999999997|             0.3645|1.1179999999999999|           

In [161]:
# split and store in csv's
for col in grouped_df.columns:
    if col != "country":
        df = grouped_df.select("country", col)
        file_path = 'csv_files/pre_' + col + '.csv'
        economy.toPandas().to_csv(file_path)

In [None]:
# stop session
spark.stop()