> # EXTRACT

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col, lit, avg, when

In [2]:
spark = SparkSession.builder.appName("test").getOrCreate()
dfgni = spark.read.csv("GNI_data.csv", header=True, inferSchema=True, quote='"', escape='"', multiLine=True, encoding="UTF-8")
dfgdp = spark.read.option("delimiter", ";").csv("GDP_data_new.csv", header=True, inferSchema=True, quote='"', escape='"', multiLine=True,encoding="UTF-8")
dfedu = spark.read.csv("Global_Education.csv", header=True, inferSchema=True, quote='"', escape='"', multiLine=True, encoding="UTF-8")
dfhomi = spark.read.option("delimiter", ",").csv("intentional_homicide.csv", header=True, inferSchema=True, quote='"', escape='"', multiLine=True, encoding="UTF-8")

> # TRANSFORM

> Transform GNI

In [3]:
# Melakukan melt dengan stack
years = [1960,1961,1962,1963,1964,1965,1966,1967,
1968,1969,1970,1971,1972,1973,1974,1975,1976,1977,1978,1979,1980,1981,
         1982,1983,1984,1985,1986,1987,1988,1989,1990,1991,1992,1993,1994,
         1995,1996,1997,1998,1999,2000,2001,2002,2003,2004,2005,2006,2007,
         2008,2009,2010,2011,2012,2013,2014,2015,2016,2017,2018,2019,2020,2021,2022,2023]

years = [str(x) for x in years]
print(years)

['1960', '1961', '1962', '1963', '1964', '1965', '1966', '1967', '1968', '1969', '1970', '1971', '1972', '1973', '1974', '1975', '1976', '1977', '1978', '1979', '1980', '1981', '1982', '1983', '1984', '1985', '1986', '1987', '1988', '1989', '1990', '1991', '1992', '1993', '1994', '1995', '1996', '1997', '1998', '1999', '2000', '2001', '2002', '2003', '2004', '2005', '2006', '2007', '2008', '2009', '2010', '2011', '2012', '2013', '2014', '2015', '2016', '2017', '2018', '2019', '2020', '2021', '2022', '2023']


In [4]:
melted_df_list = []
for year in years:
    melted_df = dfgni.select(
        'Country Name',
        'Country Code',
        col(year).alias('GNI'),
        lit(year).alias('Year')
    )
    
    melted_df_list.append(melted_df)
    final_dfgni = melted_df_list[0]
for melted_df in melted_df_list[1:]:
    final_dfgni = final_dfgni.union(melted_df)

final_dfgni.show()

+--------------------+------------+----+----+
|        Country Name|Country Code| GNI|Year|
+--------------------+------------+----+----+
|               Aruba|         ABW|NULL|1960|
|Africa Eastern an...|         AFE|NULL|1960|
|         Afghanistan|         AFG|NULL|1960|
|Africa Western an...|         AFW|NULL|1960|
|              Angola|         AGO|NULL|1960|
|             Albania|         ALB|NULL|1960|
|             Andorra|         AND|NULL|1960|
|          Arab World|         ARB|NULL|1960|
|United Arab Emirates|         ARE|NULL|1960|
|           Argentina|         ARG|NULL|1960|
|             Armenia|         ARM|NULL|1960|
|      American Samoa|         ASM|NULL|1960|
| Antigua and Barbuda|         ATG|NULL|1960|
|           Australia|         AUS|NULL|1960|
|             Austria|         AUT|NULL|1960|
|          Azerbaijan|         AZE|NULL|1960|
|             Burundi|         BDI|NULL|1960|
|             Belgium|         BEL|NULL|1960|
|               Benin|         BEN

In [5]:
# Rename semua kolom: lowercase dan menggunakan snakecase
final_dfgni = final_dfgni.toDF(*[col.lower().replace(" ", "_") for col in final_dfgni.columns])

In [6]:
final_dfgni = final_dfgni.withColumnRenamed("country_name","country")

