<img src="images/aiven-logo.png" alt="Aiven logo" style="width:200px;height:100px;">


<h1><font color="purple">Lab Description</font></h1>
The purpose of the this lab is to walk a user through building out an pipeline using the aiven platform.
<p>The aiven platform will be used to create and deploy the following services on the Aiven platform using the console:
<lu>
<li><b>Kafka</b> - used for event streaming platform</li>
<li><b>Flink</b> - stream-processing and batch-processing</li>
<li><b>M3</b> - used for storing realtime metrics at long retention</li>
<li><b>Grafana</b> - used to analyze and visualize data</li>
</lu>
<p><h3> Lab format</h3>
<br>The lab is broken into 4 sections. However, sections 2, 3 and 4 will be exclusively conducted within aiven console.  Each section will need to be completed in order to have a complete pipeline with observibility visualizations.  
<img src="images/application-flow.png" alt="Application Pipeline" style="width:6000px;height:100px;">  
<br> All services are part of the Aiven platform.  
<p><b><font color="darkblue">Requirements:</font></b>
<br><font color="purple">To complete this lab you will need:
    <lu>
<li>An Aiven account, see <a href="https://aiven.io/community/forum/t/creating-an-aiven-account/687"> “Creating an Aiven account”</a></li>
<li>Access to a linux terminal window(or macOS terminal). </li>
<li>A system running jupyter lab notebook. The below coding examples will be cut and paste into the notebook being created.</li>
<li>Basic understanding of python coding </li>
</lu>
   
    



<hr>
<h1>Step 1: Aiven Kafka </h1>
<p><b>Section Description</b>:
<br>As part of this section we will create a Kafka service into Aiven and write a piece of code that produces valid JSON data to a topic.
Our topic will be titled: financial_transactions
The key will be a valid JSON string containing a random id, e.g. UUID, and the message payload should be a valid JSON object. The payload will be a mock "event" from <i>financial transactions</i>. The event includes a timestamp represented by a string with the date in ISO 8601 format.

<hr>
<h3>Step 1.1: Create a Kafka Service on Aiven</h3>

<br>1. Sign in to your Aiven account or create a new one if you haven't already.
<br>2. Navigate to the Aiven console and click on "Apache Kafka" in section "Create new service."
<br>3. Select "Apache Kafka" as the service type.
<br>4. Choose your preferred cloud provider, region, and plan.
<br>5. Configure the settings such as service name, project, and authentication method (e.g., username/password or certificate).
<br>6. Review and confirm your configuration, then click "Create service."
<hr>

<h3>Step 1.2: Create a kafka topic in our cluster</h3>
<b>Section Description</b>


