# Final Project: ETL Process for Close Approach JSON Data

In [3]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Connecting to security.u0% [Waiting for headers] [Connecting to security.ubuntu.com (91.189.91.39)] [Co                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
                                                                               Get:3 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
0% [3 InRelease 15.6 kB/88.7 kB 18%] [Connecting to security.ubuntu.com (91.1890% [1 InRelease gpgv 15.9 kB] [3 InRelease 15.6 kB/88.7 kB 18%] [Connecting to                                                                                Hit:4 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
0% [1 InRelease gpgv 15.9 kB] [3 InRelease 15.6 kB/88.7 kB 18%] [Connecting to                                              

In [4]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-01-13 19:50:24--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2022-01-13 19:50:25 (1.68 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [5]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Neo_Json").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [6]:
# Import pySpark libraries
from pyspark import SparkFiles
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [7]:
def process_cad_data(json_filename, url_endpoint):
  """
  Loads new dataframe for json filename and url endpoint parameters

  params:
    json_filename: name of json filename from AWS S3
    url_endpoint: endpoint of Url for Spark to read S3 file

  returns:
    final dataframe to be loaded into postgres table
  """

  spark.sparkContext.addFile(url_endpoint)
  
  # read cad json file into spark session
  cad_json_file = SparkFiles.get(json_filename)
  json_df = spark.read.json(cad_json_file, multiLine=True)

  # create temporary dataframe from data column in dataframe
  array_data_df = json_df.select(F.explode("data").alias('data'))

  # create tabular formatted dataframe
  tabular_df = array_data_df.select(array_data_df['data'].getItem(0).alias('des'), 
                 array_data_df['data'].getItem(1).alias('orbit_id'),
                 array_data_df['data'].getItem(2).alias('jd'),
                 array_data_df['data'].getItem(3).alias('cd'),
                 array_data_df['data'].getItem(4).alias('dist'),
                 array_data_df['data'].getItem(5).alias('dist_min'),
                 array_data_df['data'].getItem(6).alias('dist_max'),
                 array_data_df['data'].getItem(7).alias('v_rel'),
                 array_data_df['data'].getItem(8).alias('v_inf'),
                 array_data_df['data'].getItem(9).alias('t_sigma_f'),
                 array_data_df['data'].getItem(10).alias('h')
                 )
  
  # create final dataframe for loading postgres table
  cad_final_df = (tabular_df
    .transform(lambda df: df.withColumn("cd", F.to_timestamp(tabular_df["cd"], 'yyyy-MMM-dd HH:mm')))
    .transform(lambda df: df.withColumn("dist", tabular_df["dist"].cast(T.DecimalType(precision=24, scale=16))))
    .transform(lambda df: df.withColumn("dist_min", tabular_df["dist_min"].cast(T.DecimalType(precision=24, scale=16))))   
    .transform(lambda df: df.withColumn("dist_max", tabular_df["dist_max"].cast(T.DecimalType(precision=24, scale=16))))             
    .transform(lambda df: df.withColumn("v_rel", tabular_df["v_rel"].cast(T.DecimalType(precision=24, scale=16))))
    .transform(lambda df: df.withColumn("v_inf", tabular_df["v_inf"].cast(T.DecimalType(precision=24, scale=16))))
    .transform(lambda df: df.withColumn("h", tabular_df["h"].cast(T.DecimalType(precision=24, scale=16))))
  )

  return cad_final_df

In [8]:
# import getpass module
from getpass import getpass

In [9]:
def load_cad_data_aws_rds(df, mode, table_name):
  """
  Load data in dataframe arg df into aws rds neo database

  args:
    df: dataframe containing source data to load into database
    mode: write mode ie. append, overwrite
    table_name: name of table in database to load data into
  """

  password = getpass('Enter database password')

  # Configure settings for RDS
  jdbc_url="jdbc:postgresql://neo-db.ctohlxwhjvlb.us-east-1.rds.amazonaws.com:5432/neo"
  config = {"user":"postgres", 
            "password": password, 
            "driver":"org.postgresql.Driver"}
  
  mode = 'overwrite'
  df.write.jdbc(url=jdbc_url, table=table_name, mode=mode, properties=config)

### Current Year and future CAD data

In [10]:
json_filename = "cad.json"
url_endpoint = f"https://ucb-neo-project.s3.us-east-2.amazonaws.com/json/{json_filename}"
table_name = 'public.cad'
mode = 'overwrite'

In [11]:
# call function to get dataframe for loading postgres table
cad_final_df = process_cad_data(json_filename, url_endpoint)

In [12]:
cad_final_df.show()

+---------+--------+-----------------+-------------------+------------------+------------------+------------------+-------------------+-------------------+---------+-------------------+
|      des|orbit_id|               jd|                 cd|              dist|          dist_min|          dist_max|              v_rel|              v_inf|t_sigma_f|                  h|
+---------+--------+-----------------+-------------------+------------------+------------------+------------------+-------------------+-------------------+---------+-------------------+
|   363505|     262|2459580.559075860|2022-01-01 01:25:00|0.1576976399720940|0.1576968984424920|0.1576983815020980|12.5185308744250000|12.5171811123797000|  < 00:01|18.2900000000000000|
| 2022 AJ1|       2|2459580.875887923|2022-01-01 09:01:00|0.0051506713103762|0.0051094551147137|0.0051918821699614|11.1602139720087000|11.1137645065506000|  < 00:01|27.8370000000000000|
| 2018 VB7|       7|2459580.943369207|2022-01-01 10:38:00|0.1576805784

In [13]:
cad_final_df.printSchema()

root
 |-- des: string (nullable = true)
 |-- orbit_id: string (nullable = true)
 |-- jd: string (nullable = true)
 |-- cd: timestamp (nullable = true)
 |-- dist: decimal(24,16) (nullable = true)
 |-- dist_min: decimal(24,16) (nullable = true)
 |-- dist_max: decimal(24,16) (nullable = true)
 |-- v_rel: decimal(24,16) (nullable = true)
 |-- v_inf: decimal(24,16) (nullable = true)
 |-- t_sigma_f: string (nullable = true)
 |-- h: decimal(24,16) (nullable = true)



In [14]:
# Call function to load dataframe into AWS RDS Postgres database table
load_cad_data_aws_rds(cad_final_df, mode, table_name)

Enter database password··········


### Historical CAD Data

In [15]:
json_filename = "cad_history.json"
url_endpoint = f"https://ucb-neo-project.s3.us-east-2.amazonaws.com/json/{json_filename}"
table_name = 'public.cad_history'
mode = 'overwrite'

In [16]:
# call function to get dataframe for loading postgres table
cad_history_df = process_cad_data(json_filename, url_endpoint)

In [17]:
# Call function to load dataframe into AWS RDS Postgres database table
load_cad_data_aws_rds(cad_history_df, mode, table_name)

Enter database password··········