In [7]:
final_dfgni = final_dfgni.withColumn("country_status", when(col("country").isin(["Afghanistan", "Burkina Faso", "Cameroon", "Africa", "Ethiopia", "Haiti", "Iraq", "Lebanon", "Mozambique", "Myanmar", "Niger", "Nigeria", "Somalia", "South Sudan", "syrian arab republic", "Ukraine"," Yemen"]),"Conflict Zone") . when(col("GNI") > 13846, "High Income") . when((col("GNI") > 1136) & (col("GNI") < 13845), "Middle Income") .otherwise("Low Income"))                                                                                                                                                                                                                  

In [8]:
final_dfgni.show()

+--------------------+------------+----+----+--------------+
|             country|country_code| gni|year|country_status|
+--------------------+------------+----+----+--------------+
|               Aruba|         ABW|NULL|1960|    Low Income|
|Africa Eastern an...|         AFE|NULL|1960|    Low Income|
|         Afghanistan|         AFG|NULL|1960| Conflict Zone|
|Africa Western an...|         AFW|NULL|1960|    Low Income|
|              Angola|         AGO|NULL|1960|    Low Income|
|             Albania|         ALB|NULL|1960|    Low Income|
|             Andorra|         AND|NULL|1960|    Low Income|
|          Arab World|         ARB|NULL|1960|    Low Income|
|United Arab Emirates|         ARE|NULL|1960|    Low Income|
|           Argentina|         ARG|NULL|1960|    Low Income|
|             Armenia|         ARM|NULL|1960|    Low Income|
|      American Samoa|         ASM|NULL|1960|    Low Income|
| Antigua and Barbuda|         ATG|NULL|1960|    Low Income|
|           Australia|  

> Transform GDP

In [9]:
melted_df_list = []
for year in years:
    melted_df = dfgdp.select(
        'Country Name',
        'Country Code',
        col(year).alias('GDP'),
        lit(year).alias('Year')
    )
    
    melted_df_list.append(melted_df)
    final_dfgdp = melted_df_list[0]
for melted_df in melted_df_list[1:]:
    final_dfgdp = final_dfgdp.union(melted_df)

final_dfgdp.show()

+--------------------+------------+----+----+
|        Country Name|Country Code| GDP|Year|
+--------------------+------------+----+----+
|               Aruba|         ABW|NULL|1960|
|Africa Eastern an...|         AFE|NULL|1960|
|         Afghanistan|         AFG|NULL|1960|
|Africa Western an...|         AFW|NULL|1960|
|              Angola|         AGO|NULL|1960|
|             Albania|         ALB|NULL|1960|
|             Andorra|         AND|NULL|1960|
|          Arab World|         ARB|NULL|1960|
|United Arab Emirates|         ARE|NULL|1960|
|           Argentina|         ARG|NULL|1960|
|             Armenia|         ARM|NULL|1960|
|      American Samoa|         ASM|NULL|1960|
| Antigua and Barbuda|         ATG|NULL|1960|
|           Australia|         AUS|NULL|1960|
|             Austria|         AUT|NULL|1960|
|          Azerbaijan|         AZE|NULL|1960|
|             Burundi|         BDI|NULL|1960|
|             Belgium|         BEL|NULL|1960|
|               Benin|         BEN

In [10]:
# Rename semua kolom: lowercase dan menggunakan snakecase
final_dfgdp = final_dfgdp.toDF(*[col.lower().replace(" ", "_") for col in final_dfgdp.columns])
final_dfgdp = final_dfgdp.withColumnRenamed("country_name","country")

In [11]:
final_dfgdp.printSchema()

root
 |-- country: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- gdp: string (nullable = true)
 |-- year: string (nullable = false)



> Transform Edu

In [12]:
dfedu.printSchema()

