In [6]:
# pip install binance-connector
# pyspark==3.4.1
# pydeequ==1.0.1
# python-dotenv==1.0.0
# nltk==3.8.1
# apache-flink==1.17.1
# kafka-python==2.0.2
# pip install minio==7.1.16
# pip install deltalake==0.10.2
# pip install delta-spark==3.1.0

In [7]:
API_KEY = open("/home/longnv95/Coding/MLOPs/api_binance.txt", "r").read().strip()

In [8]:


# Method 1: Using python-dotenv (most common approach)
from dotenv import load_dotenv
import os

# Load .env file
load_dotenv()  # By default looks for .env in current directory
# Or specify a path: load_dotenv('/path/to/your/.env')

# Access environment variables
db_password = os.getenv('POSTGRES_PASSWORD')
minio_key = os.getenv('MINIO_ACCESS_KEY')
fernet_key = os.getenv('AIRFLOW__CORE__FERNET_KEY')

print(f"DB Password: {db_password}")
print(f"MinIO Key: {minio_key}")

DB Password: airflow
MinIO Key: None


# Load API Key and run_date

In [9]:
from binance.spot import Spot
import datetime
import pandas as pd
import numpy as np

# Binance API Key (Public API, no need for secret key)
client = Spot(api_key=API_KEY)  # No secret key needed for public market data

# Define the time range for historical data (e.g. yesterday at noon)
run_date_str = "2025-02-26"

run_date = datetime.datetime.strptime(run_date_str, "%Y-%m-%d")
run_date = datetime.datetime.combine(run_date, datetime.time(0, 0))
start_time_ms = int(run_date.timestamp() * 1000)
print(run_date, start_time_ms)

end_date = run_date + datetime.timedelta(hours=23)
end_time_ms = int(end_date.timestamp() * 1000)
print(end_date, end_time_ms)

# For a 24-hour period at 30-minute intervals, we expect 48 candles (24 hours / 0.5 hour)
limit = 48

2025-02-26 00:00:00 1740502800000
2025-02-26 23:00:00 1740585600000


# Get the result from Binance

In [10]:
# Retrieve exchange info to get a list of all trading symbols
exchange_info = client.exchange_info()
symbols = [s['symbol'] for s in exchange_info['symbols'] if s['symbol'].lower().endswith('usdt')]

len(symbols)

# List to collect aggregated data for each symbol
data = []

for symbol in symbols[0:2]:
    print(f"Processing {symbol}")
    try:
        # Fetch klines data for the given symbol and day
        klines = client.klines(symbol=symbol, interval="1h", 
                                startTime=start_time_ms, 
                                endTime=end_time_ms,
                                limit=limit)
        if not klines:
            print(f"No data for {symbol}")
            continue
        
        for candle in klines:
            # Calculate aggregated metrics from the klines
            lastPrice = float(candle[4])  # Last candle's close price
            highPrice = float(candle[2])  # Highest high price across candles
            lowPrice = float(candle[3])  # Lowest low price across candles
            quoteVolume = float(candle[7]) # Sum of quote asset volumes
            count = int(candle[8])           # Total number of trades
        
            # Use the provided run date and extract hour from the close time of the last candle
            close_time_ms = int(candle[6])
            close_time = datetime.datetime.fromtimestamp(close_time_ms / 1000)
            hour = close_time.hour

            # Append the aggregated data to our list
            data.append({
                "symbols": symbol,
                "lastPrice": lastPrice,
                "highPrice": highPrice,
                "lowPrice": lowPrice,
                "quoteVolume": quoteVolume,
                "count": count,
                "date": close_time.strftime("%Y-%m-%d"),
                "hour": close_time.hour
            })
    except Exception as e:
        print(f"Error processing {symbol}: {e}")

    # Create the DataFrame with the desired columns
    df = pd.DataFrame(data)

Processing BTCUSDT
Processing ETHUSDT


In [11]:
import sys
from binance.spot import Spot
import datetime
import pandas as pd
import numpy as np

API_KEY = open("/home/longnv95/Coding/MLOPs/api_binance.txt", "r").read().strip()
RUN_DATE_STR = "2025-03-04"

client = Spot(api_key=API_KEY)

def get_data_binance(symbol, start_date_str):
    start_date = datetime.datetime.strptime(start_date_str, "%Y-%m-%d") 
    end_date = start_date + datetime.timedelta(hours=23)

    start_time_ms = int(start_date.timestamp() * 1000)
    end_time_ms = int(end_date.timestamp() * 1000)
    
    data = client.klines(symbol=symbol, interval="1h", startTime=start_time_ms, endTime=end_time_ms)
    df = pd.DataFrame(data, columns=["timestamp", "open", "high", 
                                     "low", "close", "volume", "close_time", 
                                     "quote_av", "trades", "tb_base_av", "tb_quote_av", "ignore"])

    df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms") 
    df["close_time"] = pd.to_datetime(df["close_time"], unit="ms")
    df["symbol"] = symbol
    df["hour"] = df['close_time'].apply(lambda x: x.hour)
    return df


In [12]:
exchange_info = client.exchange_info()
symbols = [s['symbol'] for s in exchange_info['symbols'] if s['symbol'].lower().endswith('usdt')][:2]
test = get_data_binance('BTCUSDT', RUN_DATE_STR)

KeyboardInterrupt: 

In [None]:
df.to_csv(f"./data_daily/spot_{run_date_str.replace('-','')}.csv", index=False)

# Check Limit API

In [None]:
from binance.spot import Spot
import requests
import json

client = Spot(api_key=API_KEY)

try:
    # Make a  API call
    # Use the requests library to get headers
    symbol = "BTCUSDT"
    interval = "1m"  # 1-minute klines
    limit = 10  # Number of klines to retrieve (adjust as needed)

    url = "https://api.binance.com/api/v3/klines"  # Binance Kline API endpoint
    params = {'symbol': symbol, 'interval': interval, 'limit': limit}
    headers = {'X-MBX-APIKEY': API_KEY}  # Include the API key in the header

    response = requests.get(url, params=params, headers=headers)
    response.raise_for_status()  # Raise an exception for bad status codes


    # Access the headers from the response object
    headers = response.headers

    # Extract rate limit information
    # These headers are the key to understanding your limits
    order_count = headers.get('X-MBX-ORDER-COUNT-1M')
    used_weight = headers.get('X-MBX-USED-WEIGHT-1M')
    order_weight = headers.get('X-MBX-ORDER-WEIGHT-1M')

    print(f"Orders Placed (Last Minute): {order_count}")
    print(f"Request Weights Used (Last Minute): {used_weight}")
    print(f"Order Weight Used (Last Minute): {order_weight}")

except requests.exceptions.RequestException as e:
    print(f"Request Error: {e}")
except Exception as e:
    print(f"Error checking rate limits: {e}")

Orders Placed (Last Minute): None
Request Weights Used (Last Minute): 6
Order Weight Used (Last Minute): None


# New Data Source

In [None]:
RUN_DATE_STR = "2025-03-04"

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

def generate_pseudo_data(RUN_DATE_STR, num_rows=10000, num_users=1000):
    """Generates a Pandas DataFrame with pseudo transaction data."""

    # User IDs
    user_ids = [f"user_{i:06d}" for i in np.random.choice(num_users, size=num_rows)]

    # Transaction IDs
    transaction_ids = np.random.randint(10**11, 10**12 - 1, size=num_rows)

    # Source
    sources = np.random.choice(["Current Account", "Credit Card", "Debit Card"], 
                               size=num_rows, p=[0.6, 0.3, 0.1])

    # Amounts
    amounts = np.random.uniform(1.0, 1000.0, size=num_rows)

    # Vendors
    vendors = ["Online Shopping", "Hospital", "Sport", "Grocery", "Restaurant",
               "Travel", "Entertainment", "Electronics", "Home Improvement",
               "Clothing", "Education", "Sending Out", "Utilities", "Other"]
    vendor_probabilities = np.random.dirichlet(np.ones(len(vendors)))
    vendor_choices = np.random.choice(vendors, size=num_rows, p=vendor_probabilities)

    # Times
    start_date = datetime.strptime(RUN_DATE_STR, "%Y-%m-%d")
    time_deltas = [timedelta(seconds=np.random.randint(0, 31536000)) for _ in range(num_rows)]  # Up to 1 year
    times = [(start_date + delta).strftime('%Y-%m-%d %H:%M:%S') for delta in time_deltas]

    # Create DataFrame
    df = pd.DataFrame({
        "User ID": user_ids,
        "Transaction ID": transaction_ids,
        "Amount": amounts,
        "Vendor": vendor_choices,
        "Sources": sources,
        "Time": times
    })

    return df

