In [0]:
def bronze_():
    df = spark.read.format("jdbc") \
        .option("url", "jdbc:sqlserver://sql.database.windows.net;databaseName=database") \
        .option("username", "x") \
        .option("password", "x") \
        .option("dbtable", "cdc.SalesLT_Customer_CT") \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .load()
    return (df)

In [0]:
%sql
USE CATALOG content_job;
USE SCHEMA bronze;

In [0]:
df = spark.table("silver.silver_account_details")
df.display()

In [0]:
%sql
CREATE SCHEMA content_job.temp

In [0]:
%sql
DROP TABLE content_job.bronze.bronze_account_details

In [0]:
%sql
SHOW VOLUMES

In [0]:
def bronze_account_user():
    return (
        spark.read
        .format("jdbc") 
        .option("url", dbutils.secrets.get(scope="sm-secret-scope", key = "social-media-project-db-jdbc")) 
        .option("username", dbutils.secrets.get(scope="sm-secret-scope", key = "social-media-project-dblog")) 
        .option("password", dbutils.secrets.get(scope="sm-secret-scope", key = "social-media-project-secret")) 
        .option("dbtable", dbutils.secrets.get(scope="sm-secret-scope", key = "social-media-project-db-tab-acc-users")) 
        .load()
        .display()
    )

In [0]:
%sql
--DELETE FROM content_job.bronze.bronze_account_user

In [0]:
%sql
DESCRIBE DETAIL content_job.bronze.bronze_account_user

In [0]:
display(dbutils.fs.ls("/mnt/content_job/bronze/bronze_account_user"))

In [0]:
%sql
SELECT * FROM content_job.silver.silver_account_details

In [0]:
%sql
--TRUNCATE TABLE content_job.silver.account_user_scd_type2

In [0]:
%sql
--UPDATE content_job.bronze.bronze_account_user SET is_group = false WHERE account_id = 1379

In [0]:
from pyspark.sql import functions as F, Window
from pyspark.sql.types import StringType
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, DoubleType

json_schema = StructType([
    StructField("userId", StringType(), True),
    StructField("friendsCount", IntegerType(), True),
    StructField("listedCount", IntegerType(), True),
    StructField("location", StringType(), True),
    StructField("rawDescription", StringType(), True),
    # Zagnieżdżona struktura accountMetadata
    StructField("accountMetadata", StructType([
        StructField("accountAge", StructType([
            StructField("createdYear", StringType(), True),
            StructField("createdMonth", StringType(), True),
            StructField("accountAgeCategory", StringType(), True)
        ])),
        StructField("verificationStatus", StructType([
            StructField("isVerified", BooleanType(), True),
            StructField("verificationConfidence", DoubleType(), True)
        ]))
    ])),
    # Zagnieżdżona struktura analyticsFlags
    StructField("analyticsFlags", StructType([
        StructField("potentialBot", BooleanType(), True),
        StructField("potentialInfluencer", BooleanType(), True)
    ])),
    # Zagnieżdżona struktura profileAnalysis
    StructField("profileAnalysis", StructType([
        StructField("profileCompletenessScore", DoubleType(), True)
    ])),
    # Zagnieżdżona struktura networkFeatures
    StructField("networkFeatures", StructType([
        StructField("networkType", StringType(), True)
    ]))
])


# Here I'm using ba
def bronze_account_details():
        return (
            spark.readStream
            .format("cloudFiles")
            #.format("json")
            #.option('format','delta')
            .option("cloudFiles.format", "json")
            .schema(json_schema)
            #.option("cloudFiles.inferColumnTypes","true")
            .option("multiline","true")
            .load("/Volumes/content/landing/json_files_data/test_dane1.json")
            .select(
                "*",
                F.current_timestamp().alias("ingest_time")
))


stg = bronze_account_details()#.filter(F.col("userId").isNotNull())

In [0]:
stg = stg.select(
                "userId",
                F.date_format(F.make_date(F.col("accountMetadata.accountAge.createdYear").cast("int"), F.col("accountMetadata.accountAge.createdMonth").cast("int"), F.lit(1)),'yyyy-MM').alias("account_creation_year_month"),
                "accountMetadata.accountAge.accountAgeCategory",
                "accountMetadata.verificationStatus.isVerified",
                "accountMetadata.verificationStatus.verificationConfidence",
                "analyticsFlags.potentialBot",
                "analyticsFlags.potentialInfluencer",
                "friendsCount",
                "listedCount",
                "location",
                "rawDescription",
                "profileAnalysis.profileCompletenessScore",
                "networkFeatures.networkType",
                "ingest_time"
            )
            #.withColumn("accountAgeCategory", F.regexp_replace("accountAgeCategory", "_", ' ')) \
            #.withColumn("networkType", F.regexp_replace("networkType", "_", ' '))


stg = (stg
        .withColumn("accountAgeCategory", F.regexp_replace("accountAgeCategory", "_", ' '))
        .withColumn("networkType", F.regexp_replace("networkType", "_", ' '))
)

stg.display()