Skip to content

Simple Examples of EasyML

callmevan edited this page Sep 4, 2017 · 9 revisions

序言

本文将通过两个简单的示例分别展示如何使用EasyML以单机或分布式的方式运行程序

本文资源汇总

分布式程序
单机程序
测试文本文件
程序源文件

一、搭建流程

1.1 上传data

首先点击 Upload Data 上传本次程序需要使用的输入文件 Alt text

面板说明

  1. 数据名称
  2. 所属目录 --- System Data为系统实例数据目录;Shared Data为用户共享目录,此目录下的数据将可以被所有用户使用。My Data为私人目录,此目录下数据仅用户本人可见。
  3. 数据类型 --- 共有JSON/CSV/TSV/General四种类型,除了JSON/CSV/TSV外的数据类型全部使用General类型。例如本次使用的WordCount_test.txt,类型选择是General
  4. 数据版本
  5. 上传按钮
  6. 数据描述 --- 可以为空

在填写完成后,点击Submit按钮开始上传Data 上传成功后你会看到这个提示 Alt text

之后就可以在目录下查看已上传的Data

Alt text

1.2.1 上传分布式program

程序功能简介

  • WordCount 词频统计,将一个文件中使用的词汇及其使用次数统计后输出

点击 Upload Program 上传程序 本次上传的是 WordCount.zip(无Parameter) Alt text

面板的详细说明请参照EasyML User's Guide

部分参数说明及源码展示

Type
  • Standalone代表以单机的方式执行,例如执行一个java或python程序;
  • Distributed代表以分布式的方式执行,如一个Spark的算法包;
  • ETL代表进行数据的读入和写出的模块。
Parameter
  • 注意到这里没有填写Parameter一栏,在此贴出WordCount(无Parameter)的源码
package word
/**
  * Created by Administrator on 2017/8/26.
  * scalaVersion := "2.11.8"
  */
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._

object WordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("error")
      System.exit(1)
    }

    val conf = new SparkConf().setAppName(args(2))
    val sc = new SparkContext(conf)
    val line = sc.textFile(args(0))
    //output to file
    line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).saveAsTextFile(args(1))
    //output to  screen
    line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect().foreach(println)
    sc.stop()
  }
}
/**
  * 代码中arg(0) arg(1) arg(2)分别对应CMD中的 in out string
  */
  • Parameter实际上是参数的一个前缀,是为了方便理解参数的含义而设置的。如果要使用Parameter,那么需要对应的在程序中对其进行解析。例如:

Alt text

  • 这里给三个参数的前缀分别为input_pt output_pt appname。 相应的,需要在程序里进行解析。在此贴出WordCount(有Parameter)的源码
package word
/**
  * Created by Administrator on 2017/8/26.
  * scalaVersion := "2.11.8"
  */
import org.apache.spark.{SparkConf, SparkContext}
import scopt.OptionParser
import org.apache.spark.SparkContext._

object WordCount {

  /** command line parameters */
  case class Params(input_pt: String = "",
                    output_pt: String = "",
                    appname: String = "")

  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: <file>")
      System.exit(1)
    }

    val default_params = Params()
    val parser = new OptionParser[Params]("WordCount") {
      head("WordCount: Count words in documents.")
      opt[String]("input_pt")
        .required()
        .text("Input document file path")
        .action((x, c) => c.copy(input_pt = x))
      opt[String]("output_pt")
        .required()
        .text("Output document file path")
        .action((x, c) => c.copy(output_pt = x))
      opt[String]("appname")
        .required()
        .text("appname")
        .action((x, c) => c.copy(appname = x))
    }

    parser.parse(args, default_params).map { params =>
      run(params)
    } getOrElse {
      System.exit(1)
    }
  }

  def run(p:Params): Unit = {
    val conf = new SparkConf().setAppName(p.appname)
    val sc = new SparkContext(conf)
    val line = sc.textFile(p.input_pt)
    //output to file
    line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).saveAsTextFile(p.output_pt)
    //output to  screen
    line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).collect().foreach(println)
    sc.stop()
  }

}
/**
  * 代码中p.input_pt p.output_pt p.appname分别对应CMD中的 in out string
  */
CMD
  • CMD及参数都配置完后,点击Generate,你就会看到一条完整的CMD Alt text
  • 可以看出,Parameter字段内容以--'Parameter'生成在了真正的参数之前。对应的type为in 解析为:{in:Value:"description"},代表输入文件 out解析为:{out:Value:"description'"},代表输出文件 string解析为:["description":string:default,"Default"],代表String类参数
  • 在最后程序执行时,它会被替换为spark-submit --class word.WordCount wordcount.jar --input_pt '文件名' --output_pt '文件名' --appname 'wordcount'