# Generate and print the data
pseudo_df = generate_pseudo_data(RUN_DATE_STR, num_rows=10000, num_users=100)
print(pseudo_df.head())

       User ID  Transaction ID      Amount      Vendor          Sources  \
0  user_000069    294063303192  579.803149   Education  Current Account   
1  user_000000    413456285542  128.222111  Restaurant  Current Account   
2  user_000023    168523878031  508.797821   Education      Credit Card   
3  user_000095    160894934119  381.033603      Travel       Debit Card   
4  user_000043    327308630118  611.396373   Education  Current Account   

                  Time  
0  2025-03-09 06:55:22  
1  2025-09-03 22:19:31  
2  2025-05-29 14:11:48  
3  2025-03-20 23:05:23  
4  2025-03-29 20:35:56  


In [None]:
import os
os.getcwd()+'/data_daily/'

'/home/longnv95/Coding/MLOPs/final_project/data_daily/'

In [None]:
pd.read_parquet('/home/longnv95/Coding/MLOPs/final_project/data_daily/transaction_2024-12-31.parquet').head()

Unnamed: 0,User ID,Transaction ID,Amount,Vendor,Sources,Time
0,user_000028,502338847571,244.895708,Home Improvement,Debit Card,2025-09-29 22:19:21
1,user_000727,479508446004,466.927997,Restaurant,Current Account,2025-03-21 22:14:40
2,user_000547,113555903812,971.725657,Home Improvement,Current Account,2025-07-11 22:30:57
3,user_000028,864048690465,417.96338,Travel,Credit Card,2025-01-30 06:28:11
4,user_000323,917408956397,555.758447,Sending Out,Credit Card,2025-04-05 20:19:02


In [None]:
RUN_DATE_STR = "2025-03-04"

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

def generate_user_data(RUN_DATE_STR, num_rows=10000):
    """Generates a Pandas DataFrame with pseudo transaction data."""

    # User IDs
    user_ids = [f"user_{i:06d}" for i in np.random.choice(num_rows, size=num_rows)]

    # Transaction IDs
    age = np.random.randint(22, 90, size=num_rows)

    # Source
    gender = np.random.choice(["M", "F", "Other"], 
                               size=num_rows, p=[0.5, 0.4, 0.1])

    # Amounts
    occupation = np.random.choice(["M", "F", "Other"], 
                               size=num_rows, p=[0.5, 0.4, 0.1])

    # location
    location = np.random.choice(["City A", "City B", "City C", "City D", "City E", 
                                 "City F", "City G", "City H", "Unknown"], 
                               size=num_rows, p=[0.1,0.12,0.08,0.1,0.15,0.05,0.1,0.1,0.2])
    
    # Day start
    start = datetime(2020, 1, 1)
    end = datetime(2025, 3, 1)
    
    # Calculate total days between start and end
    days_between = (end - start).days
    
    # Generate random days and add to start date
    random_days = np.random.randint(0, days_between, size=num_rows)
    day_start = [(start + timedelta(days=int(x))).strftime('%Y-%m-%d') for x in random_days]

    # Create DataFrame
    df = pd.DataFrame({
        "user_id": user_ids,
        "gender": gender,
        "age": age,
        "location": location,
        "occupation": occupation,
        "day_start": day_start,
        
    })

    return df

# Generate and print the data
pseudo_df = generate_user_data(RUN_DATE_STR, num_rows=5000)
print(pseudo_df.head())

       user_id gender location occupation   day_start
0  user_002562      M   City D      Other  2022-10-18
1  user_002095      F   City B          F  2025-01-15
2  user_003486      M   City C          F  2021-01-24
3  user_004358      F   City A          F  2024-04-21
4  user_003419      F   City B          M  2022-06-25


In [None]:
import os
from dotenv import load_dotenv
dotenv_path = os.path.join("./scripts/", '.env')
load_dotenv(dotenv_path)


True

# Spark 

In [None]:
import findspark
findspark.init()
print(findspark.find())

/home/longnv95/Applications/Miniconda/envs/ame/lib/python3.8/site-packages/pyspark


In [None]:

# Import PySpark
from pyspark.sql import SparkSession

#Create SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

# Data
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]

# Columns
columns = ["language","users_count"]

# Create DataFrame
df = spark.createDataFrame(data).toDF(*columns)

# Print DataFrame
df.show()

I0000 00:00:1741367937.707676 1093992 fork_posix.cc:75] Other threads are currently calling into gRPC, skipping fork() handlers


25/03/07 17:19:05 WARN Utils: Your hostname, LongNV resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/03/07 17:19:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/03/07 17:19:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


                                                                                

+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+



In [None]:
df = df.withColumn("subtract", df["users_count"] - 1000)
df.show()

+--------+-----------+--------+
|language|users_count|subtract|
+--------+-----------+--------+
|    Java|      20000| 19000.0|
|  Python|     100000| 99000.0|
|   Scala|       3000|  2000.0|
+--------+-----------+--------+



# Ingest data to Kafka

In [None]:
# import pandas as pd
# import json
# from kafka import KafkaProducer

# # Example DataFrame (imagine this is populated with Binance data)
# data = {
#     "symbol": ["BTCUSDT", "ETHUSDT"],
#     "lastPrice": [23000.0, 1800.0],
#     "highPrice": [23200.0, 1820.0],
#     "lowPrice": [22900.0, 1785.0],
#     "quoteVolume": [500000.0, 300000.0],
#     "count": [1500, 900],
#     "date": ["2025-02-26", "2025-02-26"],
#     "hour": [0, 0]
# }
# df = pd.DataFrame(data)

# # Initialize the Kafka Producer (adjust bootstrap_servers as needed)
# producer = KafkaProducer(
#     bootstrap_servers=['localhost:9092'],
#     value_serializer=lambda v: json.dumps(v).encode('utf-8')
# )

# topic = "crypto_data"

# # Push each DataFrame row to Kafka as a JSON message
# for _, row in df.iterrows():
#     # Convert the row to a dictionary
#     message = row.to_dict()
#     producer.send(topic, message)
#     print("Sent message:", message)

# # Flush to ensure all messages are delivered
# producer.flush()

Sent message: {'symbol': 'BTCUSDT', 'lastPrice': 23000.0, 'highPrice': 23200.0, 'lowPrice': 22900.0, 'quoteVolume': 500000.0, 'count': 1500, 'date': '2025-02-26', 'hour': 0}
Sent message: {'symbol': 'ETHUSDT', 'lastPrice': 1800.0, 'highPrice': 1820.0, 'lowPrice': 1785.0, 'quoteVolume': 300000.0, 'count': 900, 'date': '2025-02-26', 'hour': 0}


In [None]:
# Create retention policy with Kafka 
from kafka.admin import KafkaAdminClient, NewTopic
from kafka import KafkaProducer
import json

# Step 1: Create a Kafka Admin Client to manage topics
admin_client = KafkaAdminClient(
    bootstrap_servers=['localhost:9092'],  # Adjust as needed
    client_id='admin'
)

# Define the topic name
topic = "crypto_data"

# Define retention period for 2 days in milliseconds (2 days = 2 * 24 * 60 * 60 * 1000)
retention_ms = str(2 * 24 * 60 * 60 * 1000)  # "172800000"

# Create a new topic with the specified retention policy
new_topic = NewTopic(
    name=topic,
    num_partitions=1, # Modify this if you have multiple nodes
    replication_factor=1, # Modify this if you have multiple nodes
    topic_configs={"retention.ms": retention_ms}
)

# Try to create the topic (if it already exists, you'll get an exception)
try:
    admin_client.create_topics(new_topics=[new_topic], validate_only=False)
    print(f"Topic '{topic}' created with retention.ms set to {retention_ms} (2 days)")
except Exception as e:
    print(f"Topic creation issue (may already exist): {e}")

# Step 2: Create a Kafka Producer to send messages
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],  # Adjust with your broker addresses
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Serialize messages as JSON
)

# Example data to send (this could be a row from your DataFrame, for instance)
data = {
    "symbols": "BTCUSDT",
    "lastPrice": 20000.0,
    "highPrice": 21000.0,
    "lowPrice": 19500.0,
    "quoteVolume": 500000.0,
    "count": 1500,
    "date": "2025-02-26",
    "hour": 14
}

# Send the data to the Kafka topic
producer.send(topic, data)
producer.flush()  # Ensure all buffered messages are sent

print("Data inserted into Kafka topic successfully!")


Topic 'crypto_data' created with retention.ms set to 172800000 (2 days)
Data inserted into Kafka topic successfully!


