# DS8003 Management of Big Data and Tools Final Project Code Snippets and Screenshots
## Group 3:
- Derek Liu (501279266)
- Sazia Afreen (500995892)
- Kashish Prasad (501312850)
- Muhammad Zaka Shaheryar (500648718)

# Project Dataset: Life expectancy & Socio-Economic (world bank)

# Table of Contents:
1. Data Storage with HDFS
2. Data Processing and Data Description with Spark
3. Data Query with ElasticSearch (Q1-2)
4. Data Query with Spark (Q3)
5. Data Query with Hive  (Q4-5)

# Part I: Data Storage with HDFS

## 1.1. Download the dataset from the Internet
Go to the dataset website at Kaggle ([link](https://www.kaggle.com/datasets/mjshri23/life-expectancy-and-socio-economic-world-bank)) and download the dataset `life expectancy.csv`, and rename it as `life_expectancy.csv` to facilitate processing with the command prompt.

## 1.2. On local filesystem:
1. Create a directory `project` under `/root`
```
mkdir /root/project
```
2. Upload the dataset via Filezilla:
    - On the right-side box, navigate to `/root/project`
    - On the left-side box, navigate and find the `life_expectancy.csv` and drag it to `/root/project` on the right to the Sandbox
3. The `project` should be stored in `/root/project`.

## 1.3. On HDFS:
1. Create a directory `project` in HDFS:
```
hadoop fs -mkdir /user/root/project
```
3. Make sure that you’re in the `project` directory
```
cd project
```
4. Upload the `life_expectancy.csv` to HDFS:
```
hadoop fs -put life_expectancy.csv /user/root/project
```
5. Check the content in the `/user/root/project` directory
```
hadoop fs -ls /user/root/project
```

### Snapshot of the virtual environment

In [None]:
from IPython.display import Image
Image(url="https://raw.githubusercontent.com/gingkwan/ds8003_f24/refs/heads/main/project/Part_1.2.1.png")

In [None]:
Image(url="https://raw.githubusercontent.com/gingkwan/ds8003_f24/refs/heads/main/project/Part_1.3.png")

# Part II: Data Processing and Data Description with Spark

## 2.1. Start `pyspark`
```
pyspark
```

## 2.2. Load Required Packages and Source Data
```
from pyspark.sql import SparkSession
from pyspark.sql.functions import mean

spark = SparkSession.builder.appName("LifeExpectancyAnalysis").getOrCreate()

df = spark.read.csv("hdfs:///user/root/project/life_expectancy.csv", header=True, inferSchema=True)
```

## 2.3. Drop Unnecessary Columns
```
df_cleaned = df.drop("Corruption", "Country Code")
```

## 2.4. Impute Missing Values with Column Mean for Numerical Columns
```
numerical_columns = [col[0] for col in df_cleaned.dtypes if col[1] in ['int', 'double']]
imputations = {col: df_cleaned.select(mean(col)).collect()[0][0] for col in numerical_columns}
df_cleaned = df_cleaned.fillna(imputations)
```

## 2.5. Cast Numerical Columns to Appropiate Data Type
```
for col_name in numerical_columns:
	df_cleaned = df_cleaned.withColumn(col_name, col(col_name).cast("float"))
df_cleaned = df_cleaned.withColumn(Year, col(Year).cast("int"))
```

## 2.6. Changing `IncomeGroup` as `integer`
```
# Import required packages
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# Step 1: Define the mapping dictionary
income_group_mapping = {
    "Low income": 1,
    "Lower middle income": 2,
    "Upper middle income": 3,
    "High income": 4
}

# Step 2: Define a UDF to map the values
def map_income_group(value):
    return income_group_mapping.get(value, None)

# Register the UDF
map_income_group_udf = udf(map_income_group, IntegerType())

# Step 3: Apply the UDF to transform the Income Group column
df_cleaned = df_cleaned.withColumn("IncomeGroup", map_income_group_udf(col("IncomeGroup")))
```

## 2.7. Check the Cleaned DataFrame
```
df_cleaned.printSchema()
df_cleaned.show(5)
df_cleaned.describe().show()
```

## 2.8. Save the Cleaned DataFrame to HDFS
```
df_cleaned.write.csv("hdfs:///user/root/project/life_expectancy_cleaned.csv", header=True, mode="overwrite")
```

## Screenshots of the Environment

![image.png](attachment:image.png)

![image-2.png](attachment:image-2.png)

![image-3.png](attachment:image-3.png)

![image-4.png](attachment:image-4.png)

![image-5.png](attachment:image-5.png)

![image-6.png](attachment:image-6.png)

![image-7.png](attachment:image-7.png)

![image-8.png](attachment:image-8.png)

![image-9.png](attachment:image-9.png)

![image-10.png](attachment:image-10.png)

![image-11.png](attachment:image-11.png)

![image-12.png](attachment:image-12.png)

![image-13.png](attachment:image-13.png)

![image-14.png](attachment:image-14.png)

![image-15.png](attachment:image-15.png)

![image-16.png](attachment:image-16.png)

![image-17.png](attachment:image-17.png)

![image-18.png](attachment:image-18.png)

![image-19.png](attachment:image-19.png)

![image-20.png](attachment:image-20.png)

![image-21.png](attachment:image-21.png)

![image-22.png](attachment:image-22.png)

![image-23.png](attachment:image-23.png)

![image-24.png](attachment:image-24.png)

![image-25.png](attachment:image-25.png)

![image-26.png](attachment:image-26.png)

![image-27.png](attachment:image-27.png)

![image-28.png](attachment:image-28.png)

![image-29.png](attachment:image-29.png)

![image-30.png](attachment:image-30.png)

![image-31.png](attachment:image-31.png)

# Part III: ElasticSearch
## 3.1. Elastic Index Mappings
### Input Code:
```
{
  "properties": {
    "CO2": {"type": "float"},
    "Communicable": {"type": "float"},
    "Corruption": {"type": "float"},
    "Country Code": {"type": "keyword"},
    "Country Name": {"type": "keyword"},
    "Education Expenditure %": {"type": "float"},
    "Health Expenditure %": {"type": "float"},
    "IncomeGroup": {"type": "keyword"},
    "Injuries": {"type": "float"},
    "Life Expectancy World Bank": {"type": "float"},
    "NonCommunicable": {"type": "float"},
    "Prevelance of Undernourishment": {"type": "float"},
    "Region": {"type": "keyword"},
    "Sanitation": {"type": "float"},
    "Unemployment": {"type": "float"},
    "Year": {"type": "date", "format": "yyyy"}
  }
}
```


## 3.2 Ingest Pipelines with Data Processor
### Input Code
```
{
  "description": "Ingest pipeline created by text structure finder",
  "processors": [
    {"csv": {"field": "message", "target_fields": ["Country Name", "Country Code", "Region", "IncomeGroup", "Year", "Life Expectancy World Bank", "Prevelance of Undernourishment", "CO2", "Health Expenditure %", "Education Expenditure %", "Unemployment", "Corruption", "Sanitation", "Injuries", "Communicable", "NonCommunicable"], "ignore_missing": false}},
    {"rename": {"field": "CO2", "target_field": "co2_emissions", "ignore_missing": true}},
    {"rename": {"field": "Communicable", "target_field": "communicable", "ignore_missing": true}},
    {"rename": {"field": "Corruption", "target_field": "corruption", "ignore_missing": true}},
    {"rename": {"field": "Country Code", "target_field": "country_code", "ignore_missing": true}},
    {"rename": {"field": "Country Name", "target_field": "country", "ignore_missing": true}},
    {"rename": {"field": "Education Expenditure %", "target_field": "education_expenditure", "ignore_missing": true}},
    {"rename": {"field": "Health Expenditure %", "target_field": "healthcare_expenditure", "ignore_missing": true}},
    {"rename": {"field": "IncomeGroup", "target_field": "income_level", "ignore_missing": true}},
    {"rename": {"field": "Injuries", "target_field": "injuries", "ignore_missing": true}},
    {"rename": {"field": "Life Expectancy World Bank", "target_field": "life_expectancy", "ignore_missing": true}},
    {"rename": {"field": "NonCommunicable", "target_field": "non_communicable", "ignore_missing": true}},
    {"rename": {"field": "Prevelance of Undernourishment", "target_field": "undernourishment", "ignore_missing": true}},
    {"rename": {"field": "Region", "target_field": "region", "ignore_missing": true}},
    {"rename": {"field": "Sanitation", "target_field": "sanitation_level", "ignore_missing": true}},
    {"rename": {"field": "Unemployment", "target_field": "unemployment_rate", "ignore_missing": true}},
    {"convert": {"field": "co2_emissions", "type": "float", "ignore_missing": true}},
    {"convert": {"field": "communicable", "type": "float", "ignore_missing": true}},
    {"convert": {"field": "corruption", "type": "float", "ignore_missing": true}},
    {"convert": {"field": "education_expenditure", "type": "float", "ignore_missing": true}},
    {"convert": {"field": "healthcare_expenditure", "type": "float", "ignore_missing": true}},
    {"convert": {"field": "injuries", "type": "float", "ignore_missing": true}},
    {"convert": {"field": "life_expectancy", "type": "float", "ignore_missing": true}},
    {"convert": {"field": "non_communicable", "type": "float", "ignore_missing": true}},
    {"convert": {"field": "undernourishment", "type": "float", "ignore_missing": true}},
    {"convert": {"field": "sanitation_level", "type": "float", "ignore_missing": true}},
    {"convert": {"field": "unemployment_rate", "type": "float", "ignore_missing": true}},
    {"date": {"field": "Year", "formats": ["yyyy"], "timezone": "UTC", "ignore_failure": true}},
    {"remove": {"field": "message"}}
  ]
}
```
## Screenshots of the Environment

In [None]:
Image(url="https://raw.githubusercontent.com/gingkwan/ds8003_f24/refs/heads/main/project/Part_4.2.png")

## 3.3. Question 1
### Input Code:
```
POST /life_expectancy/_search
{
  "size": 0,
  "aggs": {
    "life_expectancy_over_time": {
      "date_histogram": {"field": "Year", "calendar_interval": "year"},
      "aggs": {
        "avg_life_expectancy": {"avg": {"field": "life_expectancy"}},
        "missing_life_expectancy": {"missing": {"field": "life_expectancy"}}
      }
    }
  }
}
```

### Output Snippet:
```
{
  "took": 1,
  "timed_out": false,
  "_shards": {"total": 1, "successful": 1, "skipped": 0, "failed": 0},
  "hits": {"total": {"value": 3306, "relation": "eq"}, "max_score": null, "hits": []},
  "aggregations": {
    "life_expectancy_over_time": {
      "buckets": [
        {"key_as_string": "2001", "key": 978307200000, "doc_count": 174, "missing_life_expectancy": {"doc_count": 10}, "avg_life_expectancy": {"value": 66.67023472669648}},
        {"key_as_string": "2002", "key": 1009843200000, "doc_count": 174, "missing_life_expectancy": {"doc_count": 9}, "avg_life_expectancy": {"value": 66.96920750935872}},
        {"key_as_string": "2003", "key": 1041379200000, "doc_count": 174, "missing_life_expectancy": {"doc_count": 10}, "avg_life_expectancy": {"value": 67.19824749085961}},
        {"key_as_string": "2004", "key": 1072915200000, "doc_count": 174, "missing_life_expectancy": {"doc_count": 10}, "avg_life_expectancy": {"value": 67.57582997112739}},
        {"key_as_string": "2005", "key": 1104537600000, "doc_count": 174, "missing_life_expectancy": {"doc_count": 9}, "avg_life_expectancy": {"value": 67.89232489845969}},
        {"key_as_string": "2006", "key": 1136073600000, "doc_count": 174, "missing_life_expectancy": {"doc_count": 10}, "avg_life_expectancy": {"value": 68.24670777669768}},
        {"key_as_string": "2007", "key": 1167609600000, "doc_count": 174, "missing_life_expectancy": {"doc_count": 10}, "avg_life_expectancy": {"value": 68.61899987662711}},
        {"key_as_string": "2008", "key": 1199145600000, "doc_count": 174, "missing_life_expectancy": {"doc_count": 10}, "avg_life_expectancy": {"value": 69.02661228179932}},
        {"key_as_string": "2009", "key": 1230768000000, "doc_count": 174, "missing_life_expectancy": {"doc_count": 10}, "avg_life_expectancy": {"value": 69.44180137355153}},
        {"key_as_string": "2010", "key": 1262304000000, "doc_count": 174, "missing_life_expectancy": {"doc_count": 10}, "avg_life_expectancy": {"value": 69.84225973268835}},
        {"key_as_string": "2011", "key": 1293840000000, "doc_count": 174, "missing_life_expectancy": {"doc_count": 10}, "avg_life_expectancy": {"value": 70.25856706572742}},
        {"key_as_string": "2012", "key": 1325376000000, "doc_count": 174, "missing_life_expectancy": {"doc_count": 10}, "avg_life_expectancy": {"value": 70.62610491310677}},
        {"key_as_string": "2013", "key": 1356998400000, "doc_count": 174, "missing_life_expectancy": {"doc_count": 10}, "avg_life_expectancy": {"value": 70.98988756319372}},
        {"key_as_string": "2014", "key": 1388534400000, "doc_count": 174, "missing_life_expectancy": {"doc_count": 10}, "avg_life_expectancy": {"value": 71.34372974023586}},
        {"key_as_string": "2015", "key": 1420070400000, "doc_count": 174, "missing_life_expectancy": {"doc_count": 10}, "avg_life_expectancy": {"value": 71.59502922616353}},
        {"key_as_string": "2016", "key": 1451606400000, "doc_count": 174, "missing_life_expectancy": {"doc_count": 10}, "avg_life_expectancy": {"value": 71.88926078052056}},
        {"key_as_string": "2017", "key": 1483228800000, "doc_count": 174, "missing_life_expectancy": {"doc_count": 10}, "avg_life_expectancy": {"value": 72.12473341313805}},
        {"key_as_string": "2018", "key": 1514764800000, "doc_count": 174, "missing_life_expectancy": {"doc_count": 10}, "avg_life_expectancy": {"value": 72.34848455103432}},
        {"key_as_string": "2019", "key": 1546300800000, "doc_count": 174, "missing_life_expectancy": {"doc_count": 10}, "avg_life_expectancy": {"value": 72.58911158398884}}
      ]
    }
  }
}

```

## Screenshots of the Environment

In [None]:
Image(url="https://raw.githubusercontent.com/gingkwan/ds8003_f24/refs/heads/main/project/Part_4.3.png")

## 3.4. Question 2
### Input Code:
```
POST /life_expectancy/_search
{
  "size": 0,
  "aggs": {
    "by_region": {
      "terms": {"field": "region.keyword", "size": 10, "order": {"avg_life_expectancy": "desc"}},
      "aggs": {
        "avg_life_expectancy": {"avg": {"field": "life_expectancy"}},
        "missing_life_expectancy": {"missing": {"field": "life_expectancy"}}
      }
    }
  }
}
```

### Output Snippet:
```
{
  "took": 3,
  "timed_out": false,
  "_shards": {"total": 1, "successful": 1, "skipped": 0, "failed": 0},
  "hits": {"total": {"value": 3306, "relation": "eq"}, "max_score": null, "hits": []},
  "aggregations": {
    "by_region": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {"key": "North America", "doc_count": 57, "avg_life_expectancy": {"value": 79.68649814003392}, "missing_life_expectancy": {"doc_count": 0}},
        {"key": "Europe & Central Asia", "doc_count": 893, "avg_life_expectancy": {"value": 76.18259820983741}, "missing_life_expectancy": {"doc_count": 57}},
        {"key": "Middle East & North Africa", "doc_count": 304, "avg_life_expectancy": {"value": 74.60599433748345}, "missing_life_expectancy": {"doc_count": 0}},
        {"key": "Latin America & Caribbean", "doc_count": 551, "avg_life_expectancy": {"value": 73.58897570955374}, "missing_life_expectancy": {"doc_count": 18}},
        {"key": "East Asia & Pacific", "doc_count": 513, "avg_life_expectancy": {"value": 72.04899799346924}, "missing_life_expectancy": {"doc_count": 113}},
        {"key": "South Asia", "doc_count": 152, "avg_life_expectancy": {"value": 68.52757898129914}, "missing_life_expectancy": {"doc_count": 0}},
        {"key": "Sub-Saharan Africa", "doc_count": 836, "avg_life_expectancy": {"value": 57.54266932364286}, "missing_life_expectancy": {"doc_count": 0}}
      ]
    }
  }
}
```

## Screenshots of the Environment

In [None]:
Image(url="https://raw.githubusercontent.com/gingkwan/ds8003_f24/refs/heads/main/project/Part_4.4.png")

# Part IV: Data query with Spark for Q3:

## Screenshots of the Environment

![image.png](attachment:image.png)

![image-2.png](attachment:image-2.png)

![image-3.png](attachment:image-3.png)

![image-4.png](attachment:image-4.png)

![image-5.png](attachment:image-5.png)

![image-6.png](attachment:image-6.png)


# Part V: Data Analysis with Hive

## 5.1. Start `hive`
```
hive
```

## 5.2. Create External Table in Hive
```
CREATE EXTERNAL TABLE cleaned_life_expectancy (
    country_name STRING,
    region STRING,
    income_group INT,
    year FLOAT,
    life_expectancy FLOAT,
    prevalence_of_undernourishment FLOAT,
    co2 FLOAT,
    health_expenditure FLOAT,
    education_expenditure FLOAT,
    unemployment FLOAT,
    sanitation FLOAT,
    injuries FLOAT,
    communicable FLOAT,
    non_communicable FLOAT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'hdfs:///user/root/project/life_expectancy_cleaned.csv';
```

## 5.3. To display all rows instead of 1st 20, set the Configuration Parameters:
```
SET hive.cli.print.header=true;
SET hive.fetch.task.conversion=none;
```

## 5.4. Check cleaned data in Hive:
```
SELECT * FROM cleaned_life_expectancy LIMIT 10;
```

## 5.5. Hive query for Question 4:
```
SELECT year, AVG(life_expectancy) as avg_life_expectancy, AVG(co2) as avg_co2, AVG(sanitation) as avg_sanitation FROM cleaned_life_expectancy GROUP BY year;
```

## 5.6. Hive query for Question 5:
```
SELECT year, region, AVG(life_expectancy) as avg_life_expectancy, AVG(injuries) as avg_injuries, AVG(communicable) as avg_communicable, AVG(non_communicable) as avg_non_communicable FROM cleaned_life_expectancy GROUP BY year, region ORDER BY year, region;
```

## Screenshots of the Environment:

![image.png](attachment:image.png)

![image-2.png](attachment:image-2.png)

![image-3.png](attachment:image-3.png)

![image-4.png](attachment:image-4.png)

![image-5.png](attachment:image-5.png)

![image-6.png](attachment:image-6.png)

![image-7.png](attachment:image-7.png)

![image-8.png](attachment:image-8.png)

![image-9.png](attachment:image-9.png)

![image-10.png](attachment:image-10.png)

![image-11.png](attachment:image-11.png)

![image-12.png](attachment:image-12.png)