###Import Dependencies

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

###External Location for All Paths

In [0]:
bronze_path = "abfss://cryptocontainer001@cryptostorage1.dfs.core.windows.net/bronze"
silver_path = "abfss://silver@cryptostorage1.dfs.core.windows.net/silver_crypto"
schema_path = "abfss://schema@cryptostorage1.dfs.core.windows.net/"

###Using Autoloader

In [0]:
spark.read\
    .format("json")\
    .option("multiline", True)\
    .load(bronze_path)

DataFrame[ath: double, ath_change_percentage: double, ath_date: string, atl: double, atl_change_percentage: double, atl_date: string, circulating_supply: double, current_price: double, fully_diluted_valuation: bigint, high_24h: double, id: string, image: string, last_updated: string, low_24h: double, market_cap: bigint, market_cap_change_24h: double, market_cap_change_percentage_24h: double, market_cap_rank: bigint, max_supply: double, name: string, price_change_24h: double, price_change_percentage_24h: double, symbol: string, total_supply: double, total_volume: double]

In [0]:
df = spark.readStream\
    .format("cloudFiles")\
    .option("cloudFiles.format", "json")\
    .option("schemaEvolution", "addNewColumns")\
    .option("cloudFiles.schemaLocation", schema_path)\
    .option("cloudFiles.schemaHints",
            """
            roi STRUCT<times:DOUBLE, currency:STRING, percentage:DOUBLE>,
            ath STRING,
            ath_change_percentage STRING,
            ath_date STRING,
            atl STRING,
            atl_change_percentage STRING,
            atl_date STRING,
            circulating_supply STRING,
            current_price STRING,
            high_24h STRING,
            id STRING,
            last_updated STRING,
            low_24h STRING,
            market_cap STRING,
            market_cap_change_24h STRING,
            market_cap_change_percentage_24h STRING,
            market_cap_rank STRING,
            max_supply STRING,
            name STRING,
            price_change_24h STRING,
            price_change_percentage_24h STRING,
            symbol STRING,
            total_supply STRING,
            total_volume STRING
            """) \
    .load(bronze_path) \
    .withColumn("ingested_time", current_timestamp())

In [0]:
df.printSchema()

root
 |-- ath: string (nullable = true)
 |-- ath_change_percentage: string (nullable = true)
 |-- ath_date: string (nullable = true)
 |-- atl: string (nullable = true)
 |-- atl_change_percentage: string (nullable = true)
 |-- atl_date: string (nullable = true)
 |-- circulating_supply: string (nullable = true)
 |-- current_price: string (nullable = true)
 |-- fully_diluted_valuation: string (nullable = true)
 |-- high_24h: string (nullable = true)
 |-- id: string (nullable = true)
 |-- image: string (nullable = true)
 |-- last_updated: string (nullable = true)
 |-- low_24h: string (nullable = true)
 |-- market_cap: string (nullable = true)
 |-- market_cap_change_24h: string (nullable = true)
 |-- market_cap_change_percentage_24h: string (nullable = true)
 |-- market_cap_rank: string (nullable = true)
 |-- max_supply: string (nullable = true)
 |-- name: string (nullable = true)
 |-- price_change_24h: string (nullable = true)
 |-- price_change_percentage_24h: string (nullable = true)
 |--

### Transforming data using Pyspark

In [0]:
df_parsed = df\
  .withColumn('roi_time', col('roi.times'))\
  .withColumn('roi_currency', col('roi.currency'))\
  .withColumn('roi_percentage', col('roi.percentage'))\
  .drop('roi')

In [0]:
df_parsed.printSchema()

