In [1]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
    .appName("Iceberg Spark + REST")
    .config("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.demo.type", "rest")
    .config("spark.sql.catalog.demo.uri", "http://rest:8181")
    .config("spark.sql.catalog.demo.warehouse", "s3://warehouse/warehouse/")
    .config("spark.sql.catalog.demo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    
    # MinIO S3FileIO settings (match REST catalog)
    .config("spark.sql.catalog.demo.s3.endpoint", "http://minio:9000")
    .config("spark.sql.catalog.demo.s3.path-style-access", "true")
    .config("spark.sql.catalog.demo.s3.access-key-id", "admin")
    .config("spark.sql.catalog.demo.s3.secret-access-key", "password")
    
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .getOrCreate())


25/07/30 13:25:40 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [5]:
import time

start_time = time.time()

spark.sql("""
CREATE OR REPLACE TABLE demo.bronze.stg_auchan2
USING iceberg
AS
WITH raw_auchan AS (
    SELECT
        ROW_NUMBER() OVER (ORDER BY date) AS id,
        CAST(date AS DATE) AS date,
        name,
        CAST(quantity_stock AS DOUBLE) AS stock,
        CAST(REPLACE(REPLACE(REPLACE(REPLACE(price, '€', ''), ' ', ''), ',', '.'), 'Àpartirde', '0') AS DOUBLE) AS cleaned_price,
        REPLACE(REPLACE(REGEXP_REPLACE(price_per_quantity, '[^a-zA-Z\\s]+', ''), '€', ''), 'x', '') AS unit,
        REPLACE(REPLACE(REGEXP_REPLACE(quantity, '[^a-zA-Z\\s]+', ''), '€', ''), 'x', '') AS ss,
        REGEXP_REPLACE(quantity, '[^0-9\\.]+', '') AS quantity,
        marque,
        store,
        image_url
    FROM demo.source.auchan
),

raw_category AS (
    SELECT product_name, category
    FROM (
        SELECT *,
               ROW_NUMBER() OVER (PARTITION BY product_name ORDER BY category) AS rn
        FROM demo.source.auchan_cat
    ) tmp
    WHERE rn = 1
),

raw_section AS (
    SELECT product_name, section
    FROM (
        SELECT *,
               ROW_NUMBER() OVER (PARTITION BY product_name ORDER BY section) AS rn
        FROM demo.source.auchan_section
    ) tmp
    WHERE rn = 1
),

raw_norm AS (
    SELECT name, unit,
        CASE
            WHEN CAST(quantity AS DOUBLE) = 0 THEN NULL
            ELSE CAST(quantity AS DOUBLE)
        END AS quantity
    FROM (
        SELECT *,
               ROW_NUMBER() OVER (PARTITION BY name ORDER BY quantity DESC NULLS LAST) AS rn
        FROM demo.source.auchan_norm
    ) tmp
    WHERE rn = 1
),

final AS (
    SELECT *,
           ROW_NUMBER() OVER (PARTITION BY image_url, date ORDER BY id) AS dedup_rank
    FROM (
        SELECT
            l.id,
            l.name,
            l.cleaned_price AS price,
            l.stock,
            n.quantity,
            n.unit,
            CASE
                WHEN n.quantity IS NULL THEN NULL
                ELSE l.cleaned_price / n.quantity
            END AS price_per_quantity,
            c.category,
            s.section,
            l.date,
            l.marque,
            l.store,
            l.image_url
        FROM raw_auchan l
        LEFT JOIN raw_category c ON c.product_name = l.name
        LEFT JOIN raw_section s ON s.product_name = l.name
        LEFT JOIN raw_norm n ON n.name = l.name
        WHERE l.image_url IS NOT NULL
          AND l.image_url != ''
          AND LOWER(l.image_url) != 'nan'
    ) subquery
)

SELECT
    id,
    name,
    price,
    stock,
    quantity,
    unit,
    price_per_quantity,
    category,
    section,
    date,
    marque,
    store,
    image_url
FROM final
WHERE dedup_rank = 1
""")

# Step 1: Create image_url_dates view
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW image_url_dates AS
SELECT
    image_url,
    name,
    MIN(date) AS min_date,
    MAX(date) AS max_date,
    datediff(MAX(date), MIN(date)) AS date_diff