## Delete kafka topic

In [None]:
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError, UnknownTopicOrPartitionError

def delete_topic(bootstrap_servers='localhost:9092', topic_name='topic_name'):
    """
    Delete a Kafka topic
    Args:
        bootstrap_servers: Kafka bootstrap servers (default: 'localhost:9092')
        topic_name: Name of topic to delete (default: 'topic_name')
    """
    try:
        # Create admin client
        admin_client = KafkaAdminClient(
            bootstrap_servers=bootstrap_servers
        )
        
        # Delete topic
        admin_client.delete_topics(topics=[topic_name])
        print(f"Successfully deleted topic: {topic_name}")
        
    except UnknownTopicOrPartitionError:
        print(f"Topic {topic_name} does not exist")
    except Exception as e:
        print(f"Failed to delete topic {topic_name}. Error: {e}")
    finally:
        admin_client.close()

# Usage example
# if __name__ == "__main__":
    # delete_topic(topic_name="crypto_data")

Successfully deleted topic: crypto_data


## Test with Binace Data

In [None]:
import pandas as pd
import numpy as np
df_test = pd.read_csv("./data_daily/binance_data_2024-03-04.csv")

In [None]:
from typing import Dict, Any
import logging
import json
from kafka import KafkaProducer 
from concurrent.futures import ThreadPoolExecutor, as_completed
from kafka.errors import KafkaError

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def send_message_batch(producer, topic_name: str, messages: list, batch_size: int = 100):
    """Send a batch of messages to Kafka"""
    futures = []
    
    try:
        for i in range(0, len(messages), batch_size):
            batch = messages[i:i + batch_size]
            
            # Send batch of messages
            for message in batch:
                future = producer.send(topic_name, message)
                futures.append(future)
            
            # Wait for current batch to complete
            producer.flush()
            
            # Check for any errors in the batch
            for future in futures:
                try:
                    record_metadata = future.get(timeout=10)
                    logger.debug(f"Message sent to {record_metadata.topic}:{record_metadata.partition}:{record_metadata.offset}")
                except Exception as e:
                    logger.error(f"Failed to send message: {e}")
            
            logger.info(f"Processed batch of {len(batch)} messages")
            futures = []  # Clear futures for next batch
            
    except Exception as e:
        logger.error(f"Error in batch processing: {e}")
        raise

# Modified ingest function
def ingest_data_to_kafka(df, topic_name: str, bootstrap_servers='localhost:9092', batch_size: int = 100):
    """
    Ingest DataFrame to Kafka with batching and error handling
    Args:
        df: DataFrame to ingest
        topic_name: Kafka topic name
        bootstrap_servers: Kafka bootstrap servers
        batch_size: Number of messages per batch
    """
    # Create a Kafka Admin Client to manage topics
    admin_client = KafkaAdminClient(
        bootstrap_servers=['localhost:9092'],  # Adjust as needed
        client_id='admin'
    )

    # Define retention period for 2 days in milliseconds (2 days = 2 * 24 * 60 * 60 * 1000)
    retention_ms = str(2 * 24 * 60 * 60 * 1000)  # "172800000"

    # Create a new topic with the specified retention policy
    new_topic = NewTopic(
        name=topic_name,
        num_partitions=1, # Modify this if you have multiple nodes
        replication_factor=1, # Modify this if you have multiple nodes
        topic_configs={"retention.ms": retention_ms}
    )

    # Try to create the topic (if it already exists, you'll get an exception)
    try:
        admin_client.create_topics(new_topics=[new_topic], validate_only=False)
        print(f"Topic '{topic}' created with retention.ms set to {retention_ms} (2 days)")
    except Exception as e:
        print(f"Topic creation issue (may already exist): {e}")

    # Ingest data to Kafka
    producer = KafkaProducer(
        bootstrap_servers=[bootstrap_servers],
        value_serializer=lambda v: json.dumps(v).encode('utf-8'),
        acks='all',
        retries=3,
        batch_size=16384,
        linger_ms=100,
        compression_type='gzip'  # Add compression
    )

    try:
        # Convert DataFrame to list of dictionaries more efficiently
        messages = df.to_dict('records')
        
        # Send messages in batches
        send_message_batch(producer, topic_name, messages, batch_size)
        
        logger.info(f"Successfully sent {len(messages)} messages to topic {topic_name}")
        
    except Exception as e:
        logger.error(f"Failed to ingest data: {e}")
        raise
    finally:
        producer.close()

# Usage example:
ingest_data_to_kafka(df_test, "crypto_data", batch_size=100)

INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to localhost:9092 [('127.0.0.1', 9092) IPv4]
INFO:kafka.conn:Probing node bootstrap-0 broker version
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Connection complete.
INFO:kafka.conn:Broker version identified as 2.5.0
INFO:kafka.conn:Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
INFO:kafka.conn:<BrokerConnection node_id=1 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to localhost:9092 [('127.0.0.1', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=1 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Connection complete.
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv4 ('127.0.0.1', 9092)]>: Closing connection. 
INFO:__main__:Processed batch of 100 messages
INFO:__mai

# Process Data with Flink (Not Use)

In [None]:
import pandas as pd
df_test = pd.read_csv("./data_daily/binance_data_2024-03-04.csv")   
df_test.head()

Unnamed: 0,timestamp,open,high,low,close,volume,close_time,quote_av,trades,tb_base_av,tb_quote_av,ignore,symbol,hour
0,2024-03-03 17:00:00,62845.14,62989.0,62458.01,62570.01,1725.46165,2024-03-03 17:59:59.999,108262000.0,90105,891.18899,55921460.0,0,BTCUSDT,17
1,2024-03-03 18:00:00,62570.01,62863.68,62570.0,62811.1,957.69137,2024-03-03 18:59:59.999,60075720.0,60591,501.56021,31460420.0,0,BTCUSDT,18
2,2024-03-03 19:00:00,62811.1,62857.0,62653.89,62730.0,814.77449,2024-03-03 19:59:59.999,51121550.0,59993,384.98781,24154340.0,0,BTCUSDT,19
3,2024-03-03 20:00:00,62730.0,62859.99,62580.0,62757.99,843.07635,2024-03-03 20:59:59.999,52885400.0,51386,395.49832,24808570.0,0,BTCUSDT,20
4,2024-03-03 21:00:00,62758.0,62828.18,62623.76,62827.11,652.28143,2024-03-03 21:59:59.999,40920350.0,43335,319.10493,20019380.0,0,BTCUSDT,21


In [None]:
df_test.columns

Index(['timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time',
       'quote_av', 'trades', 'tb_base_av', 'tb_quote_av', 'ignore', 'symbol',
       'hour'],
      dtype='object')

In [None]:
import kafka
print(kafka.__version__)

2.0.2


In [None]:
spark.stop()

# Read from Kafka and write down to MinIO

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

# Create Spark session configured for MinIO (S3A)
spark = SparkSession.builder \
    .appName("Weekly_Monthly_Feature_Calculation") \
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1,"
            "io.delta:delta-core_2.12:2.4.0," # previosly 2.4.0
            "org.apache.hadoop:hadoop-aws:3.3.2,"
            "com.amazonaws:aws-java-sdk-bundle:1.12.261") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9010") \
    .config("spark.hadoop.fs.s3a.access.key", "minio_access_key") \
    .config("spark.hadoop.fs.s3a.secret.key", "minio_secret_key") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Read streaming data from Kafka topic "crypto_data"
raw_kafka_df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "crypto_data_agg_test") \
    .option("startingOffsets", "earliest") \
    .load()


# Define the schema for your JSON data (adjust field names and types as needed)
schema = StructType([
    StructField("symbol", StringType(), True),
    StructField("date", StringType(), True),
    StructField("price_daily", DoubleType(), True),
    StructField("volume_daily", DoubleType(), True),
    StructField("trades_daily", IntegerType(), True)
])

# Parse the JSON from the Kafka "value" column (which is binary)
json_df = raw_kafka_df.select(F.from_json(F.col("value").cast("string"), schema).alias("data")).select("data.*")

# Convert the string date (agg_date) to a proper date type (specify the format if needed)
json_df = json_df.withColumn("date", F.to_date(F.col("date"), "yyyy-MM-dd"))

json_df.show(5)

25/03/15 19:23:38 WARN Utils: Your hostname, LongNV resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/03/15 19:23:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/longnv95/Applications/extracts/Miniconda3-py38_4.8.3/envs/ame/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/longnv95/.ivy2/cache
The jars for the packages stored in: /home/longnv95/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
io.delta#delta-core_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2f2ca241-a581-402b-8202-e6c5735171b4;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.1 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.1 in central
	found org.slf4j#slf4j-api;2.0.6 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.

