Skip to content
This repository has been archived by the owner on Dec 22, 2022. It is now read-only.

Commit

Permalink
Merge branch 'arc_fixes'
Browse files Browse the repository at this point in the history
Conflicts:
	src/org/commoncrawl/mapred/ec2/parser/CJS_EMR_Parse.java
  • Loading branch information
Ahad Rana authored and Ahad Rana committed Aug 13, 2012
2 parents 921e68a + 9d9e636 commit 3d8b56a
Show file tree
Hide file tree
Showing 9 changed files with 424 additions and 198 deletions.
34 changes: 25 additions & 9 deletions bin/launch_emr_parse_job.py
Expand Up @@ -27,7 +27,7 @@ def usage():
sys.exit()

try:
opts, args = getopt.getopt(sys.argv[1:],'',['awsKey=','awsSecret=','core-count=','spot-count=','spot-bid=','keypair='])
opts, args = getopt.getopt(sys.argv[1:],'',['awsKey=','awsSecret=','core-count=','spot-count=','spot-bid=','keypair=','test'])
except:
usage()

Expand All @@ -39,7 +39,8 @@ def usage():
's3_bucket' : 'commoncrawl-emr',
'num_core' : 2,
'num_spot' : 0,
'spot_bid_price' : None
'spot_bid_price' : None,
'test_mode' : False
}

for o, a in opts:
Expand All @@ -55,6 +56,8 @@ def usage():
params['keypair']=a
if o in ('--spot-bid'):
params['spot_bid_price']=a
if o in ('--test'):
params['test_mode']=True

required = ['aws_key','secret','keypair']

Expand All @@ -63,10 +66,18 @@ def usage():
print '\nERROR:%s is required' % pname
usage()

for p, v in params.iteritems():
print "param:" + `p`+ " value:" + `v`

conn = boto.connect_emr(params['aws_key'],params['secret'])

bootstrap_step1 = BootstrapAction("install_cc", "s3://commoncrawl-public/config64.sh",[params['aws_key'], params['secret']])
bootstrap_step2 = BootstrapAction("configure_hadoop", "s3://elasticmapreduce/bootstrap-actions/configure-hadoop",["-m","mapred.tasktracker.map.tasks.maximum=8"])
bootstrap_step2 = BootstrapAction("configure_hadoop", "s3://elasticmapreduce/bootstrap-actions/configure-hadoop",
[
"-m","mapred.tasktracker.map.tasks.maximum=6",
"-m","mapred.child.java.opts=-XX:ErrorFile=/tmp/hs_err_${mapred.tip.id}.log -Xmx900m -XX:+UseParNewGC -XX:ParallelGCThreads=8 -XX:NewSize=100m -XX:+UseConcMarkSweepGC -XX:+UseTLAB -XX:+CMSIncrementalMode -XX:+CMSIncrementalPacing -XX:CMSIncrementalDutyCycleMin=0 -XX:CMSIncrementalDutyCycle=10"
])
bootstrap_step3 = BootstrapAction("configure_jobtrackerheap", "s3://elasticmapreduce/bootstrap-actions/configure-daemons",["--jobtracker-heap-size=12096"])

namenode_instance_group = InstanceGroup(1,"MASTER","c1.xlarge","ON_DEMAND","MASTER_GROUP")
core_instance_group = InstanceGroup(params['num_core'],"CORE","c1.xlarge","ON_DEMAND","CORE_GROUP")
Expand All @@ -84,27 +95,32 @@ def usage():

instance_groups=[namenode_instance_group,core_instance_group,spot_instance_group]

args = []

if params['test_mode'] == True:
args.append('--testMode')

step = JarStep(
name="CCParseJob",
jar="s3://commoncrawl-public/commoncrawl-0.1.jar",
main_class="org.commoncrawl.mapred.ec2.parser.EC2Launcer",
action_on_failure="TERMINATE_JOB_FLOW")
main_class="org.commoncrawl.mapred.ec2.parser.EC2Launcher",
action_on_failure="TERMINATE_JOB_FLOW",
step_args=args)

print instance_groups
sys.exit()

# instance_groups=[namenode_instance_group,core_instance_group,spot_instance_group],
jobid = conn.run_jobflow(
name="testbootstrap",
availability_zone="us-east-1a",
name="EMR Parser JOB",
availability_zone="us-east-1d",
log_uri="s3://" + params['s3_bucket'] + "/logs",
ec2_keyname=params['keypair'],
instance_groups=instance_groups,
keep_alive=True,
enable_debugging=True,
hadoop_version="0.20.205",
steps = [step],
bootstrap_actions=[bootstrap_step1,bootstrap_step2],
bootstrap_actions=[bootstrap_step1,bootstrap_step2,bootstrap_step3],
ami_version="2.0.4"
)