FROM demo.bronze.stg_auchan2
WHERE image_url IS NOT NULL AND name IS NOT NULL
GROUP BY image_url, name
""")

# Step 2: Create numbers table
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW numbers AS
SELECT explode(array(
    0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,
    51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,
    101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127,128,129,130,131,132,133,134,135,136,137,138,139,140,141,142,143,144,145,146,147,148,149,150,
    151,152,153,154,155,156,157,158,159,160,161,162,163,164,165,166,167,168,169,170,171,172,173,174,175,176,177,178,179,180,181,182,183,184,185,186,187,188,189,190,191,192,193,194,195,196,197,198,199,200,
    201,202,203,204,205,206,207,208,209,210,211,212,213,214,215,216,217,218,219,220,221,222,223,224,225,226,227,228,229,230,231,232,233,234,235,236,237,238,239,240,241,242,243,244,245,246,247,248,249,250,
    251,252,253,254,255,256,257,258,259,260,261,262,263,264,265,266,267,268,269,270,271,272,273,274,275,276,277,278,279,280,281,282,283,284,285,286,287,288,289,290,291,292,293,294,295,296,297,298,299,300,
    301,302,303,304,305,306,307,308,309,310,311,312,313,314,315,316,317,318,319,320,321,322,323,324,325,326,327,328,329,330,331,332,333,334,335,336,337,338,339,340,341,342,343,344,345,346,347,348,349,350,
    351,352,353,354,355,356,357,358,359,360,361,362,363,364,365,366,367,368,369,370
)) AS num
""")

# Step 3: Create image_date_matrix view
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW image_date_matrix AS
SELECT
    i.image_url,
    i.name,
    date_add(i.min_date, n.num) AS date_day
FROM image_url_dates i
CROSS JOIN numbers n
WHERE n.num <= i.date_diff
""")

# Step 4: Create joined view
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW joined AS
SELECT
    m.image_url,
    m.name,
    m.date_day,
    a.price,
    a.stock,
    a.quantity,
    a.unit,
    a.price_per_quantity,
    a.category,
    a.section,
    a.marque,
    a.store
FROM image_date_matrix m
LEFT JOIN demo.bronze.stg_auchan2 a
    ON a.image_url = m.image_url
    AND a.name = m.name
    AND a.date = m.date_day
""")

# Step 5: Create grouped_data view
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW grouped_data AS
SELECT
    image_url,
    name,
    date_day,
    price,
    stock,
    quantity,
    unit,
    price_per_quantity,
    category,
    section,
    marque,
    store,
    sum(case when price IS NOT NULL then 1 else 0 end) OVER (
        PARTITION BY image_url, name 
        ORDER BY date_day 
        ROWS UNBOUNDED PRECEDING
    ) AS price_group,
    sum(case when stock IS NOT NULL then 1 else 0 end) OVER (
        PARTITION BY image_url, name 
        ORDER BY date_day 
        ROWS UNBOUNDED PRECEDING
    ) AS stock_group,
    sum(case when quantity IS NOT NULL then 1 else 0 end) OVER (
        PARTITION BY image_url, name 
        ORDER BY date_day 
        ROWS UNBOUNDED PRECEDING
    ) AS quantity_group,
    sum(case when unit IS NOT NULL then 1 else 0 end) OVER (
        PARTITION BY image_url, name 
        ORDER BY date_day 
        ROWS UNBOUNDED PRECEDING
    ) AS unit_group,
    sum(case when price_per_quantity IS NOT NULL then 1 else 0 end) OVER (
        PARTITION BY image_url, name 
        ORDER BY date_day 
        ROWS UNBOUNDED PRECEDING
    ) AS price_per_quantity_group,
    sum(case when category IS NOT NULL then 1 else 0 end) OVER (
        PARTITION BY image_url, name 
        ORDER BY date_day 
        ROWS UNBOUNDED PRECEDING
    ) AS category_group,
    sum(case when section IS NOT NULL then 1 else 0 end) OVER (
        PARTITION BY image_url, name 
        ORDER BY date_day 
        ROWS UNBOUNDED PRECEDING
    ) AS section_group,
    sum(case when marque IS NOT NULL then 1 else 0 end) OVER (
        PARTITION BY image_url, name 
        ORDER BY date_day 
        ROWS UNBOUNDED PRECEDING
    ) AS marque_group,
    sum(case when store IS NOT NULL then 1 else 0 end) OVER (
        PARTITION BY image_url, name 
        ORDER BY date_day 
        ROWS UNBOUNDED PRECEDING
    ) AS store_group