Py4JJavaError: An error occurred while calling o66.showString.
: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.retrieveAllPartitions(ConsumerStrategy.scala:66)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.retrieveAllPartitions$(ConsumerStrategy.scala:65)
	at org.apache.spark.sql.kafka010.SubscribeStrategy.retrieveAllPartitions(ConsumerStrategy.scala:102)
	at org.apache.spark.sql.kafka010.SubscribeStrategy.assignedTopicPartitions(ConsumerStrategy.scala:113)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.fetchPartitionOffsets(KafkaOffsetReaderAdmin.scala:128)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.getOffsetRangesFromUnresolvedOffsets(KafkaOffsetReaderAdmin.scala:374)
	at org.apache.spark.sql.kafka010.KafkaRelation.buildScan(KafkaRelation.scala:67)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:345)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
	at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:476)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:162)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:162)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:155)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:175)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:175)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168)
	at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)
	at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:235)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:112)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:284)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:323)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.


In [None]:
from utils.helpers import load_cfg
from minio import Minio

CFG_FILE = "./utils/config.yaml"
cfg = load_cfg(CFG_FILE)
datalake_cfg = cfg["datalake"]
crypto_data_cfg = cfg["crypto_data"]

# Initialize MinIO client
minio_client = Minio(
    datalake_cfg['endpoint'],  # Your MinIO server endpoint
    access_key=datalake_cfg['access_key'],
    secret_key=datalake_cfg['secret_key'],
    secure=False  # Set to True if using HTTPS
)

# Create bucket if it doesn't exist
bucket = 'crypto-test'
if not minio_client.bucket_exists(bucket):
    minio_client.make_bucket(bucket)

In [None]:
# Write the weekly features to Delta on MinIO using streaming write
# json_df.write \
#     .format("parquet") \
#     .mode("overwrite")\
#     .save(f"s3a://{bucket}/delta/test_features")

# from pyspark.sql import Row
# test_df = spark.createDataFrame([Row(id=1, value="test")])
# test_df.write.format("json").save("s3a://crypto-test/test_connection")
json_df.write \
    .format("delta") \
    .mode("overwrite")\
    .save(f"s3a://{bucket}/delta_test/test_features")

25/03/10 17:55:16 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
25/03/10 17:55:21 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/03/10 17:55:27 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [None]:
bucket

'crypto-test'

In [None]:
# df = spark.read.format("json").load("s3a://crypto-test/test_connection")
df = spark.read.format("delta").load(f"s3a://transaction-data/delta_test/")
df.show()

25/03/15 19:24:36 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
25/03/15 19:24:45 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 9:>                                                          (0 + 1) / 1]

+-----------+--------------+------------------+-------------+---------------+-------------------+-------------------+----------+
|    User ID|Transaction ID|            Amount|       Vendor|        Sources|               Time|          timestamp|      date|
+-----------+--------------+------------------+-------------+---------------+-------------------+-------------------+----------+
|user_000899|  658830851347|  974.933538286743|    Education|Current Account|2025-03-14 16:30:53|2025-03-14 16:30:53|2025-03-14|
|user_000719|  743653405057|257.11832424180943|Entertainment|Current Account|2025-03-14 16:12:50|2025-03-14 16:12:50|2025-03-14|
|user_000866|  730320152514| 477.4738513432964|        Other|     Debit Card|2025-03-14 19:51:52|2025-03-14 19:51:52|2025-03-14|
|user_000977|  620364528242|117.96175593527273|    Education|Current Account|2025-03-14 03:42:49|2025-03-14 03:42:49|2025-03-14|
|user_000198|  228177746961| 802.4465173614635|Entertainment|Current Account|2025-03-14 14:53:00|

                                                                                

In [None]:
from pyspark.sql import SparkSession
import os
from delta.tables import DeltaTable

# ====== 1. List all checkpoints for a streaming query ======
def list_checkpoints(checkpoint_dir):
    # List all files in the checkpoint directory
    checkpoint_files = spark.sparkContext.textFile(os.path.join(checkpoint_dir, "offsets")).collect()
    
    # Extract batch IDs from the files
    batch_ids = []
    for file in checkpoint_files:
        if "batch-" in file:
            batch_id = file.split("batch-")[1].split(".")[0]
            batch_ids.append(int(batch_id))
    
    # Sort batch IDs to show most recent first
    batch_ids.sort(reverse=True)
    
    print(f"Found {len(batch_ids)} checkpoints")
    for i, batch_id in enumerate(batch_ids[:10]):  # Show only the 10 most recent
        print(f"Batch ID: {batch_id}")
    
    return batch_ids

# ====== 2. Get metadata about a specific checkpoint ======
def get_checkpoint_info(checkpoint_dir, batch_id):
    # Read the metadata JSON file for this batch
    metadata_path = os.path.join(checkpoint_dir, f"offsets/batch-{batch_id}.json")
    metadata_df = spark.read.json(metadata_path)
    
    # Display checkpoint details
    metadata_df.show(truncate=False)
    
    return metadata_df

# ====== 3. Find the checkpoint location used by a Delta table ======
def get_delta_table_checkpoint(delta_table_path):
    # Load the Delta table
    delta_table = DeltaTable.forPath(spark, delta_table_path)
    
    # Get the transaction log and last checkpoint information
    history = delta_table.history().select("version", "timestamp", "operation", "operationParameters")
    
    # Show recent operations
    print("Recent Delta table operations:")
    history.orderBy("version", ascending=False).show(5, truncate=False)
    
    # Find streaming write operations with checkpoint info
    streaming_writes = history.filter("operation = 'STREAMING UPDATE' OR operation = 'WRITE'")
    
    # Extract checkpoint location from operation parameters
    if streaming_writes.count() > 0:
        # The checkpoint location should be in the operationParameters
        checkpoint_info = streaming_writes.first()
        
        # Parse the JSON operationParameters to extract checkpoint location
        params = streaming_writes.select("operationParameters").first()[0]
        if "checkpointLocation" in params:
            print(f"Checkpoint location: {params['checkpointLocation']}")
            return params['checkpointLocation']
        else:
            print("No checkpoint location found in operation parameters")
            return None
    else:
        print("No streaming writes found in the Delta table history")
        return None

# ====== 4. Execute the functions ======
checkpoint_path = f"s3a://{bucket}/delta_test/test_features"
# List checkpoints
print("=== Available Checkpoints ===")
batch_ids = list_checkpoints(checkpoint_path)

# # Get info about the most recent checkpoint
# if batch_ids:
#     print("\n=== Most Recent Checkpoint Info ===")
#     get_checkpoint_info(checkpoint_path, batch_ids[0])

# # Find checkpoint location used by the Delta table
# print("\n=== Delta Table Checkpoint Information ===")
# table_checkpoint = get_delta_table_checkpoint(delta_table_path)

# # ====== 5. Bonus: Get commit info from Delta table ======
# print("\n=== Delta Table Commit Information ===")
# spark.sql(f"DESCRIBE HISTORY delta.`{delta_table_path}`").show(5, truncate=False)

=== Available Checkpoints ===


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3a://crypto-test/delta_test/test_features/offsets
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:304)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:208)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:291)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:291)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:287)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1019)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1018)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Input path does not exist: s3a://crypto-test/delta_test/test_features/offsets
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:278)
	... 30 more


In [None]:
from pyspark.sql import SparkSession
import subprocess

# Path to delete
path_to_delete = "s3a://crypto-test/delta/"

def delete_s3_path(path: str):
    """
    Delete a path in S3 using the Hadoop FileSystem API
    Args:
    """
    # Get the Hadoop configuration
    hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()

    # Get the FileSystem for S3
    fs = spark.sparkContext._jvm.org.apache.hadoop.fs.FileSystem.get(
        spark.sparkContext._jvm.java.net.URI.create(path), hadoop_conf
    )

    # Create path object
    hadoop_path = spark.sparkContext._jvm.org.apache.hadoop.fs.Path(path)

    # Delete the path (true means recursive deletion)
    if fs.exists(hadoop_path):
        fs.delete(hadoop_path, True)
        print(f"Successfully deleted {path}")
    else:
        print(f"Path {path} does not exist")

In [None]:
# # Calculate weekly features: average price, total volume, and total trades per week
# weekly_df = json_df.groupBy("symbol", weekofyear("agg_date").alias("week_number")) \
#     .agg(
#         avg("avgPrice").alias("weekly_avg_price"),
#         _sum("totalVolume").alias("weekly_total_volume"),
#         _sum("totalTrades").alias("weekly_total_trades")
#     )

