# SparkR setup up instructions and examples.
    ### Questions on this doc: people

In [None]:
# Stop Spark: On top for easy access and because it is really important
sparkR.stop()

### Setup Procedures

In [None]:
# finds SparkR in Hadoop
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))

# Create a SparkR session...connecting to Spark on hadoop from the edge node
sparkR.session(appName = "Sparkr Demo" , 
               master = "yarn", 
               sparkConfig = list(spark.driver.memory = "6g", spark.executor.memory = "26g" , spark.driver.cores = "50"))

# finds Hive Metadata warehouse on hadoop
sparkR.session(enableHiveSupport = TRUE)

In [None]:
# Call packages or install as needed...these get installed in Miniconda R folder
#install.packages('magrittr')  # install.packages
#library('dplyr')  # Loads library
#library('magrittr')

search()

### Point to Data

In [None]:
demo_data <- sql("
select *
from prd_sed_fnd.cal_date_dim_v
")

In [None]:
# this tells you if you have a SparkDataFrame and the what types the variables are. Don't really need to do this much
# str(demo_data)
# just the column names. You can copy and paste into code, wich is nice sometimes to remove errors.
# colnames(demo_data)

In [None]:
# Take the first NUM rows of a SparkDataFrame and return the results as a R data.frame
take(demo_data,5)
class(take(demo_data,5))

In [None]:
# Persist data if you will be using the SparkDataFrame over and over.  Keeps the data in memory on the hadoop.
persist(demo_data, "MEMORY_ONLY")

### Spark SQL

In [None]:
# Creating View in Hive to SQL the SparkDataFrames in Spark
#createOrReplaceTempView(df, "df")
createOrReplaceTempView(demo_data, "demo_data")

In [None]:
# You can now use "demo_data" to call the SparkDataFrame in an SQL statement

In [None]:
yr_2016 <- SparkR::sql("
select * 
from demo_data
where acct_yr_i = 2016
order by greg_d
")

In [None]:
# yr_2016 is also a SparkDataFrame, which means it is distributed
class(yr_2016)

### Bringing Data out of hadoop down to the edge node: collect()/take()

In [None]:
# WITH BIG DATA COMES BIG RESPONSIBILITY !!!!!
# Only collect() if you know your data set's size, and the size will not crash the edge node!!!!
# Collect() brings all your data to the local edge node and converts it into an R data frame...no longer SparkDataFrame (i.e. distributed)
SparkR::collect(yr_2016)

In [None]:
class(SparkR::collect(yr_2016))

### Creating a Hive table from a SparkDataFrame

In [None]:
# Creating View to SQL the SparkDataFrames in Spark
createOrReplaceTempView(yr_2016, "yr_2016_UP")

In [None]:
# Create Table with common SQL syntax
sql( 'CREATE TABLE IF NOT EXISTS MI_TEST.YR_2016 AS SELECT * FROM yr_2016_UP')

In [None]:
# Drop table in Hive
sql( 'DROP TABLE MI_TEST.YR_2016')

### Join SparkDataFrames

In [None]:
# Inner Join
# join(df1, df2, df1$col1 == df2$col2) # Performs an inner join based on expression
yr <- sql('SELECT GREG_D , ACCT_YR_I FROM demo_data WHERE ACCT_YR_I = 2016')
mth <- sql('SELECT GREG_D , ACCT_MO_I FROM demo_data WHERE ACCT_YR_I = 2016')

createOrReplaceTempView(yr, "yr")
createOrReplaceTempView(mth, "mth")

classic_join <- sql('SELECT * 
                     FROM yr as A
                     INNER JOIN mth AS B
                     ON A.GREG_D = B.GREG_D
')

SparkR_join <- join(yr, mth, yr$GREG_D == mth$GREG_D) # creates inner join, use joinExpr = 'left' for left join


In [None]:
identical(collect(classic_join), collect(SparkR_join))

In [None]:
# Remove views and data in memory. demo_data
dropTempView('demo_data')
dropTempView('yr_2016_UP')
dropTempView('yr')
dropTempView('mth')
unpersist(demo_data)

In [None]:
sessionInfo()