## Docker related information

https://github.com/jupyter/docker-stacks

localnotebookpath='/Users/maheshgoud/Documents/Git_Modules/gh/sandbox/'

localdatapath='/Users/maheshgoud/Downloads/data/'

#allsparkpackagenames='org.apache.spark:spark-avro_2.11:2.4.0,com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.2'

docker run -it --rm -p 8890:8888 -v $localnotebookpath:/home/jovyan -v $localdatapath:/home/jovyan/data -e JUPYTER_ENABLE_LAB=yes -e SPARK_OPTS='--packages org.apache.spark:spark-avro_2.11:2.4.0,com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.2' \
-e PYSPARK_SUBMIT_ARGS='--packages com.databricks:spark-avro_2.10:2.0.1 pyspark-shell' jupyter/all-spark-notebook:7d427e7a4dde start-notebook.sh --NotebookApp.token=''


## Helpful code snippets

- https://www.programcreek.com/scala/org.apache.spark.SparkConf (code snippets)
- https://gist.github.com/eddies/f37d696567f15b33029277ee9084c4a0 (s3 access, packages related)

In [None]:
import scala.collection.Seq
import sys.process._

// import spark.implicits._
// import org.apache.spark.sql.Row

println("Spark version: "+sc.version)
println("Scala version: "+util.Properties.versionString)
"python --version"!

In [None]:
// Tinker with functional pattern + daily agg window function

import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.functions.{col, when}
val F = org.apache.spark.sql.functions

def distinctElemCount = F.udf( (a: Seq[Any]) => a.toSet.size )
//def distinctElemCount = F.udf( (a: Seq[Any]) => when(a.toSet.isEmpty(), null).otherwise(a.toSet.size) )

val df1 = Seq(
      (1, 1, 11, "ord1", "a"),
      (2, 1, 22, "ord2", "a"),
      (5, 5, 13, "ord5", "a"),
      (6, 5, 14, "ord6", "b"),
      (3, 3, 33, "ord3", "b"),
      (3, 3, 33, "ord7", "b"),    
      (4, 3, 44, "ord4", "b")).toDF("DATETIME", "DAY", "VAL", "ORD", "IP")
print(df1.show())
val w = Window.partitionBy(col("IP")).orderBy("DAY").rangeBetween(1, 4)

val df2 = df1.groupBy(col("IP"), col("DAY")).agg(F.sum(col("VAL")).alias("DAILY_VAL"), F.collect_set(col("ORD")).alias("DAILY_ORD")).toDF()
print(df2.show())

// compute collect set over daily collect set aggs
val w2 = Window.orderBy("DAY").rangeBetween(-5, 0)

print("debug")
print(df2.withColumn("z1", F.flatten(F.collect_set(col("DAILY_ORD")).over(w2))).withColumn("z2", distinctElemCount(col("z1"))).show(10, false))
print(df2.withColumn("z1", distinctElemCount(F.flatten(F.collect_set(col("DAILY_ORD")).over(w2)))).show())

val df3 = df2.withColumn("WINDOWED_VAL", F.sum(col("DAILY_VAL")).over(w))
print(df3.show())

val df4 = df3.withColumn("ORD", F.explode(col("DAILY_ORD"))).drop("DAILY_ORD")
print(df4.show())


println("------------------------------------------")
println("debug null")
val tmpCol = "z2"
val dfnull = Seq(
      (1, 1, 11, null, "a"),
      (2, 1, 22, null, "a"),
      (5, 5, 13, "ord5", "a"),
      (6, 5, 14, "ord6", "b"),
      (3, 3, 33, "ord3", "b"),
      (3, 3, 33, "ord7", "b"),    
      (4, 3, 44, "ord4", "b")).toDF("DATETIME", "DAY", "VAL", "ORD", "IP")
val dfnull2 = dfnull.groupBy(col("IP"), col("DAY")).agg(F.sum(col("VAL")).alias("DAILY_VAL"), F.collect_set(col("ORD")).alias("DAILY_ORD")).toDF()
print(dfnull2.withColumn("z1", F.flatten(F.collect_set(col("DAILY_ORD")).over(w2)))
      .withColumn("z2", distinctElemCount(col("z1")))
      //.withColumn("z2", when($"z2"===0, null).otherwise(col("z2")))
      .withColumn(tmpCol, when(col(tmpCol)===0, null).otherwise(col(tmpCol)))
      .show(10, false))



In [None]:
//round floats: strictfp annotation is one option, bigdecimal has no add operations causing compilation failure, trying round for now