We will be creating a topic within our Aiven Kafka cluster.  After which, we will use a python script to produce some data to the topic. 
<br>1. To create topic, from within your project click on the kafka service that was recently created. 
br
<br>2. From the kafka service overview (path should be similar to: 
         My Organization / carlsongould-2cdd /sa-kafka-homework / Overview
<br>3. On the left side of the overview page, click: “Topic”
<br>4. Enter ‘financial_transactions’
<br>5. Click ‘Create topic’





<hr>
<h3>Step 1.3: Create a producer and events python script </h3>
<b>Section Description</b>:
<br>The script we are creating will produce events and send them to our topic in our kafka service.  
<br>The script will create event messages that contain the following data in JSON format:


<p>Within the jupyter notebook, the following dependencies will be installed for this lab:
<br>Install Required Dependency: 
<lu>
    <li>kafka-python</li>
    <li>aiven-client</li>
        <li>faker (to create fake data)</li>

<p>Please note that the python script listed below can be saved locally OR placed within the notebook.  It can be repeatedly run as part of the process.
     <hr>

In [1]:
#installing dependencies
!pip install kafka-python



In [2]:
!pip install aiven-client



In [3]:
!pip install faker



In [10]:
import json
import random
import string
import uuid
from datetime import datetime

from faker import Faker
from kafka import KafkaProducer

fake = Faker()
topic_name = 'financial_transactions'

producer = KafkaProducer(
 bootstrap_servers='sa-kafka-homework-carlsongould-2cdd.l.aivencloud.com:10358',
 security_protocol="SSL",
 ssl_cafile="./ca.pem",
 ssl_certfile="./service.cert",
 ssl_keyfile="./service.key",
 value_serializer=lambda v: json.dumps(v).encode('ascii'),
 key_serializer=lambda v: json.dumps(v).encode('ascii')
)

##########################################
# Create fake "financial" data
##########################################
def generate_random_id():
    return str(uuid.uuid4())

def generate_random_account():
    return ''.join(random.choices(string.digits, k=10))

def generate_random_amount():
    return round(random.uniform(100, 500), 2)

def generate_random_timestamp():
    return datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')


def generate_event():
    event_id = generate_random_id() #payment id
    event_timestamp = generate_random_timestamp() #payment timestamp
    event_account_number = generate_random_account() #payees payment account
    event_account_holder = fake.name()#Payees name
    event_amount = generate_random_amount() #payment amount
    event_payment_details = str(fake.credit_card_full())
    event_url = str(fake.url())

    event_data = {
        'id': event_id,
        'payment_timestamp': event_timestamp,
        'account_number': event_account_number.replace("\n", " "),
        'account_holder': event_account_holder.replace("\n", " "),
        'payment_amount': event_amount        
    }
    return event_data

def delivery_report(err, msg):
    if err is not None:
        print("Message delivery failed: {err}")
    else:
        print("Message delivered to topic {msg.topic()}")
##############
#Send event with key and value
##############
def produce_event():
    event = generate_event()
    key = {'id': event['id']}
    value = event
    print( 'Sending Event Key: ', key, '\nSending Event payload:',event )
    producer.send(topic_name, key=key, value=value)
    producer.flush()
    return value 

if __name__ == '__main__':
    produce_event()



Sending Event Key:  {'id': '2895cb99-4923-4746-8a06-da2643c3e7fd'} 
Sending Event payload: {'id': '2895cb99-4923-4746-8a06-da2643c3e7fd', 'payment_timestamp': '2024-04-29 13:36:36', 'account_number': '0708853976', 'account_holder': 'Pamela Hunter', 'payment_amount': 406.62}


<hr><h3>Step 1.4: Validating message in kafka </h3>
<b>Section Description</b>:
<br>To validate that the data is being received within kafka we must fetch the messages from within the service
We will then validate that the service is receiving the messages. 
From the kafka service overview click:
</lu> <li>Topics => Topic Name (financial_transactions)=> Fetch Messages.</li>
<li>Change “FORMAT” to json</li>
<li>Change “TIMEOUT (S)”  to 10 and click ‘FETCH MESSAGES’</li>
<p> The following data will be displayed:
    <img src="images/kafka.png" alt="kafka queue" style="width:850px;height:300px;">


<hr>
<h1>Step 2: Aiven Flink </h1>
<b>Section Description:</b>
<p>Once data flowing into the kafka topic we will be integrate your Kafka service with Aiven Flink as part of this step. <b>Note</b>: This section will be conducted within the Aiven console with the following queries. The queries are specific to the applications being made.

<p>We will use Flink to filter the data into 2 new topics in kafka. 

<p>We will  then split our financial_transactions data into odd/even topics based on whether the seconds for a given transaction are even or odd.  Then output the data to the sink kafka topic.

<p>The goal here is to create 2 Flink jobs to split the data to show the power of Flink filtering.

<hr>

<h3>Step 2.1: Aiven Flink application source query </h3>
<b>Section Description:</b>
<p>The following query will be utilized as the source table created within Flink for both applications. 

<hr>
<h3>Step 2.2: Aiven Flink application queries odd only </h3>
<b>Section Description:</b>
<br>The following query will be utilized as the source table created within Flink for the odd application only. 


<rh>
<h3>Step 2.3: Aiven Flink application queries odd </h3>
<b>Section Description:</b>
<p>The following query will be utilized as to transform and move data specific to odd seconds within the time field to the sink. 

<hr><h3>Step 2.4: Aiven Flink application queries even only </h3>
<b>Section Description:</b>
<p>The following query will be utilized as the source table created within Flink for the even application only. 

<rh>
<h3>Step 2.5: Aiven Flink application queries even </h3>
<b>Section Description:</b>
<p>The following query will be utilized as to transform and move data specific to even seconds within the time field to the sink. 

<hr>
<h1>Step 3: Aiven M3db </h1>
<b>Section Description:</b>
<p>A walk through will be conducted as part of this step. Customers will be showed how to integrate the kafka service with M3db.
m3db will be used to capture and keep the historical performance data from our kafka service.
<hr>

<hr>
<h1>Step 4: Aiven Grafana </h1>
<b>Section Description:</b>
<p>A walk through will be conducted as part of this step. Customers will be showed how to integrate the m3db service with Grafana for visualizations.

<hr>