root
 |-- Countries and areas: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- OOSR_Pre0Primary_Age_Male: integer (nullable = true)
 |-- OOSR_Pre0Primary_Age_Female: integer (nullable = true)
 |-- OOSR_Primary_Age_Male: integer (nullable = true)
 |-- OOSR_Primary_Age_Female: integer (nullable = true)
 |-- OOSR_Lower_Secondary_Age_Male: integer (nullable = true)
 |-- OOSR_Lower_Secondary_Age_Female: integer (nullable = true)
 |-- OOSR_Upper_Secondary_Age_Male: integer (nullable = true)
 |-- OOSR_Upper_Secondary_Age_Female: integer (nullable = true)
 |-- Completion_Rate_Primary_Male: integer (nullable = true)
 |-- Completion_Rate_Primary_Female: integer (nullable = true)
 |-- Completion_Rate_Lower_Secondary_Male: integer (nullable = true)
 |-- Completion_Rate_Lower_Secondary_Female: integer (nullable = true)
 |-- Completion_Rate_Upper_Secondary_Male: integer (nullable = true)
 |-- Completion_Rate_Upper_Secondary_Female: integ

In [13]:
dfedu = dfedu.withColumn("Dropout_Rate", (col("OOSR_Upper_Secondary_Age_Male") + col("OOSR_Upper_Secondary_Age_Female")) / 2)

In [14]:
dfedu = dfedu.withColumn("Completion_Rate", (col("Completion_Rate_Upper_Secondary_Male") + col("Completion_Rate_Upper_Secondary_Female")) / 2)

In [15]:
dfedu.printSchema()

root
 |-- Countries and areas: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- OOSR_Pre0Primary_Age_Male: integer (nullable = true)
 |-- OOSR_Pre0Primary_Age_Female: integer (nullable = true)
 |-- OOSR_Primary_Age_Male: integer (nullable = true)
 |-- OOSR_Primary_Age_Female: integer (nullable = true)
 |-- OOSR_Lower_Secondary_Age_Male: integer (nullable = true)
 |-- OOSR_Lower_Secondary_Age_Female: integer (nullable = true)
 |-- OOSR_Upper_Secondary_Age_Male: integer (nullable = true)
 |-- OOSR_Upper_Secondary_Age_Female: integer (nullable = true)
 |-- Completion_Rate_Primary_Male: integer (nullable = true)
 |-- Completion_Rate_Primary_Female: integer (nullable = true)
 |-- Completion_Rate_Lower_Secondary_Male: integer (nullable = true)
 |-- Completion_Rate_Lower_Secondary_Female: integer (nullable = true)
 |-- Completion_Rate_Upper_Secondary_Male: integer (nullable = true)
 |-- Completion_Rate_Upper_Secondary_Female: integ

In [16]:
final_dfedu = dfedu.select(
        'Countries and areas',
        'Latitude',
        'Longitude',
        'Dropout_Rate',
        'Completion_Rate',
        'Unemployment_Rate'
    )

In [17]:
# Rename semua kolom: lowercase dan menggunakan snakecase
final_dfedu = final_dfedu.toDF(*[col.lower().replace(" ", "_") for col in final_dfedu.columns])
final_dfedu = final_dfedu.withColumnRenamed("countries_and_areas","country")
final_dfedu.printSchema()

root
 |-- country: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- dropout_rate: double (nullable = true)
 |-- completion_rate: double (nullable = true)
 |-- unemployment_rate: double (nullable = true)



> Transform Homi

In [18]:
dfhomi.printSchema()

root
 |-- Iso3_code: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Subregion: string (nullable = true)
 |-- Indicator: string (nullable = true)
 |-- Dimension: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Unit of measurement: string (nullable = true)
 |-- VALUE: double (nullable = true)
 |-- Source: string (nullable = true)



In [19]:
# Rename semua kolom: lowercase dan menggunakan snakecase
dfhomi = dfhomi.toDF(*[col.lower().replace(" ", "_") for col in dfhomi.columns])