val dfFloat = df1.withColumn("someFloat1",F.lit(66232.58783497249767)).withColumn("someFloat2",F.lit(22.187889174987158723497249767))
val dfFloat2 = dfFloat.withColumn("someFloat1", F.round($"someFloat1", 3)).withColumn("DAY", F.round($"DAY", 3))
val floatCols: List[String] = List("someFloat1", "someFloat2", "DAY")

val dfFloat3 = floatCols.foldLeft(dfFloat)((acc, c) =>
  acc.withColumn(c, F.round(F.col(c), 3))
)

dfFloat.show()
dfFloat.printSchema()

dfFloat2.show()
dfFloat2.printSchema()

dfFloat3.show()
dfFloat3.printSchema()

In [None]:
// Tinker with safedivide function with null values

import org.apache.spark.sql.functions.{col, lit, when}
import org.apache.spark.sql.{Column, Row}
import org.apache.spark.sql.types.{StructType, StructField, LongType, StringType, IntegerType}

val schema = List(
  StructField("v1", IntegerType, true),
  StructField("v2", IntegerType, true)
)

val data = Seq(
  Row(null, 3),
  Row(1, 5),
  Row(2, 4),
  Row(3, null),
  Row(null, null),
  Row(null, 0),
  Row(1, 0)
)

val df = spark.createDataFrame(
  spark.sparkContext.parallelize(data),
  StructType(schema)
)
print(df.show())

def safeDivideColumns(column1: Column, column2: Column): Column = {
    when(column1.isNull, lit("")).otherwise(column1) / when(column2.isNull || column2===0, lit("")).otherwise(column2)
}

val df2 = df.withColumn("v3", safeDivideColumns(col("v1"), col("v2"))).withColumn("v4", col("v3")+1)
print(df2.show())


// val w1 = Window.orderBy("v3").rangeBetween(-10, 10)
val w1 = Window.orderBy("v3").rowsBetween(-10, 10)
val df3 = df2.withColumn("v5", F.sum("v3").over(w1))
print(df3.show())

// // compute collect set over daily collect set aggs
// print(df2.withColumn("z1", F.flatten(F.collect_set(col("DAILY_ORD")).over(w2))).withColumn("z2", distinctElemCount(col("z1"))).show(10, false))
// print(df2.withColumn("z1", distinctElemCount(F.flatten(F.collect_set(col("DAILY_ORD")).over(w2)))).show())
// val df3 = df2.withColumn("WINDOWED_VAL", F.sum(col("DAILY_VAL")).over(w))



In [None]:
//// Tinker with null values based on dataframe col type, append df, foldleft df join, filter type check

// val schema = StructType(Array(
//     StructField("DATETIME", LongType, true),
//     StructField("DAY", LongType, true),
//     StructField("IP", StringType, true),
//     StructField("ORD", StringType, true),
//     StructField("VAL", LongType, true)))
     
// val s = Seq(
//       (1, 1, 11, "ord1", "a"),
//       (2, 1, 22, "ord2", "a"),
//       (5, 5, 13, "ord5", "a"),
//       (6, 5, 14, "ord6", "b"),
//       (3, 3, 33, "ord3", "b"),
//       (4, 3, 44, "ord4", "b")).toDF("DATETIME", "DAY", "VAL", "ORD", "IP")

//// null check
// df4.schema.fields(0).dataType
// df4.schema.fieldIndex("IP")

////foldleft df join
val df1 = Seq(
      (1, 1, 11, "ord1", "a"),
      (2, 1, 22, "ord2", "a"),
      (5, 5, 13, "ord5", "a"),
      (6, 5, 14, "ord6", "b"),
      (3, 3, 33, "ord3", "b"),
      (4, 3, 44, "ord4", "b")).toDF("DATETIME", "DAY", "VAL", "ORD", "IP")

df1.printSchema()
print(df1.filter(col("DAY").isNotNull && !col("DAY").isNaN && $"DAY"=!="3").show())
println("-----")
// Buggy prints below
print(df1.filter(col("DAY").isNotNull && !col("DAY").isNaN && $"DAY"=!="3" && $"DAY"=!="").show())
println("-----")
// Fixed bugs in above prints
val df1_bugfix = df1.withColumn("DAY", col("DAY").cast(StringType))
print(df1_bugfix.filter(col("DAY").isNotNull && !col("DAY").isNaN && $"DAY"=!="3" && $"DAY"=!="").show())
println("-----")

val df10 = Seq(
      (1, 1, 11, "ord1", "a"),
      (2, 1, 22, "ord2", "a"),
      (5, 5, 13, "ord5", "a"),
      (6, 5, 14, "ord6", "b"),
      (3, 3, 33, "ord3", "b"),
      (3, 3, 33, "ord33", "b"),
      (4, 3, 44, "ord4", "b")).toDF("DATETIME", "DAY", "VAL", "ORD", "IP")

