In [1]:
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, log, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType

spark = SparkSession.builder.appName("preparation").getOrCreate()


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/23 21:16:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/05/23 21:16:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/05/23 21:16:18 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
24/05/23 21:16:18 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
24/05/23 21:16:18 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.


In [2]:
df = spark.read.csv('global_education_data.csv', inferSchema=True, header=True)

In [3]:
df = df.drop('Grade_2_3_Proficiency_Reading', 'Grade_2_3_Proficiency_Math',
       'Primary_End_Proficiency_Reading', 'Primary_End_Proficiency_Math',
       'Lower_Secondary_End_Proficiency_Reading',
       'Lower_Secondary_End_Proficiency_Math',
       'Gross_Primary_Education_Enrollment',
       'Gross_Tertiary_Education_Enrollment')


row_count = df.count()
column_count = len(df.columns)
print(f"{row_count} rows, {column_count} columns")

df.printSchema()

201 rows, 21 columns
root
 |-- Countries and areas: string (nullable = true)
 |-- Latitude : double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- OOSR_Pre0Primary_Age_Male: integer (nullable = true)
 |-- OOSR_Pre0Primary_Age_Female: integer (nullable = true)
 |-- OOSR_Primary_Age_Male: integer (nullable = true)
 |-- OOSR_Primary_Age_Female: integer (nullable = true)
 |-- OOSR_Lower_Secondary_Age_Male: integer (nullable = true)
 |-- OOSR_Lower_Secondary_Age_Female: integer (nullable = true)
 |-- OOSR_Upper_Secondary_Age_Male: integer (nullable = true)
 |-- OOSR_Upper_Secondary_Age_Female: integer (nullable = true)
 |-- Completion_Rate_Primary_Male: integer (nullable = true)
 |-- Completion_Rate_Primary_Female: integer (nullable = true)
 |-- Completion_Rate_Lower_Secondary_Male: integer (nullable = true)
 |-- Completion_Rate_Lower_Secondary_Female: integer (nullable = true)
 |-- Completion_Rate_Upper_Secondary_Male: integer (nullable = true)
 |-- Completion_Rate_Upper_S

In [4]:
# Rename
df = df.withColumnRenamed("OOSR_Pre0Primary_Age_Male", "OOSR_Preprimary_Age_Male") \
    .withColumnRenamed("OOSR_Pre0Primary_Age_Female", "OOSR_Preprimary_Age_Female")
df.printSchema()

root
 |-- Countries and areas: string (nullable = true)
 |-- Latitude : double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- OOSR_Preprimary_Age_Male: integer (nullable = true)
 |-- OOSR_Preprimary_Age_Female: integer (nullable = true)
 |-- OOSR_Primary_Age_Male: integer (nullable = true)
 |-- OOSR_Primary_Age_Female: integer (nullable = true)
 |-- OOSR_Lower_Secondary_Age_Male: integer (nullable = true)
 |-- OOSR_Lower_Secondary_Age_Female: integer (nullable = true)
 |-- OOSR_Upper_Secondary_Age_Male: integer (nullable = true)
 |-- OOSR_Upper_Secondary_Age_Female: integer (nullable = true)
 |-- Completion_Rate_Primary_Male: integer (nullable = true)
 |-- Completion_Rate_Primary_Female: integer (nullable = true)
 |-- Completion_Rate_Lower_Secondary_Male: integer (nullable = true)
 |-- Completion_Rate_Lower_Secondary_Female: integer (nullable = true)
 |-- Completion_Rate_Upper_Secondary_Male: integer (nullable = true)
 |-- Completion_Rate_Upper_Secondary_Female: intege

In [5]:
# Deriving
df = df.withColumn("High_OOSR_Preprimary", when((col("OOSR_Preprimary_Age_Male") + 
                                                 col("OOSR_Preprimary_Age_Female")) > 80, 1).otherwise(0)) \
    .withColumn("High_OOSR_Primary", when((col("OOSR_Primary_Age_Male") + 
                                           col("OOSR_Primary_Age_Female")) > 80, 1).otherwise(0)) \
    .withColumn("High_OOSR_Lower_Secondary", when((col("OOSR_Lower_Secondary_Age_Male") + 
                                                   col("OOSR_Lower_Secondary_Age_Female")) > 80, 1).otherwise(0)) \
    .withColumn("High_OOSR_Upper_Secondary", when((col("OOSR_Upper_Secondary_Age_Male") + 
                                                   col("OOSR_Upper_Secondary_Age_Female")) > 80, 1).otherwise(0))

