In [4]:
#spark.sql("CREATE TEMPORARY VIEW COPUK_Bronze USING CSV OPTIONS ( path 'Files/Bronze/*/*.csv', header true);")
from pyspark.sql.functions import input_file_name

df = spark.read.option("header", True).csv("Files/Bronze/*/*.csv") \
    .withColumn("source_file", input_file_name())

# Register as temporary view
df.createOrReplaceTempView("COPUK_Bronze")

#display(df)

StatementMeta(, 1b875beb-1b8b-42f0-ada6-2437f7e8c00c, 6, Finished, Available, Finished)

In [19]:
#display(df)

df = spark.sql("""
    WITH Mapping AS (
        SELECT 
            `Company Registration No. (1)` AS reg_id,
            INITCAP(`Proprietor Name (1)`) AS mapped_name,
            ROW_NUMBER() OVER (
                PARTITION BY `Company Registration No. (1)`
                ORDER BY `Date Proprietor Added` DESC
            ) AS rn
        FROM 
            COPUK_Bronze
        WHERE 
            `Company Registration No. (1)` IS NOT NULL
),
LatestMapping AS (
    SELECT 
        reg_id, 
        mapped_name
    FROM 
        Mapping
    WHERE rn = 1
),

Base AS (
    SELECT 
        SUBSTRING(`source_file`, 143, 7)    AS data_version,
        `Title Number`                      AS title_number,
        `Date Proprietor Added`             AS date_proprietor_added,
        `Price Paid`                        AS price_paid,
        `Additional Proprietor Indicator`   AS additional_proprietor_indicator,
        COALESCE(l.mapped_name, INITCAP(b.`Proprietor Name (1)`)) AS proprietor_name_1
    FROM 
        COPUK_Bronze b
    LEFT JOIN LatestMapping l
        ON b.`Company Registration No. (1)` = l.reg_id
    WHERE `Title Number` <> 'Row Count:'
),
WithID AS (
    SELECT DISTINCT proprietor_name_1
    FROM Base
),
WithRank AS (
    SELECT
        proprietor_name_1,
        DENSE_RANK() OVER (ORDER BY proprietor_name_1) AS ProprietorID
    FROM WithID
)
SELECT 
    b.data_version,
    b.title_number,
    b.date_proprietor_added,
    b.price_paid,
    b.additional_proprietor_indicator,
   -- b.proprietor_name_1,
    r.ProprietorID
FROM Base b
LEFT JOIN WithRank r
    ON b.proprietor_name_1 = r.proprietor_name_1
""")

display(df)

StatementMeta(, 1b875beb-1b8b-42f0-ada6-2437f7e8c00c, 21, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, b8b7be1f-269d-4ab3-913f-b9e101e42618)

In [20]:
df.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/Silver/COPUK_Silver")

StatementMeta(, 1b875beb-1b8b-42f0-ada6-2437f7e8c00c, 22, Finished, Available, Finished)

In [21]:
%%sql
SELECT * FROM silver.COPUK_Silver where data_version = '2025_03'

StatementMeta(, 1b875beb-1b8b-42f0-ada6-2437f7e8c00c, 23, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 6 fields>

In [29]:
dfCompanies = spark.sql("""
    WITH Mapping AS (
        SELECT 
            `Company Registration No. (1)` AS reg_id,
            INITCAP(`Proprietor Name (1)`) AS mapped_name,
            ROW_NUMBER() OVER (
                PARTITION BY `Company Registration No. (1)`
                ORDER BY `Date Proprietor Added` DESC
            ) AS rn
        FROM 
            COPUK_Bronze
        WHERE 
            `Company Registration No. (1)` IS NOT NULL
),
LatestMapping AS (
    SELECT 
        reg_id, 
        mapped_name
    FROM 
        Mapping
    WHERE rn = 1
),

Base AS (
    SELECT 
        SUBSTRING(`source_file`, 143, 7)                            AS data_version,
        COALESCE(l.mapped_name, INITCAP(b.`Proprietor Name (1)`))   AS proprietor_name,
        `Proprietorship Category (1)`                               AS proprietorship_category,
        CASE 
            WHEN COALESCE(l.mapped_name, INITCAP(b.`Proprietor Name (1)`)) <> INITCAP(b.`Proprietor Name (1)`)
                THEN 'Yes'
                ELSE 'No'
        END AS HistDiffNames
    FROM 
        COPUK_Bronze b
    LEFT JOIN LatestMapping l
        ON b.`Company Registration No. (1)` = l.reg_id
    WHERE `Title Number` <> 'Row Count:'
),
WithID AS (
    SELECT DISTINCT proprietor_name
    FROM Base
),
WithRank AS (
    SELECT
        proprietor_name,
        DENSE_RANK() OVER (ORDER BY proprietor_name) AS proprietorID
    FROM WithID
)
SELECT 
    r.proprietorID,
    b.data_version,
    b.proprietor_name,
    b.proprietorship_category,
    b.HistDiffNames
   
    
FROM Base b
LEFT JOIN WithRank r
    ON b.proprietor_name = r.proprietor_name
""")

display(dfCompanies)

StatementMeta(, 1b875beb-1b8b-42f0-ada6-2437f7e8c00c, 31, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3107a031-cef1-483a-8ea6-b90c588f56e1)

In [30]:
dfCompanies.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/Gold/Companies")

StatementMeta(, 1b875beb-1b8b-42f0-ada6-2437f7e8c00c, 32, Finished, Available, Finished)

In [31]:
%%sql
SELECT * FROM Gold.Companies

StatementMeta(, 1b875beb-1b8b-42f0-ada6-2437f7e8c00c, 33, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 5 fields>