### copy tpc-ds tools including dsdgen to cwd and test

In [0]:
! cp -rf /dbfs/user/eordentlich/nds/tools tools

In [0]:
! ls tools

dsdgen	tpcds.idx


In [0]:
! cd tools && ./dsdgen

dsdgen Population Generator (Version 3.2.0)
Copyright Transaction Processing Performance Council (TPC) 2001 - 2021


In [0]:
! ls tools

call_center.dat		   dsdgen		       store_returns.dat
catalog_page.dat	   household_demographics.dat  store_sales.dat
catalog_returns.dat	   income_band.dat	       time_dim.dat
catalog_sales.dat	   inventory.dat	       tpcds.idx
customer.dat		   item.dat		       warehouse.dat
customer_address.dat	   promotion.dat	       web_page.dat
customer_demographics.dat  reason.dat		       web_returns.dat
date_dim.dat		   ship_mode.dat	       web_sales.dat
dbgen_version.dat	   store.dat		       web_site.dat


### generate nds scale factor parts in parallel using pyspark udf and save to dbfs via local mount

In [0]:
def run_dsdgen(parallel, child, scale):
  import shutil
  import subprocess
  import os
  shutil.copytree('/dbfs/user/eordentlich/nds/tools', f'tools_{child}')
  os.chdir(f'tools_{child}')
  cmd = f'./dsdgen -parallel {parallel} -child {child} -scale {scale} -dir /dbfs/user/eordentlich/nds/sf{scale}'.split(" ")
  print(cmd)
  subprocess.run(cmd)
  os.chdir('..')
  shutil.rmtree(f'tools_{child}')


sf 1000 with 100 fold parallelism

In [0]:
sc.parallelize(list(range(1,101)),100).foreach(lambda c: run_dsdgen(100, c, 1000))

sf 10000 with 100 fold parallelism

In [0]:
sc.parallelize(list(range(1,101)),100).foreach(lambda c: run_dsdgen(100, c, 10000))

### convert to parquet, saving result to mahrens container in azure blob storage
uses (slightly modified) scripts from: https://github.com/NVIDIA/spark-rapids-benchmarks/blob/dev/nds/ which should be copied to workspace location of this notebook


Configure credentials for mahrens account and container

In [0]:
# Azure Storage Account Name
storage_account_name = "mahrens"

# Azure Storage Account Key
storage_account_key = dbutils.secrets.get(scope="mahrens-azure", key="storage_account_access_key")

# Azure Storage Account Source Container
container = "mahrens"

# Set the configuration details to read/write
spark.conf.set("fs.azure.account.key.{0}.blob.core.windows.net".format(storage_account_name), storage_account_key)

In [0]:
from nds_schema import *

In [0]:
import argparse
parser = parser = argparse.ArgumentParser()
parser.add_argument(
    'input_prefix',
    help='text to prepend to every input file path (e.g., "hdfs:///ds-generated-data"; the default is empty)')
parser.add_argument(
    'output_prefix',
    help='text to prepend to every output file (e.g., "hdfs:///ds-parquet"; the default is empty)' +
    '. If output_format is "iceberg", this argument will be regarded as the value of property ' +
    '"spark.sql.catalog.spark_catalog.warehouse". Only default Spark catalog ' +
    'session name "spark_catalog" is supported now, customized catalog is not ' +
    'yet supported.')
parser.add_argument(
    'report_file',
    help='location to store a performance report(local)')

parser.add_argument(
    '--output_mode',
    choices=['overwrite', 'append', 'ignore', 'error', 'errorifexists'],
    help="save modes as defined by " +
    "https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes." +
    "default value is errorifexists, which is the Spark default behavior.",
    default="errorifexists")
parser.add_argument(
    '--input_format',
    choices=['csv', 'parquet', 'orc', 'avro', 'json'],
    default='csv',
    help='input data format to be converted. default value is csv.'
)
parser.add_argument(
    '--output_format',
    choices=['parquet', 'orc', 'avro', 'json', 'iceberg', 'delta'],
    default='parquet',
    help="output data format when converting CSV data sources."
)
parser.add_argument(
    '--tables',
    type=lambda s: s.split(','),
    help="specify table names by a comma separated string. e.g. 'catalog_page,catalog_sales'.")