df.printSchema()

root
 |-- Countries and areas: string (nullable = true)
 |-- Latitude : double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- OOSR_Preprimary_Age_Male: integer (nullable = true)
 |-- OOSR_Preprimary_Age_Female: integer (nullable = true)
 |-- OOSR_Primary_Age_Male: integer (nullable = true)
 |-- OOSR_Primary_Age_Female: integer (nullable = true)
 |-- OOSR_Lower_Secondary_Age_Male: integer (nullable = true)
 |-- OOSR_Lower_Secondary_Age_Female: integer (nullable = true)
 |-- OOSR_Upper_Secondary_Age_Male: integer (nullable = true)
 |-- OOSR_Upper_Secondary_Age_Female: integer (nullable = true)
 |-- Completion_Rate_Primary_Male: integer (nullable = true)
 |-- Completion_Rate_Primary_Female: integer (nullable = true)
 |-- Completion_Rate_Lower_Secondary_Male: integer (nullable = true)
 |-- Completion_Rate_Lower_Secondary_Female: integer (nullable = true)
 |-- Completion_Rate_Upper_Secondary_Male: integer (nullable = true)
 |-- Completion_Rate_Upper_Secondary_Female: intege

In [6]:
# Read region.csv file into a Spark DataFrame
region = spark.read.csv("region.csv", header=True, inferSchema=True)
region.show()

+-------------------+--------+
|Countries and areas|  Region|
+-------------------+--------+
|        Afghanistan|    Asia|
|            Albania|  Europe|
|            Algeria|  Africa|
|            Andorra|  Europe|
|             Angola|  Africa|
|           Anguilla|Americas|
|Antigua and Barbuda|Americas|
|          Argentina|Americas|
|            Armenia|    Asia|
|          Australia| Oceania|
|            Austria|  Europe|
|         Azerbaijan|    Asia|
|        The Bahamas|Americas|
|            Bahrain|    Asia|
|         Bangladesh|    Asia|
|           Barbados|Americas|
|            Belarus|  Europe|
|            Belgium|  Europe|
|             Belize|Americas|
|              Benin|  Africa|
+-------------------+--------+
only showing top 20 rows



In [7]:
# Define the schema for the DataFrame
schema = StructType([
    StructField("Countries and areas", StringType(), True),
    StructField("OOSR", IntegerType(), True),
    StructField("Age_group", StringType(), True),
    StructField("Gender", StringType(), True),
    StructField("High_OOSR", IntegerType(), True)
])

# Create an empty DataFrame with the defined schema
OOSR = spark.createDataFrame([], schema)

In [8]:
age_groups = ['Preprimary', 'Primary', 'Lower_Secondary', 'Upper_Secondary']
genders = ['Male', 'Female']

for age_group in age_groups:
    for gender in genders:
        gender_col = f"OOSR_{age_group}_Age_{gender}"
        
        temp_df = df.select(
            col("Countries and areas"),
            col(gender_col).alias("OOSR"),
            lit(age_group).alias("Age_group"),
            lit(gender).alias("Gender"),
            when(col(gender_col) > 50, 1).otherwise(0).alias("High_OOSR")
        ).filter(col(gender_col).isNotNull())
        
        OOSR = OOSR.union(temp_df)

In [9]:
age_group_order = when(col("Age_group") == "Preprimary", 1).when(col("Age_group") == "Primary", 2) \
                  .when(col("Age_group") == "Lower_Secondary", 3).when(col("Age_group") == "Upper_Secondary", 4)

OOSR = OOSR.orderBy("Countries and areas", age_group_order)

OOSR.show()



+-------------------+----+---------------+------+---------+
|Countries and areas|OOSR|      Age_group|Gender|High_OOSR|
+-------------------+----+---------------+------+---------+
|        Afghanistan|   0|     Preprimary|Female|        0|
|        Afghanistan|   0|     Preprimary|  Male|        0|
|        Afghanistan|   0|        Primary|  Male|        0|
|        Afghanistan|   0|        Primary|Female|        0|
|        Afghanistan|   0|Lower_Secondary|Female|        0|
|        Afghanistan|   0|Lower_Secondary|  Male|        0|
|        Afghanistan|  44|Upper_Secondary|  Male|        0|
|        Afghanistan|  69|Upper_Secondary|Female|        1|
|            Albania|   2|     Preprimary|Female|        0|
|            Albania|   4|     Preprimary|  Male|        0|
|            Albania|   6|        Primary|  Male|        0|
|            Albania|   3|        Primary|Female|        0|
|            Albania|   1|Lower_Secondary|Female|        0|
|            Albania|   6|Lower_Secondar



