-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Closed
Description
Steps
Step 1 Create Kinesis Streams
import boto3
def create_kinesis_stream(stream_name, shard_count):
try:
# Initialize the Kinesis client
kinesis_client = boto3.client('kinesis')
# Create the Kinesis stream
response = kinesis_client.create_stream(
StreamName=stream_name,
ShardCount=shard_count
)
# Check for successful response
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print(f"Kinesis stream '{stream_name}' created with {shard_count} shard(s)")
else:
print("Failed to create Kinesis stream")
except Exception as e:
print(f"Error: {str(e)}")
def delete_kinesis_stream(stream_name):
try:
# Initialize the Kinesis client
kinesis_client = boto3.client('kinesis')
# Delete the Kinesis stream
response = kinesis_client.delete_stream(
StreamName=stream_name
)
# Check for successful response
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print(f"Kinesis stream '{stream_name}' deleted successfully")
else:
print("Failed to delete Kinesis stream")
except Exception as e:
print(f"Error: {str(e)}")
create_kinesis_stream("stocks-stream", 1)
Step 2 : Publish some Dummy Data into Input Streams
import datetime
import json
import random
import boto3
STREAM_NAME = "stocks-stream"
def get_data():
return {
'event_time': datetime.datetime.now().isoformat(),
'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
'price': round(random.random() * 100, 2)
}
def generate(stream_name, kinesis_client, num_samples):
for _ in range(num_samples):
data = get_data()
print(data)
kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(data),
PartitionKey="partitionkey")
if __name__ == '__main__':
num_samples = 10 # Change this to the desired number of samples
generate(STREAM_NAME, boto3.client('kinesis'), num_samples)
Stop 3 Download and upload the JAR
import boto3
import requests
def download_and_upload_to_s3(url, bucket_name, s3_key):
# Download the JAR file
response = requests.get(url)
if response.status_code == 200:
jar_content = response.content
# Upload to S3
s3_client = boto3.client('s3')
s3_client.put_object(Bucket=bucket_name, Key=s3_key, Body=jar_content)
print(f"Uploaded {s3_key} to {bucket_name}")
else:
print(f"Failed to download {url}")
if __name__ == "__main__":
# URLs of the JAR files you want to download
jar_urls = [
"https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink1.15-bundle/0.13.1/hudi-flink1.15-bundle-0.13.1.jar",
"https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.15.0/flink-s3-fs-hadoop-1.15.0.jar",
]
# S3 bucket name and S3 keys (object keys) for uploaded JARs
bucket_name = 'sample-backup-us-west-1'
s3_keys = [
'hudi-flink1.15-bundle-0.13.1.jar',
'flink-s3-fs-hadoop-1.15.0.jar',
]
for i, jar_url in enumerate(jar_urls):
download_and_upload_to_s3(jar_url, bucket_name, s3_keys[i])
Flink code (Enter them in Zeppelin )
%flink.ssql(type=update)
DROP TABLE IF EXISTS stocks_stream_table;
CREATE TABLE stocks_stream_table (
uuid VARCHAR,
ticker VARCHAR(6),
price DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
PARTITIONED BY (ticker)
WITH (
'connector' = 'kinesis',
'stream' = 'stocks-stream',
'aws.region' = 'us-west-1',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)
%flink.ssql(type=update)
DROP TABLE IF EXISTS stock_table_hudi;
CREATE TABLE IF NOT EXISTS stock_table_hudi (
uuid VARCHAR PRIMARY KEY NOT ENFORCED,
ticker VARCHAR,
price DOUBLE,
event_time TIMESTAMP(3)
)
PARTITIONED BY (ticker)
WITH (
'connector' = 'hudi',
'path' = 's3a://sample-backup-us-west-1/tmp/',
'table.type' = 'MERGE_ON_READ',
'hoodie.embed.timeline.server' = 'false'
);
%ssql
INSERT INTO stock_table_hudi
SELECT uuid, ticker, price, event_time FROM stocks_stream_table;
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels

