In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
bronzepath = "abfss://kriptocontainer002@kriptostorage003.dfs.core.windows.net/bronze"
silver_path = "abfss://silver@kriptostorage003.dfs.core.windows.net/silver_kripto"
schema_path = "abfss://schema@kriptostorage003.dfs.core.windows.net/"

In [0]:
print(silver_path)

abfss://silver@kriptostorage003.dfs.core.windows.net/silver_kripto


Using Autoloader

In [0]:
from pyspark.sql.functions import current_timestamp
# Define the schema hint string (example capturing most keys)
schema_hint = """
  roi STRUCT<times:DOUBLE, currency:STRING, percentage:DOUBLE>,
  ath STRING,
  ath_change_percentage STRING,
  ath_date STRING,
  atl STRING,
  atl_change_percentage STRING,
  atl_date STRING,
  circulating_supply STRING,
  current_price STRING,
  fully_diluted_valuation STRING,
  high_24h STRING,
  id STRING,
  image STRING,
  last_updated STRING,
  low_24h STRING,
  market_cap STRING,
  market_cap_change_24h STRING,
  market_cap_change_percentage_24h STRING,
  market_cap_rank STRING,
  max_supply STRING,
  name STRING,
  price_change_24h STRING,
  price_change_percentage_24h STRING,
  symbol STRING,
  total_supply STRING,
  total_volume STRING
"""
df = (
  spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", schema_path)  
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
    .option("cloudFiles.schemaHints", schema_hint).option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-key>")
    .load(bronzepath)
    .withColumn("ingested_time", current_timestamp()) )

In [0]:
df.printSchema()

root
 |-- ath: string (nullable = true)
 |-- ath_change_percentage: string (nullable = true)
 |-- ath_date: string (nullable = true)
 |-- atl: string (nullable = true)
 |-- atl_change_percentage: string (nullable = true)
 |-- atl_date: string (nullable = true)
 |-- circulating_supply: string (nullable = true)
 |-- current_price: string (nullable = true)
 |-- fully_diluted_valuation: string (nullable = true)
 |-- high_24h: string (nullable = true)
 |-- id: string (nullable = true)
 |-- image: string (nullable = true)
 |-- last_updated: string (nullable = true)
 |-- low_24h: string (nullable = true)
 |-- market_cap: string (nullable = true)
 |-- market_cap_change_24h: string (nullable = true)
 |-- market_cap_change_percentage_24h: string (nullable = true)
 |-- market_cap_rank: string (nullable = true)
 |-- max_supply: string (nullable = true)
 |-- name: string (nullable = true)
 |-- price_change_24h: string (nullable = true)
 |-- price_change_percentage_24h: string (nullable = true)
 |--

Transformation of the data using pyspark

In [0]:
from pyspark.sql.functions import col

# Define schema for roi
roi_schema = StructType() \
    .add("times", DoubleType()) \
    .add("currency", StringType()) \
    .add("percentage", DoubleType())

# Rename or directly access 'roi' as if it's a struct (⚠️ this assumes it's already parsed)
df_with_struct = df.withColumn("roi_struct", col("roi"))

# Extract individual fields and clean up
df_parse = (
    df_with_struct
      .withColumn("roi_time", col("roi_struct.times"))
      .withColumn("roi_currency", col("roi_struct.currency"))
      .withColumn("roi_percentage", col("roi_struct.percentage"))
      .drop("roi_struct", "roi")
)


In [0]:
df.printSchema()

root
 |-- ath: string (nullable = true)
 |-- ath_change_percentage: string (nullable = true)
 |-- ath_date: string (nullable = true)
 |-- atl: string (nullable = true)
 |-- atl_change_percentage: string (nullable = true)
 |-- atl_date: string (nullable = true)
 |-- circulating_supply: string (nullable = true)
 |-- current_price: string (nullable = true)
 |-- fully_diluted_valuation: string (nullable = true)
 |-- high_24h: string (nullable = true)
 |-- id: string (nullable = true)
 |-- image: string (nullable = true)
 |-- last_updated: string (nullable = true)
 |-- low_24h: string (nullable = true)
 |-- market_cap: string (nullable = true)
 |-- market_cap_change_24h: string (nullable = true)
 |-- market_cap_change_percentage_24h: string (nullable = true)
 |-- market_cap_rank: string (nullable = true)
 |-- max_supply: string (nullable = true)
 |-- name: string (nullable = true)
 |-- price_change_24h: string (nullable = true)
 |-- price_change_percentage_24h: string (nullable = true)
 |--

