# Required Installations of Spark & Jave and set up of Environment Variables.

In [1]:
import os
# Find the latest version of spark 3.2 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.3'
spark_version = 'spark-3.2.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()



0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
            Hit:2 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
            Hit:3 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
0% [Waiting for headers] [Waiting for headers] [Connected to cloud.r-project.or                                                                               Get:4 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
                                                                               Get:5 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
                                                                               Get:6 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [4 InRelease 88.7 kB/88.7 kB 100%] [Waiting for headers] [Waiting for header                                                                               Hit:7 http://ppa.launchpa

In [3]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.5.1.jar

--2023-01-04 23:44:29--  https://jdbc.postgresql.org/download/postgresql-42.5.1.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1046770 (1022K) [application/java-archive]
Saving to: ‘postgresql-42.5.1.jar’


2023-01-04 23:44:31 (1.66 MB/s) - ‘postgresql-42.5.1.jar’ saved [1046770/1046770]



# Load Amazon Data into Spark DataFrame
## Get the Country latitude and longitude details with GDP

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FinalProject-WomenWellBeing").config("spark.driver.extraClassPath","/content/postgresql-42.5.1.jar").getOrCreate()


In [5]:
from pyspark import SparkFiles
url = "https://womenwellbeing.s3.us-west-1.amazonaws.com/world_country_and_usa_states_latitude_and_longitude_values.csv"
spark.sparkContext.addFile(url)



from pyspark.sql.types import StructType,StructField,StringType,IntegerType,DoubleType
schema = StructType([
  StructField("country_code", StringType(), False),  
  StructField("latitude", DoubleType(), True),
  StructField("longitude", DoubleType(), True),
  StructField("country_name", StringType(), False),
  ])

country_df = spark.read.option("encoding", "UTF-8").schema(schema).csv(SparkFiles.get("world_country_and_usa_states_latitude_and_longitude_values.csv"), sep=",", header=True, inferSchema=True)
country_df.show()


+------------+----------+-----------+--------------------+
|country_code|  latitude|  longitude|        country_name|
+------------+----------+-----------+--------------------+
|          AD| 42.546245|   1.601554|             Andorra|
|          AE| 23.424076|  53.847818|United Arab Emirates|
|          AF|  33.93911|  67.709953|         Afghanistan|
|          AG| 17.060816| -61.796428| Antigua and Barbuda|
|          AI| 18.220554| -63.068615|            Anguilla|
|          AL| 41.153332|  20.168331|             Albania|
|          AM| 40.069099|  45.038189|             Armenia|
|          AN| 12.226079| -69.060087|Netherlands Antilles|
|          AO|-11.202692|  17.873887|              Angola|
|          AQ|-75.250973|  -0.071389|          Antarctica|
|          AR|-38.416097| -63.616672|           Argentina|
|          AS|-14.270972|-170.132217|      American Samoa|
|          AT| 47.516231|  14.550072|             Austria|
|          AU|-25.274398| 133.775136|           Australi

### Load the dataframe with required columns and clean the data.

In [6]:
country_clean_df  = country_df.select(country_df['country_code'],country_df['country_name'],country_df['latitude'],country_df['longitude'])
country_clean_df.summary().show()

country_clean_df.dropna().summary().show()

CountryDemographics = country_clean_df.dropna()

+-------+------------+------------+------------------+------------------+
|summary|country_code|country_name|          latitude|         longitude|
+-------+------------+------------+------------------+------------------+
|  count|         245|         245|               244|               244|
|   mean|        null|        null|  16.2531093647541|13.294813586065576|
| stddev|        null|        null|27.031206079229964| 73.97647657765005|
|    min|          AD| Afghanistan|        -75.250973|       -177.156097|
|    25%|        null|        null|         -0.522778|        -42.604303|
|    50%|        null|        null|         16.742498|         17.873887|
|    75%|        null|        null|         38.963745|         48.516388|
|    max|          ZW|    Zimbabwe|         77.553604|        179.414413|
+-------+------------+------------+------------------+------------------+

+-------+------------+------------+------------------+------------------+
|summary|country_code|country_name|  

In [7]:
CountryDemographics.show()

+------------+--------------------+----------+-----------+
|country_code|        country_name|  latitude|  longitude|
+------------+--------------------+----------+-----------+
|          AD|             Andorra| 42.546245|   1.601554|
|          AE|United Arab Emirates| 23.424076|  53.847818|
|          AF|         Afghanistan|  33.93911|  67.709953|
|          AG| Antigua and Barbuda| 17.060816| -61.796428|
|          AI|            Anguilla| 18.220554| -63.068615|
|          AL|             Albania| 41.153332|  20.168331|
|          AM|             Armenia| 40.069099|  45.038189|
|          AN|Netherlands Antilles| 12.226079| -69.060087|
|          AO|              Angola|-11.202692|  17.873887|
|          AQ|          Antarctica|-75.250973|  -0.071389|
|          AR|           Argentina|-38.416097| -63.616672|
|          AS|      American Samoa|-14.270972|-170.132217|
|          AT|             Austria| 47.516231|  14.550072|
|          AU|           Australia|-25.274398| 133.77513

### Load the Country GDP csv.
### Drop the unnecessary columns and drop the null values and store the clean data into a new dataframe.

In [8]:

from pyspark import SparkFiles
url = "https://womenwellbeing.s3.us-west-1.amazonaws.com/Country_GDP.csv"
spark.sparkContext.addFile(url)


from pyspark.sql.types import StructType,StructField,StringType,IntegerType,DoubleType
schema = StructType([
  StructField("country_code", StringType(), False),
  StructField("country_name", StringType(), False),
  
  StructField("2000", IntegerType(), True),
  StructField("2001", IntegerType(), True),
  StructField("2002", IntegerType(), True),
  StructField("2003", IntegerType(), True),
  StructField("2004", IntegerType(), True),
  StructField("2005", IntegerType(), True),
  StructField("2006", IntegerType(), True),
  StructField("2007", IntegerType(), True),
  StructField("2008", IntegerType(), True),
  StructField("2009", IntegerType(), True),
  StructField("2010", IntegerType(), True),
  StructField("2011", IntegerType(), True),
  StructField("2012", IntegerType(), True),
  StructField("2013", IntegerType(), True),
  StructField("2014", IntegerType(), True),
  StructField("2015", IntegerType(), True),
  StructField("2016", IntegerType(), True),
  StructField("2017", IntegerType(), True),
  StructField("2018", IntegerType(), True),
  StructField("2019", IntegerType(), True),
  StructField("2020", IntegerType(), True),
  StructField("2021", IntegerType(), True),
  ])






In [9]:

Country_GDP_df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("Country_GDP.csv"), sep=",", header=True, inferSchema=True)
Country_GDP_df.show()

Country_GDP_df.dtypes

+--------------------+------------+-----------------+--------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+-------------+-------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+--------

[('Country Name', 'string'),
 ('Country Code', 'string'),
 ('Indicator Name', 'string'),
 ('Indicator Code', 'string'),
 ('1960', 'double'),
 ('1961', 'double'),
 ('1962', 'double'),
 ('1963', 'double'),
 ('1964', 'double'),
 ('1965', 'double'),
 ('1966', 'double'),
 ('1967', 'double'),
 ('1968', 'double'),
 ('1969', 'double'),
 ('1970', 'double'),
 ('1971', 'double'),
 ('1972', 'double'),
 ('1973', 'double'),
 ('1974', 'double'),
 ('1975', 'double'),
 ('1976', 'double'),
 ('1977', 'double'),
 ('1978', 'double'),
 ('1979', 'double'),
 ('1980', 'double'),
 ('1981', 'double'),
 ('1982', 'double'),
 ('1983', 'double'),
 ('1984', 'double'),
 ('1985', 'double'),
 ('1986', 'double'),
 ('1987', 'double'),
 ('1988', 'double'),
 ('1989', 'double'),
 ('1990', 'double'),
 ('1991', 'double'),
 ('1992', 'double'),
 ('1993', 'double'),
 ('1994', 'double'),
 ('1995', 'double'),
 ('1996', 'double'),
 ('1997', 'double'),
 ('1998', 'double'),
 ('1999', 'double'),
 ('2000', 'double'),
 ('2001', 'double')

In [10]:
Country_GDP_df = Country_GDP_df.drop(Country_GDP_df["Indicator Name"])
Country_GDP_df = Country_GDP_df.drop(Country_GDP_df["Indicator Code"])
Country_GDP_df = Country_GDP_df.withColumnRenamed("Country Name","country_name")
Country_GDP_df = Country_GDP_df.withColumnRenamed("Country Code","country_code")
Country_GDP_df.printSchema()

root
 |-- country_name: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- 1960: double (nullable = true)
 |-- 1961: double (nullable = true)
 |-- 1962: double (nullable = true)
 |-- 1963: double (nullable = true)
 |-- 1964: double (nullable = true)
 |-- 1965: double (nullable = true)
 |-- 1966: double (nullable = true)
 |-- 1967: double (nullable = true)
 |-- 1968: double (nullable = true)
 |-- 1969: double (nullable = true)
 |-- 1970: double (nullable = true)
 |-- 1971: double (nullable = true)
 |-- 1972: double (nullable = true)
 |-- 1973: double (nullable = true)
 |-- 1974: double (nullable = true)
 |-- 1975: double (nullable = true)
 |-- 1976: double (nullable = true)
 |-- 1977: double (nullable = true)
 |-- 1978: double (nullable = true)
 |-- 1979: double (nullable = true)
 |-- 1980: double (nullable = true)
 |-- 1981: double (nullable = true)
 |-- 1982: double (nullable = true)
 |-- 1983: double (nullable = true)
 |-- 1984: double (nullable = true)
 |-- 19

In [11]:
Country_GDP_df.count()
Country_GDP_df= Country_GDP_df.dropna()
Country_GDP_df.count()


123

In [12]:
#select the years from the past two decades.
Country_GDPdecade_df = Country_GDP_df.select("country_name","country_code","2000","2001","2002","2003","2004","2005","2006","2007","2008","2009","2010","2011","2012","2013","2014","2015","2016","2017","2018","2019","2020","2021")

Country_GDPdecade_df.show(5)


+--------------------+------------+-------------+-------------+-------------+-------------+-------------+-------------+------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+
|        country_name|country_code|         2000|         2001|         2002|         2003|         2004|         2005|        2006|         2007|         2008|         2009|         2010|         2011|         2012|         2013|         2014|         2015|         2016|         2017|         2018|         2019|         2020|         2021|
+--------------------+------------+-------------+-------------+-------------+-------------+-------------+-------------+------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+

In [13]:
#Perform the aggregation of each country GDP row wise.

from pyspark.sql.functions import mean as _mean, stddev as _stddev, col

total_cols = len(Country_GDPdecade_df.columns)
total_cols = total_cols -2 
rowMean  = (sum(col(year) for year in Country_GDPdecade_df.columns[2:]) /total_cols )
rowMean
Country_GDP_mean_df = Country_GDPdecade_df.select(
    ["country_name",rowMean.alias("GDP")]
    # _stddev(col('columnName')).alias('std')
)
Country_GDP_mean_df.printSchema()


root
 |-- country_name: string (nullable = true)
 |-- GDP: double (nullable = true)



### Perform inner join between the two dataframes grouping by country_name.
### Avoid creating the duplicate columns.

In [14]:
# 

CountryDemographics_clean_df =CountryDemographics.join(Country_GDP_mean_df,['country_name'])
CountryDemographics_clean_df.printSchema()

CountryDemographics_clean_df.show()


root
 |-- country_name: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- GDP: double (nullable = true)

+--------------------+------------+----------+-----------+--------------------+
|        country_name|country_code|  latitude|  longitude|                 GDP|
+--------------------+------------+----------+-----------+--------------------+
|             Austria|          AT| 47.516231|  14.550072|3.711230454545455E11|
|           Australia|          AU|-25.274398| 133.775136|         1.058007E12|
|          Bangladesh|          BD| 23.684994|  90.356331|1.645153556962272...|
|             Belgium|          BE| 50.503887|   4.469936|4.491997272727273E11|
|        Burkina Faso|          BF| 12.238333|  -1.561593|1.044751617809091E10|
|             Burundi|          BI| -3.373056|  29.918886|1.9224104425090907E9|
|               Benin|          BJ|   9.30769|   2.315834| 9.943231901

### Check the summary of all the dataframes so far created.

In [19]:
Country_GDP_df.summary().show()
CountryDemographics.summary().show()

CountryDemographics_clean_df.summary().show()

+-------+--------------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------

# Configure the RDS settings to write the cleaned dataframe into the CountryDemographics table.

In [22]:
# Configure settings for RDS

mode = "append"
jdbc_url="jdbc:postgresql://database-1.c1bq6ytovfoa.us-west-2.rds.amazonaws.com:5432/postgres"
config = {"user":"postgres", 
          "password": "postgres", 
           "driver":"org.postgresql.Driver"
         }
# Write country_clean_df to table in RDS
CountryDemographics_clean_df.printSchema()


CountryDemographics_clean_df.write.jdbc(url=jdbc_url, table='countrydemographics', mode=mode, properties=config)
CountryDemographics_clean_df.show()

root
 |-- country_name: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- GDP: double (nullable = true)

+--------------------+------------+----------+-----------+--------------------+
|        country_name|country_code|  latitude|  longitude|                 GDP|
+--------------------+------------+----------+-----------+--------------------+
|             Austria|          AT| 47.516231|  14.550072|3.711230454545455E11|
|           Australia|          AU|-25.274398| 133.775136|         1.058007E12|
|          Bangladesh|          BD| 23.684994|  90.356331|1.645153556962272...|
|             Belgium|          BE| 50.503887|   4.469936|4.491997272727273E11|
|        Burkina Faso|          BF| 12.238333|  -1.561593|1.044751617809091E10|
|             Burundi|          BI| -3.373056|  29.918886|1.9224104425090907E9|
|               Benin|          BJ|   9.30769|   2.315834| 9.943231901