In [20]:
# Filter untuk kategori "Counts"
dfcount = dfhomi.filter(col("unit_of_measurement") == "Counts").filter(col("sex") == "Total")

In [58]:
# Filter untuk kategori "Rate per 100000"
dfrate = dfhomi.filter(col("unit_of_measurement") == "Rate per 100,000 population").filter(col("sex") == "Total")

In [59]:
dfcount = dfcount.withColumnRenamed("value","homicide_number")

In [60]:
dfrate = dfrate.withColumnRenamed("value","rate_per_100k_population")

In [61]:
final_dfhomi = dfcount.join(dfrate, on=["country","region","subregion","year"], how="left")\
                 .select(
                    dfcount.country,
                    dfcount.region,
                    dfcount.subregion,
                    dfcount.year,
                    dfcount.homicide_number,
                    dfrate.rate_per_100k_population
                )

In [62]:
final_dfhomi.show()

+--------------------+--------+--------------------+----+----------------+------------------------+
|             country|  region|           subregion|year| homicide_number|rate_per_100k_population|
+--------------------+--------+--------------------+----+----------------+------------------------+
|               Aruba|Americas|Latin America and...|1990|             0.0|                     0.0|
|            Anguilla|Americas|Latin America and...|1990|             0.0|                     0.0|
|             Armenia|    Asia|        Western Asia|1990|           178.0|        5.01108138714052|
| Antigua and Barbuda|Americas|Latin America and...|1990|             1.0|        1.58562786899543|
|           Australia| Oceania|Australia and New...|1990|           374.0|        2.18377608517614|
|             Austria|  Europe|      Western Europe|1990|            89.0|        1.15891082955944|
|          Azerbaijan|    Asia|        Western Asia|1990|           329.0|        4.54980634281726|


In [None]:
> # Buat Dimension and Fact Table 

> DIM COUNTRY

Country,
Country_code,
Region,
Subregion,
Latitude,
Longitude,
country_status

In [31]:
dfgni.show()

+--------------------+------------+--------------------+--------------+----+----+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+
|        Country Name|Country Code|      Indicator

In [43]:
dfedu3 = final_dfedu\
                .select(
                    final_dfedu.country,
                    final_dfedu.latitude, final_dfedu.longitude
                )

dfgni3 = final_dfgni.filter(col("year") == "2021").select(
            final_dfgni.country, final_dfgni.country_code,final_dfgni.country_status)

dfhomi3 = final_dfhomi.select(
            final_dfhomi.country,final_dfhomi.region, final_dfhomi.subregion)

In [44]:
dim_country = dfedu3.join(dfhomi3, on=["country"], how="left").join(dfgni3, on="country", how="left")

In [45]:
dim_country = dim_country.dropDuplicates()

In [46]:
dim_country.show(200)

+--------------------+----------+-----------+--------+--------------------+------------+--------------+
|             country|  latitude|  longitude|  region|           subregion|country_code|country_status|
+--------------------+----------+-----------+--------+--------------------+------------+--------------+
|            Paraguay| 23.442503|  58.443832|Americas|Latin America and...|         PRY| Middle Income|
|              Sweden| 60.128161|  18.643501|  Europe|     Northern Europe|         SWE|   High Income|
|            Kiribati| 1.8368976|157.3768317| Oceania|          Micronesia|         KIR| Middle Income|
|              Guyana|  4.860416|   58.93018|Americas|Latin America and...|         GUY| Middle Income|
|             Eritrea| 15.179384|  39.782334|  Africa|  Sub-Saharan Africa|         ERI|    Low Income|
|         Philippines| 12.879721| 121.774017|    Asia|  South-eastern Asia|         PHL| Middle Income|
|            Djibouti| 11.825138|  42.590275|  Africa|  Sub-Saha

> FACT EDUCATION

country/year (concat)(PK),
country (FK),
year,
completion_rate,
dropout_rate,
unemployment_rate

