# Hadoop Platform and Application Framework

### Spark Join
#### Week_4, Lesson_2
Actual problem statement was for Hadoop MapReduce. Here, that assignment is instead done in Spark.

In [1]:
print(s"Spark Version: ${spark.version}\nSpark App Name: ${spark.sparkContext.appName}")

Spark Version: 2.1.0
Spark App Name: Apache Toree

In [2]:
import org.apache.spark.sql.functions.{col, sum}

//Workaround in Toree to get SparkSession
val sparkDummy = spark
import sparkDummy.implicits._

In [3]:
val join2Num = spark.read.
    format("csv").
    load("file:///home/ubuntu/UCSD/hadoop-platform-and-application-framework/join2_gennum*.txt").
    as[(String, String)].
    select('_c0 as 'show, '_c1.cast("int") as 'count)

join2Num.show(5)

+-----------------+-----+
|             show|count|
+-----------------+-----+
|    Hourly_Sports|   21|
|      Hot_Talking|   44|
|   Almost_Cooking|   91|
|        Dumb_Show|  186|
|PostModern_Sports|  377|
+-----------------+-----+
only showing top 5 rows



In [4]:
join2Num.printSchema

root
 |-- show: string (nullable = true)
 |-- count: integer (nullable = true)



In [5]:
val join2Chan = spark.read.
    format("csv").
    load("file:///home/ubuntu/UCSD/hadoop-platform-and-application-framework/join2_genchan*.txt").
    as[(String, String)].
    select('_c0 as 'show, '_c1 as 'channel)

join2Chan.show(5)

+---------------+-------+
|           show|channel|
+---------------+-------+
|  Hourly_Sports|    DEF|
|    Hot_Cooking|    XYZ|
| Almost_Talking|    CAB|
|   Dumb_Talking|    MAN|
|PostModern_News|    BAT|
+---------------+-------+
only showing top 5 rows



In [6]:
join2Chan.printSchema

root
 |-- show: string (nullable = true)
 |-- channel: string (nullable = true)



In [7]:
join2Num.createOrReplaceTempView("join2Num")
join2Chan.createOrReplaceTempView("join2Chan")

In [8]:
join2Num.cache()
join2Chan.cache()

[show: string, channel: string]

In [9]:
spark.sql(s"""select join2Num.show as show, sum(count) as count 
    | from join2Num join join2Chan 
    | where join2Num.show = join2Chan.show and join2Chan.channel=\"ABC\" 
    | group by join2Num.show 
    | order by count desc""").
    show

+----------------+------+
|            show| count|
+----------------+------+
|  Hourly_Talking|108163|
|    Dumb_Talking|103894|
|        Hot_Show| 54378|
|  Hourly_Cooking| 54208|
|       Dumb_Show| 53824|
|     Cold_Sports| 52005|
|     Baked_Games| 51604|
|       Loud_Show| 50820|
|PostModern_Games| 50644|
|    Surreal_News| 50420|
|       Hot_Games| 50228|
|     Almost_Show| 50202|
| PostModern_News| 50021|
|      Loud_Games| 49482|
|    Almost_Games| 49237|
|     Hourly_Show| 48283|
|       Cold_News| 47924|
|      Baked_News| 47211|
|  Surreal_Sports| 46834|
|     Almost_News| 46592|
+----------------+------+



In [10]:
val joinCondition = (join2Num.col("show") === join2Chan.col("show")).
    and(join2Chan.col("channel") === "ABC")
    
