## Create some synthetic data - using the `dbldatagen` package in Spark

Docs:
- https://databrickslabs.github.io/dbldatagen/public_docs/index.html
- https://pyparsing-docs.readthedocs.io/en/latest/index.html

Note: Needed to first install Java for using Spark

https://www.oracle.com/java/technologies/downloads/#jdk20-mac

In [None]:
import numpy as np
from pyspark.sql import SparkSession
import dbldatagen as dg
from pyspark.sql.types import IntegerType

In [None]:
# spark = (
#     SparkSession.builder
#     .appName("tmp")
#     .getOrCreate() )

In [None]:

spark_jars = (
    "org.apache.spark:spark-avro_2.13:3.4.1"
    ",io.delta:delta-core_2.13:2.4.0"
    ",com.databricks:spark-xml_2.13:0.16.0"
)

spark = (
    SparkSession.builder.master("local[*]")
    .appName("MyApp")
    .config("spark.jars.packages", spark_jars)
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )    
    .config("spark.ui.showConsoleProgress", "false")
    .config("spark.ui.enabled", "false")
    .config("spark.ui.dagGraph.retainedRootRDDs", "1")
    .config("spark.ui.retainedJobs", "1")
    .config("spark.ui.retainedStages", "1")
    .config("spark.ui.retainedTasks", "1")
    .config("spark.sql.ui.retainedExecutions", "1")
    .config("spark.worker.ui.retainedExecutors", "1")
    .config("spark.worker.ui.retainedDrivers", "1")
    .config("spark.driver.memory", "16g") 
).getOrCreate()

In [None]:
row_count = 10 * 1000 * 1000

test_data_spec = (
    dg.DataGenerator( spark, name="test_data_set1", rows=row_count, partitions=4, 
                      randomSeedMethod="hash_fieldname", verbose=True, )
    .withColumn("purchase_id", IntegerType(), minValue=1000000, maxValue=2000000)
    .withColumn("product_code", IntegerType(), uniqueValues=10000, random=True)
    .withColumn(
        "purchase_date",
        "date",
        data_range=dg.DateRange("2017-10-01 00:00:00", "2018-10-06 11:55:00", "days=3"),
        random=True,
    )
    .withColumn(
        "return_date",
        "date",
        expr="date_add(purchase_date, cast(floor(rand() * 100 + 1) as int))",
        baseColumn="purchase_date",
    )
    .withColumn("name", "string", percentNulls=0.01, template=r'\\\\w \\\\w|\\\\w A. \\\\w|test')
    .withColumn("emails", "string", template=r'\\\\w.\\\\w@\\\\w.com', random=True,
                 numFeatures=(1, 6), structType="array")
)
df_test_data = test_data_spec.build()

In [None]:
df_test_data.head()

In [None]:
df_test_data.count() / 1000_000

In [None]:
df_test_data.limit(5).show(truncate=True)

In [None]:
df_test_data.count()

In [None]:
df_test_data.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("../data/delta/sample_dataset")