# # Similarly, calculate monthly features
# monthly_df = json_df.groupBy("symbol", month("agg_date").alias("month")) \
#     .agg(
#         avg("avgPrice").alias("monthly_avg_price"),
#         _sum("totalVolume").alias("monthly_total_volume"),
#         _sum("totalTrades").alias("monthly_total_trades")
#     )

# # Write the weekly features to Delta on MinIO using streaming write with a checkpoint location
# weekly_query = weekly_df.writeStream \
#     .format("delta") \
#     .outputMode("complete") \
#     .option("checkpointLocation", "s3a://your-bucket/delta/weekly_checkpoint") \
#     .start("s3a://your-bucket/delta/weekly_features")

# # Write the monthly features to Delta on MinIO using streaming write with a checkpoint location
# monthly_query = monthly_df.writeStream \
#     .format("delta") \
#     .outputMode("complete") \
#     .option("checkpointLocation", "s3a://your-bucket/delta/monthly_checkpoint") \
#     .start("s3a://your-bucket/delta/monthly_features")

# spark.streams.awaitAnyTermination()


In [None]:
from dotenv import load_dotenv
import os

dotenv_path = os.path.join("./scripts/", '.env')  # Assuming .env is in the same directory
load_dotenv(dotenv_path)

True

# Test Debezium

In [None]:
import sys
import os
import datetime as dt
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, types, MetaData, Table, Column, String, Float, text, Integer, Identity

# Expect a run date (e.g., "2025-03-20") as the first command-line argument
RUN_DATE_STR = "2025-03-02"

def generate_pseudo_data(run_date_str, num_rows=20000, num_users=1000):
    """Generates a Pandas DataFrame with pseudo transaction data."""
    # Generate pseudo user IDs
    user_ids = [f"user_{i:06d}" for i in np.random.choice(num_users, size=num_rows)]
    
    # Generate pseudo transaction IDs
    transaction_ids = np.random.randint(10**11, 10**12 - 1, size=num_rows)
    
    # Choose sources for the transactions
    sources = np.random.choice(["Current Account", "Credit Card", "Debit Card"],
                              size=num_rows, p=[0.6, 0.3, 0.1])
    
    # Generate random amounts
    amounts = np.random.uniform(1.0, 1000.0, size=num_rows)
    
    # Choose vendors from a list, with random probabilities
    vendors = ["Online Shopping", "Hospital", "Sport", "Grocery", "Restaurant",
               "Travel", "Entertainment", "Electronics", "Home Improvement",
               "Clothing", "Education", "Sending Out", "Utilities", "Other"]
    vendor_probabilities = np.random.dirichlet(np.ones(len(vendors)))
    vendor_choices = np.random.choice(vendors, size=num_rows, p=vendor_probabilities)
    
    # Generate pseudo times based on the run date
    start_date = dt.datetime.strptime(run_date_str, "%Y-%m-%d")
    time_deltas = [dt.timedelta(seconds=np.random.randint(0, 31536000)) for _ in range(num_rows)]
    times = [(start_date + delta).strftime('%Y-%m-%d %H:%M:%S') for delta in time_deltas]
    
    # Create the DataFrame
    df = pd.DataFrame({
        "User ID": user_ids,
        "Transaction ID": transaction_ids,
        "Amount": amounts,
        "Vendor": vendor_choices,
        "Sources": sources,
        "Time": times
    })
    
    return df

def insert_data_to_postgres(df, table_name="transaction_data"):
    """
    Inserts the DataFrame into a PostgreSQL table.
    It uses SQLAlchemy to connect to PostgreSQL.
    """
    # Use connection string from environment or default to the Airflow connection string
    postgres_conn_str = "postgresql+psycopg2://airflow:airflow@localhost/airflow"
    engine = create_engine(postgres_conn_str)
    
    try:
        # Define table metadata with primary key
        metadata = MetaData()
        transaction_table = Table(
            table_name, metadata,
            Column("id", Integer, Identity(), primary_key=True),  # Auto-incrementing primary key
            Column("User ID", String(255)),
            Column("Transaction ID", String(255)),
            Column("Amount", Float),
            Column("Vendor", String(255)),
            Column("Sources", String(255)),
            Column("Time", String(255)),
            schema='public'
        )
        
        # Create the table in the database
        metadata.create_all(engine)
        
        # Insert data into the table using pandas to_sql
        df.to_sql(
            name=table_name,
            con=engine,
            if_exists='append',  # Use 'append' to add data to the existing table
            index=False,
            schema='public'
        )
        
        print(f"Successfully inserted {len(df)} rows into {table_name} table at {RUN_DATE_STR}")

        # Verify the table exists
        with engine.connect() as connection:
            query = text(f"SELECT COUNT(*) FROM public.{table_name}")
            result = connection.execute(query)
            count = result.scalar()
            print(f"Table contains {count} rows")
    except Exception as e:
        raise

if __name__ == "__main__":
    # Generate pseudo transaction data
    df = generate_pseudo_data(RUN_DATE_STR)
    
    # Insert the pseudo data into PostgreSQL
    insert_data_to_postgres(df, table_name="transaction_data")


Successfully inserted 20000 rows into transaction_data table at 2025-03-02
Table contains 20000 rows


In [None]:
import requests
import json
import os
import sys
import time
import datetime as dt

def check_connector_exists():
    try:
        response = requests.get('http://localhost:8083/connectors/postgres-connector')
        return response.status_code == 200
    except requests.exceptions.RequestException as e:
        return False

def check_kafka_connect_health():
    try:
        response = requests.get('http://localhost:8083')
        return response.status_code == 200
    except requests.exceptions.RequestException:
        return False

def register_connector():
    # Wait for Kafka Connect to be ready
    retries = 30
    while retries > 0:
        if check_kafka_connect_health():
            break
        print("Waiting for Kafka Connect to be ready...")
        time.sleep(5)
        retries -= 1
    
    if retries == 0:
        raise Exception("Kafka Connect is not available after waiting")
    
    # Check if connector already exists
    if check_connector_exists():
        print("Connector already registered, checking status...")
        response = requests.get('http://localhost:8083/connectors/postgres-connector/status')
        print(f"Connector status: {response.json()}")
        sys.exit(0)
    
    # Load connector configuration
    config_path = './docker_all/config/config_debezium.json'
    if not os.path.exists(config_path):
        raise FileNotFoundError(f"Config file not found at {config_path}")
    
    with open(config_path, 'r') as f:
        connector_config = json.load(f)
    
    try:
        # Register connector
        response = requests.post(
            'http://localhost:8083/connectors',
            headers={'Content-Type': 'application/json'},
            data=json.dumps(connector_config)
        )
        
        if response.status_code == 201:
            print("Connector registered successfully")
            # Check connector status
            time.sleep(2)  # Wait for connector to start
            status_response = requests.get('http://localhost:8083/connectors/postgres-connector/status')
            print(f"Connector status: {status_response.json()}")
        else:
            print(f"Failed to register connector: {response.text}")
    except requests.exceptions.ConnectionError as e:
        print(f"Failed to connect to Kafka Connect: {e}")
        raise

if __name__ == "__main__":
    register_connector()

In [None]:
check_connector_exists()

True

In [None]:
#!/usr/bin/env python3
import sys
import os
import json
import logging
import datetime as dt
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType
from dotenv import load_dotenv
from minio import Minio

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Get run date from command line argument
RUN_DATE_STR = sys.argv[1]
RUN_DATE = dt.datetime.strptime(RUN_DATE_STR, "%Y-%m-%d")
RUN_DATE_STR_7DAYS = (RUN_DATE - dt.timedelta(days=7)).strftime('%Y-%m-%d')

# Environment variables
KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "kafka:29092")
CDC_TOPIC = "dbserver1.airflow.transaction_data"  # Adjust if your topic name is different
POSTGRES_CONN_STR = os.getenv("POSTGRES_CONN_STR", "jdbc:postgresql://postgres:5432/airflow")
POSTGRES_USER = os.getenv("POSTGRES_USER", "airflow")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "airflow")
MINIO_ENDPOINT = os.getenv("S3_ENDPOINT", "minio:9000")
MINIO_ACCESS_KEY = os.getenv("S3_ACCESS_KEY", "minio_access_key")
MINIO_SECRET_KEY = os.getenv("S3_SECRET_KEY", "minio_secret_key")
MINIO_BUCKET = os.getenv("MINIO_BUCKET", "features")

