# Project 3

## Report

### Terminal Commands

`cp ~/w205/course-content/12a/docker-compose.yml .`
Copying the docker-compose.yml file to this folder 


`docker-compose up -d`
Spinning up docker in the background


`docker-compose exec kafka kafka-topics --create --topic MessageHistory --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181`
Created MessageHistory topic to be used in this project. 


`docker-compose exec mids env FLASK_APP=/w205/project-3-Marcus-M1999/game_api.py flask run --host 0.0.0.0"`
Spinning up the flask server to take in the data sent to it with "GET" requests. Flask is an API that is used to build webservers, in this case it is used to create an instance of the host server to take in data being sent to it. 


`docker-compose exec mids kafkacat -C -b kafka:29092 -t MessageHistory -o beginning`
Starts Kafkacat listening to the MessageHistory topic, so I can see the what data is flowing through it. 

`Sh ab.sh`
This command runs the ab shell script below:



### ab.sh

In [2]:
#!/bin/bash
while true;
do
docker-compose exec mids ab -n 10 -H "Host: test_connection.game.com" http://localhost:5000/
docker-compose exec mids \
    ab -n 10 -H "Host: requester.game.com" \
      http://localhost:5000/request_group
  sleep 10;
  docker-compose exec mids \
    ab -n 10 -H "Host: acceptor.game.com" \
      http://localhost:5000/accept_member
  sleep 10;
done

SyntaxError: invalid syntax (<ipython-input-2-17eaa80d68eb>, line 2)

This script contains an infinite while loop used to generate test data using apache bench; a testing tool created by apache, with the data from the game_api.py file. The while ‘-n 10’ part means that it is sending 10 commands, and the -H sets the host to requester.game.com or acceptor.game.com. Finally there is a 10 second break between each request and acceptance. 

### write_requestor_stream

In [None]:
#!/usr/bin/env python
"""Extract events from kafka and write them to hdfs
"""
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType


def requestor_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
    ])


