# Oetker Streaming Challenge

## Instructions

1. Run following command in KSQL prompt:
    
    ```RUN SCRIPT 'confluentConsumer.ksql';```
    

2. To view results of windowed unique user aggregation in KSQL type the following:

    ```SELECT ROWKEY, UNIQUE_USERS FROM UNIQUE_USERS;```
    To stop type:`Ctrl-C:`
    
    
3. View analysis in the Control [Console](http://localhost:9021/)


4. Navigate to Development-KSQL-Query Editor


5. To view results of non windowed country counts type the following:

    ```SELECT ROWTIME, COUNTRY, COUNTRYCOUNT FROM COUNTRY_AGG;```


6. Run the script below under the Producer Heading to begin the Kafka Stream


7. Once the stream has ended stop both queries and exit KSQL using the below command:

    ```exit```


8. End Services by running the following script:
    
    ```sudo ~/Documents/Oetker/./confluentStop```
    


## File Explanations

### confluentStart

    * Install Environment Dependencies
    * Install confluent Control Center
    * Start Confluent Services
    * Install confluent-kafka package
    * Create Kafka Topic: `Oetker`
    * Start KSQL Client
    
### confluenConsumer

    * USER:                Stream Created from Kafka Topic 
    * USER_INFO:           Stream Created from USER Topic
    * UNIQUE_USERS:        Table Created from USER_INFO
    * COUNTRY_AGG:         Table Created from USER Topic

## Confluent Justifications/Limitations

### Justifications

    * Easy integration with kafka streaming
    * UI interface for monitoring stream performance and visualizing stream analytics
    * KSQL integrates well with JSON kafka stream
    * Packaged Kafka Zookeeper KSQL solution

### Limitations

    * KSQL doesn't allow for aggregations of aggregated table data.
    * Not able to select MAX, MIN from COUNT(*) for country
    * Limited functionality in syncing queries with data streams
    * Still in development/beta phase
    

## Producer

In [1]:
from confluent_kafka import Producer
import json, time

In [2]:
def openSource():
    try:
        with open('Data/MOCK_DATA.json', 'r') as f:
            mock = json.load(f)
        return mock
    except:
        return False

In [3]:
producer = Producer({'bootstrap.servers': 'localhost:9092', 'plugin.library.paths': 'monitoring-interceptor'})

In [4]:
def Produce():
    mock = openSource()
    if not mock:
        print('Invalid stream file')
        return None

    else:
        for line in mock:
            producer.poll(0)
            producer.produce('oetker', json.dumps(line).encode('utf-8'))
            time.sleep(0.1)
    producer.flush()

In [5]:
Produce()