In [0]:
def drop_volume(chapter_number: str):
    spark.sql(f"drop volume if exists workspace.default.chapter_{chapter_number}")

def create_volume(chapter_number: str):
    spark.sql(f"""
        create volume if not exists workspace.default.chapter_{chapter_number}
        comment 'Datasets for chapter {chapter_number}'
    """)

def create_enumerated_files(temp_path: str, table_name: str, file_format: str):
    data_files = [f for f in dbutils.fs.ls(temp_path) if f.name.startswith("part-")]
    for index, file_name in enumerate(data_files, start=1):
        path = temp_path.replace('temp/', '')
        dbutils.fs.mv(file_name.path, f"{path}{table_name}_0{index}.{file_format}")

def create_complex_data_files(chapter_number: str, temp_path: str):
    
    df = spark.read.parquet(
        f'/Volumes/workspace/default/chapter_{chapter_number}/orderdetails/parquet/'
    )
    df.createOrReplaceTempView('orderdetails_source')
    
    array_products = spark.sql("""
        SELECT 
            orderid, 
            ARRAY_AGG(productid) AS array_products
        FROM orderdetails_source
        GROUP BY orderid;
    """)

    array_products.repartition(4) \
      .write \
      .mode("overwrite") \
      .format('parquet') \
      .save(f'{temp_path}/array_products_by_order/parquet/')
    
    order_details_dict = spark.sql("""
        SELECT 
            orderid, 
            ARRAY_AGG(
                NAMED_STRUCT(
                    'productid', productid, 
                    'unitprice', unitprice, 
                    'qty', qty, 
                    'discount', discount)
                ) AS order_details
        FROM orderdetails_source
        GROUP BY orderid
    """)

    order_details_dict.repartition(4) \
                      .write \
                      .mode("overwrite") \
                      .format('json') \
                      .save(f'{temp_path}/order_details_dict/parquet/')

def get_sample_files(chapter_number: str):
    import pandas as pd

    drop_volume(chapter_number)
    create_volume(chapter_number)

    tables_list = ['Employees','Suppliers','Categories','Products','Customers','Shippers','Orders','OrderDetails','Tests','Scores','Nums']

    dict_types = {
        "csv": {"options": {"delimiter": ",","header": "true"}},
        "parquet": {"options": {}},
        "json": {"options": {}},
        "avro": {"options": {}},
        "orc": {"options": {}}
    }

    main_temp_path = f'/Volumes/workspace/default/chapter_{chapter_number}/temp'

    for table in tables_list:
        parquet_file = f'https://raw.githubusercontent.com/ajluz/tsql_database/main/{table}.parquet'
        df = pd.read_parquet(parquet_file, engine='auto')
        sparkdf = spark.createDataFrame(df)
        table_temp_path = f'{main_temp_path}/{table.lower()}'

        for fmt, spec in dict_types.items():
            options = spec["options"]
            type_temp_path = f'{table_temp_path}/{fmt}/'
            
            sparkdf.repartition(4) \
                   .write \
                   .mode("overwrite") \
                   .options(**options) \
                   .format(fmt) \
                   .save(type_temp_path)

            create_enumerated_files(type_temp_path, table.lower(), fmt)
        
        if table == 'OrderDetails':
            create_complex_data_files(chapter_number, main_temp_path)
            create_enumerated_files(
                f'{main_temp_path}/array_products_by_order/parquet/',
                'array_products_by_order',
                'parquet'
            )
            create_enumerated_files(
                f'{main_temp_path}/order_details_dict/parquet/',
                'order_details_dict',
                'parquet'
            )
        
    dbutils.fs.rm(main_temp_path, True)