In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:3 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:4 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Get:5 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Hit:6 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:7 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease [15.9 kB]
Hit:8 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Get:9 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:10 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:11 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:12 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [696 B]
Hit:13 https://developer.

In [2]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.3.1.jar

--2022-01-22 03:03:12--  https://jdbc.postgresql.org/download/postgresql-42.3.1.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1015689 (992K) [application/java-archive]
Saving to: ‘postgresql-42.3.1.jar’


2022-01-22 03:03:14 (1.70 MB/s) - ‘postgresql-42.3.1.jar’ saved [1015689/1015689]



In [3]:
# Create Spark session
from pyspark.sql import SparkSession
spark = SparkSession\
        .builder \
        .appName("tourism-db") \
        .config("spark.driver.extraClassPath","/content/postgresql-42.3.1.jar") \
        .getOrCreate()

Load Amazon Data into Spark DataFrame

In [4]:
# Loading inbound_tourist.csv data from AWS
from pyspark import SparkFiles
spark.sparkContext.addFile('https://tourism-project.s3.us-east-2.amazonaws.com/clean_inbound_data_thousands.csv')
inbound_df= spark.read.csv(SparkFiles.get('clean_inbound_data_thousands.csv'), header=True, inferSchema=True)
inbound_df.show()

+--------------------+----+--------+--------+------+------+--------+---------------------+------+-----------+----------+--------------------+
|             Country|Year|Personal|Business| Total|Africa|Americas|East Asia and Pacific|Europe|Middle East|South Asia|Other not Classified|
+--------------------+----+--------+--------+------+------+--------+---------------------+------+-----------+----------+--------------------+
|      american samoa|1995|    28.0|     6.0|  34.0|   0.0|     0.0|                  0.0|   0.0|        0.0|       0.0|                 0.0|
|              angola|1995|     6.0|     3.0|   9.0|   2.0|     1.0|                  0.0|   6.0|        0.0|       0.0|                 0.0|
|            anguilla|1995|   104.8|     2.3| 107.1|   0.0|    35.0|                  0.0|   2.0|        0.0|       0.0|                 1.0|
| antigua and barbuda|1995|   186.0|     5.0| 191.0|   0.0|   124.0|                  0.0|  84.0|        0.0|       0.0|                 4.0|
|     

In [5]:
# Loading world_indicators.csv data from AWS
spark.sparkContext.addFile('https://tourism-project.s3.us-east-2.amazonaws.com/clean_world_ind.csv')
world_ind_df= spark.read.csv(SparkFiles.get('clean_world_ind.csv'), header=True, inferSchema=True)
world_ind_df.show()

+----------+--------------------+---------------+----------------+-----------------+---------------------+--------------+----------------------+--------------------+------------------+-----------------+---------------+----------------+--------------+----------------+----------------+------+---------------+----------------+----+------+
|Birth Rate|             Country|            GDP|Health Exp % GDP|Health Exp/Capita|Infant Mortality Rate|Internet Usage|Life Expectancy Female|Life Expectancy Male|Mobile Phone Usage|Number of Records|Population 0-14|Population 15-64|Population 65+|Population Total|Population Urban|Region|Tourism Inbound|Tourism Outbound|Year|Header|
+----------+--------------------+---------------+----------------+-----------------+---------------------+--------------+----------------------+--------------------+------------------+-----------------+---------------+----------------+--------------+----------------+----------------+------+---------------+----------------+--

Create DataFrames to match tables

In [6]:
#Create inbound_tourism table for the inboud_tourism data
clean_inbound_df=inbound_df.withColumnRenamed("Country", "country") \
                           .withColumnRenamed("Year", "year") \
                           .withColumnRenamed("Personal", "personal") \
                           .withColumnRenamed("Business", "business") \
                           .withColumnRenamed("Total", "total") \
                           .withColumnRenamed("Africa", "africa") \
                           .withColumnRenamed("Americas", "americas") \
                           .withColumnRenamed("East Asia and Pacific", "east_asia_and_pacific") \
                           .withColumnRenamed("Europe", "europe") \
                           .withColumnRenamed("Middle East", "middle_east") \
                           .withColumnRenamed("South Asia", "south_asia") \
                           .withColumnRenamed("Other not Classified", "others")
