## Chapter: 15 Big Data Mining

The term “big data” applies to datasets that exceed the size our data programs can hold. For Excel, that might be a dataset that is larger than the available memory, and for MySQL, that might be the size of the largest file supported by the underlying operating system. The point is, most software applications have size limits which the size of big-data solutions exceed.

Normal questions you may have probably include: where is all the big data coming from, how quickly, and how much? 

# Bring the Process to the Data

When programs retrieve data from a disk, the time required to get the data from the disk directly impacts the application’s performance. When you distribute files across a network, the network’s transmission speed will become the longest delay in the process.

Assume, for example, that you must read a 10GB file to find each occurrence of the word “Data Mining.” On a local drive, a system may read the file (based on a 100MB/S disk) in about 100 seconds. If you move the file to a distributed file system, the network-transportation speed will increase that amount of time. 

Developers refer to solutions that retrieve all the data for a file for processing in this way as “bringing the data to the process.” The time to perform such processing is dependent on the amount of data retrieved.

Hadoop changes this model to instead “bring the process to the data.” In the Hadoop model, rather than returning all the data and then performing the processing, Hadoop sends the processing to be performed to each node and lets each perform the operation to collect and return the desired data. In this way, if you break apart the previous 10GB file across 20 nodes, Hadoop can parallelize the operations, significantly reducing the processing time required--in this case, down to 5 to 10 seconds. If you further distribute the file across more nodes, such as 100, you can quickly distribute away much of the processing time. As an example, estimates place the number of Hadoop nodes driving the Yahoo search-index page at over 10,000.

# Enter Map Reduce

As you have learned, HDFS, the Hadoop Distributed File System, distributes large files across many nodes within a cluster. To perform big-data analytics, applications normally use a two-step process called MapReduce. 

During the first step, the application maps the data that it reads into key-value pairs. Assume, for example, you have stock-market data for technology stocks (date, symbol, open, low, high, and close) since the year 2000. Your goal is to know the date of each company’s highest closing stock price.

Our data of interest, in this case, are the date of each record and the closing price. The Map phase will identify the key-value pair of interest (date, high price). The query, as such, would provide data for each record. The Reduce phase, then, will examine that data to determine the desired result (the date and price of the highest price found).

To implement MapReduce processing, developers define two functions, one to perform the mapping and one to perform the reduction. Using Hadoop, developers can implement these functions using Java, C++, Python, and other languages. 

# Looking at a Simple MapReduce Example

To better understand the MapReduce process, consider the following simple Python scripts that use the data file stocks.csv. 
The first script, Mapper.py, reads the file, searching for the MSFT symbol and outputs matching dates and closing prices. That script consists of the following code:

```python
import sys

for line in sys.stdin:
    data = line.split(',')
    if (data[0] == 'MSFT'):
      print(data[1] + ',' + data[2])
```

To run the script, you can redirect the contents of the Stock.csv file to the script, as is performed below:

In [None]:
! python Mapper.py < Stocks.csv

As you can see, the Mapper returns the data for each MSFT record along with the closing price, but does not calculate the date of the highest price—that’s the role of the Reduce function, which the script, Reduce.py, implements:

```python
import sys

high = 0

for line in sys.stdin:
    data = line.split(',')
    close = float(data[1].rstrip())   # get rid of newline

    if (close > high):
      high = close
      date = data[0]

print(date + ',' + str(high))
```
To combine the scripts, in order to perform both the mapping and reducing operations on the dataset, we would use the following line:

```python
! python Mapper.py < Stocks.csv | python Reduce.py
```
As you can see, first, we perform the mapping operation on the Stocks.csv file, returning Microsoft's highest closing price for each date; then, we send the results to our reduce script, which finds Microsoft's highest closing price, and returns its data and price values.

In [None]:
#############################
# Chapter 15 / Deliverable 1
#############################

! python Mapper.py < Stocks.csv

As you may discover, this command displays the date and closing price for Microsoft’s highest closing price.
Admittedly, this example was very simple. In a real-world example, Hadoop would pass the functions to each node, which, in turn, would use it to process its data.

# Performing MapReduce Options Using MongoDB

This section performs operations on a database that has been pre-built for you. The commands used to build this database can be found in the chapter15-database.js script file located in the chapter folder.

*Note: None of the code found in this section is valid in the Jupyter environment. You are to run the code in the MongoDB Server instance you prepared in *Getting Started with MongoDB.*

Like Hadoop, MongoDB also offers distributed data solutions through a mechanism called sharding. MongoDB can distribute data across nodes ("shards") and then access the data through a router that maps data requests to the correct shard. To provide support for big-data analytics, MongoDB provides the mapReduce command, which enables MapReduce aggregation operations over sharded collections. 

First, connect to the pre-built Stocks database by issuing the following command in the MongoDB Shell:

In [None]:
use Stocks

Then, to perform a MapReduce operation you must define two functions: map and reduce. Execute the following query in the MongoDB Shell to create them:

In [None]:
var map = function() {
  emit(this.Symbol, { date: this.Date, close: this.Close});
};

reduce = function (key, values) {
  output = { date: "", close: 0 }

 for (i = 0; i < values.length; i++)
    if (values[i].close > output.close)
      {
        output.close = values[i].close;
        output.date = values[i].date;
      }

  return(output);
}

The map function provides the stock symbol, date, and closing price for each record. The reduce function receives a group of related records (same key) and determines and returns the highest stock closing price within the group. Note that the reduce function returns a result in the form emitted by the map function. That’s because the reduce function may be called with partial group data (such as some of the MSFT data) and will be called repeatedly until the aggregation is complete. 

To use these two functions to perform a Map Reduce operation, use the mapReduce class. Execute the following query into MongoDB:

In [None]:
######################################
# Chapter 15 / Deliverable 2
######################################

db.Data.mapReduce(map, reduce, { out: { inline: 1}})