val df11 = df1.withColumn("C1", F.lit(1)).withColumn("C2", F.lit(2))
val df12 = df1.withColumn("C3", F.lit(3)).withColumn("C4", F.lit(4))
val df13 = df1.filter($"ORD" =!= "ord1" && $"ORD" =!= "ord2")
print(df13.show())

val dfs = Seq(df11, df12, df13)
val fdf = dfs.foldLeft(df10)(
  (acc, df) => acc.join(df, Seq("ORD"), "left")
)
print(fdf.show())
// print(fdf.na.fill(1).show())
// fdf.agg(F.sum("C1")).show()
// df10.filter(col("IP").isNotNull && col("IP") =!= 1).show()

////filter type check
// df10.filter(col("IP").isNotNull && col("IP") =!= 2).show() //string
// df10.filter(col("DAY").isNotNull && col("DAY") =!= "11.0").show() //int

In [None]:
// Tinker with Timstamp

//2018-12-01 02:00:00.000|2018-11-16|

import org.apache.spark.sql.types.{StructType, StructField, StringType, FloatType, IntegerType, TimestampType, DoubleType, LongType}
import org.apache.spark.sql.functions.{col, lit, when}

val schema = StructType(Array(
    StructField("DATETIME", TimestampType, true),
    StructField("DAY", TimestampType, true),
    StructField("VAL", LongType, true)))
      
val someData = Seq(
      ("2018-12-01 02:00:00.000", "2018-12-01", 11),
      ("2019-01-01 02:00:00.000", "2019-01-01", 22),
      ("2019-02-01 02:00:00.000", "2019-02-01", 13),
      ("2019-03-01 02:00:00.000", "2019-03-01", 14),
      ("2019-04-01 02:00:00.000", "2019-04-01", 33),
      ("0", "0", 333),
      ("2019-05-01 02:00:00.000", "2019-05-01", 44))

val df = someData.toDF("DATETIME", "DAY", "VAL")
    .withColumn("DATETIME", F.to_timestamp($"DATETIME"))
    .withColumn("DAY", F.to_timestamp($"DAY"))

// val someDF = spark.createDataFrame(
//   spark.sparkContext.parallelize(someData),
//   StructType(schema)
// )

val dt = F.lit("2019-02-02").cast("timestamp")
df.filter(F.col("DATETIME") > dt).show()
df.withColumn("VAL", when(F.col("DATETIME") > dt, 1).otherwise(F.col("VAL"))).show()
// df.repartition(100,F.col("DATETIME")).persist().count()


In [None]:
//Declare some dummy dataframe

val df = Seq(
      (1, "foo"),
      (2, "barrio"),
      (3, "gitten"),
      (33, "NA"),
      (4, "baa")).toDF("id", "words")

// val someData = Seq(
//   Row(8, "bat"),
//   Row(64, "mouse"),
//   Row(-27, "horse")
// )

// val someSchema = List(
//   StructField("number", IntegerType, true),
//   StructField("word", StringType, true)
// )

// val someDF = spark.createDataFrame(
//   spark.sparkContext.parallelize(someData),
//   StructType(someSchema)
// )

// // dictionary Set of words to check 
// val d = Set("foo","bar","baa")

// println(s"Num of partitions:${df.rdd.getNumPartitions},  All partition sizes:[${df.rdd.glom().map(_.length).collect().mkString(",")}]")

df.show()
df.na.fill("").show()

df.withColumn("isValid", $"words".isInCollection(d)).show()
df.filter($"words".isInCollection(d)).show()

In [None]:
//Read avro data from s3
// https://github.com/julianpeeters/avro-spark-examples/blob/master/src/main/scala/AvroSparkScala.scala

// import org.apache.avro.generic.GenericRecord
// import org.apache.avro.mapred.AvroKey
// import org.apache.avro.mapreduce.AvroKeyInputFormat
// import org.apache.hadoop.io.NullWritable
// val rdd = sc.newAPIHadoopFile(local_path,
//   classOf[AvroKeyInputFormat[GenericRecord]],
//   classOf[AvroKey[GenericRecord]],
//   classOf[NullWritable]
// )
// rdd.collect()

// s3a://key:secret@
val local_path = "./data/part.avro"

val conf = sc.hadoopConfiguration
conf.set("fs.s3a.access.key", "")
conf.set("fs.s3a.secret.key", "")

val df = spark.read
    .format("com.databricks.spark.avro")
    .load(local_path)
df.collect() 