##Customer Data Generator

In [0]:
from pyspark.sql import SparkSession
import random, string
from datetime import datetime

In [0]:
%sql
create schema if not exists bronze_dev.dlt_demo_files;
create volume if not exists bronze_dev.dlt_demo_files.demo_synthetic_data;

In [0]:
catalog = 'bronze_dev'
schema = 'dlt_demo_files'
volume = 'demo_synthetic_data'
subdir = 'file_ingestion'
VOLUME_PATH = f'/Volumes/{catalog}/{schema}/{volume}/{subdir}'

In [0]:
def rand_email():
    import random, string
    user = ''.join(random.choices(string.ascii_lowercase, k=8))
    domain = random.choice(['example.com', 'random.net', 'test.org'])
    return f'{user}@{domain}'

def make_records(n=22250):
    ts = datetime.utcnow().isoformat() + "Z"
    import random
    category_code = ['AA', 'BBB', 'YYQ', 'ZZT']
    names = ['John', 'Jane', 'Sam', 'Alice', 'Mary', 'Joe', 'Sue', 'Bill', 'Ted', 'Sally', 'Mike', 'Pete', 'Mary', 'Sue']
    return [{'id': i, 'name': random.choice(names), 'email': rand_email() if random.random() > 0.05 else None, 'timestamp': ts, 'cat_code': random.choice(category_code)} for i in range(n)]

In [0]:
df = spark.createDataFrame(make_records())

try:
    dbutils.fs.mkdirs(VOLUME_PATH)
except Exception:
    pass

In [0]:
out_json = f'{VOLUME_PATH}/batch_{datetime.utcnow().strftime("%Y%m%d_%H%M%S")}.json'
df.coalesce(1).write.mode('append').json(out_json)

In [0]:
out_csv = f'{VOLUME_PATH}/batch_{datetime.utcnow().strftime("%Y%m%d_%H%M%S")}.csv'
df.coalesce(1).write.mode('append').option('header', True).csv(out_csv)

In [0]:
print('Wrote files to ', VOLUME_PATH)