In [1]:
# Load configuration from .env file
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Get configuration from environment variables
SQL_SERVER_NAME = os.getenv('SQL_SERVER_NAME')
SQL_DATABASE = os.getenv('SQL_DATABASE', 'fabricMirrorDemoDb')
SQL_USER = os.getenv('SQL_USER')
SQL_PASSWORD = os.getenv('SQL_PASSWORD')
BATCH_SIZE = int(os.getenv('BATCH_SIZE', '50'))
SLEEP_SECONDS = int(os.getenv('SLEEP_SECONDS', '2'))

# Validate required configuration
if not all([SQL_SERVER_NAME, SQL_USER, SQL_PASSWORD]):
    raise ValueError("Missing required environment variables. Please check your .env file.")
    
# Build full server name
SQL_SERVER = f"{SQL_SERVER_NAME}.database.windows.net"
print(f"Configuration loaded for server: {SQL_SERVER}, database: {SQL_DATABASE}")
print(f"Streaming config: {BATCH_SIZE} rows every {SLEEP_SECONDS} seconds")

Configuration loaded for server: sql-mirror-westus3-flopes.database.windows.net, database: fabricMirrorDemoDb
Streaming config: 50 rows every 2 seconds


# Continuous sales writer
This notebook generates random sales rows and continuously inserts them into dbo.FactSales to drive Fabric Mirroring. Stop the cell to end the stream.

In [2]:
import pyodbc, random, time, datetime
conn_str = ''.join((
    f"DRIVER={{ODBC Driver 18 for SQL Server}};"
    f"SERVER={SQL_SERVER};"
    f"DATABASE={SQL_DATABASE};"
    f"UID={SQL_USER};"
    f"PWD={SQL_PASSWORD};"
    "Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;",
))
conn = pyodbc.connect(conn_str, autocommit=True)
cur = conn.cursor()
print('Connected')

# Get existing keys for distribution
cur.execute("SELECT StoreKey, CountryKey FROM dbo.DimStore")
store_pairs = cur.fetchall()
cur.execute("SELECT ProductKey, UnitPrice FROM dbo.DimProduct")
product_pairs = cur.fetchall()

if not store_pairs:
    raise RuntimeError('No stores found. Load Dim tables first.')
if not product_pairs:
    raise RuntimeError('No products found. Load Dim tables first.')

Connected


In [3]:
def gen_row():
    sk, ck = random.choice(store_pairs)
    pk, unit_price = random.choice(product_pairs)
    qty = random.randint(1, 5)
    # Use product unit price with small random variation
    actual_unit_price = round(float(unit_price) * random.uniform(0.9, 1.1), 2)
    amt = round(actual_unit_price * qty, 2)
    
    # Generate sale timestamp within the last few minutes (simulating real-time sales)
    minutes_ago = random.randint(0, 5)  # Sales from 0-5 minutes ago
    seconds_ago = random.randint(0, 59)
    sale_time = datetime.datetime.now(datetime.UTC) - datetime.timedelta(minutes=minutes_ago, seconds=seconds_ago)
    
    return sk, ck, pk, sale_time, qty, actual_unit_price, amt

insert_sql = (
    "INSERT INTO dbo.FactSales (StoreKey, CountryKey, ProductKey, SaleTimestamp, Quantity, UnitPrice, Amount) "
    "VALUES (?,?,?,?,?,?,?)"
)

In [4]:
print('Starting real-time sales stream. Stop the cell to end.')
print(f'Configuration: {BATCH_SIZE} sales every {SLEEP_SECONDS} seconds')

batch_count = 0
total_sales = 0
start_time = datetime.datetime.now(datetime.UTC)

while True:
    batch_count += 1
    rows = [gen_row() for _ in range(BATCH_SIZE)]
    cur.fast_executemany = True
    cur.executemany(insert_sql, rows)
    total_sales += len(rows)
    
    # Calculate running statistics
    current_time = datetime.datetime.now(datetime.UTC)
    elapsed_minutes = (current_time - start_time).total_seconds() / 60
    sales_per_minute = total_sales / elapsed_minutes if elapsed_minutes > 0 else 0
    
    # Get timestamp range of this batch for monitoring
    batch_timestamps = [row[3] for row in rows]  # SaleTimestamp is index 3
    min_sale_time = min(batch_timestamps)
    max_sale_time = max(batch_timestamps)
    
    print(f'Batch #{batch_count}: Inserted {len(rows)} sales | '
          f'Total: {total_sales} | Rate: {sales_per_minute:.1f}/min | '
          f'Sale Times: {min_sale_time.strftime("%H:%M:%S")} - {max_sale_time.strftime("%H:%M:%S")} | '
          f'Inserted at: {current_time.strftime("%H:%M:%S")}')
    
    time.sleep(SLEEP_SECONDS)

Starting real-time sales stream. Stop the cell to end.
Configuration: 50 sales every 2 seconds
Batch #1: Inserted 50 sales | Total: 50 | Rate: 4293.8/min | Sale Times: 15:07:53 - 15:13:38 | Inserted at: 15:13:39
Batch #1: Inserted 50 sales | Total: 50 | Rate: 4293.8/min | Sale Times: 15:07:53 - 15:13:38 | Inserted at: 15:13:39
Batch #2: Inserted 50 sales | Total: 100 | Rate: 1883.4/min | Sale Times: 15:07:47 - 15:13:39 | Inserted at: 15:13:42
Batch #2: Inserted 50 sales | Total: 100 | Rate: 1883.4/min | Sale Times: 15:07:47 - 15:13:39 | Inserted at: 15:13:42
Batch #3: Inserted 50 sales | Total: 150 | Rate: 1585.3/min | Sale Times: 15:07:48 - 15:13:36 | Inserted at: 15:13:44
Batch #3: Inserted 50 sales | Total: 150 | Rate: 1585.3/min | Sale Times: 15:07:48 - 15:13:36 | Inserted at: 15:13:44
Batch #4: Inserted 50 sales | Total: 200 | Rate: 1470.5/min | Sale Times: 15:07:48 - 15:13:33 | Inserted at: 15:13:47
Batch #4: Inserted 50 sales | Total: 200 | Rate: 1470.5/min | Sale Times: 15:07:4

OperationalError: ('08S01', '[08S01] [Microsoft][ODBC Driver 18 for SQL Server]Communication link failure (0) (SQLExecute); [08S01] [Microsoft][ODBC Driver 18 for SQL Server]The connection is broken and recovery is not possible. The client driver attempted to recover the connection one or more times and all attempts failed. Increase the value of ConnectRetryCount to increase the number of recovery attempts. (0)')