In [0]:
# Generic stream generathor:
import time 
from sdv.single_table import GaussianCopulaSynthesizer
from sdv.metadata import SingleTableMetadata
import pandas 
from pathlib import Path
import pickle 
from datetime import datetime
import random

In [0]:
# Generic stream generathor:

def generate_stream(
    save_path:str,
    city:str,
):
    """
    Generic data based on the distribution of the sample data columns 
    -price
    -nb of rooms
    -proprety time ...
    this stream will be appended to a delta table 
    """
    model_path = Path("real_estate_management.sample_data.synthetic_data_generator")
    with open(model_path,'rb') as f :
        model = pickle.load(f)

    while True :
        try :
            nb_rows = random.randint(1,30)

            data = model.sample(nb_rows)
            data['timestamp'] = datetime.now().strftime("%Y-%d-%m %H-%M-%S")
            data['city'] = city
            data.columns = [col.replace(' ', '_').strip() for col in data.columns]

            df = spark.createDataFrame(data)
            df.write.format("delta").mode("append").save(save_path)
            print(f"[{datetime.now()}] Written {nb_rows} rows to {city}")

            batch_interval = random.randint(1,60)
            time.sleep(batch_interval)
        except Exception as e :
            print(f"Error in stream for {city}: {str(e)}")
            time.sleep(batch_interval)

In [0]:
from concurrent.futures import  ThreadPoolExecutor

streams =  [
    ("/Volumes/real_estate_management/00_landing/streaming_data/RE_Rabat/","rabat"),
    ("/Volumes/real_estate_management/00_landing/streaming_data/RE_casablanca/","casablanca"),
    ("/Volumes/real_estate_management/00_landing/streaming_data/RE_tanger/","tanger") ]

with ThreadPoolExecutor(max_workers=len(streams)) as executor:
    futures = [executor.submit(generate_stream, save_path, city) 
               for save_path, city in streams]