def main():
    logger.info(f"Starting data processing for date range: {RUN_DATE_STR_7DAYS} to {RUN_DATE_STR}")
    
    # Create Spark session configured for MinIO (S3A)
    spark = SparkSession.builder \
        .appName("Combined_CDC_PostgreSQL_Processing") \
        .config("spark.jars.packages", 
                "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1,"
                "io.delta:delta-core_2.12:2.4.0,"
                "org.apache.hadoop:hadoop-aws:3.3.2,"
                "com.amazonaws:aws-java-sdk-bundle:1.12.261,"
                "org.postgresql:postgresql:42.5.1") \
        .config("spark.hadoop.fs.s3a.endpoint", f"http://{MINIO_ENDPOINT}") \
        .config("spark.hadoop.fs.s3a.access.key", MINIO_ACCESS_KEY) \
        .config("spark.hadoop.fs.s3a.secret.key", MINIO_SECRET_KEY) \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .master("local[*]") \
        .getOrCreate()
    
    # 1. Process CDC events from Kafka
    cdc_df = process_cdc_events(spark)
    
    # 2. Query PostgreSQL data directly (for data that might not have CDC events yet)
    postgres_df = query_postgres_data(spark)
    
    # 3. Combine the data (union the dataframes)
    combined_df = combine_data(cdc_df, postgres_df)
    
    # 4. Calculate features
    features_df = calculate_features(combined_df)
    
    # 5. Write results to MinIO using Delta format
    write_to_minio(features_df)
    
    spark.stop()
    logger.info("Processing completed successfully")

def process_cdc_events(spark):
    """Process CDC events from Kafka"""
    logger.info(f"Reading CDC events from Kafka topic: {CDC_TOPIC}")
    
    # Read data from Kafka CDC topic
    raw_kafka_df = spark.read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
        .option("subscribe", CDC_TOPIC) \
        .option("startingOffsets", "earliest") \
        .load()
    
    # Define schema for Debezium CDC events
    # Note: This schema is specific to Debezium's PostgreSQL connector output format
    cdc_schema = StructType([
        StructField("schema", StringType(), True),
        StructField("payload", StructType([
            StructField("after", StringType(), True),  # Data after change
            StructField("before", StringType(), True), # Data before change (for updates/deletes)
            StructField("op", StringType(), True),     # Operation type (c=create, u=update, d=delete)
            StructField("ts_ms", DoubleType(), True)   # Timestamp of the change
        ]), True)
    ])
    
    # Parse the value column (which contains the CDC event JSON)
    parsed_df = raw_kafka_df \
        .selectExpr("CAST(value AS STRING) as json_value") \
        .select(F.from_json(F.col("json_value"), cdc_schema).alias("cdc")) \
        .select("cdc.payload.*")
    
    # Define schema for the transaction data
    transaction_schema = StructType([
        StructField("User ID", StringType(), True),
        StructField("Transaction ID", StringType(), True),
        StructField("Amount", DoubleType(), True),
        StructField("Vendor", StringType(), True),
        StructField("Sources", StringType(), True),
        StructField("Time", StringType(), True)
    ])
    
    # Extract and parse the 'after' data (which contains the actual transaction data)
    # Filter to only include 'create' and 'update' operations (skip deletes)
    transactions_df = parsed_df \
        .where("op IN ('c', 'u')") \
        .select(
            F.from_json(F.col("after"), transaction_schema).alias("data"),
            F.col("ts_ms").alias("cdc_timestamp")
        ) \
        .select("data.*", "cdc_timestamp")
    
    # Convert timestamp string to proper timestamp and add date column
    result_df = transactions_df \
        .withColumn("timestamp", F.to_timestamp(F.col("Time"), "yyyy-MM-dd HH:mm:ss")) \
        .withColumn("date", F.to_date(F.col("timestamp"))) \
        .where(f"date BETWEEN '{RUN_DATE_STR_7DAYS}' AND '{RUN_DATE_STR}'") \
        .drop("cdc_timestamp")  # Remove the CDC timestamp as we don't need it anymore
    
    logger.info(f"Processed {result_df.count()} CDC events")
    return result_df

def query_postgres_data(spark):
    """Query PostgreSQL data directly"""
    logger.info("Querying PostgreSQL database directly")
    
    # Read directly from PostgreSQL for recent data that might not be in CDC yet
    postgres_df = spark.read \
        .format("jdbc") \
        .option("url", POSTGRES_CONN_STR) \
        .option("dbtable", "transaction_data") \
        .option("user", POSTGRES_USER) \
        .option("password", POSTGRES_PASSWORD) \
        .option("driver", "org.postgresql.Driver") \
        .load()
    
    # Convert timestamp and filter by date range
    result_df = postgres_df \
        .withColumn("timestamp", F.to_timestamp(F.col("Time"), "yyyy-MM-dd HH:mm:ss")) \
        .withColumn("date", F.to_date(F.col("timestamp"))) \
        .where(f"date BETWEEN '{RUN_DATE_STR_7DAYS}' AND '{RUN_DATE_STR}'")
    
    logger.info(f"Retrieved {result_df.count()} records from PostgreSQL")
    return result_df

def combine_data(cdc_df, postgres_df):
    """Combine CDC events and PostgreSQL data, removing duplicates"""
    logger.info("Combining CDC events and PostgreSQL data")
    
    # Union the dataframes
    combined_df = cdc_df.unionByName(postgres_df)
    
    # Remove duplicates based on Transaction ID (keeping the latest record)
    deduplicated_df = combined_df \
        .withColumn("row_num", F.row_number().over(
            F.window.partitionBy("Transaction ID").orderBy(F.desc("timestamp"))
        )) \
        .where("row_num = 1") \
        .drop("row_num")
    
    logger.info(f"Combined data has {deduplicated_df.count()} unique transactions after deduplication")
    return deduplicated_df

def calculate_features(df):
    """Calculate features from the transaction data"""
    logger.info("Calculating features from transaction data")
    
    # Calculate weekly features (l1w = last 1 week)
    time_window = "l1w"
    features_df = df.groupBy("User ID") \
        .agg(
            F.count("Transaction ID").alias(f"num_transactions_{time_window}"),
            F.sum("Amount").alias(f"total_amount_{time_window}"),
            F.avg("Amount").alias(f"avg_amount_{time_window}"),
            F.min("Amount").alias(f"min_amount_{time_window}"),
            F.max("Amount").alias(f"max_amount_{time_window}"),
            F.countDistinct("Vendor").alias(f"num_vendors_{time_window}"),
            F.countDistinct("Sources").alias(f"num_sources_{time_window}")
        )
    
    # Add a timestamp for when these features were calculated
    features_df = features_df.withColumn("feature_timestamp", F.current_timestamp())
    
    logger.info(f"Calculated features for {features_df.count()} users")
    return features_df

def write_to_minio(df):
    """Write features to MinIO using Delta format"""
    logger.info(f"Writing features to MinIO bucket: {MINIO_BUCKET}")
    
    # Ensure MinIO bucket exists
    minio_client = Minio(
        MINIO_ENDPOINT,
        access_key=MINIO_ACCESS_KEY,
        secret_key=MINIO_SECRET_KEY,
        secure=False
    )
    
    if not minio_client.bucket_exists(MINIO_BUCKET):
        minio_client.make_bucket(MINIO_BUCKET)
        logger.info(f"Created new bucket: {MINIO_BUCKET}")
    
    # Write to MinIO using Delta format
    df.write \
        .format("delta") \
        .mode("overwrite") \
        .option("delta.columnMapping.mode", "name") \
        .option("delta.minReaderVersion", "2") \
        .option("delta.minWriterVersion", "5") \
        .save(f"s3a://{MINIO_BUCKET}/features/date={RUN_DATE_STR}")
    
    logger.info(f"Successfully wrote features to s3a://{MINIO_BUCKET}/features/date={RUN_DATE_STR}")

if __name__ == "__main__":
    main()


In [None]:
import sys
import os
import json
import datetime as dt
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window
from dotenv import load_dotenv
from minio import Minio

# Get run date from command line argument
# RUN_DATE_STR = sys.argv[1]
# RUN_DATE = dt.datetime.strptime(RUN_DATE_STR, "%Y-%m-%d")
# RUN_DATE_STR_7DAYS = (RUN_DATE - dt.timedelta(days=7)).strftime('%Y-%m-%d')
dotenv_path = os.path.join("./scripts/", '.env') 
load_dotenv(dotenv_path)

