# Project 3

### Paco, Dera, Natasha

## Pipeline Workflow

0. Spin up docker instance based on YML file configuration
    * Command: docker-compose up -d
    * Output: 
        Starting pacomiguelagv_zookeeper_1 ... done
        Starting pacomiguelagv_mids_1      ... done
        Starting pacomiguelagv_presto_1    ... done
        Starting pacomiguelagv_cloudera_1  ... done
        Starting pacomiguelagv_spark_1     ... done
        Starting pacomiguelagv_kafka_1     ... done



1. Create Kafka topic "events", input description, and run game_api.py file 
    * Script: sh step1_spinup.sh
    * Commands: 
        echo "Creating topic events..."
        docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
        echo "Topic events description:"
        docker-compose exec kafka kafka-topics --describe --topic events --zookeeper zookeeper:32181
        echo "Spinning up game game_api.py..."
        docker-compose exec mids env FLASK_APP= /w205/w205_project3_Aguirre_Flowers_Mojekwu/pacomiguelagv/game_api.py flask run --host 0.0.0.0
    * Output:
Creating topic events...
Topic events description:
Topic: events   PartitionCount: 1       ReplicationFactor: 1    Configs: 
        Topic: events   Partition: 0    Leader: 1       Replicas: 1     Isr: 1
Spinning up game game_api.py...
 * Serving Flask app "game_api"
 * Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)

2. Launch a Kafka consumer for the topic "events"
    * Script: step2_consumer.sh
    * Commands: 
        echo "Launching Kafka Consumer..."
        docker-compose exec mids kafkacat -C -b kafka:29092 -t events -o beginning
    * Output: (none until messages are produced, see step 3)

3. Produce random events using Apache Bench every 3 seconds
    * Script: step3_producemessages.sh
    * Commands:
        watch -n 3 ./benchevents.sh
        (see benchevents.sh for details)
    * Output: 
        This is ApacheBench, Version 2.3 <$Revision: 1706008 $>0
        Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
        Licensed to The Apache Software Foundation, http://www.apache.org/
        Benchmarking localhost (be patient).....done[
        Server Software:        Werkzeug/0.14.1
        Server Hostname:        localhost
        Server Port:            5000
        Document Path:          /purchase_a_sword?id=3
        Document Length:        51 bytes
        Concurrency Level:      1
        Time taken for tests:   0.013 seconds
        Complete requests:      1
        Failed requests:        0
        Total transferred:      206 bytes
        HTML transferred:       51 bytes
        Requests per second:    77.51 [#/sec] (mean))
        Time per request:       12.901 [ms] (mean)
        Time per request:       12.901 [ms] (mean, across all concurrent requests) 
    
    * Output (in Kafka consumer terminal window):
        {"event_type": "purchase_health", "success": true, "Price": 20, "Accept": "*/*", "User-Agent": "ApacheBench/2.3", "Host": "user1.comcast.com", "health": 10, "id": 1, "size": "Medium Heart"}
        (etc)

4. Use Spark to read in stream from Kafka, transform messages, create tables, and land them in HDFS
    * Script: step4_write_hdfs_tables.sh
    * Commands: 
        echo "Launching write_hive_tables.py"
    docker-compose exec spark spark-submit /w205/w205_project3_Aguirre_Flowers_Mojekwu/pacomiguelagv/write_hive_tables.py
    (see write_hive_tables.py for details)
    * Output: 
     
         21/04/11 03:07:54 INFO StreamExecution: Streaming query made progress: {
  "id" : "0107bd3c-002a-4b37-92b6-ceb5a12dd23c",
  "runId" : "5a485a23-8398-4be7-b11a-ba281b0995f7",
  "name" : null,
  "timestamp" : "2021-04-11T03:07:54.000Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 2,
    "triggerExecution" : 2
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[events]]",
    "startOffset" : {
      "events" : {
        "0" : 297
      }
    },
    "endOffset" : {
      "events" : {
        "0" : 297
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "FileSink[/tmp/purchase_a_sword]"
  }
}
(etc)

    * Check for presence of files: docker-compose exec cloudera hadoop fs -ls /tmp/

5. Transform parquet files into dataframes, send them to Hive
    * Script: step5_make_hive_files.sh
    * Commands: 
        echo "Launching Hive_spark.py"
        watch -n 10 ./hdfs_to_hive_for_presto.sh
    * Output: 
        Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
        21/04/11 03:08:18 INFO SparkContext: Running Spark version 2.2.0
        21/04/11 03:08:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
        21/04/11 03:08:22 INFO SparkContext: Submitted application: ExtractEventsJob
    (etc)

6. Query with presto

    * Script: step6_launch_presto.py 
    * Command: docker-compose exec presto presto --server presto:8080 --catalog hive --schema default
    * Output: NA, see queries below

## Queries 

* Examine a full table to see layout
    * Query: select * from join_guild;
    * Output:

In [None]:
 accept |       host        |   user-agent    | event_type | guild_name 
--------+-------------------+-----------------+------------+------------
 */*    | user3.att.com     | ApacheBench/2.3 | join_guild | Gnomes     
 */*    | user3.att.com     | ApacheBench/2.3 | join_guild | Gnomes        
 */*    | user3.att.com     | ApacheBench/2.3 | join_guild | Horde      
 */*    | user3.att.com     | ApacheBench/2.3 | join_guild | Horde   

* Count number of IDs present for each guild
    * Query: select guild_name,  count(id) as num_ids from join_guild group by guild_name;

In [None]:
 guild_name | num_ids 
------------+---------
 Gods       |      24 
 Gnomes     |      27 
 Horde      |      25

 * Get average sword speed by purchase price
     * Query: select price, avg(speed) as avg_speed from purchase_a_sword group by price;

In [None]:
 price | avg_speed 
-------+-----------
   100 |       4.0 
    50 |       1.0 
    70 |       2.0 
    20 |       5.0 

* Get average health by id
    * select id, avg(health) as avg_health from purchase_health group by id;

In [None]:
 id | avg_health 
----+------------
  2 |       20.0 
  1 |       10.0 
  0 |        5.0 