## Iteration 4 code
### Daorui Wang dwan266 796617373

In [18]:
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HIV_Analysis').getOrCreate()
from pyspark.sql.functions import coalesce
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql import functions as F
from pyspark.sql.window import Window

### 1. Import data

In [7]:
deaths_data = spark.read.csv("no_of_deaths_by_country_clean.csv", header=True, inferSchema=True)
hiv_data = spark.read.csv("no_of_people_living_with_hiv_by_country_clean.csv", header=True, inferSchema=True)

# Rename the WHO Region column in the deaths_data dataset
deaths_data = deaths_data.withColumnRenamed("WHO Region", "WHO Region_deaths")

In [8]:
# Print the first five rows of the data so you can see the data structure
print("Deaths Data:")
print(deaths_data.head())

print("\nHIV Data:")
print(hiv_data.head())

Deaths Data:
Row(Country='Afghanistan', Year=2018, Count_deaths='500[200–610]', Count_median_deaths=500, Count_min_deaths=200, Count_max_deaths=610, WHO Region_deaths='Eastern Mediterranean')

HIV Data:
Row(Country='Afghanistan', Year=2018, Count_hiv='7200[4100–11000]', Count_median_hiv=7200, Count_min_hiv=4100, Count_max_hiv=11000, WHO Region='Eastern Mediterranean')


In [9]:
# View the data type 
print("\nDeaths Data Data Types:")
print(deaths_data.dtypes)

print("\nHIV Data Data Types:")
print(hiv_data.dtypes)


Deaths Data Data Types:
[('Country', 'string'), ('Year', 'int'), ('Count_deaths', 'string'), ('Count_median_deaths', 'int'), ('Count_min_deaths', 'int'), ('Count_max_deaths', 'int'), ('WHO Region_deaths', 'string')]

HIV Data Data Types:
[('Country', 'string'), ('Year', 'int'), ('Count_hiv', 'string'), ('Count_median_hiv', 'int'), ('Count_min_hiv', 'int'), ('Count_max_hiv', 'int'), ('WHO Region', 'string')]


### 2. Merge and select data

In [14]:
merged_data = deaths_data.join(hiv_data, ["Country", "Year"], "outer")

# Select the columns we need and select the WHO Region_deaths column in deaths_data as the WHO Region column of the merged dataset
merged_data = merged_data.select('Country', 'Year', 'Count_deaths', 'Count_median_deaths', 'Count_hiv', 'Count_median_hiv', 'WHO Region','WHO Region_deaths')

merged_data = merged_data.withColumn("WHO Region", coalesce(merged_data["WHO Region_deaths"], merged_data["WHO Region"]))

merged_data = merged_data.drop("WHO Region_deaths")

In [15]:
merged_data.show()

+-----------+----+-----------------+-------------------+--------------------+----------------+--------------------+
|    Country|Year|     Count_deaths|Count_median_deaths|           Count_hiv|Count_median_hiv|          WHO Region|
+-----------+----+-----------------+-------------------+--------------------+----------------+--------------------+
|Afghanistan|2000|     100[100–500]|                100|     1600[1000–3500]|            1600|Eastern Mediterra...|
|Afghanistan|2005|             null|               null|     2900[1700–5000]|            2900|Eastern Mediterra...|
|Afghanistan|2010|     500[200–500]|                500|     4200[2500–6200]|            4200|Eastern Mediterra...|
|Afghanistan|2018|     500[200–610]|                500|    7200[4100–11000]|            7200|Eastern Mediterra...|
|    Albania|2000|               na|               null|                  na|            null|              Europe|
|    Albania|2005|             null|               null|                

#### 2.1 Show the data quality

In [17]:
# View basic information of the data
print("Total rows:", merged_data.count())
print("Total columns:", len(merged_data.columns))
print("Column information:")
merged_data.printSchema()

# Check for missing values
print("Missing values for each column:")
merged_data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in merged_data.columns]).show()

# View statistics
print("Statistical Information:")
merged_data.describe().show()

Total rows: 680
Total columns: 7
Column information:
root
 |-- Country: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Count_deaths: string (nullable = true)
 |-- Count_median_deaths: integer (nullable = true)
 |-- Count_hiv: string (nullable = true)
 |-- Count_median_hiv: integer (nullable = true)
 |-- WHO Region: string (nullable = true)

Missing values for each column:
+-------+----+------------+-------------------+---------+----------------+----------+
|Country|Year|Count_deaths|Count_median_deaths|Count_hiv|Count_median_hiv|WHO Region|
+-------+----+------------+-------------------+---------+----------------+----------+
|      0|   0|         170|                280|        0|             127|         0|
+-------+----+------------+-------------------+---------+----------------+----------+