In [10]:
# Merge the data
df = df.join(region, on='Countries and areas', how='left')
OOSR = OOSR.join(region, on='Countries and areas', how='left')

In [11]:
row_count = df.count()
column_count = len(df.columns)
print(f"{row_count} rows, {column_count} columns")
df.printSchema()

201 rows, 26 columns
root
 |-- Countries and areas: string (nullable = true)
 |-- Latitude : double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- OOSR_Preprimary_Age_Male: integer (nullable = true)
 |-- OOSR_Preprimary_Age_Female: integer (nullable = true)
 |-- OOSR_Primary_Age_Male: integer (nullable = true)
 |-- OOSR_Primary_Age_Female: integer (nullable = true)
 |-- OOSR_Lower_Secondary_Age_Male: integer (nullable = true)
 |-- OOSR_Lower_Secondary_Age_Female: integer (nullable = true)
 |-- OOSR_Upper_Secondary_Age_Male: integer (nullable = true)
 |-- OOSR_Upper_Secondary_Age_Female: integer (nullable = true)
 |-- Completion_Rate_Primary_Male: integer (nullable = true)
 |-- Completion_Rate_Primary_Female: integer (nullable = true)
 |-- Completion_Rate_Lower_Secondary_Male: integer (nullable = true)
 |-- Completion_Rate_Lower_Secondary_Female: integer (nullable = true)
 |-- Completion_Rate_Upper_Secondary_Male: integer (nullable = true)
 |-- Completion_Rate_Upper_Sec

In [12]:
row_count = OOSR.count()
column_count = len(OOSR.columns)
print(f"{row_count} rows, {column_count} columns")

OOSR.printSchema()

1608 rows, 6 columns
root
 |-- Countries and areas: string (nullable = true)
 |-- OOSR: integer (nullable = true)
 |-- Age_group: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- High_OOSR: integer (nullable = true)
 |-- Region: string (nullable = true)



In [13]:
df = df.withColumn("High_OOSR_Preprimary", col("High_OOSR_Preprimary").cast(BooleanType())) \
       .withColumn("High_OOSR_Primary", col("High_OOSR_Primary").cast(BooleanType())) \
       .withColumn("High_OOSR_Lower_Secondary", col("High_OOSR_Lower_Secondary").cast(BooleanType())) \
       .withColumn("High_OOSR_Upper_Secondary", col("High_OOSR_Upper_Secondary").cast(BooleanType()))

In [14]:
df.printSchema()

root
 |-- Countries and areas: string (nullable = true)
 |-- Latitude : double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- OOSR_Preprimary_Age_Male: integer (nullable = true)
 |-- OOSR_Preprimary_Age_Female: integer (nullable = true)
 |-- OOSR_Primary_Age_Male: integer (nullable = true)
 |-- OOSR_Primary_Age_Female: integer (nullable = true)
 |-- OOSR_Lower_Secondary_Age_Male: integer (nullable = true)
 |-- OOSR_Lower_Secondary_Age_Female: integer (nullable = true)
 |-- OOSR_Upper_Secondary_Age_Male: integer (nullable = true)
 |-- OOSR_Upper_Secondary_Age_Female: integer (nullable = true)
 |-- Completion_Rate_Primary_Male: integer (nullable = true)
 |-- Completion_Rate_Primary_Female: integer (nullable = true)
 |-- Completion_Rate_Lower_Secondary_Male: integer (nullable = true)
 |-- Completion_Rate_Lower_Secondary_Female: integer (nullable = true)
 |-- Completion_Rate_Upper_Secondary_Male: integer (nullable = true)
 |-- Completion_Rate_Upper_Secondary_Female: intege

In [15]:
columns_to_drop = ['Completion_Rate_Primary_Male', 'Completion_Rate_Primary_Female', 
                   'Completion_Rate_Lower_Secondary_Male', 'Completion_Rate_Lower_Secondary_Female',
                   'Completion_Rate_Upper_Secondary_Male', 'Completion_Rate_Upper_Secondary_Female',
                   'Youth_15_24_Literacy_Rate_Male', 'Youth_15_24_Literacy_Rate_Female',
                   'Birth_Rate', 'Unemployment_Rate']