Expand Down
File renamed without changes.
19 changes: 10 additions & 9 deletions src/org/commoncrawl/crawl/common/shared/Constants.java
Expand Up @@ -26,15 +26,16 @@
public interface Constants {

/** arc file header **/
public static final String ARCFileHeader_ParseSegmentId = "x_commoncrawl_ParseSegmentId";
public static final String ARCFileHeader_OriginalURL = "x_commoncrawl_OriginalURL";
public static final String ARCFileHeader_URLFP = "x_commoncrawl_URLFP";
public static final String ARCFileHeader_HostFP = "x_commoncrawl_HostFP";
public static final String ARCFileHeader_Signature = "x_commoncrawl_Signature";
public static final String ARCFileHeader_CrawlNumber = "x_commoncrawl_CrawlNo";
public static final String ARCFileHeader_CrawlerId = "x_commoncrawl_CrawlerId";
public static final String ARCFileHeader_FetchTimeStamp = "x_commoncrawl_FetchTimestamp";
public static final String ARCFileHeader_ParseSegmentId = "x-commoncrawl-ParseSegmentId";
public static final String ARCFileHeader_OriginalURL = "x-commoncrawl_OriginalURL";
public static final String ARCFileHeader_URLFP = "x-commoncrawl-URLFP";
public static final String ARCFileHeader_HostFP = "x-commoncrawl-HostFP";
public static final String ARCFileHeader_Signature = "x-commoncrawl-Signature";
public static final String ARCFileHeader_CrawlNumber = "x-commoncrawl-CrawlNo";
public static final String ARCFileHeader_CrawlerId = "x-commoncrawl-CrawlerId";
public static final String ARCFileHeader_FetchTimeStamp = "x-commoncrawl-FetchTimestamp";
public static final String ARCFileHeader_ContentTruncated = "x-commoncrawl-ContentTruncated";
public static final String ARCFileHeader_SOURCE_IS_GZIPED = "x_commoncrawl_SourceIsGZIP";
public static final String ARCFileHeader_SOURCE_IS_GZIPED = "x-commoncrawl-SourceIsGZIP";
public static final String ARCFileHeader_DetectedCharset = "x-commoncrawl-DetectedCharset";

}
86 changes: 57 additions & 29 deletions src/org/commoncrawl/mapred/ec2/parser/EC2Launcher.java
Expand Up @@ -22,15 +22,26 @@
import java.io.IOException;
import java.io.InputStream;

import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.commoncrawl.util.CCStringUtils;


/**
* EC2Launcher Task (spawned by EMR)
*
* @author rana
*
*/
@SuppressWarnings("static-access")
public class EC2Launcher {

public static final Log LOG = LogFactory.getLog(EC2Launcher.class);



static class InputStreamHandler extends Thread {
/**
* Stream being read
Expand Down Expand Up @@ -69,39 +80,56 @@ public void run() {
}
}



public static void main(String[] args) {
System.out.println("Sleeping for 2 mins");
try {
Thread.sleep(10000);
} catch (InterruptedException e1) {
}
System.out.println("Done Sleeping");

ProcessBuilder pb = new ProcessBuilder(
"./bin/ccAppRun.sh",
"--consoleMode",
"--heapSize",
"4096",
"--logdir",
"/mnt/var/EC2TaskLogs",
"org.commoncrawl.mapred.ec2.parser.EC2ParserTask",
"start");
pb.directory(new File("/home/hadoop/ccprod"));

try {
System.out.println("Starting Job");
Process p = pb.start();
new InputStreamHandler (p.getErrorStream());
new InputStreamHandler (p.getInputStream());
CommandLineParser parser = new GnuParser();

try {
System.out.println("Sleeping for 2 mins");
try {
Thread.sleep(10000);
} catch (InterruptedException e1) {
}
System.out.println("Done Sleeping");

ProcessBuilder pb = new ProcessBuilder(
"./bin/ccAppRun.sh",
"--consoleMode",
"--heapSize",
"4096",
"--logdir",
"/mnt/var/EC2TaskLogs",
"org.commoncrawl.mapred.ec2.parser.EC2ParserTask",
"start");

for (String arg : args) {
pb.command().add(arg);
}

System.out.println("Waiting for Job to Finish");
p.waitFor();
System.out.println("Job Finished");
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
pb.directory(new File("/home/hadoop/ccprod"));

try {
System.out.println("Starting Job");
Process p = pb.start();
new InputStreamHandler (p.getErrorStream());
new InputStreamHandler (p.getInputStream());

System.out.println("Waiting for Job to Finish");
p.waitFor();
System.out.println("Job Finished");
System.exit(0);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
catch (Exception e) {
LOG.error(CCStringUtils.stringifyException(e));
}
System.exit(1);
}

}

0 comments on commit 3d8b56a

Please sign in to comment.