From 55fcac59dc98ced717e92ab9f1f302dd155d3cbc Mon Sep 17 00:00:00 2001 From: SathishKumar Date: Fri, 26 Jul 2024 10:51:06 +0530 Subject: [PATCH] fixed toc issues --- ...21-05-30-platform-engineering-manifesto.md | 2 +- ...-03-30-kafka-client-performance-metrics.md | 100 +++++++------- ...3-Real-time-data-architecture-in-retail.md | 129 +++++++++--------- 3 files changed, 117 insertions(+), 114 deletions(-) diff --git a/_posts/2021-05-30-platform-engineering-manifesto.md b/_posts/2021-05-30-platform-engineering-manifesto.md index f3c5244046..f3452a96e2 100644 --- a/_posts/2021-05-30-platform-engineering-manifesto.md +++ b/_posts/2021-05-30-platform-engineering-manifesto.md @@ -13,7 +13,7 @@ ctas: description: "Have questions or need assistance? Our team is here to help" url: "/contact/" -teaser: Platform Eng ineering is dead. Long live platform engineering! +teaser: Platform Engineering is dead. Long live platform engineering! toc: true --- diff --git a/_posts/2023-03-30-kafka-client-performance-metrics.md b/_posts/2023-03-30-kafka-client-performance-metrics.md index 1489cf6125..dce1d69dd8 100644 --- a/_posts/2023-03-30-kafka-client-performance-metrics.md +++ b/_posts/2023-03-30-kafka-client-performance-metrics.md @@ -1,11 +1,12 @@ --- layout: post -title: "The Kafka benchmarking suite" -categories: [ Kafka, Kubernetes, Performance, Helm ] +title: "The Kafka benchmarking suite" +categories: [Kafka, Kubernetes, Performance, Helm] featured: false image: assets/blog-images/kafka-perf-suite/kafka-benchmark-metrics-featured.png teaser: "Distributed testing grid for Kafka on top of Kubernetes" authors: badri,p6 +toc: true --- There are numerous parameters we have to consider tweaking when benchmarking a Kafka cluster. Irrespective of these parameters, we optimize along the following dimensions: @@ -70,6 +71,7 @@ Kafka offers a set of performance testing tools for the producer, consumer and a You should have a Kafka cluster which needs to be benchmarked(Duh!). Arguably the fastest way to get your hands on a fairly production ready Kafka cluster is to sign up for a Confluent cloud account and spin up a new cluster. ## Benchmarking using kafka provided tools + For those of you who are new to benchmarking using Kafka perf-tools, here’s a brief recap. First, you create a new topic where you want to send your records. @@ -169,7 +171,7 @@ $ kafka-consumer-perf-test \ --consumer.config kafka.properties # <-- (4) --print-metrics \ --timeout=100000 # <-- (5) - ``` +``` 1. We use the same topic. 2. We have to specify the bootstrap server. @@ -177,7 +179,6 @@ $ kafka-consumer-perf-test \ 4. We refer to the same kafka.properties file. 5. The amount of time the consumer process waits before the broker returns records. - Here’s a sample output from a consumer perf test run. ``` @@ -205,6 +206,7 @@ consumer-fetch-manager-metrics:records-per-request-avg:{client-id=consumer-perf- consumer-fetch-manager-metrics:records-per-request-avg:{client-id=consumer-perf-consumer-24667-1} : 1000.000 kafka-metrics-count:count:{client-id=consumer-perf-consumer-24667-1} : 61.000 ``` + The typical way to run Kafka benchmarks is to take a set of parameters for the producer and consumer, do a set of sample runs with those parameters, and record the metrics we get. We repeat this loop until we get the desired numbers. This can be likened to an OODA (Observe Orient Decide Act) loop, where the mapping looks like this: - Observe - Look at the printed metrics for each run. @@ -236,40 +238,40 @@ spec: template: spec: initContainers: # <-------- (3) - - name: topics - image: confluentinc/cp-kafka:7.3.2 # <-------- (4) - command: - - /bin/sh - - -c - - | - kafka-topics \ - --if-not-exists \ - --topic mytopic \ - --create \ - --bootstrap-server xxx-yyyy.us-west4.gcp.confluent.cloud:9092 \ - --replication-factor 3 \ - --partitions 1 \ - --command-config /mnt/kafka.properties - volumeMounts: - - name: kafka-properties # <-------- (5) - mountPath: /mnt + - name: topics + image: confluentinc/cp-kafka:7.3.2 # <-------- (4) + command: + - /bin/sh + - -c + - | + kafka-topics \ + --if-not-exists \ + --topic mytopic \ + --create \ + --bootstrap-server xxx-yyyy.us-west4.gcp.confluent.cloud:9092 \ + --replication-factor 3 \ + --partitions 1 \ + --command-config /mnt/kafka.properties + volumeMounts: + - name: kafka-properties # <-------- (5) + mountPath: /mnt containers: - - name: producer - image: confluentinc/cp-kafka:7.3.2 - command: - - /bin/sh - - -c - - | - kafka-producer-perf-test \ # <-------- (6) - --topic perf-test \ - --num-records 10000 \ - --record-size 1024 \ - --throughput -1 \ - --producer-props acks=1 client.id=foo batch.size=1000 linger.ms=100 compression.type=lz4 \ - --producer.config /mnt/kafka.properties - volumeMounts: - - name: kafka-properties - mountPath: /mnt + - name: producer + image: confluentinc/cp-kafka:7.3.2 + command: + - /bin/sh + - -c + - | + kafka-producer-perf-test \ # <-------- (6) + --topic perf-test \ + --num-records 10000 \ + --record-size 1024 \ + --throughput -1 \ + --producer-props acks=1 client.id=foo batch.size=1000 linger.ms=100 compression.type=lz4 \ + --producer.config /mnt/kafka.properties + volumeMounts: + - name: kafka-properties + mountPath: /mnt volumes: - name: kafka-properties secret: @@ -323,12 +325,11 @@ global: evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute. # scrape_timeout is set to the global default (10s). - # A scrape configuration containing exactly one endpoint to scrape: # Here it's Prometheus itself. scrape_configs: # The job name is added as a label `job=` to any timeseries scraped from this config. - - job_name: "jmx" # <---- (1) + - job_name: "jmx" # <---- (1) # metrics_path defaults to '/metrics' # scheme defaults to 'http'. @@ -336,14 +337,14 @@ scrape_configs: static_configs: - targets: ["localhost:7071"] # <---- (2) labels: - env: 'dev' # <---- (3) + env: "dev" # <---- (3) relabel_configs: - source_labels: [__address__] target_label: hostname - regex: '([^:]+)(:[0-9]+)?' - replacement: '${1}' + regex: "([^:]+)(:[0-9]+)?" + replacement: "${1}" remote_write: - - url: 'http://your-prometheus-url/api/v1/write' # <---- (4) + - url: "http://your-prometheus-url/api/v1/write" # <---- (4) ``` 1. Name of the prometheus Job. @@ -366,8 +367,8 @@ I’d argue that we didn’t gain much from this transition, except for the Prom Repeat-rinse steps 3 - 8 and infer performance based on deviation from the baseline. - ## Take 3 - Helm chart + We take the following artifacts, 1. The kafka.config secret @@ -378,6 +379,7 @@ We take the following artifacts, And templatize them, package them as a Helm chart. We model every iteration in our OODA loop as a new helm release with its own set of helm values. The Helm chart does the following: + 1. Run a distributed set of producer jobs(defined by the `producer.count` parameter) 2. run a distributed consumer - simplest is to have an array of consumers (and topics) defined so we don't get into managing concurrency; instead, we just create `consumer.count` number of consumer Jobs to paralelly consume from the topic. @@ -412,7 +414,7 @@ consumer: messagesCount: 1000 count: 1 timeout: 100000 - ``` +``` And here’s another set of values optimized for high durability. @@ -459,7 +461,7 @@ The Prometheus console can be viewed in the browser by issuing a port-forward co ```bash kubectl port-forward svc/prometheus-operated 9090:9090 - ``` +``` We can do something similar for the Grafana web console as well. @@ -506,7 +508,7 @@ The performance metrics Helm chart has provision to write openmetrics to any sys ```yaml prometheus: remote_write: - - url: "http://prom-stack-kube-prometheus-prometheus:9090/api/v1/write" + - url: "http://prom-stack-kube-prometheus-prometheus:9090/api/v1/write" ``` As a quick example, if you want to use New Relic to process the metrics, your configuration would look like this: @@ -514,8 +516,8 @@ As a quick example, if you want to use New Relic to process the metrics, your co ```yaml prometheus: remote_write: - - url: https://metric-api.newrelic.com/prometheus/v1/write?prometheus_server=kafka-perf-test - bearer_token: xxxxxxxxxxxxxxxxxxxxxxxxxxx + - url: https://metric-api.newrelic.com/prometheus/v1/write?prometheus_server=kafka-perf-test + bearer_token: xxxxxxxxxxxxxxxxxxxxxxxxxxx ``` The Helm charts and the associated code mentioned in this post can be found [here](https://github.com/Platformatory/kafka-performance-suite). diff --git a/_posts/2024-07-23-Real-time-data-architecture-in-retail.md b/_posts/2024-07-23-Real-time-data-architecture-in-retail.md index e07cc6c58a..f1f3bf966d 100644 --- a/_posts/2024-07-23-Real-time-data-architecture-in-retail.md +++ b/_posts/2024-07-23-Real-time-data-architecture-in-retail.md @@ -9,15 +9,16 @@ hidden: false teaser: Discover how real-time data architecture can revolutionize retail operations, from customer engagement to dynamic pricing and inventory management. toc: true --- + # Introduction -Retail industry is one of the fastest growing sector in terms of data and analytics. According to the recent study from [Fortune Business Insights](https://www.fortunebusinessinsights.com/industry-reports/retail-analytics-market-101273), the global retail analytics market size was valued at USD 7.56 billion in 2023 and is projected to grow to USD 31.08 billion by 2032, exhibiting a CAGR of 17.2%. +Retail industry is one of the fastest growing sector in terms of data and analytics. According to the recent study from [Fortune Business Insights](https://www.fortunebusinessinsights.com/industry-reports/retail-analytics-market-101273), the global retail analytics market size was valued at USD 7.56 billion in 2023 and is projected to grow to USD 31.08 billion by 2032, exhibiting a CAGR of 17.2%. -Although, a lot of money is pumped into analytics only a few selected companies like Walmart, Amazon etc. hold the majority of the market share. The reason being they are equipped to make many important decisions based on an ever-growing supply of *real-time* and historical data while most of their competitors still use very basic tools that are far better able at tracking where they’ve *been* than where they should be *going*. During the pandemic, [McKinsey estimates](https://www.mckinsey.com/industries/retail/our-insights/jumpstarting-value-creation-with-data-and-analytics-in-fashion-and-luxury), the 25 top-performing retailers — most of whom are digital leaders — were 83% more profitable than laggards and took home more than 90% of the sector’s gains in market capitalization. +Although, a lot of money is pumped into analytics only a few selected companies like Walmart, Amazon etc. hold the majority of the market share. The reason being they are equipped to make many important decisions based on an ever-growing supply of _real-time_ and historical data while most of their competitors still use very basic tools that are far better able at tracking where they’ve *been* than where they should be *going*. During the pandemic, [McKinsey estimates](https://www.mckinsey.com/industries/retail/our-insights/jumpstarting-value-creation-with-data-and-analytics-in-fashion-and-luxury), the 25 top-performing retailers — most of whom are digital leaders — were 83% more profitable than laggards and took home more than 90% of the sector’s gains in market capitalization. # Why Real time data is important in Retail? -Retail customers are generally fickle, and their preferences can change in an instant. Each moment is a point in time when a customer interacts with a brand to get what they want immediately and in context. Real-time data allows retailers to respond swiftly to changes in customer behaviour, offering personalized experiences that can significantly enhance customer satisfaction. For example, Amazon’s recommendation engine, accounts for an impressive 35% of their total revenue. Amazon has achieved a substantial increase in ROI through customized homepage displays, advanced analysis of browsing and purchase history tied to the customer’s Amazon account, and strategic email marketing campaigns, +Retail customers are generally fickle, and their preferences can change in an instant. Each moment is a point in time when a customer interacts with a brand to get what they want immediately and in context. Real-time data allows retailers to respond swiftly to changes in customer behaviour, offering personalized experiences that can significantly enhance customer satisfaction. For example, Amazon’s recommendation engine, accounts for an impressive 35% of their total revenue. Amazon has achieved a substantial increase in ROI through customized homepage displays, advanced analysis of browsing and purchase history tied to the customer’s Amazon account, and strategic email marketing campaigns, ![RealTimeRetail1.png](../assets/blog-images/real_time_retail/RealTimeRetail1.png) @@ -29,31 +30,31 @@ By leveraging real-time analytics, companies can optimize inventory management, Top 5 Foundational Capabilities for Managing Data in Retail Leading Up to Direct Impact on Customer Experience -### 1. Inventory Visibility +## 1. Inventory Visibility Achieving end-to-end supply chain visibility involves having real-time insights into the entire journey of products. According to CSCMP's [2023 State of Logistics Report](https://cscmp.org/CSCMP/Resources/Reports_and_Surveys/State_of_Logistics_Report/CSCMP/Educate/State_of_Logistics_Report.aspx?hkey=bdfd8da6-e34f-434c-b39c-d3219dd4a6a2), in 2022, the U.S. Business Logistics Cost went up as much as 19.6%, the largest increase to date, of which 52% was due to an increase in inventory carrying costs. In this economic climate, stocking shelves inadequately, letting down consumers and misjudging demand can lead to catastrophic consequences Inventory visibility is crucial for managing stock levels effectively and ensuring that products are available when customers need them. Logging in orders and sales data through various tools allows retailers to react quicker based on the data. This leads to optimized inventory levels, improved customer satisfaction, and reduced costs associated with excess inventory, stockouts, and rush shipments. -### 2. Omnichannel Integration +## 2. Omnichannel Integration Shoppers now have more flexibility and can chose different formats for both shopping and fulfilment. Some shoppers tend to go online in search of information, and then actually conclude the transaction in a physical store. Others might go to a store or a showroom to do research, and then they go online to make their purchase. Retailers today have to understand and enable this flexibility and seamless movement across channels Therefore, a seamless data integration across multiple operational channels, such as online stores, physical shops, and mobile apps is paramount. By synchronizing data across all channels, retailers can provide a consistent and unified shopping experience for customers. This integration helps in managing inventory, customer preferences, and sales data more effectively, leading to better customer experiences. -### 3. Dynamic Pricing +## 3. Dynamic Pricing Dynamic pricing makes sense for staying relevant in retail where competition is ever-growing because it allows retailers to adjust prices in real-time based on a variety of factors such as customer buying behaviour, seasonal trends, competitor pricing, and product performance. This flexibility ensures that prices remain competitive and appealing to customers, helping to maximize sales and profitability. In a market where consumer preferences and competitive landscapes can change rapidly, dynamic pricing helps retailers react swiftly to these changes, offering the right price at the right time to attract and retain customers. -### 4. Fulfillment Speed +## 4. Fulfillment Speed Fulfillment speed is a critical factor in customer satisfaction. Retailers can improve fulfillment speed through dynamic routing based on weather, traffic conditions, and automation. Real-time tracking of orders ensures that customers are informed about the status of their purchases, leading to a better shopping experience. Faster fulfillment also reduces the likelihood of abandoned carts and enhances customer loyalty. Accurately forecasting demand and supply enables retailers to manage inventory more efficiently, ensuring that popular products are stocked and ready for quick dispatch. Effective demand and supply forecasting also allows for better workforce planning and allocation of resources, leading to faster order processing and quicker delivery times. -### 5. Deep Personalization +## 5. Deep Personalization -Deep personalization is the ultimate goal in retail because it directly impacts customer satisfaction, loyalty, and ultimately, the bottom line. By providing highly personalized experiences, retailers can create deeper emotional connections with their customers, making them feel valued and understood. +Deep personalization is the ultimate goal in retail because it directly impacts customer satisfaction, loyalty, and ultimately, the bottom line. By providing highly personalized experiences, retailers can create deeper emotional connections with their customers, making them feel valued and understood. The ability to dynamically adjust content, offers, and recommendations based on real-time data ensures that customers receive the most relevant information at any given moment, which is critical for maintaining customer engagement. Utilizing sophisticated algorithms and machine learning models helps in accurately predicting customer preferences and behaviors, leading to more effective personalization strategies. @@ -63,23 +64,23 @@ The ability to dynamically adjust content, offers, and recommendations based on In today's fast-paced retail environment, leveraging real-time data architecture is crucial for staying ahead of the competition. Here are the key components of real-time data architecture in retail: -### Data Sources +## Data Sources -The first step in building a real-time data architecture is identifying and integrating various data sources. These can include point-of-sale (POS) systems, e-commerce platforms (clickstream etc), social media feeds, CDP etc. Capabilities to collect data across both physical and digital stores is an important requirement. +The first step in building a real-time data architecture is identifying and integrating various data sources. These can include point-of-sale (POS) systems, e-commerce platforms (clickstream etc), social media feeds, CDP etc. Capabilities to collect data across both physical and digital stores is an important requirement. -### Event Bus +## Event Bus A centralized event bus is essential for managing the flow of data between different systems and components. It serves as a backbone for real-time data pipelines, ensuring efficient data transfer and integration. **Apache Kafka** is an ideal choice for this purpose. It allows for high-throughput, low-latency data streaming and can handle large volumes of events in real-time. This capability is particularly critical for retailers who need to quickly respond to changes in customer behaviour, manage inventory in real-time, and provide dynamic pricing updates. Kafka also supports a wide range of connectors for external systems enabling effective data ingress and egress capabilities. -### Stream Processing Layer +## Stream Processing Layer Real-time data processing is crucial for extracting actionable insights. Technologies like Apache Flink, Kafka Streams, and Storm enable continuous data processing. Ingested data can be transformed, filtered and aggregated on the fly for downstream processing services. Data can be enriched with historical information through joins which might be relevant for further processing. -Additionally, Real time predictions can be captured for the processed data points and stored in a database for retrieval. Real-time predictions allow for instant decision-making based on the most recent data realising maximum data impact value. +Additionally, Real time predictions can be captured for the processed data points and stored in a database for retrieval. Real-time predictions allow for instant decision-making based on the most recent data realising maximum data impact value. -### Decision-making Microservices +## Decision-making Microservices Intelligent decision-making microservices are the brain of the real-time data architecture. These microservices use the processed data to make real-time decisions that can significantly impact retail operations. For example, a pricing microservice can dynamically adjust product prices based on current demand, competitor pricing, and inventory levels. Similarly, a recommendation microservice can provide personalized product suggestions to customers based on their browsing and purchase history. @@ -90,7 +91,7 @@ A decision-making microservice can range from a simple rule based service to dee - Ensemble models - Deep Learning models -### Data Analytics and Visualization +## Data Analytics and Visualization The final component is data analytics and visualization, which provides a centralized dashboard for analyzing customer behavior in real-time. This dashboard allows retailers to monitor key performance indicators (KPIs), track trends, and gain insights into customer preferences and market dynamics. Retailers can also use these tools to track customer engagement across various channels, allowing them to tailor marketing strategies and improve customer experiences. Real-time visualization tools help in making data-driven decisions quickly and effectively. @@ -98,30 +99,30 @@ The final component is data analytics and visualization, which provides a centra Let’s see an example on how to implement the mentioned Real time data architecture. [Link](https://github.com/Platformatory/real-time-retail) for the the Github repository. -### Objective: +## Objective: Based on the Shopify inventory changes and clickstream data dynamically predict the price and the sell through date for the available products in real time. -### Prerequisites: +## Prerequisites: - Shopify Store - Docker - ngrok -### Details: +## Details: - Kafka Broker will be used as the Event bus for the inventory and clickstream data - Kafka connect will be used to source the data from the data sources and get the predictions from the microservices - KSQL will be used to perform ETL on the source data for downstream predictions - Flask APIs will be used to expose endpoints to talk to prediction microservices -### Setup the infrastructure: +## Setup the infrastructure: -1. We will run all the required components as docker containers locally. Following is the `docker-compose.yaml` for the required services. +1. We will run all the required components as docker containers locally. Following is the `docker-compose.yaml` for the required services. ```yaml --- -version: '2' +version: "2" services: broker: image: confluentinc/cp-kafka:7.5.0 @@ -132,8 +133,8 @@ services: - "9101:9101" environment: KAFKA_NODE_ID: 1 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' - KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT" + KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092" #KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter #KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: 'broker:9092' KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 @@ -142,15 +143,15 @@ services: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost - KAFKA_PROCESS_ROLES: 'broker,controller' - KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093' - KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092' - KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' - KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' - KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' - # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid" + KAFKA_PROCESS_ROLES: "broker,controller" + KAFKA_CONTROLLER_QUORUM_VOTERS: "1@broker:29093" + KAFKA_LISTENERS: "PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092" + KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT" + KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER" + KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs" + # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid" # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh - CLUSTER_ID: '41PmKs1mQiGJK0U_Ul45OA' + CLUSTER_ID: "41PmKs1mQiGJK0U_Ul45OA" connect: build: @@ -165,7 +166,7 @@ services: - broker - api environment: - CONNECT_BOOTSTRAP_SERVERS: 'broker:29092' + CONNECT_BOOTSTRAP_SERVERS: "broker:29092" CONNECT_REST_ADVERTISED_HOST_NAME: connect CONNECT_GROUP_ID: compose-connect-group CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs @@ -184,8 +185,8 @@ services: CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR volumes: - - ./retail_clickstream_schema.avro:/home/appuser/retail_clickstream_schema.avro - + - ./retail_clickstream_schema.avro:/home/appuser/retail_clickstream_schema.avro + ksqldb-server: image: confluentinc/cp-ksqldb-server:7.5.0 hostname: ksqldb-server @@ -205,11 +206,11 @@ services: KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" KSQL_KSQL_CONNECT_URL: "http://connect:8083" KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1 - KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true' - KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true' + KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true" + KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true" # KSQL_KSQL_QUERIES_FILE: /home/appuser/ksql_queries.sql volumes: - - ./ksql_queries.sql:/home/appuser/ksql_queries.sql + - ./ksql_queries.sql:/home/appuser/ksql_queries.sql ksqldb-cli: image: confluentinc/cp-ksqldb-cli:7.5.0 @@ -239,23 +240,23 @@ export NGROK_PUBLIC_URL=`curl -s localhost:4040/api/tunnels | jq -r '.tunnels[0] ```json { - "name": "$CONNECTOR_NAME", + "name": "$CONNECTOR_NAME", "config": { "connector.class": "com.platformatory.kafka.connect.ShopifyWebhookConnector", - "tasks.max":1, - "topic.default":"webhook", - "topic.header":"X-Shopify-Topic", - "topic.prefix":"plf_", + "tasks.max": 1, + "topic.default": "webhook", + "topic.header": "X-Shopify-Topic", + "topic.prefix": "plf_", "key.json.path": "$.id", "schema.infer": false, - "validator.class":"com.platformatory.kafka.connect.ShopifyRequestValidator", - "port":8000, - "shopify.access.token":"$SHOPIFY_ACCESS_TOKEN", - "shopify.webhook.create":true, - "shopify.store.name":"$SHOPIFY_STORE_NAME", - "shopify.webhook.topics":"products/update", - "shopify.apisecret":"$SHOPIFY_API_SECRET", - "shopify.connector.hostname":"$NGROK_PUBLIC_URL" + "validator.class": "com.platformatory.kafka.connect.ShopifyRequestValidator", + "port": 8000, + "shopify.access.token": "$SHOPIFY_ACCESS_TOKEN", + "shopify.webhook.create": true, + "shopify.store.name": "$SHOPIFY_STORE_NAME", + "shopify.webhook.topics": "products/update", + "shopify.apisecret": "$SHOPIFY_API_SECRET", + "shopify.connector.hostname": "$NGROK_PUBLIC_URL" } } ``` @@ -264,8 +265,8 @@ export NGROK_PUBLIC_URL=`curl -s localhost:4040/api/tunnels | jq -r '.tunnels[0] ```bash { - "name": "$CONNECTOR_NAME", - "config": { + "name": "$CONNECTOR_NAME", + "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "tasks.max": "1", "kafka.topic": "shopify_clickstream", @@ -283,7 +284,7 @@ export NGROK_PUBLIC_URL=`curl -s localhost:4040/api/tunnels | jq -r '.tunnels[0] 1. Load both the shopify inventory data and clickstream data as STREAMS in KSQL ```sql -CREATE OR REPLACE STREAM PRODUCTS_UPDATE_STREAM +CREATE OR REPLACE STREAM PRODUCTS_UPDATE_STREAM (PAYLOAD STRUCT< id BIGINT, created_at STRING, @@ -336,7 +337,7 @@ CREATE OR REPLACE STREAM PRODUCTS_UPDATES CREATE OR REPLACE TABLE CLICKSTREAM_ACTIVITY WITH (KAFKA_TOPIC='ksql_clickstream_activity', PARTITIONS=6, REPLICAS=1, KEY_FORMAT='JSON') AS -SELECT +SELECT product_variant_id product_variant_id, COUNT(activity) activity_count FROM CLICKSTREAM_STREAM @@ -382,12 +383,12 @@ def index(): return 'Index Page' @app.route('/user', methods=["POST"]) -def get_user(): +def get_user(): data = request.get_json() username = data['username'] password = data['password'] print({username, password }) - return {"username":username, password: password} + return {"username":username, password: password} def predict_sell_through(inventory_data): average_daily_sales = inventory_data['total_sales'] / inventory_data['total_days'] @@ -402,8 +403,8 @@ def show_post(store_name): inventory_data = request.json if not inventory_data: return jsonify({'error': 'No data provided'}), 400 - - + + sell_through_date, inventory_doh = predict_sell_through(inventory_data) print(sell_through_date, inventory_doh, inventory_data) return jsonify({ @@ -417,7 +418,7 @@ def show_post(store_name): def dynamic_pricing(): try: data = request.get_json()[0] - print(data) + print(data) if data["ACTIVITY_COUNT"]>3: data["dynamic_price"] = float(data["PRICE"])*1.1 else: @@ -434,7 +435,7 @@ def dynamic_pricing(): def sell_through_date(): try: data = request.get_json()[0] - print(data) + print(data) # date + random days (5-10) data["sell_through_date"] = datetime.now() + timedelta(days=randrange(10)) @@ -456,7 +457,7 @@ if __name__ == "__main__": ```json ## Dynamic pricing { - "name": "$CONNECTOR_NAME", + "name": "$CONNECTOR_NAME", "config": { "connector.class": "io.confluent.connect.http.HttpSinkConnector", "tasks.max":1, @@ -477,7 +478,7 @@ if __name__ == "__main__": ## Sell through date { - "name": "$CONNECTOR_NAME", + "name": "$CONNECTOR_NAME", "config": { "connector.class": "io.confluent.connect.http.HttpSinkConnector", "tasks.max":1, @@ -501,4 +502,4 @@ In this example, we were able to demonstrate a sample real time data pipeline fo # Conclusion -The implementation of a real-time data architecture in retail is essential for gaining a competitive edge in today's fast-paced market. By leveraging real-time data, retailers can enhance customer satisfaction through dynamic routing, accurate demand forecasting, and deep personalization. As the retail landscape continues to evolve, investing in real-time data capabilities will be crucial for optimizing operations, improving customer experiences, and driving business growth. +The implementation of a real-time data architecture in retail is essential for gaining a competitive edge in today's fast-paced market. By leveraging real-time data, retailers can enhance customer satisfaction through dynamic routing, accurate demand forecasting, and deep personalization. As the retail landscape continues to evolve, investing in real-time data capabilities will be crucial for optimizing operations, improving customer experiences, and driving business growth.