# Environment Test
This notebook is intended to be used to test the database environment, specifically the connection of the spark environment with the neo4j and apache kudu databases.

# Pre Test Setup

In [1]:
import random
import os
from os import listdir

import seaborn as sns

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pyspark.sql.functions as F

from ipywidgets import interact, widgets

KUDU_MASTER = 'kudu-master-1:7051'

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.kudu:kudu-spark3_2.12:1.13.0.7.1.5.17-1,org.neo4j:neo4j-connector-apache-spark_2.12:5.0.1_for_spark_3 --repositories https://repository.cloudera.com/artifactory/cloudera-repos/ pyspark-shell'

In [3]:
spark = SparkSession.builder.config('spark.packages', 'org.apache.kudu:kudu-spark3_2.12:1.13.0.7.1.5.17-1,org.neo4j:neo4j-connector-apache-spark_2.12:5.0.1_for_spark_3').getOrCreate()
sc = SparkContext.getOrCreate()
sc.setLogLevel('OFF')

# Kudu Test

## Initial read from Kudu
Reading from tableA, that was created during the execution of the pre test commands.
Should return an empty dataframe with the schema specified during the creation of the table

In [10]:
table = spark.read.option('kudu.master', KUDU_MASTER).option('kudu.table', f'impala::default.testA').format('kudu').load()
table.createOrReplaceTempView('testA')
display(table)
table.show(truncate=False)

DataFrame[literal: string, nota: decimal(8,5), dados: decimal(8,5), idade: int, lorem: int, ipsum: string]

+-------+----+-----+-----+-----+-----+
|literal|nota|dados|idade|lorem|ipsum|
+-------+----+-----+-----+-----+-----+
+-------+----+-----+-----+-----+-----+



## Insertion of new rows
The Dataframe df_kudu is created with column information and two rows

In [14]:
df_kudu = spark.createDataFrame(
 [("conseguimos?", 3.2, 2.0, 1000, 300, "sad"),("teste", 22.5,4.75, -500, -1000, "Jarles")],
 ["literal","nota", "dados","idade","lorem","ipsum"]
)
df_kudu = df_kudu.withColumn('nota', df_kudu.nota.cast(DecimalType(8, 5))) \
    .withColumn('dados', df_kudu.dados.cast(DecimalType(8, 5))) \
    .withColumn('idade', df_kudu.idade.cast(IntegerType())) \
    .withColumn('lorem', df_kudu.lorem.cast(IntegerType()))
df_kudu.write.option('kudu.master', KUDU_MASTER).option('kudu.table', f'impala::default.testA').mode('append').format('kudu').save()

## Reading Kudu again
Performing the same read operation as the initial step, however, as new data was inserted it is expected to return the same schema but with two new rows of data

In [21]:
table = spark.read.option('kudu.master', KUDU_MASTER).option('kudu.table', f'impala::default.testA').format('kudu').load()
table.createOrReplaceTempView('testA')
display(table)
table.show(truncate=False)

DataFrame[literal: string, nota: decimal(8,5), dados: decimal(8,5), idade: int, lorem: int, ipsum: string]

+------------+--------+-------+-----+-----+------+
|literal     |nota    |dados  |idade|lorem|ipsum |
+------------+--------+-------+-----+-----+------+
|conseguimos?|3.20000 |2.00000|1000 |300  |sad   |
|teste       |22.50000|4.75000|-500 |-1000|Jarles|
+------------+--------+-------+-----+-----+------+



# Neo4j Test

## Initial read from neo4j
In this step we try to query from neo4j all the nodes of label "Person", it should return an empty Dataframe

In [4]:
df = spark.read.format("org.neo4j.spark.DataSource")\
 .option("url", "bolt://neo4j:7687")\
 .option("authentication.type", "none")\
 .option("labels", "Person")\
 .load()
display(df)
df.show(truncate=False)

DataFrame[<id>: bigint, <labels>: array<string>]

+----+--------+
|<id>|<labels>|
+----+--------+
+----+--------+



## Writing into neo4j
In this step we create a new dataframe with two new rows, or in this case nodes. We specify the properties names too and write them with the label "Person".

In [5]:
df = spark.createDataFrame(
 [(3, "Carlos"),(4, "Jarles")],
 ["id", "name"]
)
df.write.format("org.neo4j.spark.DataSource")\
 .option("url", "bolt://neo4j:7687")\
 .option("authentication.type", "none")\
 .option("labels", ":Person")\
 .option("node.keys", "id")\
 .mode("Overwrite")\
 .save()

## Reading neo4j again
This step should return the new nodes inserted in the above cell, as it is the same query used in the initial read

In [6]:
df = spark.read.format("org.neo4j.spark.DataSource")\
 .option("url", "bolt://neo4j:7687")\
 .option("authentication.type", "none")\
 .option("labels", "Person")\
 .load()
display(df)
df.show(truncate=False)

DataFrame[<id>: bigint, <labels>: array<string>, name: string, id: bigint]

+----+--------+------+---+
|<id>|<labels>|name  |id |
+----+--------+------+---+
|0   |[Person]|Carlos|3  |
|1   |[Person]|Jarles|4  |
+----+--------+------+---+