# Environment variables
KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
CDC_TRANSACTION_TOPIC = os.getenv("CDC_TRANSACTION_TOPIC")
POSTGRES_CON_STR = os.getenv("POSTGRES_CON_STR")
POSTGRES_USER = os.getenv("POSTGRES_USER")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD")
MINIO_ENDPOINT = os.getenv("S3_ENDPOINT")
MINIO_ACCESS_KEY = os.getenv("S3_ACCESS_KEY")
MINIO_SECRET_KEY = os.getenv("S3_SECRET_KEY")
MINIO_BUCKET = os.getenv("MINIO_BUCKET")

    
# Create Spark session configured for MinIO (S3A)
spark = SparkSession.builder \
    .appName("Combined_CDC_PostgreSQL_Processing") \
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1,"
            "io.delta:delta-core_2.12:2.4.0,"
            "org.apache.hadoop:hadoop-aws:3.3.2,"
            "com.amazonaws:aws-java-sdk-bundle:1.12.261,"
            "org.postgresql:postgresql:42.5.1") \
    .config("spark.hadoop.fs.s3a.endpoint", f"http://{MINIO_ENDPOINT}") \
    .config("spark.hadoop.fs.s3a.access.key", MINIO_ACCESS_KEY) \
    .config("spark.hadoop.fs.s3a.secret.key", MINIO_SECRET_KEY) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .master("local[*]") \
    .getOrCreate()

In [None]:
raw_kafka_df = spark.read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", CDC_TRANSACTION_TOPIC) \
        .option("startingOffsets", "earliest") \
        .load()

# Define the nested schema for transaction data fields
transaction_fields = [
    T.StructField("id", T.IntegerType(), False),
    T.StructField("User ID", T.StringType(), True),
    T.StructField("Transaction ID", T.StringType(), True),
    T.StructField("Amount", T.DoubleType(), True),
    T.StructField("Vendor", T.StringType(), True),
    T.StructField("Sources", T.StringType(), True),
    T.StructField("Time", T.StringType(), True)
]

# Define the nested schema for source
source_fields = [
    T.StructField("version", T.StringType(), False),
    T.StructField("connector", T.StringType(), False),
    T.StructField("name", T.StringType(), False),
    T.StructField("ts_ms", T.LongType(), False),
    T.StructField("snapshot", T.StringType(), True),
    T.StructField("db", T.StringType(), False),
    T.StructField("sequence", T.StringType(), True),
    T.StructField("schema", T.StringType(), False),
    T.StructField("table", T.StringType(), False),
    T.StructField("txId", T.LongType(), True),
    T.StructField("lsn", T.LongType(), True),
    T.StructField("xmin", T.LongType(), True)
]

# Define the schema for transaction block
transaction_block_fields = [
    T.StructField("id", T.StringType(), False),
    T.StructField("total_order", T.LongType(), False),
    T.StructField("data_collection_order", T.LongType(), False)
]

# Complete CDC schema
cdc_schema = T.StructType([
    T.StructField("schema", T.StructType([
        T.StructField("type", T.StringType()),
        T.StructField("fields", T.ArrayType(T.StringType())),
        T.StructField("optional", T.BooleanType()),
        T.StructField("name", T.StringType()),
        T.StructField("version", T.IntegerType())
    ])),
    T.StructField("payload", T.StructType([
        T.StructField("before", T.StructType(transaction_fields), True),
        T.StructField("after", T.StructType(transaction_fields), True), 
        T.StructField("source", T.StructType(source_fields), False),
        T.StructField("op", T.StringType(), False),
        T.StructField("ts_ms", T.LongType(), True),
        T.StructField("transaction", T.StructType(transaction_block_fields), True)
    ]))
])

# Parse the JSON data
cdc_df = raw_kafka_df \
  .selectExpr("CAST(value AS STRING) as json_str") \
  .withColumn("parsed_data", F.from_json(F.col("json_str"), cdc_schema)) \
  .select("parsed_data.payload.*")

# Now show the parsed data
cdc_df.show(5)

# To get just the transaction data after the change
transaction_data = cdc_df.where('''op in ('c','u')''').select(
    "op", 
    "after.id", 
    "after.`User ID`", 
    "after.`Transaction ID`", 
    "after.Amount", 
    "after.Vendor", 
    "after.Sources", 
    "after.Time",
    "source.ts_ms"
)

w = Window.partitionBy("id").orderBy(F.col("ts_ms").desc())
transaction_data = transaction_data.withColumn("rank", F.row_number().over(w)) \
                        .filter(F.col("rank") == 1) \
                        .drop("rank")

transaction_data.show(5)

25/03/21 18:11:32 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