root
 |-- ath: string (nullable = true)
 |-- ath_change_percentage: string (nullable = true)
 |-- ath_date: string (nullable = true)
 |-- atl: string (nullable = true)
 |-- atl_change_percentage: string (nullable = true)
 |-- atl_date: string (nullable = true)
 |-- circulating_supply: string (nullable = true)
 |-- current_price: string (nullable = true)
 |-- fully_diluted_valuation: string (nullable = true)
 |-- high_24h: string (nullable = true)
 |-- id: string (nullable = true)
 |-- image: string (nullable = true)
 |-- last_updated: string (nullable = true)
 |-- low_24h: string (nullable = true)
 |-- market_cap: string (nullable = true)
 |-- market_cap_change_24h: string (nullable = true)
 |-- market_cap_change_percentage_24h: string (nullable = true)
 |-- market_cap_rank: string (nullable = true)
 |-- max_supply: string (nullable = true)
 |-- name: string (nullable = true)
 |-- price_change_24h: string (nullable = true)
 |-- price_change_percentage_24h: string (nullable = true)
 |--

In [0]:
from pyspark.sql.functions import regexp_replace, to_timestamp, to_date, when
df_transformed = df_parsed \
    .withColumn("last_updated_clean", regexp_replace(col("last_updated"), "(Z|\\+00:00)$", "")) \
    .withColumn(
        "last_updated_ts",
        when(
            col("last_updated_clean").isNotNull(),
            to_timestamp(col("last_updated_clean"), "yyyy-MM-dd'T'HH:mm:ss.SSS")
        ).otherwise(current_timestamp())
    ) \
    .withColumn("date_partition", to_date(col("last_updated_ts"))) \
    .drop("last_updated", "last_updated_clean")

In [0]:
df_metrics = df_transformed \
    .withColumn("market_cap_double", col("market_cap").cast("double")) \
    .withColumn("total_volume_double", col("total_volume").cast("double")) \
    .withColumn("market_cap_billions", 
        when(col("market_cap_double").isNotNull(), col("market_cap_double") / 1_000_000_000.0)
        .otherwise(0.0)
    ) \
    .withColumn("volume_to_market_cap_ratio",
        when(
            (col("total_volume_double").isNotNull()) & 
            (col("market_cap_double").isNotNull()) & 
            (col("market_cap_double") > 0),
            col("total_volume_double") / col("market_cap_double")
        ).otherwise(0.0)
    )

In [0]:
df_metrics.printSchema()

root
 |-- ath: string (nullable = true)
 |-- ath_change_percentage: string (nullable = true)
 |-- ath_date: string (nullable = true)
 |-- atl: string (nullable = true)
 |-- atl_change_percentage: string (nullable = true)
 |-- atl_date: string (nullable = true)
 |-- circulating_supply: string (nullable = true)
 |-- current_price: string (nullable = true)
 |-- fully_diluted_valuation: string (nullable = true)
 |-- high_24h: string (nullable = true)
 |-- id: string (nullable = true)
 |-- image: string (nullable = true)
 |-- low_24h: string (nullable = true)
 |-- market_cap: string (nullable = true)
 |-- market_cap_change_24h: string (nullable = true)
 |-- market_cap_change_percentage_24h: string (nullable = true)
 |-- market_cap_rank: string (nullable = true)
 |-- max_supply: string (nullable = true)
 |-- name: string (nullable = true)
 |-- price_change_24h: string (nullable = true)
 |-- price_change_percentage_24h: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- total

In [0]:
df_final = df_metrics \
    .filter(
    col("symbol").isNotNull() & 
    col("last_updated_ts").isNotNull() &
    col("date_partition").isNotNull() &
    col("market_cap_billions").isNotNull() &
    col('id').isNotNull()
    )\
    .dropDuplicates(['id', 'last_updated_ts'])

In [0]:
spark.read

<pyspark.sql.readwriter.DataFrameReader at 0x7f28bc954e90>

In [0]:
df_final.writeStream

<pyspark.sql.streaming.readwriter.DataStreamWriter at 0x7f28bc93bdd0>

In [0]:
df_final.printSchema()