@udf('boolean')
def is_requestor_event(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if event['event_type'] == 'request_group':
        return True
    return False


def main():
    """main
    """
    spark = SparkSession \
        .builder \
        .appName("ExtractEventsJob") \
        .enableHiveSupport() \
        .getOrCreate()

    raw_events = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "MessageHistory") \
        .load()

    requestors = raw_events \
        .filter(is_requestor_event(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          requestor_event_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')

    spark.sql("drop table if exists requestors")
    sql_string = """
        create external table if not exists requestors (
            raw_event string,
            timestamp string,
            Accept string,
            Host string,
            `User-Agent` string,
            event_type string
            )
            stored as parquet
            location '/tmp/requestors'
            tblproperties ("parquet.compress"="SNAPPY")
            """
    spark.sql(sql_string)

    sink = requestors \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_requestors") \
        .option("path", "/tmp/requestors") \
        .trigger(processingTime="10 seconds") \
        .start()

    sink.awaitTermination()


if __name__ == "__main__":
    main()


The requestor_event_schema() defines the schema for the requestor event. In this case I did not change it from the schema originally in the file, but it is possible to add other fields concerning the metadata, or nested fields within the JSON data. <br>
The is_requestor_event() method takes in the event in the form of JSON data, and filters it out to see if it’s a ‘request_group’ event type in order to see if it belongs in this table or a different one. <br>
The main() method creates a spark session with hive support, and creates the raw_events which are the events received from subscribing to the MessageHistory topic on the Kafka server.<br>
The requestors object casts the raw_events as strings so it can be queryed in Presto. <br>
The spark.sql("drop table if exists requestors") deletes the table from hdfs if it already exists. This is done to avoid errors that could arise if data was already written to the table and then overwritten (assuming the table was already created). <br>
The sql_string portion of the file contains the query to create the requestors table, and store it in the parquet format in hdfs. <br>
The spark.sql(sql_string) runs the sql query, creating the new table. <br>
The sink object writes the new table to hdfs, and saves it under the /tmp/requestors directory. <br>
The processingTime=”10 seconds” waits 10 seconds before writing the data again. The 10 seconds was kept from the original file, and was not changed since it was an appropriate amount of time to wait while I was testing this pipeline. <br>
The sink.awaitanyTermination() method allows the sink to run continuously, until the keyboard interrupt (CTRL + C) is hit. <br>

### Terminal Commands

`docker-compose exec spark spark-submit /w205/project-3-Marcus-M1999/write_requestor_stream.py`
This command runs a spark script; write_requestor_stream.py,

`docker-compose exec spark spark-submit /w205/project-3-Marcus-M1999/write_acceptor_stream.py`
This commmand runs a similar script, but for the acceptor events rather than the requestor events. 

`docker-compose exec cloudera hadoop fs -ls /tmp/`
This checks the cloudera server to make sure that the data for both requestors and acceptors is present and the scripts are working as intended. 

`docker-compose exec presto presto --server presto:8080 --catalog hive --schema default`
This command begins to run presto, and uses hive to get the list of queryable tables in hdfs. 



### game_api.py

In [None]:
#!/usr/bin/env python
import json
from kafka import KafkaProducer
from flask import Flask, request

app = Flask(__name__)
producer = KafkaProducer(bootstrap_servers='kafka:29092')


def log_to_kafka(topic, event):
    event.update(request.headers)
    producer.send(topic, json.dumps(event).encode())


@app.route("/")
def default_response():
    default_event = {'event_type': 'default'}
    log_to_kafka('events', default_event)
    return "This is the default response!\n"


@app.route("/request_group")
def request_friend():
    add_event = {'event_type': 'request_group'}
    log_to_kafka('MessageHistory', add_event)
    return "Request Submitted!\n"

@app.route("/accept_member")
def accept_member():
    accept_event = {'event_type': 'accept_member'}
    log_to_kafka('MessageHistory', accept_event)
    return "Member Approved!\n"

@app.route("/decline_member")
def decline_member():
    accept_event = {'event_type': 'decline_member'}
    log_to_kafka('MessageHistory', decline_event)
    return "Member Declined!\n"

@app.route("/message")
def message():
    message_event = {'event_type': 'message'}
    log_to_kafka('MessageHistory', message_event)
    return "Message sent!\n"

This is the game api file, that contains the api data that would be received from the game/platform. 
Some of the commands in this file include a default event, request_group, and accept_member among others.
Each method adds the event to the kafka topic listed and returns a string to confirm it executed correctly. 

### docker-compose.yml

The docker-compose.yml file contains the configuration for docker to run off, and all of the services as well as ports needed to run the services. From top to bottom the services include:

##### 1.)  Zookeeper
Zookeeper is the service used to manage all other services in the container. The image line refers to the version of the service being used in the container. The client port refers to the port in use to communicate with Kafka. The exposed, or open ports are listed under "expose" (for additional connections in the future).

##### 2.) Kafka 
Kafka is the second service in the docker configuration file and has several new lines including depends_on. The depends_on keyword means that docker will first spin up Zookeeper and then Kafka since Kafka uses Zookeeper to manage all other services. Under the environment variables, `KAFKA_BROKER_ID` is assigned to 1, since it's the first and only node in the cluster. (The Broker ID identifies each broker in the cluster with a unique identifier, 1 in this case.) `KAFKA_ZOOKEEPER_CONNECT` displays the port that zookeeper is connected to. `KAFKA_ADVERTISED_LISTENERS` lists all listeners that the broker (Kafka node) will display to all producers and consumers of records in the pipeline. The port connected, `kafka:29092` refers to the Kafka port that Kafka connects to communicate with Zookeeper. `KAFKA_OFFSETS_TOPIC_REPLICATOIN_FACTOR` refers to the number of replications that occur with the data between multiple brokers. In this case, the replication factor is 1 since there is only 1 broker. 

##### 3.) Cloudera
Cloudera is a hosting service for Hadoop and contains a variety of additional tools on top of Apache's version of Hadoop (Apache is the creator of Hadoop). 

##### 4.) Spark
Spark is the service used to process messages and relay them to the hdfs nodes ran by Cloudera. `stdin_open` and `TTY keep the container running in a "detached mode" allowing the service to persist in an interactive virtual environment. The `volumes` connect the directory ~/w205/ to /w205, to read and write files in spark. The `environment lists the environment variables including the `Hadoops_NameNode` cluster that spark sends the processed data to. 

##### 5.) Presto
Presto is a querying tool used to pull data from big data stores such as Hadoop File System (hdfs). In this context Presto is connected to a Hive Thrift Server, or a server that Hive runs to track the available tables. This is how Presto can view the tables in hdfs. 

##### 6.) mids
mids is the final container in the docker-compose file that holds tools such as Kafkacat (a tool used to produce and consume messages with Kafka). 


### Queries

##### 1.) How many requests to join a group are there?
`SELECT COUNT(*) FROM requestors;`
This query shows the count of all requests to join a group. <br>
Output:


`
 _col0 
-------
    80 
(1 row)

`

##### 2.) How many of those requests were accepted?
`SELECT COUNT(*) FROM acceptor;`
This query is similar to the above one shows the count of all acceptances to join a group. <br>
Output:

`
 _col0 
-------
   180 
(1 row)

`
<br>
Note that the acceptances are higher than the requests because the script; writing_acceptors.py, was started later and ran longer than the writing_requestors.py script. 
<br>
##### 3.) Who were the hosts for the requestor events?
`SELECT host, event_type FROM requestors;`
In the testing data there was only one host, however, in a real world scenario the hosts would be different and could offer some insights to the players. <br>
Output:<br>
`        host        |  event_type   
--------------------+---------------
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
 requester.game.com | request_group 
`