In [0]:
from pyspark.sql.functions import col, regexp_replace, to_timestamp, to_date, when, current_timestamp

df_transformed = (
    df_parse
    .withColumn("last_updated_clean", regexp_replace("last_updated", "T", " "))
    .withColumn("last_updated_clean", regexp_replace("last_updated_clean", "Z", ""))
    .withColumn("last_updated_ts", to_timestamp("last_updated_clean", "yyyy-MM-dd HH:mm:ss.SSS"))
    .withColumn("last_updated_ts", when(col("last_updated_ts").isNotNull(), col("last_updated_ts"))
                                    .otherwise(current_timestamp()))
    .withColumn("date_partition", to_date(col("last_updated_ts")))
    .drop("last_updated", "last_updated_clean")
)

In [0]:
df.printSchema()

root
 |-- ath: string (nullable = true)
 |-- ath_change_percentage: string (nullable = true)
 |-- ath_date: string (nullable = true)
 |-- atl: string (nullable = true)
 |-- atl_change_percentage: string (nullable = true)
 |-- atl_date: string (nullable = true)
 |-- circulating_supply: string (nullable = true)
 |-- current_price: string (nullable = true)
 |-- fully_diluted_valuation: string (nullable = true)
 |-- high_24h: string (nullable = true)
 |-- id: string (nullable = true)
 |-- image: string (nullable = true)
 |-- last_updated: string (nullable = true)
 |-- low_24h: string (nullable = true)
 |-- market_cap: string (nullable = true)
 |-- market_cap_change_24h: string (nullable = true)
 |-- market_cap_change_percentage_24h: string (nullable = true)
 |-- market_cap_rank: string (nullable = true)
 |-- max_supply: string (nullable = true)
 |-- name: string (nullable = true)
 |-- price_change_24h: string (nullable = true)
 |-- price_change_percentage_24h: string (nullable = true)
 |--

In [0]:
from pyspark.sql.functions import col, when
from pyspark.sql.types import DoubleType, IntegerType

df_transformed_final = (
    df_transformed
    .withColumn("ath_change_percentage", col("ath_change_percentage").cast(DoubleType()))
    .withColumn("atl_change_percentage", col("atl_change_percentage").cast(DoubleType()))
    .withColumn("circulating_supply", col("circulating_supply").cast(DoubleType()))
    .withColumn("current_price", col("current_price").cast(DoubleType()))
    .withColumn("fully_diluted_valuation", col("fully_diluted_valuation").cast(DoubleType()))
    .withColumn("high_24h", col("high_24h").cast(DoubleType()))
    .withColumn("low_24h", col("low_24h").cast(DoubleType()))
    .withColumn("market_cap", col("market_cap").cast(DoubleType()))
    .withColumn("market_cap_change_24h", col("market_cap_change_24h").cast(DoubleType()))
    .withColumn("market_cap_change_percentage_24h", col("market_cap_change_percentage_24h").cast(DoubleType()))
    .withColumn("market_cap_rank", col("market_cap_rank").cast(IntegerType()))
    .withColumn("max_supply", col("max_supply").cast(DoubleType()))
    .withColumn("price_change_24h", col("price_change_24h").cast(DoubleType()))
    .withColumn("price_change_percentage_24h", col("price_change_percentage_24h").cast(DoubleType()))
    .withColumn("total_supply", col("total_supply").cast(DoubleType()))
    .withColumn("total_volume", col("total_volume").cast(DoubleType()))

    # Handle null or missing values
    .fillna({"max_supply": 0, "circulating_supply": 0, "market_cap": 0})

    # Create new derived column for market cap category
    .withColumn(
        "market_cap_category",
        when(col("market_cap") >= 1e10, "Large Cap")
        .when(col("market_cap") >= 1e9, "Mid Cap")
        .when(col("market_cap") > 0, "Small Cap")
        .otherwise("Unknown")
    )

    # Rename columns
    .withColumnRenamed("last_updated_ts", "last_updated")
    .withColumnRenamed("market_cap_rank", "market_rank")
)