df = df.drop(*columns_to_drop)


In [16]:
row_count = df.count()
column_count = len(df.columns)
print(f"{row_count} rows, {column_count} columns")
df.printSchema()

201 rows, 16 columns
root
 |-- Countries and areas: string (nullable = true)
 |-- Latitude : double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- OOSR_Preprimary_Age_Male: integer (nullable = true)
 |-- OOSR_Preprimary_Age_Female: integer (nullable = true)
 |-- OOSR_Primary_Age_Male: integer (nullable = true)
 |-- OOSR_Primary_Age_Female: integer (nullable = true)
 |-- OOSR_Lower_Secondary_Age_Male: integer (nullable = true)
 |-- OOSR_Lower_Secondary_Age_Female: integer (nullable = true)
 |-- OOSR_Upper_Secondary_Age_Male: integer (nullable = true)
 |-- OOSR_Upper_Secondary_Age_Female: integer (nullable = true)
 |-- High_OOSR_Preprimary: boolean (nullable = false)
 |-- High_OOSR_Primary: boolean (nullable = false)
 |-- High_OOSR_Lower_Secondary: boolean (nullable = false)
 |-- High_OOSR_Upper_Secondary: boolean (nullable = false)
 |-- Region: string (nullable = true)



In [17]:
columns_for_log = ['OOSR_Preprimary_Age_Male', 'OOSR_Preprimary_Age_Female', 'OOSR_Primary_Age_Male', 
                   'OOSR_Primary_Age_Female', 'OOSR_Lower_Secondary_Age_Male', 'OOSR_Lower_Secondary_Age_Female',
                   'OOSR_Upper_Secondary_Age_Male', 'OOSR_Upper_Secondary_Age_Female']

df_log = df.select([log(col_name).alias(col_name) for col_name in columns_for_log])

In [18]:
# The first 15 lines of output data
columns = df_log.columns
num_columns_per_output = 3
num_iterations = (len(columns) + num_columns_per_output - 1) // num_columns_per_output

for i in range(num_iterations):
    start_idx = i * num_columns_per_output
    end_idx = min((i + 1) * num_columns_per_output, len(columns))
    selected_columns = columns[start_idx:end_idx]
    
    print(f"Displaying columns: {selected_columns}")
    df_log.select(selected_columns).show(15, truncate=False)

Displaying columns: ['OOSR_Preprimary_Age_Male', 'OOSR_Preprimary_Age_Female', 'OOSR_Primary_Age_Male']
+------------------------+--------------------------+---------------------+
|OOSR_Preprimary_Age_Male|OOSR_Preprimary_Age_Female|OOSR_Primary_Age_Male|
+------------------------+--------------------------+---------------------+
|null                    |null                      |null                 |
|1.3862943611198906      |0.6931471805599453        |1.791759469228055    |
|null                    |null                      |null                 |
|null                    |null                      |null                 |
|3.4339872044851463      |3.6635616461296463        |null                 |
|2.6390573296152584      |null                      |null                 |
|2.6390573296152584      |1.3862943611198906        |1.3862943611198906   |
|0.6931471805599453      |0.6931471805599453        |null                 |
|3.9512437185814275      |3.912023005428146         |2.19722

In [19]:
import os

df_output_path = "df"
OOSR_output_path = "OOSR"

df.coalesce(1).write.csv(df_output_path, header=True, mode="overwrite")
OOSR.coalesce(1).write.csv(OOSR_output_path, header=True, mode="overwrite")

def get_single_csv_path(output_path):
    part_files = [f for f in os.listdir(output_path) if f.startswith("part-") and f.endswith(".csv")]
    if part_files:
        return os.path.join(output_path, part_files[0])
    else:
        raise FileNotFoundError("No part-*.csv file found in the directory")

df_single_csv = get_single_csv_path(df_output_path)
OOSR_single_csv = get_single_csv_path(OOSR_output_path)

os.rename(df_single_csv, os.path.join(df_output_path, "df.csv"))
os.rename(OOSR_single_csv, os.path.join(OOSR_output_path, "OOSR.csv"))

for filename in os.listdir(df_output_path):
    if filename != "df.csv":
        os.remove(os.path.join(df_output_path, filename))

for filename in os.listdir(OOSR_output_path):
    if filename != "OOSR.csv":
        os.remove(os.path.join(OOSR_output_path, filename))