以上步骤都完成后,点击Submit按钮开始上传Program 上传成功后你会看到这个提示 Alt text

1.2.2 上传单机program

程序功能简介

  • FileSplit 文件内容分割,读取一个文件中的内容按照特定的比例分别存入两个文件

点击 Upload Program 上传程序,本次上传的是FileSplit.zip(无Parameter) Alt text 参数说明请参照1.2.1

源码展示

  • FileSplit(无Parameter)
package filesplit

import scala.util.Random
import scala.io.Source
import java.io._
/**
  * Created by Administrator on 2017/9/1.
  */
object filesplit {

  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: <input_file> <rate> <output_file1> <output_file2>")
      System.exit(1)
    }
    /**
      * Split a file into two partitions.
      *
      *  infile A file.
      *  rate the ratio in two partitions.
      */

    val infile = readLines(args(0)).toSeq
    val rate = args(1).toDouble
    val inf_len = infile.size
    val outf1_len = (inf_len * rate).toInt
    val outf2_len = infile.size - outf1_len
    val outf1 = new Array[String](outf1_len)
    val outf2 = new Array[String](outf2_len)

    var ind1 = 0
    while (ind1 < outf1_len) {
      outf1(ind1) = infile(ind1)
      ind1 += 1
    }

    var ind2 = 0
    while (ind2 < outf2_len) {
      outf2(ind2) = infile(outf1_len + ind2)
      val rand = Random.nextInt(outf1_len + ind2)
      if (rand < outf1_len) {
        swap(outf1, outf2, rand, ind2)
      }
      ind2 += 1
    }

    writeLines(args(2),outf1.toSeq)
    writeLines(args(3),outf2.toSeq)

  }
  def readLines(pt: String): Iterator[String] = {
    val rf = Source.fromFile(pt)
    rf.getLines()
  }

  def writeLines(res_pt: String, lines: TraversableOnce[String]) {
    val wf = new PrintWriter(res_pt)
    lines.foreach { l => wf.write(l + '\n') }
    wf.close()
  }

  /**
    * Swap two elements in different arrays.
    *
    * @param f1 array stored part of the file.
    * @param f2 array stored part of the file.
    * @param id1 element id of array-f1.
    * @param id2 element id of array-f2.
    */

  private def swap(f1: Array[String], f2: Array[String], id1: Int, id2: Int): Unit = {
    val str = f1(id1)
    f1(id1) = f2(id2)
    f2(id2) = str
  }

}
/**
  * 代码中arg(0) arg(1) arg(2) arg(3)分别对应CMD中的 in double out out
  */

CMD

  • CMD及参数都配置完后,点击Generate,你就会看到一条完整的CMD Alt text

以上步骤都完成后,点击Submit按钮开始上传Program 上传成功后你会看到这个提示 Alt text

之后就可以在目录下查看已上传的Program

Alt text

1.3 提交任务

首先点击Create Job新建一个空任务

WordCount

  • 将上传的Data wordtest和Program wordcount点击加入任务 wordtest作为wordcount的输入文件,将图标上代表输出的绿色点连接至wordcount图标上代表输入的蓝色点 Alt text

FileSplit

  • 将上传的Data wordtest和Program filesplit点击加入任务 wordtest作为filesplit的输入文件,将图标上代表输出的绿色点连接至filesplit图标上代表输入的蓝色点 Alt text

之后点击Submit,输入Job Name即可提交任务

至此,所有步骤已完成。如有疑问请加入微信群咨询 EML WeChat Group

1.4 结果展示

右键代表输出的绿色点,点击结果预览即可看到本次程序的运行结果 Alt text

WordCount

  • Alt text

FileSplit

  • Alt text
  • Alt text

二、注意事项

  1. 所执行的程序必须有明确的输入或输出,且在运行中无任何形式的交互 -- 例如按下回车或是输入y/n
  2. 打包时一定要包含所有的依赖包
  3. 如果选择将结果输出至屏幕,则所有信息无法复原且无法传递,因此推荐输出至文件
  4. 程序在上传前必须压缩为zip格式的压缩包,注意必须要保证程序的可执行包等包含在一个文件夹中,再对该文件夹进行压缩。具体目录结构可以参照本文给出的三个压缩包
  5. 如果程序需要访问外部内容,则必须先通过ETL化为内部文件方可使用(目前EML还未添加ETL功能)