FROM joined
""")

spark.sql("""
CREATE OR REPLACE TABLE demo.silver.inter_auchan2
AS
SELECT
    row_number() OVER (ORDER BY image_url, name, date_day) AS id,
    cast(conv(substr(md5(concat(image_url, name)), 1, 15), 16, 10) AS bigint) AS product_id,
    image_url,
    date_day AS date,
    name,
    first_value(price) OVER (
        PARTITION BY image_url, name, price_group 
        ORDER BY date_day 
        ROWS UNBOUNDED PRECEDING
    ) AS price,
    first_value(stock) OVER (
        PARTITION BY image_url, name, stock_group 
        ORDER BY date_day 
        ROWS UNBOUNDED PRECEDING
    ) AS stock,
    first_value(quantity) OVER (
        PARTITION BY image_url, name, quantity_group 
        ORDER BY date_day 
        ROWS UNBOUNDED PRECEDING
    ) AS quantity,
    first_value(unit) OVER (
        PARTITION BY image_url, name, unit_group 
        ORDER BY date_day 
        ROWS UNBOUNDED PRECEDING
    ) AS unit,
    first_value(price_per_quantity) OVER (
        PARTITION BY image_url, name, price_per_quantity_group 
        ORDER BY date_day 
        ROWS UNBOUNDED PRECEDING
    ) AS price_per_quantity,
    first_value(category) OVER (
        PARTITION BY image_url, name, category_group 
        ORDER BY date_day 
        ROWS UNBOUNDED PRECEDING
    ) AS category,
    first_value(section) OVER (
        PARTITION BY image_url, name, section_group 
        ORDER BY date_day 
        ROWS UNBOUNDED PRECEDING
    ) AS section,
    first_value(marque) OVER (
        PARTITION BY image_url, name, marque_group 
        ORDER BY date_day 
        ROWS UNBOUNDED PRECEDING
    ) AS marque,
    first_value(store) OVER (
        PARTITION BY image_url, name, store_group 
        ORDER BY date_day 
        ROWS UNBOUNDED PRECEDING
    ) AS store
FROM grouped_data
ORDER BY image_url, name, date_day
""")


spark.sql("""
CREATE OR REPLACE TABLE demo.gold.dim_category2 AS
WITH raw_cat AS (
    SELECT DISTINCT category, section
    FROM demo.silver.inter_auchan2
),
dim_category AS (
    SELECT
        ROW_NUMBER() OVER (ORDER BY category) AS id_category,
        section,
        category
    FROM raw_cat
    WHERE category IS NOT NULL
      AND TRIM(category) <> ''
      AND section IS NOT NULL
      AND TRIM(section) <> ''
)
SELECT * FROM dim_category
""")


spark.sql("""
CREATE OR REPLACE TABLE demo.gold.dim_date2 AS

WITH raw_dates AS (
    SELECT DISTINCT CAST(date AS DATE) AS date
    FROM demo.silver.inter_auchan2
),

dim_date AS (
    SELECT
        ROW_NUMBER() OVER (ORDER BY date) AS date_id,
        DAY(date) AS day,
        MONTH(date) AS month,
        YEAR(date) AS year,
        date
    FROM raw_dates
)

SELECT * FROM dim_date
""")

spark.sql("""
CREATE OR REPLACE TABLE demo.gold.dim_picture2 AS

WITH raw_picture AS (
    SELECT DISTINCT image_url AS picture
    FROM demo.silver.inter_auchan2
),

dim_picture AS (
    SELECT
        ROW_NUMBER() OVER (ORDER BY picture) AS id_picture,
        picture
    FROM raw_picture
)

SELECT * FROM dim_picture
""")

spark.sql("""
CREATE OR REPLACE TABLE demo.gold.dim_store2 AS

WITH raw_store AS (
    SELECT DISTINCT store
    FROM demo.silver.inter_auchan2
),

raw_category AS (
    SELECT DISTINCT store, image_str
    FROM demo.source.Store_img
),

dim_store AS (
    SELECT
        ROW_NUMBER() OVER (ORDER BY rs.store) AS id_store,
        rs.store,
        rc.image_str
    FROM raw_store rs
    LEFT JOIN raw_category rc
        ON rs.store = rc.store
    WHERE rs.store IS NOT NULL
      AND TRIM(rs.store) <> ''
)

SELECT * FROM dim_store
""")


spark.sql("""
CREATE OR REPLACE TABLE demo.gold.dim_product2 AS

WITH all_products AS (
    SELECT DISTINCT
        product_id,
        name,
        marque,
        category,
        image_url AS picture,
        store
    FROM demo.silver.inter_auchan2
),

final_product AS (
    SELECT
        p.product_id,
        p.name,
        p.marque,
        c.id_category,
        i.id_picture,
        s.id_store
    FROM all_products p
    LEFT JOIN demo.gold.dim_category2 c
        ON p.category = c.category
    LEFT JOIN demo.gold.dim_picture2 i
        ON p.picture = i.picture
    INNER JOIN demo.gold.dim_store2 s
        ON p.store = s.store
),

deduplicated AS (
    SELECT *
    FROM (
        SELECT *,
            ROW_NUMBER() OVER (
                PARTITION BY product_id
                ORDER BY id_category, id_store, id_picture
            ) AS rn
        FROM final_product
    ) sub
    WHERE rn = 1
)

SELECT * FROM deduplicated
""")


spark.sql("""
CREATE OR REPLACE TABLE demo.gold.fact_sales2 AS

