## Pulling From AWS

In [1]:
import os

spark_version = 'spark-3.2.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu focal InRelease
0% [Waiting for headers] [Waiting for headers] [Connecting to cloud.r-project.o                                                                               Get:2 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
                                                                               Get:3 http://security.ubuntu.com/ubuntu focal-security InRelease [114 kB]
0% [2 InRelease 47.5 kB/114 kB 42%] [3 InRelease 75.0 kB/114 kB 66%] [Waiting f                                                                               Get:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease [1,581 B]
0% [2 InRelease 47.5 kB/114 kB 42%] [3 InRelease 77.9 kB/114 kB 68%] [Waiting f                                                                               Get:5 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease [3,622 B]
0% [2 InRelease 53.3

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.17.jar

--2023-03-16 22:34:27--  https://jdbc.postgresql.org/download/postgresql-42.2.17.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1004734 (981K) [application/java-archive]
Saving to: ‘postgresql-42.2.17.jar’


2023-03-16 22:34:28 (6.32 MB/s) - ‘postgresql-42.2.17.jar’ saved [1004734/1004734]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.17.jar").getOrCreate()

In [4]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://casey-elizabeth-haley-kaleb-bucket.s3.us-east-2.amazonaws.com/Total+Emissions+Per+Country+.csv"
spark.sparkContext.addFile(url)
total_emissions_uncleaned_df = spark.read.csv(SparkFiles.get("Total+Emissions+Per+Country+.csv"), sep=",", header=True, inferSchema=True)

## Data Cleaning

In [5]:
total_emissions_uncleaned_df.show()

+-----------+--------------------+--------------------+----------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|       Area|                Item|       Emission_Type|      Unit|Year_2000|Year_2001|Year_2002|Year_2003|Year_2004|Year_2005|Year_2006|Year_2007|Year_2008|Year_2009|Year_2010|Year_2011|Year_2012|Year_2013|Year_2014|Year_2015|Year_2016|Year_2017|Year_2018|Year_2019|Year_2020|
+-----------+--------------------+--------------------+----------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|Afghanistan|       Crop Residues|Direct emissions ...|kilotonnes|     0.52|   0.5267|     0.82|   0.9988|   0.8225|   1.1821|   1.0277|   1.2426|   0.8869|    1.392|   

In [6]:
total_emissions_dropna_df = total_emissions_uncleaned_df.dropna()
total_emissions_dropna_df.show()

+-----------+--------------------+--------------------+----------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+----------+
|       Area|                Item|       Emission_Type|      Unit|Year_2000|Year_2001|Year_2002|Year_2003|Year_2004|Year_2005|Year_2006|Year_2007|Year_2008|Year_2009|Year_2010|Year_2011|Year_2012|Year_2013|Year_2014| Year_2015| Year_2016| Year_2017| Year_2018| Year_2019| Year_2020|
+-----------+--------------------+--------------------+----------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+----------+
|Afghanistan|       Crop Residues|Direct emissions ...|kilotonnes|     0.52|   0.5267|     0.82|   0.9988|   0.8225|   1.1821|   1.0277|   1.2426|   0.

In [7]:
total_emissions_cleaned_df = total_emissions_dropna_df.replace("kilotonnes", "kilotons")
total_emissions_cleaned_df.show()

+-----------+--------------------+--------------------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+----------+
|       Area|                Item|       Emission_Type|    Unit|Year_2000|Year_2001|Year_2002|Year_2003|Year_2004|Year_2005|Year_2006|Year_2007|Year_2008|Year_2009|Year_2010|Year_2011|Year_2012|Year_2013|Year_2014| Year_2015| Year_2016| Year_2017| Year_2018| Year_2019| Year_2020|
+-----------+--------------------+--------------------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+----------+
|Afghanistan|       Crop Residues|Direct emissions ...|kilotons|     0.52|   0.5267|     0.82|   0.9988|   0.8225|   1.1821|   1.0277|   1.2426|   0.8869|   

## Summary DataFrame

In [8]:
import pandas as pd

In [9]:
emissions_pandas_df = total_emissions_cleaned_df.toPandas()

In [10]:
emissions_pandas_df

Unnamed: 0,Area,Item,Emission_Type,Unit,Year_2000,Year_2001,Year_2002,Year_2003,Year_2004,Year_2005,...,Year_2011,Year_2012,Year_2013,Year_2014,Year_2015,Year_2016,Year_2017,Year_2018,Year_2019,Year_2020
0,Afghanistan,Crop Residues,Direct emissions (N2O),kilotons,0.5200,0.5267,0.8200,0.9988,0.8225,1.1821,...,1.0321,1.3726,1.4018,1.4584,1.2424,1.1940,1.0617,0.8988,1.2176,1.3170
1,Afghanistan,Crop Residues,Indirect emissions (N2O),kilotons,0.1170,0.1185,0.1845,0.2247,0.1851,0.2660,...,0.2322,0.3088,0.3154,0.3281,0.2795,0.2687,0.2389,0.2022,0.2740,0.2963
2,Afghanistan,Crop Residues,Emissions (N2O),kilotons,0.6370,0.6452,1.0045,1.2235,1.0075,1.4481,...,1.2643,1.6815,1.7173,1.7865,1.5220,1.4627,1.3005,1.1011,1.4916,1.6133
3,Afghanistan,Crop Residues,Emissions (CO2eq) from N2O (AR5),kilotons,168.8070,170.9884,266.1975,324.2195,266.9995,383.7498,...,335.0379,445.5958,455.0727,473.4174,403.3181,387.6130,344.6447,291.7838,395.2689,427.5284
4,Afghanistan,Crop Residues,Emissions (CO2eq) (AR5),kilotons,168.8070,170.9884,266.1975,324.2195,266.9995,383.7498,...,335.0379,445.5958,455.0727,473.4174,403.3181,387.6130,344.6447,291.7838,395.2689,427.5284
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
38426,Sweden,Drained organic soils (N2O),Emissions (N2O),kilotons,5.1092,5.1209,5.1355,5.1857,5.2963,5.4384,...,6.0675,6.1048,6.1144,6.1765,6.1881,6.2629,6.3629,6.7063,6.9126,6.9338
38427,Sweden,Drained organic soils (N2O),Emissions (N2O),kilotons,3.0553,3.0473,3.0351,3.0228,3.0142,3.0049,...,2.9036,2.8808,2.8557,2.8395,2.8303,2.8156,2.7989,2.7878,2.7743,2.7589
38428,Sweden,Drained organic soils (N2O),Emissions (CO2eq) from N2O (AR5),kilotons,1353.9379,1357.0467,1360.8961,1374.2134,1403.5229,1441.1851,...,1607.8907,1617.7663,1620.3241,1636.7712,1639.8453,1659.6792,1686.1590,1777.1796,1831.8376,1837.4575
38429,Sweden,Drained organic soils (N2O),Emissions (CO2eq) from N2O (AR5),kilotons,809.6447,807.5436,804.2915,801.0535,798.7694,796.3083,...,769.4588,763.4116,756.7598,752.4800,750.0283,746.1282,741.7152,738.7774,735.1846,731.1023


In [17]:
emissions_summary_df_pandas = emissions_pandas_df.describe()
emissions_summary_df_pandas.insert(0, 'Index', emissions_summary_df_pandas.index)
emissions_summary_df_pandas

Unnamed: 0,Index,Year_2000,Year_2001,Year_2002,Year_2003,Year_2004,Year_2005,Year_2006,Year_2007,Year_2008,...,Year_2011,Year_2012,Year_2013,Year_2014,Year_2015,Year_2016,Year_2017,Year_2018,Year_2019,Year_2020
count,count,38431.0,38431.0,38431.0,38431.0,38431.0,38431.0,38431.0,38431.0,38431.0,...,38431.0,38431.0,38431.0,38431.0,38431.0,38431.0,38431.0,38431.0,38431.0,38431.0
mean,mean,8062.607,8008.313,8359.142,8604.091,9173.096,9405.77,9905.405,9963.879,10206.17,...,10817.92,11028.73,11115.63,11395.9,11438.08,11396.73,11582.84,11805.28,12014.21,11736.53
std,std,94335.29,95911.83,102249.0,112787.8,125286.7,136124.1,147844.9,155547.5,164262.8,...,196940.0,202080.8,205813.5,208439.8,207328.3,204391.3,208453.9,213464.4,218832.9,220359.9
min,min,-741005.1,-797183.1,-797183.1,-797183.1,-797183.1,-797183.1,-797183.1,-797183.1,-815675.4,...,-864864.9,-878111.4,-850909.2,-845165.0,-839071.6,-823942.5,-809924.2,-798290.0,-784981.6,-772937.8
25%,25%,0.0265,0.019,0.02015,0.0235,0.022,0.0244,0.0251,0.02885,0.0266,...,0.0281,0.031,0.02945,0.031,0.034,0.03365,0.03605,0.0338,0.03715,0.0306
50%,50%,6.916,6.1787,6.4345,6.9053,6.861,7.1789,7.3425,7.7468,7.6751,...,8.2455,8.4064,8.3874,8.4672,8.816,9.0291,9.1558,8.9465,9.3916,9.0875
75%,75%,377.9087,361.676,375.2926,395.0836,398.9841,420.5209,421.5623,438.7105,445.1113,...,462.9759,464.5124,475.9622,477.5167,491.385,504.4345,510.6735,511.1268,519.9407,505.8742
max,max,5383231.0,5484582.0,5925897.0,6731423.0,7577858.0,8375211.0,9128191.0,9683548.0,10217230.0,...,12503540.0,12796490.0,13095920.0,13251100.0,13138290.0,12988560.0,13236410.0,13526640.0,13894630.0,14091450.0


In [18]:
emissions_summary_df = spark.createDataFrame(emissions_summary_df_pandas)

In [19]:
emissions_summary_df.show()

+-----+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+--------------------+--------------------+------------------+------------------+--------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+
|Index|         Year_2000|        Year_2001|         Year_2002|         Year_2003|         Year_2004|         Year_2005|         Year_2006|         Year_2007|        Year_2008|           Year_2009|           Year_2010|         Year_2011|         Year_2012|           Year_2013|         Year_2014|         Year_2015|         Year_2016|        Year_2017|         Year_2018|         Year_2019|         Year_2020|
+-----+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+----

In [15]:
from getpass import getpass
password = getpass('Enter database password')

mode = "append"
jdbc_url="jdbc:postgresql://group-4-database.c0mbmuajokyj.us-east-2.rds.amazonaws.com:5432/postgres"
config = {"user":"postgres",
          "password": password,
          "driver":"org.postgresql.Driver"}

Enter database password··········


In [16]:
total_emissions_cleaned_df.write.jdbc(url=jdbc_url, table='total_emissions', mode=mode, properties=config)

In [20]:
emissions_summary_df.write.jdbc(url=jdbc_url, table='emissions_summary', mode=mode, properties=config)