In [0]:
from pyspark.sql.functions import col

df_cleaned = (
    df_transformed_final
    .filter(
        col("symbol").isNotNull() &
        col("last_updated").isNotNull() &      
        col("date_partition").isNotNull() &
        col("id").isNotNull() &
        col("market_cap").isNotNull()
    )
)

In [0]:
query = (
    df_cleaned
    .withColumn("ingested_time", current_timestamp())
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "abfss://silver@kriptostorage003.dfs.core.windows.net/_checkpoint/crypto_market")
    .option("mergeSchema", "true")
    .trigger(once=True)
    .start(silver_path)
)

query.awaitTermination()

In [0]:
query.stop

<bound method StreamingQuery.stop of <pyspark.sql.streaming.query.StreamingQuery object at 0x7f375772d1d0>>

In [0]:
df_cleaned.isStreaming

True

In [0]:
dbutils.fs.ls("abfss://kriptocontainer002@kriptostorage003.dfs.core.windows.net/bronze")

[FileInfo(path='abfss://kriptocontainer002@kriptostorage003.dfs.core.windows.net/bronze/_SUCCESS', name='_SUCCESS', size=0, modificationTime=1753953589000),
 FileInfo(path='abfss://kriptocontainer002@kriptostorage003.dfs.core.windows.net/bronze/_committed_1573801832795881955', name='_committed_1573801832795881955', size=376, modificationTime=1753953589000),
 FileInfo(path='abfss://kriptocontainer002@kriptostorage003.dfs.core.windows.net/bronze/_committed_4449790663334205906', name='_committed_4449790663334205906', size=376, modificationTime=1753820082000),
 FileInfo(path='abfss://kriptocontainer002@kriptostorage003.dfs.core.windows.net/bronze/_committed_vacuum5785799467184854531', name='_committed_vacuum5785799467184854531', size=96, modificationTime=1753953590000),
 FileInfo(path='abfss://kriptocontainer002@kriptostorage003.dfs.core.windows.net/bronze/_started_1573801832795881955', name='_started_1573801832795881955', size=0, modificationTime=1753953588000),
 FileInfo(path='abfss://kr

In [0]:
dbutils.fs.rm("abfss://silver@kriptostorage003.dfs.core.windows.net/_checkpoint/crypto_market", True)

True

In [0]:
query.lastProgress

{'id': '1bd8e63f-05d6-40e6-b533-988bfb4d7a50',
 'runId': '169b4844-0227-4286-951c-515405e74749',
 'name': None,
 'timestamp': '2025-08-05T14:40:27.853Z',
 'batchId': 1,
 'batchDuration': 2035,
 'numInputRows': 0,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.0,
 'durationMs': {'latestOffset': 1980, 'triggerExecution': 2031},
 'stateOperators': [],
 'sources': [{'description': 'CloudFilesSource[abfss://kriptocontainer002@kriptostorage003.dfs.core.windows.net/bronze]',
   'startOffset': {'seqNum': 18,
    'sourceVersion': 3,
    'lastBackfillStartTimeMs': 1753954626233,
    'lastBackfillFinishTimeMs': 1753954627789,
    'lastInputPath': 'abfss://kriptocontainer002@kriptostorage003.dfs.core.windows.net/bronze'},
   'endOffset': {'seqNum': 18,
    'sourceVersion': 3,
    'lastBackfillStartTimeMs': 1753954626233,
    'lastBackfillFinishTimeMs': 1753954627789,
    'lastInputPath': 'abfss://kriptocontainer002@kriptostorage003.dfs.core.windows.net/bronze'},
   'latestOffset': None,


In [0]:
silver_path = "abfss://silver@kriptostorage003.dfs.core.windows.net/silver_kripto"
silver_checkpoint = "abfss://silver@kriptostorage003.dfs.core.windows.net/_checkpoint/crypto_market"

In [0]:
query = (
    df_cleaned
    .withColumn("ingested_time", current_timestamp())
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "abfss://silver@kriptostorage003.dfs.core.windows.net/_checkpoint/crypto_market")
    .option("mergeSchema", "true")
    .trigger(once=True)  # ensures it writes once and stops
    .start(silver_path)
)

In [0]:
query.awaitTermination()

In [0]:
df_silver = spark.read.format("delta").load(silver_path)
display(df_silver)

ath,ath_change_percentage,ath_date,atl,atl_change_percentage,atl_date,circulating_supply,current_price,fully_diluted_valuation,high_24h,id,image,low_24h,market_cap,market_cap_change_24h,market_cap_change_percentage_24h,market_rank,max_supply,name,price_change_24h,price_change_percentage_24h,symbol,total_supply,total_volume,_rescued_data,ingested_time,roi_time,roi_currency,roi_percentage,last_updated,date_partition,market_cap_category
0.00700261,-25.99487,2025-05-30T03:20:44.650Z,0.00069854,641.87447,2024-08-05T13:31:51.088Z,86306925195.57034,0.005173,517629519.0,0.00554746,zebec-network,https://coin-images.coingecko.com/coins/images/37052/large/zbcn.jpeg?1713168241,0.00429283,446755253.0,65278998.0,17.1122,188,100000000000.0,Zebec Network,0.00076497,17.35395,zbcn,99998851325.12544,70864075.0,,2025-08-05T14:41:30.524Z,,,,2025-07-29T19:45:23.934Z,2025-07-29,Small Cap
4084.25,-9.08596,2024-12-06T20:43:31.342Z,1137.25,226.50373,2022-11-23T02:25:33.267Z,120228.1619903289,3713.16,446273223.0,3882.51,frax-ether,https://coin-images.coingecko.com/coins/images/28284/large/frxETH_icon.png?1696527284,3689.9,446273223.0,-8940809.777430296,-1.96409,189,0.0,Frax Ether,-72.10275192251129,-1.90483,frxeth,120228.1619903289,1880446.0,,2025-08-05T14:41:30.524Z,,,,2025-07-29T19:45:22.537Z,2025-07-29,Small Cap
910.54,-94.81147,2021-05-12T02:29:08.794Z,25.74,83.54006,2023-06-10T16:15:05.289Z,9395611.377689738,47.26,472388681.0,49.68,compound-governance-token,https://coin-images.coingecko.com/coins/images/10775/large/COMP.png?1696510737,47.01,443838047.0,-17695242.76238191,-3.83401,190,10000000.0,Compound,-1.8217469941707591,-3.71185,comp,10000000.0,59980450.0,,2025-08-05T14:41:30.524Z,,,,2025-07-29T19:45:13.018Z,2025-07-29,Small Cap
4.52,-86.96306,2024-03-07T22:19:11.131Z,0.418236,40.85195,2025-06-22T20:22:23.718Z,753516896.7490065,0.588435,564070359.0,0.619432,dydx-chain,https://coin-images.coingecko.com/coins/images/32594/large/dydx.png?1698673495,0.587354,443512038.0,-17699757.301310778,-3.83766,191,1000000000.0,dYdX,-0.0229908681170241,-3.76021,dydx,958342751.0,12767080.0,,2025-08-05T14:41:30.524Z,,,,2025-07-29T19:45:14.927Z,2025-07-29,Small Cap
4090.99,-7.96174,2024-12-16T18:57:35.001Z,1390.73,170.74092,2025-04-09T01:31:27.335Z,116697.1033546364,3768.94,438772861.0,3878.96,polygon-pos-bridged-weth-polygon-pos,https://coin-images.coingecko.com/coins/images/39708/large/WETH.PNG?1723730343,3729.29,438959975.0,-3217889.029682696,-0.72774,192,0.0,Polygon PoS Bridged WETH (Polygon POS),-6.71626630910032,-0.17788,weth,116647.3592901615,21106388.0,,2025-08-05T14:41:30.524Z,,,,2025-07-29T19:45:12.373Z,2025-07-29,Small Cap
545.64,-97.18913,2021-11-23T10:33:26.737Z,6.51,135.71554,2020-10-07T01:44:53.554Z,28488568.0,15.34,436887974.0,16.14,elrond-erd-2,https://coin-images.coingecko.com/coins/images/12335/large/egld-token-logo.png?1696512162,15.27,436887974.0,-16781816.824308038,-3.69913,193,31415926.0,MultiversX,-0.5441698585572663,-3.425,egld,28488568.0,21497251.0,,2025-08-05T14:41:30.524Z,,,,2025-07-29T19:45:18.134Z,2025-07-29,Small Cap
0.00038001,-94.28313,2021-09-04T17:09:31.137Z,1.604e-05,35.4443,2025-04-07T07:05:49.124Z,19898973422582.0,2.172e-05,432119995.0,2.266e-05,ecash,https://coin-images.coingecko.com/coins/images/16646/large/Logo_final-22.png?1696516207,2.162e-05,432119181.0,-7609495.8735227585,-1.7305,194,21000000000000.0,eCash,-3.14749478707e-07,-1.42856,xec,19899010922582.0,10259367.0,,2025-08-05T14:41:30.524Z,,,,2025-07-29T19:45:02.942Z,2025-07-29,Small Cap
66.45,-99.96666,2020-09-11T03:18:35.837Z,0.00462303,379.24197,2022-11-14T04:45:13.210Z,19152857311.86329,0.02219162,441015348.0,0.02327249,sun-token,https://coin-images.coingecko.com/coins/images/12424/large/RSFOmQ.png?1696512245,0.02006776,424441920.0,35015521.0,8.99156,195,0.0,Sun Token,0.00191958,9.46907,sun,19900730000.0,157713011.0,,2025-08-05T14:41:30.524Z,,,,2025-07-29T19:45:08.817Z,2025-07-29,Small Cap
32.38,-77.10433,2021-03-08T13:29:57.935Z,0.671563,1003.96682,2020-11-06T19:31:40.144Z,57103774.56313077,7.41,715739516.0,7.45,dexe,https://coin-images.coingecko.com/coins/images/12713/large/DEXE_token_logo.png?1696512514,7.26,423517928.0,5418892.0,1.29608,196,0.0,DeXe,0.079239,1.0814,dexe,96504599.33609451,5901569.0,,2025-08-05T14:41:30.524Z,,,,2025-07-29T19:45:11.272Z,2025-07-29,Small Cap
4837.52,-5.29936,2024-12-16T19:47:19.031Z,1683.91,172.05599,2025-04-09T01:47:32.152Z,92159.39373519502,4564.75,422083290.0,4692.22,treehouse-eth,https://coin-images.coingecko.com/coins/images/40155/large/tETH_Logo_%28Color__No_Background%29.png?1748368386,4509.49,422083290.0,-31176489.21381861,-6.87828,197,0.0,Treehouse ETH,-6.365359513880321,-0.13925,teth,92159.39373519502,51578324.0,,2025-08-05T14:41:30.524Z,,,,2025-07-29T19:45:16.847Z,2025-07-29,Small Cap


In [0]:
print(silver_checkpoint)

abfss://silver@kriptostorage003.dfs.core.windows.net/_checkpoint/crypto_market


In [0]:
query.awaitTermination()

In [0]:
# List all active streaming queries
for stream in spark.streams.active:
    print(f"Query Name: {stream.name}")
    print(f"Query ID: {stream.id}")
    print(f"Is Active: {stream.isActive}")
    print(f"Last Progress: {stream.lastProgress}")
    print("-" * 40)