root
 |-- ath: string (nullable = true)
 |-- ath_change_percentage: string (nullable = true)
 |-- ath_date: string (nullable = true)
 |-- atl: string (nullable = true)
 |-- atl_change_percentage: string (nullable = true)
 |-- atl_date: string (nullable = true)
 |-- circulating_supply: string (nullable = true)
 |-- current_price: string (nullable = true)
 |-- fully_diluted_valuation: string (nullable = true)
 |-- high_24h: string (nullable = true)
 |-- id: string (nullable = true)
 |-- image: string (nullable = true)
 |-- low_24h: string (nullable = true)
 |-- market_cap: string (nullable = true)
 |-- market_cap_change_24h: string (nullable = true)
 |-- market_cap_change_percentage_24h: string (nullable = true)
 |-- market_cap_rank: string (nullable = true)
 |-- max_supply: string (nullable = true)
 |-- name: string (nullable = true)
 |-- price_change_24h: string (nullable = true)
 |-- price_change_percentage_24h: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- total

In [0]:
%sql
SHOW EXTERNAL LOCATIONS;

name,url,comment
bronzepath,abfss://cryptocontainer001@cryptostorage1.dfs.core.windows.net/bronze,
metastore_root_location,abfss://metastore-root@cryptostorage1.dfs.core.windows.net/,"Auto-created external location which provides access to the nominated metastore-level storage account for the metastore. Changing the URL on this external location will not update the metastore-level storage, and could break access. You can update the credential on this external location if desired."
schema_path_layer,abfss://schema@cryptostorage1.dfs.core.windows.net/,
silver_layer_path,abfss://silver@cryptostorage1.dfs.core.windows.net/,
silverpath,abfss://cryptocontainer001@cryptostorage1.dfs.core.windows.net/silver,


###Write to silver layer

In [0]:
query = df_final.writeStream \
    .format('delta')\
    .outputMode('append')\
    .option('checkpointLocation', "abfss://silver@cryptostorage1.dfs.core.windows.net/_checkpoints/crypto_market")\
    .trigger(once=True)\
    .start(silver_path)

In [0]:
df

DataFrame[ath: string, ath_change_percentage: string, ath_date: string, atl: string, atl_change_percentage: string, atl_date: string, circulating_supply: string, current_price: string, fully_diluted_valuation: string, high_24h: string, id: string, image: string, last_updated: string, low_24h: string, market_cap: string, market_cap_change_24h: string, market_cap_change_percentage_24h: string, market_cap_rank: string, max_supply: string, name: string, price_change_24h: string, price_change_percentage_24h: string, roi: struct<times:double,currency:string,percentage:double>, symbol: string, total_supply: string, total_volume: string, _rescued_data: string, ingested_time: timestamp]

In [0]:
query.status
query.lastProgress

{'id': '12e8eb10-e075-4d31-96d5-fcfe71bb3d4f',
 'runId': '144e221e-7512-48a8-9ade-e82147dd4718',
 'name': None,
 'timestamp': '2026-01-23T06:46:38.465Z',
 'batchId': 1,
 'batchDuration': 3275,
 'numInputRows': 0,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.0,
 'durationMs': {'latestOffset': 3200, 'triggerExecution': 3271},
 'stateOperators': [],
 'sources': [{'description': 'CloudFilesSource[abfss://cryptocontainer001@cryptostorage1.dfs.core.windows.net/bronze]',
   'startOffset': {'seqNum': 6,
    'sourceVersion': 3,
    'lastBackfillStartTimeMs': 1769106837341,
    'lastBackfillFinishTimeMs': 1769106838929,
    'lastInputPath': 'abfss://cryptocontainer001@cryptostorage1.dfs.core.windows.net/bronze'},
   'endOffset': {'seqNum': 6,
    'sourceVersion': 3,
    'lastBackfillStartTimeMs': 1769106837341,
    'lastBackfillFinishTimeMs': 1769106838929,
    'lastInputPath': 'abfss://cryptocontainer001@cryptostorage1.dfs.core.windows.net/bronze'},
   'latestOffset': None,
   'numI

In [0]:
query.stop

<bound method StreamingQuery.stop of <pyspark.sql.streaming.query.StreamingQuery object at 0x7f28bc8ca790>>