Skip to content
Austin Ouyang edited this page Sep 7, 2016 · 5 revisions

Introduction

Pig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. The salient property of Pig programs is that their structure is amenable to substantial parallelization, which in turns enables them to handle very large data sets.

At the present time, Pig's infrastructure layer consists of a compiler that produces sequences of Map-Reduce programs, for which large-scale parallel implementations already exist (e.g., the Hadoop subproject). Pig's language layer currently consists of a textual language called Pig Latin, which has the following key properties:

Ease of programming. It is trivial to achieve parallel execution of simple, "embarrassingly parallel" data analysis tasks. Complex tasks comprised of multiple interrelated data transformations are explicitly encoded as data flow sequences, making them easy to write, understand, and maintain. Optimization opportunities. The way in which tasks are encoded permits the system to optimize their execution automatically, allowing the user to focus on semantics rather than efficiency. Extensibility. Users can create their own functions to do special-purpose processing.

Requirements

At least 4 AWS Instances Hadoop (must have run the following start-dfs.sh, start-yarn, mr-jobhistory-daemon.sh)

Install Pig

This installation process only needs to be executed on the Master/Namenode.

We will grab the pig 0.14.0 version and save it to a Downloads folder. Next we will install it into our /usr/local directory and rename the folder to simply ‘pig’

namenode:~$ wget http://mirror.tcpdiag.net/apache/pig/pig-0.14.0/pig-0.14.0.tar.gz -P ~/Downloads
namenode:~$ sudo tar zxvf ~/Downloads/pig-*.tar.gz -C /usr/local
namenode:~$ sudo mv /usr/local/pig-* /usr/local/pig

Set Environment Variables

We will next add the Pig environment variables in the ~/.profile

…
…
…
export PIG_HOME=/usr/local/pig
export PATH=$PATH:$PIG_HOME/bin
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

Source the file so the environment variables are in the current shell session.

namenode:~$ . ~/.profile

Bring up WebUIs

Let’s open the WebUI to view the Hadoop cluster and the jobs that are submitted to the cluster. This can be viewed at public_dns_of_namenode:8088. The UI should look like the following:

We should also view the job history UI to see previous MapReduce jobs that we’ve run on the cluster. This can be seen at public_dns_of_namenode:19888. It should look like the following:

The example shows jobs that have been run, but if this is your first time, there should be no Retired Jobs in the Job History WebUI. We will now run through an example of writing a Pig script to compute some aggregate functions on a sample dataset.

Pig Example Problem

We will now run through an example which will involve performing some cleaning of the data and then running a MapReduce job using the Pig framework.

First download the dataset from the Google Drive link to your local machine: price data

The data is a couple days of tick level Gold Futures data from the CME Group. Each row contains a date, the last price at that time, and the number of contracts (or volume) traded at that price. The date is in the format of:

Schema:
<year><month><day> <hour><minute><seconds>; <price>; <volume>

Example:
20140602 22:34:12; 1250.5; 12  

Let’s now transfer this file over to your Hadoop cluster and load it onto HDFS into the folder ‘user’

local_machine:~$ scp -i   ubuntu@:~/

namenode:~$ hdfs dfs -mkdir /user
namenode:~$ hdfs dfs -copyFromLocal ~/price_data*.txt /user

We would like to compute the average price and total volume traded at each 30 minute interval. To do this we will need to map each timestamp to a 30 minute time slot. Next we will perform an average on the price and a sum of the contracts traded in each 30 minute time slot.

Mapping each time stamp to a 30 minute interval requires some string manipulation and is difficult using only the standard Pig functions. Pig, however, lets users declare user defined functions (UDFs) in various languages. Pig 0.14.0 currently supports Java, Jython, Python, JavaScript, Ruby, and Groovy. For this example we will use Jython which is essentially Python for the Java platform. Let’s create a file names string_manip.py and add the following to the file

namenode:~$ touch string_manip.py

@outputSchema("word:chararray")
def conv_to_30min(word):
    minute = word[11:13]
    modified_word = word[:11] + str(int(minute)/30*30).zfill(2) + "00"
    return modified_word

Here we define a function conv_to_30min which takes in a word date string and maps the date string to the last 30 minute time slot and returns the modified string. This function will be applied to each row in our data by importing this into our Pig script as we will do next.

Now we will create a Pig script named price_data.pig. Place the following into the file:

namenode:~$ touch price_data.pig

/* import the string_manip.py and assign it an alias str_func */
REGISTER 'string_manip.py' using jython as str_func;

/* load the data from HDFS using the specified schema */
price = LOAD '/user/price_data'
        USING PigStorage(';')
        AS (time:chararray, price:double, volume:int);

/* apply the function defined in str_func on the time field for each row */
price_modified = FOREACH price GENERATE str_func.conv_to_30min(time) AS time,
                                        price AS price,
                                        volume AS volume;

/* group the data based on their time */
grpd = GROUP price_modified BY (time);

/* compute the average price and total traded volume for each 30 minute time slot */
compressed = FOREACH grpd GENERATE group AS time,
                                   AVG(price_modified.price),
                                   SUM(price_modified.volume);

/* write the data out back onto HDFS under the specified folder */
STORE compressed INTO '/user/price_data_output_pig';

The Pig script can be run with the following command:

namenode:~$ pig price_data.pig

While the job is running you can go to public_dns_of_namenode:8088 and view the job being submitted and its status. Once the job is finished, it will show up in the job history under public_dns_of_namenode:19888. The output for this dataset should look somewhat like the following by executing the following command:

namenode:~$ hdfs dfs -cat /user/price_data_output_pig/part-r-00000

We can see here that the date times have been changed to 30 minute windows and the price is now a floating point number. Volume is larger resulting in the sum of all traded volume in these time periods.

Bonus: How do the run times compare between Pig and Hive? How about ease of programming.

Clone this wiki locally