In [0]:
!pip install findspark
!pip install datetime
!pip install confluent_kafka
!pip install time

In [0]:
def error_cb(err):
    """ The error callback is used for generic client errors. These
        errors are generally to be considered informational as the client will
        automatically try to recover from all errors, and no extra action
        is typically required by the application.
        For this example however, we terminate the application if the client
        is unable to connect to any broker (_ALL_BROKERS_DOWN) and on
        authentication errors (_AUTHENTICATION). """

    print("Client error: {}".format(err))
    if err.code() == KafkaError._ALL_BROKERS_DOWN or \
       err.code() == KafkaError._AUTHENTICATION:
        # Any exception raised from this callback will be re-raised from the
        # triggering flush() or poll() call.
        raise KafkaException(err)

In [0]:
from confluent_kafka import Consumer
from time import sleep
import uuid
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
import json


confluentClusterName = "stage3talent"
confluentBootstrapServers = "pkc-ldvmy.centralus.azure.confluent.cloud:9092"
confluentTopicName = "group2_pipelinerun"
schemaRegistryUrl = "https://psrc-gq7pv.westus2.azure.confluent.cloud"
confluentApiKey = "YHMHG7E54LJA55XZ"
confluentSecret = "/XYn+w3gHGMqpe9l0TWvA9FznMYNln2STI+dytyPqtZ9QktH0TbGXUqepEsJ/nR0"
confluentRegistryApiKey = "YHMHG7E54LJA55XZ"
confluentRegistrySecret = "/XYn+w3gHGMqpe9l0TWvA9FznMYNln2STI+dytyPqtZ9QktH0TbGXUqepEsJ/nR0"


#Kakfa Class Setup.
c = Consumer({
    'bootstrap.servers': confluentBootstrapServers,
    'sasl.mechanism': 'PLAIN',
    'security.protocol': 'SASL_SSL',
    'sasl.username': confluentApiKey,
    'sasl.password': confluentSecret,
    'group.id': str(uuid.uuid1()),  # this will create a new consumer group on each invocation.
    'auto.offset.reset': 'earliest',
    'error_cb': error_cb,
})

c.subscribe(['group2_pipelinerun'])

In [0]:

kafkaListDictionaries = []

while(True):
    sleep(0.2)
    try:
        
        msg = c.poll(timeout=1.0)
        if msg is None:
            break
        elif msg.error():
            print("Consumer error: {}".format(msg.error()))
            break
        else:
            df_dict = json.loads('{}'.format(msg.value().decode('utf-8')))
            df_dict['timestamp'] = msg.timestamp()[1] 
            kafkaListDictionaries.append(df_dict)
            
    except Exception as e:
        print(e)

    for message in kafkaListDictionaries:
        print(message)

In [0]:
len(kafkaListDictionaries)

In [0]:
# create mount point to send data to data lake
storageAccount = "gen10dbcdatalake"
storageContainer = "group2-capstone"
clientSecret = "~bJ7Q~KslVT~sAmHkOLXL0oeTp1ZkAcndtHPr"
clientid = "2ca50102-5717-4373-b796-39d06568588d"
mount_point = "/mnt/datalake" 


configs = {"fs.azure.account.auth.type": "OAuth",
       "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
       "fs.azure.account.oauth2.client.id": clientid,
       "fs.azure.account.oauth2.client.secret": clientSecret,
       "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/d46b54b2-a652-420b-aa5a-2ef7f8fc706e/oauth2/token",
       "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

try: 
    dbutils.fs.unmount(mount_point)
except:
    pass

dbutils.fs.mount(
source = "abfss://"+storageContainer+"@"+storageAccount+".dfs.core.windows.net/",
mount_point = mount_point,
extra_configs = configs)

In [0]:
# get date for file name
import findspark
from pyspark.sql import Row
from pyspark import SparkContext , SparkConf
import datetime
now = datetime.datetime.now()
date = now.strftime("%Y-%m-%d")
date

In [0]:
# convert data to data types
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType, LongType, DoubleType

mySchema = StructType([ StructField("Date", StringType(), True)\
                       ,StructField("Close", FloatType(), True)\
                       ,StructField('Open', FloatType(), True)\
                       ,StructField('High', FloatType(), True)\
                       ,StructField('Low', FloatType(), True)\
                       ,StructField('Volume', DoubleType(), True)\
                       ,StructField('Perc', FloatType(), True)\
                       ,StructField('Ticker', StringType(), True)\
                       ,StructField('SP_Open', FloatType(), True)\
                       ,StructField('SP_Close', FloatType(), True)\
                       ,StructField('SP_High', FloatType(), True)\
                       ,StructField('SP_Low', FloatType(), True)\
                       ,StructField('SP_perc', FloatType(), True)\
                       ,StructField('SP_Ticker', StringType(), True)\
                       ,StructField('timestamp', LongType(), False) ])

sparkDF = spark.createDataFrame(kafkaListDictionaries,schema=mySchema)

In [0]:
# save data to pipeline folder
sparkDF.write.csv('/mnt/datalake/pipeline/'+date+'/data.csv', header = True)