# Import

In [0]:
import pandas as pd
import pyspark.sql.functions as sf
from pyspark.sql import SparkSession
from functions import *
from pyspark.sql.types import IntegerType


# Datainnhenting

### Her leses inn de fire datasett fra World Bank som skal brukes videre i analysen: 
- Offentlige utdanningsutgifter (% av BNP)
- Offentlige utgifter totalt (% av BNP)
- BNP per innbygger (USD)
- Forventet levealder ved fødsel (år)

Alle data er hentet som CSV-filer fra worldbank.org.

In [0]:
#  Offentlige utdanningsutgifter (% av BNP)
expenditure_education_data = pd.read_csv(
    '/Volumes/dev_truls/default/data_files/API_SE.XPD.TOTL.GD.ZS_DS2_en_csv_v2_391679.csv'
)

In [0]:
# Offentlige utgifter totalt (% av BNP)
expense_gpd_data = pd.read_csv('/Volumes/dev_truls/default/data_files/API_GC.XPN.TOTL.GD.ZS_DS2_en_csv_v2_86912.csv')

In [0]:
# BNP per innbygger (USD)
gpd_per_capita_data = pd.read_csv('/Volumes/dev_truls/default/data_files/API_NY.GDP.PCAP.CD_DS2_en_csv_v2_394227.csv')

In [0]:
# Forventet levealder ved fødsel (år)
life_expectancy_at_birth_data = pd.read_csv('/Volumes/dev_truls/default/data_files/API_SP.DYN.LE00.IN_DS2_en_csv_v2_393864.csv')

# Transformasjon

In [0]:
# Bruker melt for å endre strukturen fra wide format til long format for alle de fire indikator-dataene.
# Dette gjør det enklere å analysere, sammenligne og slå sammen indikatorene senere i prosessen.
expenditure_education_data = pd.melt(expenditure_education_data, id_vars=['Country Name', 'Country Code', 'Indicator Name', 'Indicator Code'], var_name='Year', value_name='Value')

expense_gpd_data = pd.melt(expense_gpd_data, id_vars=['Country Name', 'Country Code', 'Indicator Name', 'Indicator Code'], var_name='Year', value_name='Value')

gpd_per_capita_data = pd.melt(gpd_per_capita_data, id_vars=['Country Name', 'Country Code', 'Indicator Name', 'Indicator Code'], var_name='Year', value_name='Value')

life_expectancy_at_birth_data = pd.melt(life_expectancy_at_birth_data, id_vars=['Country Name', 'Country Code', 'Indicator Name', 'Indicator Code'], var_name='Year', value_name='Value')

In [0]:
# Konverter datsetettet fra pandas dataframe til spark dataframe for å kunne bruke Spark SQL og utnytte distributert databehandling.
spark = SparkSession.builder.getOrCreate()
expenditure_education_df = spark.createDataFrame(expenditure_education_data)
expense_gpd_df = spark.createDataFrame(expense_gpd_data)
gpd_per_capita_df = spark.createDataFrame(gpd_per_capita_data)
life_expectancy_at_birth_df = spark.createDataFrame(life_expectancy_at_birth_data)

In [0]:
# Bruker unionByName for å sammenstille alle 4 indikator-dataframene til en Spark dataframe for mer effektiv analysering og lagring etter melt().
# Selv om melt gir samme kolonnenavn og struktur, sikrer unionByName at DataFramene slås sammen korrekt basert på kolonnenmavn,
# og unnår feil dersom rekkefølgen på kolonnene skulle være ulik eller endres senere.
# Dette gir en mer robust og fremtidssikker løsning enn vanlig union

all_indicators_df = expenditure_education_df.unionByName(expense_gpd_df).unionByName(gpd_per_capita_df).unionByName(life_expectancy_at_birth_df)

In [0]:
# Filtrerer dataene automatisk til kun å inkludere de siste 15 årene basert på inneværende år.
# Dette reduserer antall rader ved å fjerne alle observasjoner eldre enn 15 år.
current_year = pd.Timestamp.now().year
all_indicators_df = all_indicators_df[all_indicators_df['Year'] >= str(current_year - 15)]

In [0]:
# Legger til en ny kolonne "Status" som angir om verdien for indikatoren er rapportert eller ikke.
# Hvis "Value" er null, settes status til "Not reported / Not collected", ellers til "Reported".
# Dette er for å skre at vi forstår hvorfor det er en null verdi i "Value" kolonnen.
all_indicators_df = all_indicators_df.withColumn(
    "Status",
    sf.when(sf.col("Value").isNull(), "Not reported / Not collected").otherwise("Reported")
)

In [0]:
# Gir kolonnen "Country Name" et enklere og mer konsist navn ved å endre det til "Country".
all_indicators_df = all_indicators_df.withColumnRenamed(
'Country Name', 'Country')

In [0]:
# Endrer datatype for kolonnen "Year" fra tekst (string) til heltall (integer) for å muliggjøre numeriske operasjoner og sortering.
all_indicators_df = all_indicators_df.withColumn('Year', all_indicators_df['Year'].cast(IntegerType()))

In [0]:
# Bruker rename_columns_to_snake_case funksjonen for å konverterer alle kolonnenavn i DataFrame til snake_case for å sikre konsistent og maskinvennlig navngivning.
all_indicators_df = rename_columns_to_snake_case(all_indicators_df)

In [0]:
# sorterer dataFrame etter "country", "indicator_code", og "year" i stigende rekkefølge.
# Dette gir bedre oversikt, gjør datasettet mer lesbarhet, og forenkler feilsøking og kontroll av data.
all_indicators_df = all_indicators_df.orderBy(['country', 'indicator_code', 'year'], ascending=[True, True, True])

# Lagring

In [0]:
# Eksporterer DataFrame både som Delta-tabell og CSV-fil til valgt katalog med navnet "world_bank_indicators".
# Dette gjør datasettet klart for både videre analyse i Databricks og deling med andre verktøy.
save_df_as_delta_and_csv(df=all_indicators_df, base_path='/Volumes/dev_truls/default/data_files/', name='world_bank_indicators', mode='overwrite')