# Big Data Analytics
In this lab, we will go through a quiz for enhancing the understading about NoSQL and Map/Reduce paradigm, and practice to:    
    - Process data in [JSON](https://en.wikipedia.org/wiki/JSON) format     
    - Design and write Map/Reduce programs

## NoSQL Quiz
1. Which of the following is true about NoSQL?    
    - (a) An alternative to SQL databases only for managing textual data
    - (b) A distributed version of relationl databases
    - (c) A paradigm only for managing non-structured data
    - (d) A new data paradigm to manage large datasets - <b>True</b>
2. NoSQL is optimized for scaling up
    - (a) number of data rows
    - (b) number of data columns - <b>True</b>
2. Does NoSQL databases prohibits the use of SQL?
    - (a) Yes
    - (b) No  - <b>True</b>
3. Which of the following represents a column in NoSQL?
    - (a) Document
    - (b) Collection
    - (c) Database
    - (d) Field  - <b>True</b>
5. Is Hadoop a NoSQL program?
    - (a) Yes
    - (b) No - <b>True</b>
    - (c) Case by case
    - (d) Depends on the provider and version 

## Map/Reduce Quiz
1. Which of the following is true about Map/Reduce programing model
    - (a) It is used to write programs that process data in stream
    - (b) It is used to write programs that process data in parallel  - <b>True</b>
    - (c) It only works within Hadoop framework
    - (d) It only works with data of key-value pair types
2. Which of the following describes the map function?
    - (a) It maps a key-value pair to another key-value pair
    - (b) It finds a value coressonping to a key
    - (c) It manipulates the value of a key-value pair
    - (d) It prosesses data to create a list of key-value pairs  - <b>True</b>
3. Which of the following is true about output of the mappers?
    - (a) Each mapper must generate the same number of key-value pairs as its input had.    
    - (b) All mappers must generate the same number of key-value pairs    
    - (c) Each mapper can generate zero or some key/value pair(s)  - <b>True</b>
    - (d) Each mapper generates some key-value pair(s) of same type with its input
4. Which of the following describes the reduce function?
    - (a) It reduces values that have the same key
    - (b) It reduces the number of values having the same key
    - (c) It aggregates values having the same key into one of same type
    - (d) It produces a summary of values having same key  - <b>True</b>
5. Which of the following is true about input of the reducers?
    - (a) Each reducers take as input a list of key-value pairs
    - (b) Each reducer takes as input a key and a list of values  - <b>True</b>
    - (c) Each ruducer taskes as input the output of a dedicated group of mapper
    - (d) Diferent reducers may have inputs with same key(s)


## Process data in JSON format

We will use [jsonlite](https://cran.r-project.org/web/packages/jsonlite/vignettes/json-aaquickstart.html) package for working with JSON object
### Example
Given a set of Twitter messages (i.e., <i> tweets </i>) contained in file, each line is a tweet in [JSON format](https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/tweet-object), to extract text content of the tweets

In [4]:
#Install required packages

#jsonlite package: to i/o with data from/to JSON format
#install.packages("jsonlite", repos = "https://cran.uni-muenster.de/")
library(jsonlite)

#
library(data.table)
library(plyr)

In [6]:
#define the function to extract text content from tweet in JSON format
get_text <- function(jsonTweet){
    tweet <- fromJSON(jsonTweet)
    return(tweet['text'])
}

In [7]:
#function to extract tweets' text content from JSON file
readTweets <- function(filename){
    tweets <- c()
    #open file
    conn <- file(filename,open="r")
    #set of tweets
    lines <-readLines(conn)
    #process each tweet in turn
    for (i in 1:length(lines)){
       tweets <- c(tweets, get_text(lines[i]))
    }
    close(conn)
    #close file
    return (tweets)
}

In [8]:
#test the function
#get list of tweets' text content
tweets <- readTweets("data/ua.txt")
tweets

## Exercise
Given a set of Twitter messages (i.e., tweets ) contained in file, each line is a tweet in JSON format, to count number of tweets per users in the set

In [20]:
#function to extract user's screen_name from tweet in JSON format
get_user <- function(jsonTweet){
    tweet <- fromJSON(jsonTweet)
    user <-  as.list(tweet['user'])
    f <- as.data.frame(do.call(rbind, as.list(user)))
    f <- setDT(f, keep.rownames = TRUE)[]    
    return(f$screen_name)
}
#function to extract users from JSON file
readUsers <- function(filename){
    users <- c()
    #open file
    conn <- file(filename,open="r")
    #set of tweets
    lines <-readLines(conn)
    #process each tweet in turn
    for (i in 1:length(lines)){
       users <- c(users, get_user(lines[i]))
    }
    close(conn)
    #close file
    
    #count number of tweets per user
    f <- as.data.frame(do.call(rbind, as.list(users)))
    f <- setDT(f, keep.rownames = TRUE)
    names(f) <- c("tweet_index", "user")
    f <- count(f, 'user')
    names(f) <- c("user", "num_tweets")
    return (f)
}
#test the function
users <- readUsers("data/ua.txt")
users


user,num_tweets
<fct>,<int>
Andyx002,1
Asimxo,1
boredinohio2,1
cgnetwork,1
IndexJoker,1
JaneReid5,1
Matthew_F944,1
melman101,1
MikePence2Cents,2
MooreOfKaitlyn,1


## Write Map/Reduce programs

We will use the [DSL](https://cran.r-project.org/web/packages/DSL/index.html) package for (simulating) the Map/Reduce programming environment

### Example
Given a large set of tweets containted in multiple files in JSON format, to count number of tweets containing each hashtag in the set

Main steps:
1. Create a list of (directory, filename) pairs
2. Use a mapper to turn the (directory, filename) pairs files into (key = filename, value = tweet's text content) pairs 
3. Use another mapper to turn (key = filename, value = tweet's text content) pairs to (key = hashtag, value = count) pairs
4. Use a reducer to sum up the counts of the same hashtags from (key = hashtag, value = count) pairs

<div style='float: center'>
  <img style='display' src="image/hashtagCount.jpg"></img>
</div>

In [21]:
#Install required packages

#DSL packaage 
#install.packages("DSL", repos = "https://cran.uni-muenster.de/")

#import libraries
library(DSL)


In [22]:
## split into tweets: this function turns a (key = directory, value = filename) pair
## to (key = filename, value = tweet's text content) pairs
split_tweets <- function( keypair ){
    #re-use the readTweets function written above
    values <- readTweets(keypair$value)
    #produce 
    mapply( function(key, value) list( key = key, value = value), rep(keypair$key, length(values)), values,
            SIMPLIFY = FALSE, USE.NAMES = FALSE )
}

In [23]:
# this function extract hashtags in a tweet
# if the input tweet has no hashtag, "#NO_HASHTAG" is returned
get_hashtags <- function(s){
    hashtags <- grep("^[#]", scan(textConnection(s), ""), value=TRUE)
    hashtags <- unique(hashtags)
    if(length(hashtags) == 0){
        hashtags <- c("#NO_HASHTAG")
    }
    return (hashtags)
}

In [24]:
## this function turns a (key = filename, value = tweet's text content) pair
## to (key = hashtag, value = count) pairs
split_hashtags <- function( keypair ){
    keys <- get_hashtags(keypair$value)
    mapply( function(key, value) list( key = key, value = value), keys, rep(1L, length(keys)),
            SIMPLIFY = FALSE, USE.NAMES = FALSE )
}

In [25]:
#directory containing the input files
working_dir <- "./data"
## SIMULATING cluster evironment
ds <- DStorage("LFS", tempdir(), chunksize = 1L)
#step 1. Create a list of (directory, filename) pairs
dl <- as.DList(working_dir, DStorage = ds)
#uncomment these 2 following lines to see the intermediate results
as.list(dl)
print("********************")


#step 2. Use a mapper to turn the (directory, filename) pairs files into (key = filename, value = tweet's text content) pairs  
dl <- DMap(dl, split_tweets)
#as.list(dl)
#print("********************")

#step 3. Use another mapper to turn (key = filename, value = tweet's text content) pairs to (key = hashtag, value = count) pairs
dl <- DMap(dl, split_hashtags)
#as.list(dl)
#print("********************")

#step 4. Use a reducer to sum up the counts of the same hashtags from (key = hashtag, value = count) pairs
dl <- DReduce(dl, sum )

#convert the counts to a data table
f <- as.data.frame(do.call(rbind, as.list(dl)))
f <- setDT(f, keep.rownames = TRUE)[]
names(f) <- c("hashtags", "num_tweets")
f

[1] "********************"


hashtags,num_tweets
<chr>,<int>
#PARLIAMENT,1
#UK,7
#London.After,1
#2,1
#Banislam,1
#Brus…,1
#Westminster,8
#LONDON,1
#BritainAttack,1
#Bridge!!!!,1


## Exercise

<b>Design</b> <i>Map/Reduce algorithm</i> and write program (<b>optional!!!</b>) for the following tasks

1. Given a set of Twitter messages (i.e., tweets ) contained in multiple files, each line is a tweet in JSON format, to count number of tweets per day in the set
2. Given a large directed graph contained in multiple files, each file is a set of edges, each edge on a line, to count number of in-going and out-going edges per node in the graph

Suggestion

Main steps in task 1:
1. Create a list of (directory, filename) pairs
2. Use a mapper to turn the (directory, filename) pairs files into (key = filename, value = tweet's published date) pairs: This mapper reads the input file (as the key of the input pair), line by line; for each line, it extracts the published date of the tweet contained in the line, and outputs a (key = filename, value = tweet's published date).
3. Use a reducer to sum up the counts of the same publication date from (key = publication date, value = 1) pairs

Main steps in task 2:
1. Create a list of (directory, filename) pairs
2. Use a mapper to turn the (directory, filename) pairs files into (key = node_id, value = {1, "out"}) and (key = node_id, value = {1, "in"})  pairs: This mapper reads the input file (as the key of the input pair), line by line; for each line, which is an edge (u, v), it outputs a (key = u, value = {1, "out"}) pair and a (key = v, value = {1, "in"}). 
4. Use a reducer to sum up the "out" and "in" counts of the  (key = u, value = {1, "out"/"in"}) pairs


## References

[jsonlite package](https://cran.r-project.org/web/packages/jsonlite/index.html)

[DSL package](https://cran.r-project.org/web/packages/DSL/index.html)

[Map/Reduce examples](https://highlyscalable.wordpress.com/2012/02/01/mapreduce-patterns/)