In [48]:
fact_education = final_dfedu.select(
                    final_dfedu.country,
                    final_dfedu.completion_rate, final_dfedu.dropout_rate,
                    final_dfedu.unemployment_rate
                    )

In [50]:
fact_education.show()

+-------------------+---------------+------------+-----------------+
|            country|completion_rate|dropout_rate|unemployment_rate|
+-------------------+---------------+------------+-----------------+
|        Afghanistan|           23.0|        56.5|            11.12|
|            Albania|           78.0|        18.0|            12.33|
|            Algeria|           29.5|         0.0|             11.7|
|            Andorra|            0.0|         0.0|              0.0|
|             Angola|           19.5|         0.0|             6.89|
|           Anguilla|            0.0|         0.0|              0.0|
|Antigua and Barbuda|            0.0|        13.0|              0.0|
|          Argentina|           49.5|        11.0|             9.79|
|            Armenia|           74.0|        10.0|            16.99|
|          Australia|            0.0|         8.0|             5.27|
|            Austria|            0.0|         9.5|             4.67|
|         Azerbaijan|            0

> FACT GDP

GDP_rate,
country (FK),
GDP_year

In [63]:
fact_gdp = final_dfgdp.select(
            final_dfgdp.country, final_dfgdp.year, final_dfgdp.gdp)

In [64]:
fact_gdp.show()

+--------------------+----+----+
|             country|year| gdp|
+--------------------+----+----+
|               Aruba|1960|NULL|
|Africa Eastern an...|1960|NULL|
|         Afghanistan|1960|NULL|
|Africa Western an...|1960|NULL|
|              Angola|1960|NULL|
|             Albania|1960|NULL|
|             Andorra|1960|NULL|
|          Arab World|1960|NULL|
|United Arab Emirates|1960|NULL|
|           Argentina|1960|NULL|
|             Armenia|1960|NULL|
|      American Samoa|1960|NULL|
| Antigua and Barbuda|1960|NULL|
|           Australia|1960|NULL|
|             Austria|1960|NULL|
|          Azerbaijan|1960|NULL|
|             Burundi|1960|NULL|
|             Belgium|1960|NULL|
|               Benin|1960|NULL|
|        Burkina Faso|1960|NULL|
+--------------------+----+----+
only showing top 20 rows



> FACT HOMICIDE

ambil dari final_dfhomi yg masih eror
country (FK)
year
homicide_rate
homicide_number

In [65]:
fact_homi = final_dfhomi.select(
            final_dfhomi.country, final_dfhomi.year, 
            final_dfhomi.homicide_number, final_dfhomi.rate_per_100k_population)

In [66]:
final_dfhomi.show(1000)

+--------------------+--------+--------------------+----+----------------+------------------------+
|             country|  region|           subregion|year| homicide_number|rate_per_100k_population|
+--------------------+--------+--------------------+----+----------------+------------------------+
|               Aruba|Americas|Latin America and...|1990|             0.0|                     0.0|
|            Anguilla|Americas|Latin America and...|1990|             0.0|                     0.0|
|             Armenia|    Asia|        Western Asia|1990|           178.0|        5.01108138714052|
| Antigua and Barbuda|Americas|Latin America and...|1990|             1.0|        1.58562786899543|
|           Australia| Oceania|Australia and New...|1990|           374.0|        2.18377608517614|
|             Austria|  Europe|      Western Europe|1990|            89.0|        1.15891082955944|
|          Azerbaijan|    Asia|        Western Asia|1990|           329.0|        4.54980634281726|


> # LOAD

In [None]:
dim_country.repartition(1).write.csv("dim_country.csv", header=True, mode="append")
fact_education.repartition(1).write.csv("fact_education.csv", header=True, mode="append")
fact_gdp.repartition(1).write.csv("fact_gdp", header=True, mode="append")
fact_homicide.repartition(1).write.csv(fact_homicide, header=True, mode="append")