clean_inbound_df.show()                          

+--------------------+----+--------+--------+------+------+--------+---------------------+------+-----------+----------+------+
|             country|year|personal|business| total|africa|americas|east_asia_and_pacific|europe|middle_east|south_asia|others|
+--------------------+----+--------+--------+------+------+--------+---------------------+------+-----------+----------+------+
|      american samoa|1995|    28.0|     6.0|  34.0|   0.0|     0.0|                  0.0|   0.0|        0.0|       0.0|   0.0|
|              angola|1995|     6.0|     3.0|   9.0|   2.0|     1.0|                  0.0|   6.0|        0.0|       0.0|   0.0|
|            anguilla|1995|   104.8|     2.3| 107.1|   0.0|    35.0|                  0.0|   2.0|        0.0|       0.0|   1.0|
| antigua and barbuda|1995|   186.0|     5.0| 191.0|   0.0|   124.0|                  0.0|  84.0|        0.0|       0.0|   4.0|
|             armenia|1995|    10.0|     2.0|  12.0|   0.0|     2.0|                  0.0|   8.0|       

In [7]:
#Create world_indicators table for the world_indicators data
clean_world_ind_df=world_ind_df.withColumnRenamed("Birth Rate", "birth_rate") \
                               .withColumnRenamed("Country", "country") \
                               .withColumnRenamed("GDP", "gdp") \
                               .withColumnRenamed("Health Exp % GDP", "health_exp_percent_gdp") \
                               .withColumnRenamed("Health Exp/Capita", "health_exp_per_capita") \
                               .withColumnRenamed("Infant Mortality Rate", "infant_mortality_rate") \
                               .withColumnRenamed("Internet Usage", "internet_usage") \
                               .withColumnRenamed("Life Expectancy Female", "life_expectancy_female") \
                               .withColumnRenamed("Life Expectancy Male", "life_expectancy_male") \
                               .withColumnRenamed("Mobile Phone Usage", "mobile_phone_usage") \
                               .withColumnRenamed("Number of Records", "no_of_records") \
                               .withColumnRenamed("Population 0-14", "population_0_to_14") \
                               .withColumnRenamed("Population 15-64", "population_15_to_64") \
                               .withColumnRenamed("Population 65+", "population_65_plus") \
                               .withColumnRenamed("Population Total", "population_total") \
                               .withColumnRenamed("Population Urban", "population_urban") \
                               .withColumnRenamed("Region", "region") \
                               .withColumnRenamed("Tourism Inbound", "tourism_inbound") \
                               .withColumnRenamed("Tourism Outbound", "tourism_outbound") \
                               .withColumnRenamed("Year", "year") \
                               .withColumnRenamed("Header", "header") 
clean_world_ind_df.show()                          

+----------+--------------------+---------------+----------------------+---------------------+---------------------+--------------+----------------------+--------------------+------------------+-------------+------------------+-------------------+------------------+----------------+----------------+------+---------------+----------------+----+------+
|birth_rate|             country|            gdp|health_exp_percent_gdp|health_exp_per_capita|infant_mortality_rate|internet_usage|life_expectancy_female|life_expectancy_male|mobile_phone_usage|no_of_records|population_0_to_14|population_15_to_64|population_65_plus|population_total|population_urban|region|tourism_inbound|tourism_outbound|year|header|
+----------+--------------------+---------------+----------------------+---------------------+---------------------+--------------+----------------------+--------------------+------------------+-------------+------------------+-------------------+------------------+----------------+-----------

In [8]:
# Configure settings for RDS
mode = "append"
config = {"user":"postgres", 
          "password": "********", 
          "driver":"org.postgresql.Driver"}

In [14]:
# Write review_id_df to table in RDS
jdbc_url="jdbc:postgresql://tourism.cwo7jzoryi7x.us-east-2.rds.amazonaws.com:5432/postgres"
clean_inbound_df.write.jdbc(url=jdbc_url, table='inbound_tourism', mode=mode, properties=config)

In [13]:
# Write customers_df to table in RDS
jdbc_url="jdbc:postgresql://tourism.cwo7jzoryi7x.us-east-2.rds.amazonaws.com:5432/postgres"
clean_world_ind_df.write.jdbc(url=jdbc_url, table='world_indicators', mode=mode, properties=config)