+------+--------------------+--------------------+---+-------------+-----------+
|before|               after|              source| op|        ts_ms|transaction|
+------+--------------------+--------------------+---+-------------+-----------+
|  null|{1, user_000377, ...|{2.3.4.Final, pos...|  r|1742498526222|       null|
|  null|{2, user_000981, ...|{2.3.4.Final, pos...|  r|1742498526233|       null|
|  null|{3, user_000836, ...|{2.3.4.Final, pos...|  r|1742498526234|       null|
|  null|{4, user_000962, ...|{2.3.4.Final, pos...|  r|1742498526234|       null|
|  null|{5, user_000640, ...|{2.3.4.Final, pos...|  r|1742498526234|       null|
+------+--------------------+--------------------+---+-------------+-----------+
only showing top 5 rows



25/03/21 18:11:33 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
[Stage 18:>                                                         (0 + 1) / 1]

+---+---+-----------+--------------+-----------------+-----------+---------------+-------------------+-------------+
| op| id|    User ID|Transaction ID|           Amount|     Vendor|        Sources|               Time|        ts_ms|
+---+---+-----------+--------------+-----------------+-----------+---------------+-------------------+-------------+
|  u|  4|user_000962|  457239087380| 174.213256648849|Sending Out|Current Account|2025-06-09 18:27:06|1742578440256|
|  u| 48|user_000468|  155352729109|515.6645561652284|Sending Out|Current Account|2025-03-27 00:14:38|1742578440256|
|  u| 51|user_000303|  732970514123| 950.034224384725|Sending Out|Current Account|2025-04-20 09:00:18|1742578440256|
|  u| 81|user_000013|  863604161233| 868.166665533098|Sending Out|Current Account|2025-04-09 09:08:31|1742578440256|
|  u| 85|user_000324|  619489722520| 317.590035743889|Sending Out|Current Account|2026-01-01 16:36:22|1742578440256|
+---+---+-----------+--------------+-----------------+----------

                                                                                

In [None]:
postgres_df = spark.read \
    .format("jdbc") \
    .option("url", POSTGRES_CON_STR) \
    .option("dbtable", "transaction_data") \
    .option("user", POSTGRES_USER) \
    .option("password", POSTGRES_PASSWORD) \
    .option("driver", "org.postgresql.Driver") \
    .load()


postgres_df = postgres_df.withColumn("timestamp", F.to_timestamp(F.col("Time"), "yyyy-MM-dd HH:mm:ss")) \
                        .withColumn("date", F.to_date(F.col("timestamp")))

In [None]:
postgres_df.show(5)

+---+-----------+--------------+-----------------+---------------+---------------+-------------------+-------------------+----------+
| id|    User ID|Transaction ID|           Amount|         Vendor|        Sources|               Time|          timestamp|      date|
+---+-----------+--------------+-----------------+---------------+---------------+-------------------+-------------------+----------+
|  1|user_000377|  989600621541|458.3600134428063|      Education|    Credit Card|2025-05-27 01:14:33|2025-05-27 01:14:33|2025-05-27|
|  2|user_000981|  344360205463|38.14437463217147|      Education|    Credit Card|2025-07-30 11:36:25|2025-07-30 11:36:25|2025-07-30|
|  3|user_000836|  754588743418|988.4897927900346|      Education|Current Account|2025-04-21 18:01:59|2025-04-21 18:01:59|2025-04-21|
|  5|user_000640|  413270206972|78.78715816071131|Online Shopping|    Credit Card|2025-03-24 10:52:26|2025-03-24 10:52:26|2025-03-24|
|  6|user_000205|  754963731286|943.7920762607532|  Entertainm

# Feast

In [None]:
# !pip install feast==0.34.1 delta-spark==2.4.0 feast-spark

In [1]:
import sys
import os
import json
import datetime as dt
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window
from dotenv import load_dotenv
from minio import Minio

# Get run date from command line argument
# RUN_DATE_STR = sys.argv[1]
# RUN_DATE = dt.datetime.strptime(RUN_DATE_STR, "%Y-%m-%d")
# RUN_DATE_STR_7DAYS = (RUN_DATE - dt.timedelta(days=7)).strftime('%Y-%m-%d')
dotenv_path = os.path.join("./scripts/", '.env') 
load_dotenv(dotenv_path)

# Environment variables
KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
CDC_TRANSACTION_TOPIC = os.getenv("CDC_TRANSACTION_TOPIC")
POSTGRES_CON_STR = os.getenv("POSTGRES_CON_STR")
POSTGRES_USER = os.getenv("POSTGRES_USER")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD")
MINIO_ENDPOINT = os.getenv("S3_ENDPOINT")
MINIO_ACCESS_KEY = os.getenv("S3_ACCESS_KEY")
MINIO_SECRET_KEY = os.getenv("S3_SECRET_KEY")
MINIO_BUCKET = os.getenv("MINIO_BUCKET")
S3_ENDPOINT_LOCAL = os.getenv("S3_ENDPOINT_LOCAL")

    
# Create Spark session configured for MinIO (S3A)
spark = SparkSession.builder \
    .appName("Combined_CDC_PostgreSQL_Processing") \
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1,"
            "io.delta:delta-core_2.12:2.4.0,"
            "org.apache.hadoop:hadoop-aws:3.3.2,"
            "com.amazonaws:aws-java-sdk-bundle:1.12.261,"
            "org.postgresql:postgresql:42.5.1") \
    .config("spark.hadoop.fs.s3a.endpoint", f"http://{S3_ENDPOINT_LOCAL}") \
    .config("spark.hadoop.fs.s3a.access.key", MINIO_ACCESS_KEY) \
    .config("spark.hadoop.fs.s3a.secret.key", MINIO_SECRET_KEY) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .master("local[*]") \
    .getOrCreate()

25/03/24 16:21:28 WARN Utils: Your hostname, LongNV resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/03/24 16:21:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/longnv95/Applications/extracts/Miniconda3-py38_4.8.3/envs/ame/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/longnv95/.ivy2/cache
The jars for the packages stored in: /home/longnv95/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
io.delta#delta-core_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e3e4bae7-1c7d-407a-8c40-1b09f6b58755;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.1 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.1 in central
	found org.slf4j#slf4j-api;2.0.6 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#com

In [2]:
transaction_data = spark.read.format("delta").load('s3a://transaction-data/features')

25/03/24 16:21:40 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [3]:
transaction_data.show(5)

25/03/24 16:21:58 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 9:>                                                          (0 + 1) / 1]

+-----------+--------------------+------------------+------------------+------------------+-----------------+---------------+---------------+
|    User ID|num_transactions_l1w|  total_amount_l1w|    avg_amount_l1w|    min_amount_l1w|   max_amount_l1w|num_vendors_l1w|num_sources_l1w|
+-----------+--------------------+------------------+------------------+------------------+-----------------+---------------+---------------+
|user_000577|                   2| 991.0696512544381|495.53482562721905|361.35423266149826|629.7154185929398|              2|              2|
|user_000113|                   2|1096.2816446042389| 548.1408223021194| 129.9053398401846|966.3763047640543|              2|              1|
|user_000066|                   4| 2821.186816639094| 705.2967041597735|262.00653776009557|990.0429033721248|              3|              2|
|user_000708|                   1|  761.276068586942|  761.276068586942|  761.276068586942| 761.276068586942|              1|              1|
|user_

                                                                                

In [5]:
transaction_data = transaction_data.withColumn("date", F.lit('2025-03-02'))

In [15]:
# feature_definitions.py
from datetime import timedelta
from feast import Entity, Feature, FeatureView, Field
from feast.infra.offline_stores.file_source import FileSource  # Updated import
from feast.types import Float32, Int64, String

# Define the customer entity
customer = Entity(
    name="customer",
    join_keys=["user_id"],
    description="Customer identifier"
)

# Customer demographic features
customer_source = FileSource(
    path="s3a://transaction-data-user/demographic/",
    event_timestamp_column=None,
)

customer_features = FeatureView(
    name="customer_demographics",
    entities=[customer],
    ttl=timedelta(days=365),
    schema=[
        Field(name="user_id", dtype=String),
        Field(name="age", dtype=Int64),
        Field(name="gender", dtype=String),
        Field(name="location", dtype=String),
        Field(name="occupation", dtype=String),
    ],
    source=customer_source,
    online=False
)

# Transaction features
transaction_source = FileSource(
    path="s3a://transaction-data/features/",
    event_timestamp_column="date",  # Assuming you have this column
)

transaction_features = FeatureView(
    name="transaction_features",
    entities=[customer],
    ttl=timedelta(days=30),
    schema=[
        Field(name="user_id", dtype=String),
        Field(name="num_transactions_1w", dtype=Int64),
        Field(name="total_amount_1w", dtype=Float32),
        Field(name="avg_amount_1w", dtype=Float32),
        Field(name="min_amount_1w", dtype=Float32),
        Field(name="max_amount_1w", dtype=Float32),
        Field(name="num_vendors_1w", dtype=Int64),
        Field(name="num_sources_1w", dtype=Int64),
    ],
    source=transaction_source,
    online=False
)

In [16]:
# pip install 'feast[aws]'

In [17]:
from feast import FeatureStore
from dotenv import load_dotenv
import os
from datetime import datetime

# Load environment variables
dotenv_path = os.path.join("./scripts/", '.env') 
load_dotenv(dotenv_path)

# Set up feature store path (remove trailing slash)
FEATURE_STORE_PATH = "/home/longnv95/Coding/MLOPs/final_project/feature_store"

# Initialize the feature store
store = FeatureStore(repo_path=FEATURE_STORE_PATH)

# Apply feature definitions
store.apply([customer, customer_features, transaction_features])

# Materialize features
store.materialize_incremental(end_date=datetime.now())

ArrowInvalid: Expected a local filesystem path, got a URI: 's3a://transaction-data-user/demographic/'

In [None]:
```yaml
project: feature_store_demo
registry: data/registry.db # Or registry: registry.db
provider: local
online_store:
    type: sqlite
    path: data/online_store.db # Path relative to feature_repo directory
offline_store:
  type: file
  file_options:
    file_url_scheme: s3a
    s3_endpoint_url: http://{MINIO_ENDPOINT}
    s3_access_key: {MINIO_ACCESS_KEY}
    s3_secret_key: {MINIO_SECRET_KEY}
entity_key_serialization_version: 2
```

```python
from datetime import timedelta
from pathlib import Path
import os
from feast import Entity, FeatureView, Field, FileSource, FeatureStore
from feast.types import Float32, Int64
from dotenv import load_dotenv

dotenv_path = os.path.join("/opt/airflow/external_scripts/", '.env')  # Assuming .env is in the same directory
load_dotenv(dotenv_path)

# MinIO keys and access points
MINIO_ENDPOINT = os.getenv("S3_ENDPOINT")
MINIO_ACCESS_KEY = os.getenv("S3_ACCESS_KEY")
MINIO_SECRET_KEY = os.getenv("S3_SECRET_KEY")
MINIO_BUCKET = os.getenv("MINIO_BUCKET")

# Path to your existing data in MinIO (adjust as needed)
DATA_PATH = f"s3a://{MINIO_BUCKET}/customer_features.parquet"

# Create feature repo directory
repo_path = Path("feature_repo")
os.makedirs(repo_path, exist_ok=True)

# Define entity
customer = Entity(
    name="customer",
    join_keys=["customer_id"],
    description="Customer identifier"
)

# Define data source - pointing to your existing MinIO data
customer_source = FileSource(
    name="customer_source",
    path=DATA_PATH,
    timestamp_field="event_timestamp",
)

# Define feature view
customer_features = FeatureView(
    name="customer_features",
    entities=[customer],
    schema=[
        Field(name="daily_transactions", dtype=Int64),
        Field(name="total_spend", dtype=Float32),
        Field(name="average_basket_size", dtype=Float32),
    ],
    source=customer_source,
    ttl=timedelta(days=30),
)

fs = FeatureStore(repo_path=repo_path)

# Apply feature definitions
fs.apply([customer, customer_features])
print(f"Feature registry created at: {repo_path}/registry.db")

# ... (after fs.apply)
from datetime import datetime

print("Materializing latest features to online store...")
# Load the latest features for all entities up to the current time
fs.materialize_incremental(end_date=datetime.now())
print("Materialization complete.")

print("Feature store successfully set up with MinIO as the offline store and SQLite as the online store!")
```

```python
from feast import FeatureStore
from pathlib import Path

# Initialize feature store (only once)
fs = FeatureStore(repo_path=Path("feature_repo"))

# Get online features for specific customers
features = fs.get_online_features(
    features=[
        "customer_features:daily_transactions", 
        "customer_features:total_spend",
        "customer_features:average_basket_size"
    ],
    entity_rows=[{"customer_id": 1}, {"customer_id": 2}]
).to_dict()

print(features)
```