Statistical Information:
+-------+-----------+----------------+--------------------+-------------------+--------------------+------------------+---------------+
|summary|    

### 3.Clean the data (Missing value)

#### 3.1 Populate values for Count_median_deaths column for 2005 for each country

In [19]:
# 1. Calculate the average of Count_median_deaths for the years 2000 and 2010 for each country
avg_window = Window.partitionBy("Country")
average_values = merged_data.withColumn("Avg_2000_2010", 
                                        F.when((F.col("Year") == 2000) | (F.col("Year") == 2010), F.col("Count_median_deaths"))
                                         .otherwise(None))\
                            .withColumn("Avg_2000_2010", F.avg("Avg_2000_2010").over(avg_window))

# 2. Fill the 2005 values for Count_median_deaths with the calculated average for each country
filled_data = average_values.withColumn("Count_median_deaths",
                                        F.when((F.col("Year") == 2005) & (F.col("Count_median_deaths").isNull()), F.col("Avg_2000_2010"))
                                         .otherwise(F.col("Count_median_deaths")))

# Drop the auxiliary column
filled_data = filled_data.drop("Avg_2000_2010")

# Display the filled data for 2005
filled_data.filter(filled_data['Year'] == 2005).select('Country', 'Year', 'Count_median_deaths').show()

+--------------------+----+-------------------+
|             Country|Year|Count_median_deaths|
+--------------------+----+-------------------+
|         Afghanistan|2005|              300.0|
|             Albania|2005|               null|
|             Algeria|2005|              150.0|
|              Angola|2005|             7400.0|
|           Argentina|2005|             1500.0|
|             Armenia|2005|              150.0|
|           Australia|2005|              500.0|
|             Austria|2005|               null|
|          Azerbaijan|2005|               null|
|             Bahamas|2005|              350.0|
|             Bahrain|2005|               null|
|          Bangladesh|2005|              300.0|
|            Barbados|2005|              100.0|
|             Belarus|2005|              150.0|
|             Belgium|2005|               null|
|              Belize|2005|              150.0|
|               Benin|2005|             2200.0|
|              Bhutan|2005|             

#### 3.2 Delete rows containing NaN values

In [20]:
# Drop rows with NaN values
filtered_data = filled_data.dropna()

# Display the result
filtered_data.show()

+-----------+----+-----------------+-------------------+--------------------+----------------+--------------------+
|    Country|Year|     Count_deaths|Count_median_deaths|           Count_hiv|Count_median_hiv|          WHO Region|
+-----------+----+-----------------+-------------------+--------------------+----------------+--------------------+
|Afghanistan|2000|     100[100–500]|              100.0|     1600[1000–3500]|            1600|Eastern Mediterra...|
|Afghanistan|2010|     500[200–500]|              500.0|     4200[2500–6200]|            4200|Eastern Mediterra...|
|Afghanistan|2018|     500[200–610]|              500.0|    7200[4100–11000]|            7200|Eastern Mediterra...|
|    Algeria|2000|     100[100–100]|              100.0|     1900[1700–2000]|            1900|              Africa|
|    Algeria|2010|     200[100–200]|              200.0|     7100[6600–7600]|            7100|              Africa|
|    Algeria|2018|     200[200–200]|              200.0|  16000[15000–17

### 3.3 Show the filtered data quality

In [21]:
# View basic information of the data
print("Total rows:", filtered_data.count())
print("Total columns:", len(filtered_data.columns))
print("Column information:")
filtered_data.printSchema()

# Check for missing values
print("Missing values for each column:")
filtered_data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in filtered_data.columns]).show()

# View statistics
print("Statistical Information:")
filtered_data.describe().show()

Total rows: 397
Total columns: 7
Column information:
root
 |-- Country: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Count_deaths: string (nullable = true)
 |-- Count_median_deaths: double (nullable = true)
 |-- Count_hiv: string (nullable = true)
 |-- Count_median_hiv: integer (nullable = true)
 |-- WHO Region: string (nullable = true)

Missing values for each column:
+-------+----+------------+-------------------+---------+----------------+----------+
|Country|Year|Count_deaths|Count_median_deaths|Count_hiv|Count_median_hiv|WHO Region|
+-------+----+------------+-------------------+---------+----------------+----------+
|      0|   0|           0|                  0|        0|               0|         0|
+-------+----+------------+-------------------+---------+----------------+----------+

Statistical Information:
+-------+-----------+------------------+--------------------+-------------------+--------------------+------------------+---------------+
|summary|   