In [None]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [696 B]
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:7 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:8 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Get:9 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:10 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ Packages [73.0 kB]
Get:11 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:13 http://ppa.launchpad.net/cran/

In [None]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2021-11-15 02:50:42--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2021-11-15 02:50:43 (1.22 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M20-Equipo_7-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

### Load Amazon Data into Spark DataFrame

In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://equipo-7.s3.amazonaws.com/211114COVID19MEXICO.csv.gz"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("211114COVID19MEXICO.csv.gz"), sep=",", header=True)

### Create DataFrames to match tables

In [None]:
from pyspark.sql.functions import to_date
# Read in the Review dataset as a DataFrame
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("211114COVID19MEXICO.csv.gz"), sep=",", header=True, inferSchema=True)
df.show()

+-------------------+-----------+------+------+----------+----+-----------+-----------+-------------+-------------+-------------+--------------+----------+--------+--------+----+------------+--------+------------------+--------+--------+----+----+--------+------------+--------+--------------+--------+-------------+----------+---------+----------------+-------------+---------------------+------------------+-------------------+--------+-----------------+-----------+---+
|FECHA_ACTUALIZACION|ID_REGISTRO|ORIGEN|SECTOR|ENTIDAD_UM|SEXO|ENTIDAD_NAC|ENTIDAD_RES|MUNICIPIO_RES|TIPO_PACIENTE|FECHA_INGRESO|FECHA_SINTOMAS| FECHA_DEF|INTUBADO|NEUMONIA|EDAD|NACIONALIDAD|EMBARAZO|HABLA_LENGUA_INDIG|INDIGENA|DIABETES|EPOC|ASMA|INMUSUPR|HIPERTENSION|OTRA_COM|CARDIOVASCULAR|OBESIDAD|RENAL_CRONICA|TABAQUISMO|OTRO_CASO|TOMA_MUESTRA_LAB|RESULTADO_LAB|TOMA_MUESTRA_ANTIGENO|RESULTADO_ANTIGENO|CLASIFICACION_FINAL|MIGRANTE|PAIS_NACIONALIDAD|PAIS_ORIGEN|UCI|
+-------------------+-----------+------+------+-------

In [None]:
# Print our schema
df.printSchema()

root
 |-- FECHA_ACTUALIZACION: string (nullable = true)
 |-- ID_REGISTRO: string (nullable = true)
 |-- ORIGEN: integer (nullable = true)
 |-- SECTOR: integer (nullable = true)
 |-- ENTIDAD_UM: integer (nullable = true)
 |-- SEXO: integer (nullable = true)
 |-- ENTIDAD_NAC: integer (nullable = true)
 |-- ENTIDAD_RES: integer (nullable = true)
 |-- MUNICIPIO_RES: integer (nullable = true)
 |-- TIPO_PACIENTE: integer (nullable = true)
 |-- FECHA_INGRESO: string (nullable = true)
 |-- FECHA_SINTOMAS: string (nullable = true)
 |-- FECHA_DEF: string (nullable = true)
 |-- INTUBADO: integer (nullable = true)
 |-- NEUMONIA: integer (nullable = true)
 |-- EDAD: integer (nullable = true)
 |-- NACIONALIDAD: integer (nullable = true)
 |-- EMBARAZO: integer (nullable = true)
 |-- HABLA_LENGUA_INDIG: integer (nullable = true)
 |-- INDIGENA: integer (nullable = true)
 |-- DIABETES: integer (nullable = true)
 |-- EPOC: integer (nullable = true)
 |-- ASMA: integer (nullable = true)
 |-- INMUSUPR: inte

In [9]:
# Create the ID_REGISTRO_table DataFrame

ID_REGISTRO_df = df.select(['ID_REGISTRO']).drop_duplicates(['ID_REGISTRO'])
ID_REGISTRO_df.show()

+-----------+
|ID_REGISTRO|
+-----------+
|     00017b|
|     0001d2|
|     000341|
|     0004a0|
|     0004d2|
|     0004dc|
|     0004fd|
|     000684|
|     0007d2|
|     00086f|
|     000a1f|
|     000ac7|
|     000cbf|
|     000dc8|
|     0012bf|
|     001427|
|     00162a|
|     00185e|
|     0018e9|
|     00197e|
+-----------+
only showing top 20 rows



In [11]:
# Create the enfermedad_table DataFrame and drop duplicates. 
enfermedades_df = df.select(['ID_REGISTRO', 'NEUMONIA', 'DIABETES', 'EPOC', 'ASMA', 'INMUSUPR', 'HIPERTENSION', 'CARDIOVASCULAR','OBESIDAD', 'RENAL_CRONICA', 'TABAQUISMO'])
enfermedades_df.show()

+-----------+--------+--------+----+----+--------+------------+--------------+--------+-------------+----------+
|ID_REGISTRO|NEUMONIA|DIABETES|EPOC|ASMA|INMUSUPR|HIPERTENSION|CARDIOVASCULAR|OBESIDAD|RENAL_CRONICA|TABAQUISMO|
+-----------+--------+--------+----+----+--------+------------+--------------+--------+-------------+----------+
|     z2eace|       2|       2|   2|   2|       2|           2|             2|       1|            2|         2|
|     z38de4|       2|       2|   2|   2|       2|           2|             2|       2|            2|         2|
|     z579ac|       2|       2|   2|   2|       2|           2|             2|       2|            2|         2|
|     z2669f|       2|       2|   2|   1|       2|           2|             2|       2|            2|         1|
|     z54912|       2|       1|   2|   2|       2|           2|             2|       2|            2|         2|
|     z35a05|       2|       2|   2|   2|       2|           2|             2|       1|         

In [12]:
# Create the UCI_table DataFrame. 

uci_df = df.select(['ID_REGISTRO', 'UCI'])
uci_df.show()

+-----------+---+
|ID_REGISTRO|UCI|
+-----------+---+
|     z2eace|  2|
|     z38de4| 97|
|     z579ac| 97|
|     z2669f| 97|
|     z54912| 97|
|     z35a05| 97|
|     z3f33c| 97|
|     z552ac| 97|
|     z59bbc| 97|
|     z59345| 97|
|     zzf571| 97|
|     z12d63| 97|
|     z4887b| 97|
|     z13788| 97|
|     z3d8f0| 97|
|     z4e532| 97|
|     z4e838|  2|
|     z2b144| 97|
|     zz7202| 97|
|     z58ed3| 97|
+-----------+---+
only showing top 20 rows



### Connect to the AWS RDS instance and write each DataFrame to its table. 

In [19]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://equipo-7.cyyy0i0rexuu.us-west-1.rds.amazonaws.com:5432/postgres"
config = {"user":"postgres", 
          "password": "postgres2021!", 
          "driver":"org.postgresql.Driver"}

In [20]:
# Write review_id_df to table in RDS
# Leapt time 16 min
ID_REGISTRO_df.write.jdbc(url=jdbc_url, table='id_registro_table', mode=mode, properties=config)

In [None]:
# Write products_df to table in RDS
# about 3 min
products_df.write.jdbc(url=jdbc_url, table='products_table', mode=mode, properties=config)

In [None]:
# Write customers_df to table in RDS
# 7 min
customers_df.write.jdbc(url=jdbc_url, table='customers_table', mode=mode, properties=config)

In [None]:
# Write vine_df to table in RDS
# 16 minutes
vine_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)