val joinedDF = join2Num.join(join2Chan, joinCondition).
    groupBy(join2Num.col("show")).
    agg(sum('count) as 'count).
    sort($"count".desc).
    show

                                                                                +----------------+------+
|            show| count|
+----------------+------+
|  Hourly_Talking|108163|
|    Dumb_Talking|103894|
|        Hot_Show| 54378|
|  Hourly_Cooking| 54208|
|       Dumb_Show| 53824|
|     Cold_Sports| 52005|
|     Baked_Games| 51604|
|       Loud_Show| 50820|
|PostModern_Games| 50644|
|    Surreal_News| 50420|
|       Hot_Games| 50228|
|     Almost_Show| 50202|
| PostModern_News| 50021|
|      Loud_Games| 49482|
|    Almost_Games| 49237|
|     Hourly_Show| 48283|
|       Cold_News| 47924|
|      Baked_News| 47211|
|  Surreal_Sports| 46834|
|     Almost_News| 46592|
+----------------+------+



### Spark Simple Join
#### Week_5, Lesson_2

In [11]:
val simpleJoin1FileA = spark.read.
    format("csv").
    load("file:///home/ubuntu/UCSD/hadoop-platform-and-application-framework/join1_FileA.txt").
    as[(String, String)].
    select('_c0 as 'word, '_c1.cast("int") as 'count1)
    
simpleJoin1FileA.show(5)

+------+------+
|  word|count1|
+------+------+
|  able|   991|
| about|    11|
|burger|    15|
| actor|    22|
+------+------+



In [12]:
simpleJoin1FileA.printSchema

root
 |-- word: string (nullable = true)
 |-- count1: integer (nullable = true)



In [13]:
val simpleJoin1FileB = spark.read.
    format("csv").
    load("file:///home/ubuntu/UCSD/hadoop-platform-and-application-framework/join1_FileB.txt").
    map { word => 
        val one = word.toString.split(","); 
        (one(0).toString.split(" ")(1), one(0).toString.split(" ")(0).replaceAll("""\[""", ""), one(1).replaceAll("""\]""", ""))
    }.
    toDF("word", "date", "count2")

simpleJoin1FileB.show()

+------+------+------+
|  word|  date|count2|
+------+------+------+
|  able|Jan-01|     5|
| about|Feb-02|     3|
| about|Mar-03|     8|
|  able|Apr-04|    13|
| actor|Feb-22|     3|
|burger|Feb-23|     5|
|burger|Mar-08|     2|
|  able|Dec-15|   100|
+------+------+------+



In [14]:
simpleJoin1FileB.printSchema

root
 |-- word: string (nullable = true)
 |-- date: string (nullable = true)
 |-- count2: string (nullable = true)



In [15]:
simpleJoin1FileA.createOrReplaceTempView("newA")
simpleJoin1FileB.createOrReplaceTempView("newB")

In [16]:
val joinCondition = (simpleJoin1FileA.col("word") === simpleJoin1FileB.col("word"))

val fileB_joined_fileA = simpleJoin1FileB.join(simpleJoin1FileA, joinCondition)
fileB_joined_fileA.show

+------+------+------+------+------+
|  word|  date|count2|  word|count1|
+------+------+------+------+------+
|  able|Jan-01|     5|  able|   991|
| about|Feb-02|     3| about|    11|
| about|Mar-03|     8| about|    11|
|  able|Apr-04|    13|  able|   991|
| actor|Feb-22|     3| actor|    22|
|burger|Feb-23|     5|burger|    15|
|burger|Mar-08|     2|burger|    15|
|  able|Dec-15|   100|  able|   991|
+------+------+------+------+------+



In [17]:
spark.sql(s"""select * from newB join newA
    | where newA.word = newB.word and newA.word='actor'
    """.stripMargin).show

+-----+------+------+-----+------+
| word|  date|count2| word|count1|
+-----+------+------+-----+------+
|actor|Feb-22|     3|actor|    22|
+-----+------+------+-----+------+



### Spark Advanced Join
#### Week_5, Lesson_3

In [18]:
spark.sql(s"""select join2Chan.channel as channel, sum(count) as count 
    | from join2Num join join2Chan 
    | where join2Num.show = join2Chan.show
    | group by join2Chan.channel 
    | order by count desc
    """.stripMargin).show

                                                                                +-------+-------+
|channel|  count|
+-------+-------+
|    DEF|8032799|
|    MAN|6566187|
|    XYZ|5208016|
|    BAT|5099141|
|    CNO|3941177|
|    CAB|3940862|
|    BOB|2591062|
|    NOX|2583583|
|    ABC|1115974|
+-------+-------+



In [19]:
val joinCondition = (join2Num.col("show") === join2Chan.col("show")).
    and(join2Chan.col("channel") === "BAT")
    
val joinedDF = join2Num.join(join2Chan, joinCondition).
    groupBy(join2Chan.col("channel"))

joinedDF.agg(sum('count) as 'count).sort($"count".desc).show

                                                                                +-------+-------+
|channel|  count|
+-------+-------+
|    BAT|5099141|
+-------+-------+

