Skip to content

Commit

Permalink
[pinpoint-apm#11047] Add multi-table script for inspectorStatAgent table
Browse files Browse the repository at this point in the history
  • Loading branch information
donghun-cho committed May 21, 2024
1 parent e561d31 commit a90af71
Show file tree
Hide file tree
Showing 8 changed files with 321 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# multi-table.sh

This script adds schemas, real-time tables, and offline tables to a Pinot cluster.

### Steps

1. **Edit Template Files**
If needed, modify the Kafka broker list in `streamConfigs:stream.kafka.broker.list` within `template_REALTIME.json`. You can also make additional changes to other template files as necessary.
2. **Run Script**
Run `multi-table.sh` to create configuration files and add tables to the Pinot cluster. The script takes `START_INDEX`, `END_INDEX`, and `CONTROLLER_ADDRESS` as arguments.

**Note**:
- If there is no Kafka topic available for a real-time table, the table will not be added to the Pinot cluster.
- the script generates configuration files by substituting `${NUM}` from template files and sends HTTP requests to the Pinot cluster.


### Arguments
- `START_INDEX`: The start index for table creation. Must be less than or equal to `END_INDEX`.
- `END_INDEX`: The end index for table creation. Must be greater than or equal to `START_INDEX`.
- `CONTROLLER_ADDRESS`: The address of the Pinot cluster controller.

**Note**: Both `START_INDEX` and `END_INDEX` must be less than 3 digits (i.e., from 0 to 99).


## Example Commands
Command for adding 1 table (inspectorStatAgent00)
~~~
$ multi-table.sh 0 0 http:/localhost:9000
~~~

Command for adding 3 table (inspectorStatAgent01 to inspectorStatAgent03)
~~~
$ multi-table.sh 1 3 http:/localhost:9000
~~~

Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#!/bin/bash

TEMPLATE_FILE_PREFIX="template"
SCHEMA_SUFFIX="_SCHEMA.json"
REALTIME_SUFFIX="_REALTIME.json"
OFFLINE_SUFFIX="_OFFLINE.json"

START_INDEX=$1
END_INDEX=$2
CONTROLLER_ADDRESS=$3

function usage() {
echo "Usage: $0 <START_INDEX> <END_INDEX> [CONTROLLER_ADDRESS]"
echo " START_INDEX and END_INDEX should be integers with less than three digits."
echo " template files should be in the same directory."
echo " Example: $0 0 4 http://localhost:9000"
}

if [ "$#" -lt 2 ]; then
usage
exit 1
fi

function create_table_number_list() {
# Check if START_INDEX and END_INDEX are integers with less than three digits
if ! [[ "$START_INDEX" =~ ^[0-9]{1,2}$ ]] || ! [[ "$END_INDEX" =~ ^[0-9]{1,2}$ ]]; then
echo "START_INDEX and END_INDEX must be integers with less than three digits."
exit 1
fi

# Create list with leading zeros
NUM_LIST=()
for (( i=10#$START_INDEX; i<=10#$END_INDEX; i++ )); do
NUM=$(printf "%02d" "$i")
NUM_LIST+=("$NUM")
done

# Check if list is not empty
if [ ${#NUM_LIST[@]} -eq 0 ]; then
echo "NO argument created with START_INDEX:${START_INDEX} and END_INDEX:${END_INDEX}"
exit 1
fi
}


function generate_config() {
local JSON_SUFFIX=$1
local TEMPLATE_FILE="${TEMPLATE_FILE_PREFIX}${JSON_SUFFIX}"
if [ ! -f "${TEMPLATE_FILE}" ]; then
echo "No ${TEMPLATE_FILE} file provided."
exit 1
fi
echo " Generating ${JSON_SUFFIX} configuration using template configuration file"

for NUM in "${NUM_LIST[@]}"; do
JSON_FILE="${NUM}${JSON_SUFFIX}"
# Create JSON file using template file
sed -e "s/\${NUM}/${NUM}/g" \
"${TEMPLATE_FILE}" > "${JSON_FILE}"
echo " Configuration file ${JSON_FILE} generated successfully."
done
}

function add_schema_to_pinot() {
local JSON_SUFFIX=$1
if [ -z "$CONTROLLER_ADDRESS" ]; then
echo "No Controller address provided. Skipping table creation."
return 0
fi
echo " Adding schema to Pinot cluster."

for NUM in "${NUM_LIST[@]}"; do
JSON_FILE="${NUM}${JSON_SUFFIX}"
if [ ! -f "${JSON_FILE}" ]; then
echo "No ${JSON_FILE}"
else
# Send request
response=$(
curl -X 'POST' \
"${CONTROLLER_ADDRESS}/schemas?override=true&force=false" \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d @"${JSON_FILE}" \
2>&1
)
echo " Response for ${JSON_FILE}: ${response}"
fi
done
}

function add_table_to_pinot() {
local JSON_SUFFIX=$1
if [ -z "$CONTROLLER_ADDRESS" ]; then
echo "No Controller address provided. Skipping table creation."
return 0
fi
echo " Adding table to Pinot cluster."

for NUM in "${NUM_LIST[@]}"; do
JSON_FILE="${NUM}${JSON_SUFFIX}"
if [ ! -f "${JSON_FILE}" ]; then
echo "No ${JSON_FILE}"
else
# Send request
response=$(
curl -X 'POST' \
"${CONTROLLER_ADDRESS}/tables" \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d @"${JSON_FILE}" \
2>&1
)
echo " Response for ${JSON_FILE}: ${response}"
fi
done
}

# Main script
create_table_number_list

echo 'schema'
generate_config $SCHEMA_SUFFIX
add_schema_to_pinot $SCHEMA_SUFFIX

echo 'realtime'
generate_config $REALTIME_SUFFIX
add_table_to_pinot $REALTIME_SUFFIX

echo 'offline'
generate_config $OFFLINE_SUFFIX
add_table_to_pinot $OFFLINE_SUFFIX
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"tableName": "inspectorStatAgent${NUM}",
"tableType": "OFFLINE",
"segmentsConfig": {
"timeType": "MILLISECONDS",
"schemaName": "inspectorStatAgent${NUM}",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "14",
"replication": "3",
"timeColumnName": "eventTime",
"minimizeDataMovement": false
},
"tenants": {},
"tableIndexConfig": {
"sortedColumn": ["sortKey"],
"bloomFilterColumns": ["tenantId", "serviceName", "sortKey", "applicationName", "agentId", "metricName", "fieldName"],
"noDictionaryColumns": ["fieldValue", "eventTime"],
"loadMode": "MMAP",
"nullHandlingEnabled": true
},
"metadata": {
"customConfigs": {}
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
{
"tableName": "inspectorStatAgent${NUM}",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "eventTime",
"timeType": "MILLISECONDS",
"schemaName": "inspectorStatAgent${NUM}",
"replicasPerPartition": "3",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "7"
},
"tenants": {},
"tableIndexConfig": {
"sortedColumn": ["sortKey"],
"bloomFilterColumns": ["tenantId", "serviceName", "sortKey", "applicationName", "agentId", "metricName", "fieldName"],
"noDictionaryColumns": ["fieldValue", "eventTime"],
"segmentPartitionConfig": {
"columnPartitionMap": {
"sortKey": {
"functionName": "Murmur",
"numPartitions": 32
}
}
},
"loadMode": "MMAP",
"nullHandlingEnabled": true,
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.topic.name": "inspector-stat-agent-${NUM}",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "localhost:19092",
"realtime.segment.flush.threshold.rows": "0",
"realtime.segment.flush.threshold.time": "24h",
"realtime.segment.flush.threshold.segment.size": "64M",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
}
},
"metadata": {
"customConfigs": {}
},
"task": {
"taskTypeConfigsMap": {
"RealtimeToOfflineSegmentsTask": {
"bucketTimePeriod": "4h",
"bufferTimePeriod": "12h",
"schedule": "0 0/10 * * * ?",
"maxNumRecordsPerSegment": "6000000"
}
}
},
"routing": {
"segmentPrunerTypes": [
"time",
"partition"
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{
"schemaName": "inspectorStatAgent${NUM}",
"dimensionFieldSpecs": [
{
"name": "tenantId",
"dataType": "STRING",
"defaultNullValue": ""
},
{
"name": "serviceName",
"dataType": "STRING",
"defaultNullValue": ""
},
{
"name": "sortKey",
"dataType": "STRING",
"defaultNullValue": ""
},
{
"name": "applicationName",
"dataType": "STRING",
"defaultNullValue": ""
},
{
"name": "agentId",
"dataType": "STRING"
},
{
"name": "metricName",
"dataType": "STRING"
},
{
"name": "fieldName",
"dataType": "STRING"
},
{
"name": "version",
"dataType": "STRING",
"defaultNullValue": ""
},
{
"name": "tags",
"dataType": "STRING",
"singleValueField": false,
"defaultNullValue": "null"
}
],
"metricFieldSpecs": [
{
"name": "fieldValue",
"dataType": "DOUBLE",
"defaultNullValue": -1.0
}
],
"dateTimeFieldSpecs": [
{
"name": "eventTime",
"dataType": "TIMESTAMP",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"tableType": "OFFLINE",
"segmentsConfig": {
"timeType": "MILLISECONDS",
"schemaName": "inspectorStatAgent",
"schemaName": "inspectorStatAgent00",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "14",
"replication": "3",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"segmentsConfig": {
"timeColumnName": "eventTime",
"timeType": "MILLISECONDS",
"schemaName": "inspectorStatAgent",
"schemaName": "inspectorStatAgent00",
"replicasPerPartition": "3",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "7"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"schemaName": "inspectorStatAgent",
"schemaName": "inspectorStatAgent00",
"dimensionFieldSpecs": [
{
"name": "tenantId",
Expand Down

0 comments on commit a90af71

Please sign in to comment.