# Real Time Streaming Process
本案例由python脚本产生网站日志，通过flume和kafka将数据传递给Spark Streaming，通过计算后写入HBase。  

1.设置好KafkaUtils.createStream所需参数  
2.日志的信息提取和转换，创建一个case class来存储每条信息  
3.数据统计并写入数据库  

注意：  
（1）incrementColumnValue并非一个好的写入方式，应改用批量put  
（2）实际生产中可能更多用KafkaUtils.createDirectStream，但Spark代码的变化不大。

In [None]:
if (args.length != 4) {
  System.err.println("Usage: FlumePushWC <zkQuorum> <group> <topics> <numTreads>")
  System.exit(1)
}

val Array(zkQuorum, group, topics, numTreads) = args

val conf = new SparkConf() //.setMaster("local[2]").setAppName("streamingAPP")

val ssc = new StreamingContext(conf, Seconds(60))

//topicMap能映射每个topics给相应的线程数
val topicMap = topics.split(",").map((_, numTreads.toInt)).toMap

val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
//kafkaStream.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()

//kafkaStream的第二位为信息本体
//143.132.29.124	2018-08-23 12:22:00	"GET /class/116.html HTTP/1.1"	200	http://www.sogou.com/web?query=Hadoop基础
val cleanData = kafkaStream.map(_._2)
  .map(line => {
    val info = line.split("\t")

    val url = info(2).split(" ")(1)
    var courseId = 0

    if (url.startsWith("/class")) {
      val courseIdHTML = url.split("/")(2)
      //此处不能加val，否则就是创建新值，无法对if外部的courseId作修改
      courseId = courseIdHTML.substring(0, courseIdHTML.lastIndexOf(".")).toInt
    }

    //存储信息的case class
    ClickLog(info(0), DateUtils.parseToMinute(info(1)), courseId, info(3).toInt, info(4))
  }).filter(clicklog => clicklog.courseId != 0)

// cleanData.print()

//先生成rowkey和相应的value，然后聚合，再将数据写入Hbase。
//CourseClickCount为case class
//CourseClickCoutDAO看下下一个代码块
cleanData.map(x => {
  (x.time.substring(0, 8) + "_" + x.courseId, 1)
})
  .reduceByKey(_+_)
  .foreachRDD(rdd => {
  rdd.foreachPartition(partitionRecords => {
    val list = new ListBuffer[CourseClickCount]
    partitionRecords.foreach(pair => {
      list.append(CourseClickCount(pair._1, pair._2))
    })
    CourseClickCoutDAO.save(list)
  })
})

cleanData.map(x => {
  //https://cn.bing.com/search?q=Spark
  val refer = x.ref.replaceAll("//", "/")
  val splits = refer.split("/")
  var host = ""

  if (splits.length > 2) {
    host = splits(1)
  }
  (host, x.courseId, x.time)
}).filter(_._1 != "")
  .map(x => (x._3.substring(0, 8) + "_" + x._1 + "_" + x._2, 1))
  .reduceByKey(_+_).foreachRDD(rdd => {
  rdd.foreachPartition(partitionRecords => {
    val list = new ListBuffer[CourseSearchClickCount]
    partitionRecords.foreach(pair => {
      list.append(CourseSearchClickCount(pair._1, pair._2))
    })
    CourseSearchClickCoutDAO.save(list)
  })
})

ssc.start()
ssc.awaitTermination()

In [None]:
//解析时间的工具类
object DateUtils {

  val YYYYMMDDHHMMSS_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
  val TARGE_FORMAT = FastDateFormat.getInstance("yyyyMMddHHmmss")

  def getTime(time: String) = {
    YYYYMMDDHHMMSS_FORMAT.parse(time).getTime
  }

  def parseToMinute(time: String) = {
    TARGE_FORMAT.format(new Date(getTime(time)))
  }
}

In [None]:
//此单例类包括save和query方法。
object CourseClickCoutDAO {
  private val tableName = "course_clickcount"
  private val columnfamily = "info"
  private val qualifer = "click_count"

  def save(list: ListBuffer[CourseClickCount]): Unit ={
    val table = HBaseUtils.getInstance().getTable(tableName)

    for (elem <- list) {
      //此处改为批量put效率更好
      table.incrementColumnValue(Bytes.toBytes(elem.day_course),
        Bytes.toBytes(columnfamily),
        Bytes.toBytes(qualifer),
        elem.click_count)
    }
  }

  def queryByKeyRow(day_couse: String): Long ={
    val table = HBaseUtils.getInstance().getTable(tableName)
    val get = new Get(Bytes.toBytes(day_couse))
    val value = table.get(get).getValue(columnfamily.getBytes(), qualifer.getBytes())

    if (value == null) {
      0l
    } else {
      Bytes.toLong(value)
    }
  }

  def main(args: Array[String]): Unit = {

    val list = new ListBuffer[CourseClickCount]
    list.append(CourseClickCount("20171111_8", 8))
    list.append(CourseClickCount("20171111_9", 9))
    list.append(CourseClickCount("20171111_1", 100))

    save(list)
    println(queryByKeyRow("20171111_8") + ":" +
      queryByKeyRow("20171111_9") + ":" +
      queryByKeyRow("20171111_1"))
  }
}

In [None]:
//下面为Java实现的Hbase工作类模版
public class HBaseUtils {

    HBaseAdmin admin = null;
    Configuration conf = null;

    private HBaseUtils() {
        conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "localhost:2181");
        conf.set("hbase.rootdir", "hdfs://localhost:8020/hbase");

        try {
            admin = new HBaseAdmin(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static HBaseUtils instance = null;
    
    //利用getInstance来实例化HBaseUtils，加上synchronized来保证本类只有一个实例化对象
    public static synchronized HBaseUtils getInstance() {
        if (null == instance) {
            instance = new HBaseUtils();
        }
        return instance;
    }

    //取得Hbase的表格
    public HTable getTable(String tableName) {
        HTable table = null;
        try {
            table = new HTable(conf, tableName);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return table;
    }

    //把新值新增/累加到Hbase中
    private void put(String tableName, String rowkey, String columnfamily, String column, String value) {
        HTable table = getTable(tableName);
        Put put = new Put(Bytes.toBytes(rowkey));
        put.add(Bytes.toBytes(columnfamily), Bytes.toBytes(column), Bytes.toBytes(value));

        try {
            table.put(put);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}