# Kafka Producer 
----------------
Handles Collection of Data From Alpha Advantage API. 

Part A of datafactory pipeline. Bad or missing data will result in no new data being pushed to consumer, thus no data pushed to our shared SQL database.

## Module Imports

In [0]:
from time import sleep
import uuid
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
from confluent_kafka.admin import AdminClient, NewTopic
import requests
import json
import pandas as pd
import datetime
import numpy as np

## Client and Mount Point

In [0]:
# Insert clientSecret and clientid if using a Azure Storage Account
# get secret keys
clientSecret = dbutils.secrets.get(scope = 'sfb_blob', key = 'bowser') #"Insert Client Secret Here"
clientid = dbutils.secrets.get(scope = 'sfb_blob', key = 'yoshi') #"Insert Client ID Here"

In [0]:
# Storage area, location we will access/stream data
storageAccount = "gen10datafund2111"
storageContainer = "superfinancebros"
mount_point = "/mnt/superfinancebros/capstone"

# Configuration for mount point
configs = {"fs.azure.account.auth.type": "OAuth",
       "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
       "fs.azure.account.oauth2.client.id": clientid,
       "fs.azure.account.oauth2.client.secret": clientSecret,
       "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/d46b54b2-a652-420b-aa5a-2ef7f8fc706e/oauth2/token", # check if this needs to be hidden
       "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

try: 
    dbutils.fs.unmount(mount_point)
except:
    pass

dbutils.fs.mount(
source = "abfss://"+storageContainer+"@"+storageAccount+".dfs.core.windows.net/",
mount_point = mount_point,
extra_configs = configs)

## Error Handling Functions

In [0]:
# Used for error-handling
def error_cb(err):
    """ The error callback is used for generic client errors. These
        errors are generally to be considered informational as the client will
        automatically try to recover from all errors, and no extra action
        is typically required by the application.
        For this example however, we terminate the application if the client
        is unable to connect to any broker (_ALL_BROKERS_DOWN) and on
        authentication errors (_AUTHENTICATION). """

    print("Client error: {}".format(err))
    if err.code() == KafkaError._ALL_BROKERS_DOWN or \
       err.code() == KafkaError._AUTHENTICATION:
        # Any exception raised from this callback will be re-raised from the
        # triggering flush() or poll() call.
        raise KafkaException(err)


def acked(err, msg):
    """ 
        Error callback is used for generic issues for producer errors. 
        
        Parameters:
            err (err): Error flag.
            msg (str): Error message that was part of the callback.
    """
    if err is not None:
        print(f"Failed to deliver message: {msg}: {err}")
    else:
        print(f"Message produced: {msg}")

## Producer - Setup

In [0]:
# These will need to be hidden via secrets and documentation made on them. Structured scope/key/usage document
confluentApiKey = dbutils.secrets.get(scope = 'sfb_blob', key = 'mario') #Insert Confluent API Key Here
confluentSecret = dbutils.secrets.get(scope = 'sfb_blob', key = 'fawful') # Insert Confluent Secret Here
confluentRegistryApiKey = dbutils.secrets.get(scope = 'sfb_blob', key = 'luigi') # Insert Confluent Registry API Key Here
confluentRegistrySecret = dbutils.secrets.get(scope = 'sfb_blob', key = 'peach') # Insert Confluent Registry Secret Here

In [0]:
confluentClusterName = "stage3talent"
confluentBootstrapServers = "pkc-ldvmy.centralus.azure.confluent.cloud:9092"
confluentTopicName = "sfb-blob"
schemaRegistryUrl = "https://psrc-gq7pv.westus2.azure.confluent.cloud"

# Kafka Class Setup
p = Producer({
    'bootstrap.servers': confluentBootstrapServers,
    'sasl.mechanism': 'PLAIN',
    'security.protocol': 'SASL_SSL',
    'sasl.username': confluentApiKey,
    'sasl.password': confluentSecret,
    'group.id': str(uuid.uuid1()),
    'auto.offset.reset': 'earliest',
    'error_cb': error_cb,
})

admin_client = AdminClient({
    'bootstrap.servers': confluentBootstrapServers,
    'sasl.mechanism': 'PLAIN',
    'security.protocol': 'SASL_SSL',
    'sasl.username': confluentApiKey,
    'sasl.password': confluentSecret,
    'group.id': str(uuid.uuid1()),
    'auto.offset.reset': 'earliest',
    'error_cb': error_cb,
})

topic_list = []

topic_list.append(NewTopic(confluentTopicName, 1, 3))
admin_client.create_topics(topic_list)

## API Data Collection

### Stock Index Monthly Prices

In [0]:
# Calls API and sends Kafka meesages for each ETF stock. The JSON objects with contain the closing prices on a daily time scale.

api_key = dbutils.secrets.get(scope = 'sfb_blob', key = 'waluigi')

ETF_sectors = ['VTI','VGT','VIS','VHT','VFH','VCR']

# Retrieves all desired stock information for daily_time_series. Sleeps every 12 seconds to meet 5 API calls per minute requirement. 
try:
    # Monthly data
    for stock in ETF_sectors: 
        url = f'https://www.alphavantage.co/query?function=TIME_SERIES_MONTHLY&symbol={stock}&outputsize=full&datatype=json&apikey={api_key}'
        data= json.loads(requests.get(url).text)
        p.produce(confluentTopicName,json.dumps(data))
        p.flush()
        sleep(12)
        
# Should raise an error if total number of API calls reached or if the URL is invalid (i.e. API key invalid)
except Exception as e:
    print(e)

### Feature Data Collection for Stock Price Analysis

In [0]:
economic_indicators=['REAL_GDP','TREASURY_YIELD','CPI','INFLATION_EXPECTATION','CONSUMER_SENTIMENT','RETAIL_SALES','UNEMPLOYMENT']

try: # Should fail if API daily call limit is reached
    data_store = []

    # Creates list of dictionaries for each economic feature of interest.
    def data_storage(url):
        data_store.append(json.loads(requests.get(url).text))
        sleep(12) 

    for indicator in economic_indicators:
        if (indicator == 'TREASURY_YIELD'):
            url = f'https://www.alphavantage.co/query?function={indicator}&interval=monthly&datatype=json&apikey={api_key}'
            data_storage(url) # Makes it so only 5 calls can be made per minute
        elif (indicator == 'REAL_GDP'):
            url = f'https://www.alphavantage.co/query?function={indicator}&interval=quarterlydatatype=json&apikey={api_key}'
            data_storage(url) 
        else:
            url = f'https://www.alphavantage.co/query?function={indicator}&datatype=json&apikey={api_key}'
            data_storage(url)

except Exception as e:
    print(e)

## Producer Produce and Flush

In [0]:
try: # Should fail if data_store list is empty (above code fails)
    for index in range(0,len(data_store)):
        # Overwrites preexisting files to include most recent data on economic features
        # This automates updating these files when monthly changes occur
        pd.DataFrame(data_store[index]).to_json(f"/dbfs{mount_point}/Economic_Features/{data_store[index]['name']}.json",orient='records')
        p.produce(confluentTopicName,json.dumps(data_store[index])) # Sends JSON object of each economic feature stored in data_store.
        p.flush()
except Exception as e:
    print(e)