parser.add_argument(
    '--log_level',
    help='set log level for Spark driver log. Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN(default: INFO)',
    default="INFO")
parser.add_argument(
    '--floats',
    action='store_true',
    help='replace DecimalType with DoubleType when saving parquet files. If not specified, decimal data will be saved.')
parser.add_argument(
    '--update',
    action='store_true',
    help='transcode the source data or update data'
)
parser.add_argument(
    '--iceberg_write_format',
    choices=['parquet', 'orc', 'avro'],
    default='parquet',
    help='File format for the Iceberg table; parquet, avro, or orc'
)
parser.add_argument(
    '--compression',
    help='Compression codec to use when saving data.' +
    ' See https://iceberg.apache.org/docs/latest/configuration/#write-properties ' +
    ' for supported codecs in Iceberg.' +
    ' See https://spark.apache.org/docs/latest/sql-data-sources.html' +
    ' for supported codecs for Spark built-in formats.' +
    ' When not specified, the default for the requested output format will be used.'
)
parser.add_argument(
    '--delta_unmanaged',
    action='store_true',
    help='Use unmanaged tables for DeltaLake. This is useful for testing DeltaLake without ' +
    'leveraging a Metastore service.')
parser.add_argument(
    '--hive',
    action='store_true',
    help='create Hive external tables for the converted data.'
)
parser.add_argument(
    '--database',
    help='the name of a database to use instead of `default`, currently applies only to Hive',
    default="default"
)


_StoreAction(option_strings=['--database'], dest='database', nargs=None, const=None, default='default', type=None, choices=None, required=False, help='the name of a database to use instead of `default`, currently applies only to Hive', metavar=None)

Args for 10k sf case

In [0]:
args = parser.parse_args("/user/eordentlich/nds/sf10000 wasbs://nds@mahrens.blob.core.windows.net/parquet_sf10000 /dbfs/user/eordentlich/nds/wasbs_parquet_sf10000_report".split(" "))

In [0]:
from nds_transcode import transcode

In [0]:
args

Namespace(input_prefix='/user/eordentlich/nds/sf10000', output_prefix='wasbs://nds@mahrens.blob.core.windows.net/parquet_sf10000', report_file='/dbfs/user/eordentlich/nds/wasbs_parquet_sf10000_report', output_mode='errorifexists', input_format='csv', output_format='parquet', tables=None, log_level='INFO', floats=False, update=False, iceberg_write_format='parquet', compression=None, delta_unmanaged=False, hive=False, database='default')

In [0]:

spark.conf.set("spark.sql.legacy.charVarcharAsString", "true")
transcode(args)

Load Test Start Time: 2024-01-27 18:06:30.558722
Load Test Finished at: 2024-01-27 23:29:38.940507
Load Test Time: 19388.381785 seconds
RNGSEED used :01272329389
Load Test Time: 19388.381785 seconds
Load Test Finished at: 2024-01-27 23:29:38.940507
RNGSEED used: 01272329389
Time to convert 'customer_address' was 457.8248s
Time to convert 'customer_demographics' was 23.5905s
Time to convert 'date_dim' was 2.6950s
Time to convert 'warehouse' was 1.0044s
Time to convert 'ship_mode' was 0.9606s
Time to convert 'time_dim' was 3.7482s
Time to convert 'reason' was 0.9968s
Time to convert 'income_band' was 0.9910s
Time to convert 'item' was 10.2177s
Time to convert 'store' was 1.0172s
Time to convert 'call_center' was 0.9178s
Time to convert 'customer' was 1001.5614s
Time to convert 'web_site' was 1.2925s
Time to convert 'store_returns' was 1117.3322s
Time to convert 'household_demographics' was 1.2715s
Time to convert 'web_page' was 1.2282s
Time to convert 'promotion' was 1.1282s
Time to conv

In [0]:
df_store_sales = spark.read.parquet('wasbs://nds@mahrens.blob.core.windows.net/parquet_sf1000/store_sales')

In [0]:
df_store_sales.count()

2879987999