WITH base AS (
    SELECT
        product_id,
        image_url,
        date,
        price,
        stock,
        quantity,
        price_per_quantity,
        unit,
        LAG(stock) OVER (PARTITION BY image_url ORDER BY date) AS prev_stock
    FROM demo.silver.inter_auchan2
),

diffs AS (
    SELECT
        *,
        stock - prev_stock AS raw_stock_diff
    FROM base
),

mean_neg AS (
    SELECT
        image_url,
        AVG(ABS(raw_stock_diff)) AS mean_negative_stock_diff
    FROM diffs
    WHERE raw_stock_diff <= 0
    GROUP BY image_url
),

final_sales AS (
    SELECT
        d.product_id,
        d.date,
        d.price,
        d.stock,
        d.quantity,
        d.price_per_quantity,
        d.unit,
        CASE
            WHEN d.raw_stock_diff > 0 THEN m.mean_negative_stock_diff
            WHEN d.raw_stock_diff IS NULL THEN 0
            ELSE ABS(d.raw_stock_diff)
        END AS sales,
        d.image_url
    FROM diffs d
    LEFT JOIN mean_neg m ON d.image_url = m.image_url
),

raw_fact AS (
    SELECT *
    FROM (
        SELECT *,
            ROW_NUMBER() OVER (PARTITION BY product_id, date ORDER BY stock DESC NULLS LAST) AS rn
        FROM final_sales
    ) sub
    WHERE rn = 1
),

final_fact AS (
    SELECT
        f.stock,
        CASE
            WHEN f.sales BETWEEN 9850 AND 10000 THEN 1
            ELSE f.sales
        END AS sales,
        f.price,
        f.quantity,
        f.price_per_quantity,
        f.unit,
        p.product_id,
        d.date_id
    FROM raw_fact f
    LEFT JOIN demo.gold.dim_product2 p ON f.product_id = p.product_id
    LEFT JOIN demo.gold.dim_date2 d ON f.date = d.date
),

deduplicated AS (
    SELECT *
    FROM (
        SELECT *,
            ROW_NUMBER() OVER (PARTITION BY product_id, date_id ORDER BY stock DESC NULLS LAST) AS rn
        FROM final_fact
    ) sub
    WHERE rn = 1
)

SELECT
    ROW_NUMBER() OVER (ORDER BY product_id, date_id) AS id,
    stock,
    sales,
    price,
    quantity,
    price_per_quantity,
    unit,
    product_id,
    date_id
FROM deduplicated
""")

end_time = time.time()
print(f"✅ CSV loaded in {end_time - start_time:.2f} seconds")

25/07/30 13:32:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/07/30 13:32:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/07/30 13:32:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/07/30 13:32:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/07/30 13:32:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/07/30 13:32:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/07/30 1

✅ CSV loaded in 69.43 seconds


                                                                                

In [10]:
spark.sql("Drop table demo.bronze.stg_auchan;")


Py4JJavaError: An error occurred while calling o36.sql.
: org.apache.iceberg.exceptions.ServiceFailureException: Server error: NotFoundException: Location does not exist: s3://warehouse/warehouse/bronze/stg_auchan/metadata/00000-c2092991-054d-42ae-a0c9-e6e6b40331b5.metadata.json
	at org.apache.iceberg.rest.ErrorHandlers$DefaultErrorHandler.accept(ErrorHandlers.java:217)
	at org.apache.iceberg.rest.ErrorHandlers$TableErrorHandler.accept(ErrorHandlers.java:118)
	at org.apache.iceberg.rest.ErrorHandlers$TableErrorHandler.accept(ErrorHandlers.java:102)
	at org.apache.iceberg.rest.HTTPClient.throwFailure(HTTPClient.java:224)
	at org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:308)
	at org.apache.iceberg.rest.BaseHTTPClient.get(BaseHTTPClient.java:77)
	at org.apache.iceberg.rest.RESTClient.get(RESTClient.java:97)
	at org.apache.iceberg.rest.RESTSessionCatalog.loadInternal(RESTSessionCatalog.java:465)
	at org.apache.iceberg.rest.RESTSessionCatalog.loadTable(RESTSessionCatalog.java:489)
	at org.apache.iceberg.catalog.BaseSessionCatalog$AsCatalog.loadTable(BaseSessionCatalog.java:99)
	at org.apache.iceberg.rest.RESTCatalog.loadTable(RESTCatalog.java:102)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
	at java.base/java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1916)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
	at org.apache.iceberg.CachingCatalog.loadTable(CachingCatalog.java:147)
	at org.apache.iceberg.spark.SparkCatalog.load(SparkCatalog.java:844)
	at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:169)
	at org.apache.spark.sql.connector.catalog.TableCatalog.tableExists(TableCatalog.java:185)
	at org.apache.spark.sql.execution.datasources.v2.DropTableExec.run(DropTableExec.scala:36)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:638)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:629)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:659)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
