In [0]:
dbutils.widgets.text("sourceTable", "sourceTable")
dbutils.widgets.dropdown("sourceDatabase", "default", ["default","bronze", "silver", "gold","bronze1","bronze2","silver1","silver2","silver3"])
dbutils.widgets.text("targetTable", "targetTable")
dbutils.widgets.dropdown("targetDatabase", "default", ["default","bronze", "silver", "gold","bronze1","bronze2","silver1","silver2","silver3"])
dbutils.widgets.text("checkpointLocation", "checkpointLocation")

In [0]:
targetTable=dbutils.widgets.get("targetTable")

targetDatabase=dbutils.widgets.get("targetDatabase")

target_path=f"{targetDatabase}.{targetTable}"

sourceTable=dbutils.widgets.get("sourceTable")

sourceDatabase=dbutils.widgets.get("sourceDatabase")

source_path=f"{sourceDatabase}.{sourceTable}"
print(source_path)

checkpointLocation=dbutils.widgets.get("checkpointLocation")
print(checkpointLocation)

default.bronzeindia
/dbfs/FileStore/Indian_Silver/Indian_silver_stock


In [0]:
bronze_df=spark.readStream.format("delta").table(source_path)

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


# Define your schema
df_schema = StructType() \
    .add("stock_name", StringType()) \
    .add("Indian_Market_Price", FloatType()) \
    .add("market_details", ArrayType(StructType().add("marketchange", FloatType()).add("marketchangepercent", StringType()).add("data_produced_timestamp", DoubleType())))

# Parse the JSON data and split 'stock_name'
parsed_df = bronze_df.select(
    from_json(col("value"), df_schema).alias("new_df"),
    col("confluent_timestamp")
).select(
    regexp_extract(col("new_df.stock_name"), '^(.*?) \(', 1).alias("Company_Name"),
    regexp_extract(col("new_df.stock_name"), '\((.*?)\)$', 1).alias("Symbol"),
    col("new_df.Indian_Market_Price").alias("Indian_Market_Price"),
    col("new_df.market_details"),
    col("confluent_timestamp")
)

# Explode 'market_details' and rename columns
silver = parsed_df.select(
    "Company_Name",
    "Symbol",
    "Indian_Market_Price",
    explode("market_details").alias("market_detail"),
    "confluent_timestamp"
).select(
    "Company_Name",
    "Symbol",
    "Indian_Market_Price",
    col("market_detail.marketchange").alias("Market_Price_Change"),
    col("market_detail.marketchangepercent").alias("Market_ChangePercent"),
    "confluent_timestamp"
    )

# Add date and time columns from confluent_timestamp
silver = silver.withColumn("Date", date_format(col("confluent_timestamp"), "yyyy-MM-dd"))
silver = silver.withColumn("Time", date_format(col("confluent_timestamp"), "HH:mm:ss"))

display(silver)


Company_Name,Symbol,Indian_Market_Price,Market_Price_Change,Market_ChangePercent,confluent_timestamp,Date,Time
Kotak Mahindra Bank Limited,KOTAKBANK.NS,1740.45,-12.95,-0.74%,2023-10-19T09:17:05.334+0000,2023-10-19,09:17:05
Kotak Mahindra Bank Limited,KOTAKBANK.NS,1740.05,-13.35,-0.76%,2023-10-19T09:20:13.843+0000,2023-10-19,09:20:13
Britannia Industries Limited,BRITANNIA.NS,4600.95,-6.75,-0.15%,2023-10-19T09:21:39.787+0000,2023-10-19,09:21:39
Tata Consultancy Services Limited,TCS.NS,3459.0,-28.25,-0.81%,2023-10-19T09:20:57.191+0000,2023-10-19,09:20:57
Britannia Industries Limited,BRITANNIA.NS,4604.4,-3.3,-0.07%,2023-10-19T09:19:00.112+0000,2023-10-19,09:19:00
Tata Consultancy Services Limited,TCS.NS,3458.05,-29.2,-0.84%,2023-10-19T09:18:07.464+0000,2023-10-19,09:18:07
Reliance Industries Limited,RELIANCE.NS,2308.05,-15.95,-0.69%,2023-10-19T09:20:43.462+0000,2023-10-19,09:20:43
Reliance Industries Limited,RELIANCE.NS,2305.95,-18.05,-0.78%,2023-10-19T09:23:10.365+0000,2023-10-19,09:23:10
Tata Consultancy Services Limited,TCS.NS,3460.65,-26.6,-0.76%,2023-10-19T09:23:23.267+0000,2023-10-19,09:23:23
Kotak Mahindra Bank Limited,KOTAKBANK.NS,1738.7,-14.7,-0.84%,2023-10-19T09:22:37.867+0000,2023-10-19,09:22:37


In [0]:
silver.writeStream \
   .format("delta") \
   .outputMode("append") \
   .option("checkpointLocation", checkpointLocation) \
   .toTable(target_path)

In [0]:
%sql 
SELECT * from default.silverindia

Company_Name,Symbol,Indian_Market_Price,Market_Price_Change,Market_ChangePercent,confluent_timestamp,Date,Time
Kotak Mahindra Bank Limited,KOTAKBANK.NS,1740.45,-12.95,-0.74%,2023-10-19T09:17:05.334+0000,2023-10-19,09:17:05
Kotak Mahindra Bank Limited,KOTAKBANK.NS,1740.05,-13.35,-0.76%,2023-10-19T09:20:13.843+0000,2023-10-19,09:20:13
Britannia Industries Limited,BRITANNIA.NS,4600.95,-6.75,-0.15%,2023-10-19T09:21:39.787+0000,2023-10-19,09:21:39
Tata Consultancy Services Limited,TCS.NS,3459.95,-27.3,-0.78%,2023-10-19T09:26:13.162+0000,2023-10-19,09:26:13
Tata Consultancy Services Limited,TCS.NS,3459.0,-28.25,-0.81%,2023-10-19T09:20:57.191+0000,2023-10-19,09:20:57
Britannia Industries Limited,BRITANNIA.NS,4604.4,-3.3,-0.07%,2023-10-19T09:19:00.112+0000,2023-10-19,09:19:00
Tata Consultancy Services Limited,TCS.NS,3458.05,-29.2,-0.84%,2023-10-19T09:18:07.464+0000,2023-10-19,09:18:07
Reliance Industries Limited,RELIANCE.NS,2308.05,-15.95,-0.69%,2023-10-19T09:20:43.462+0000,2023-10-19,09:20:43
Reliance Industries Limited,RELIANCE.NS,2305.95,-18.05,-0.78%,2023-10-19T09:23:10.365+0000,2023-10-19,09:23:10
Tata Consultancy Services Limited,TCS.NS,3460.65,-26.6,-0.76%,2023-10-19T09:23:23.267+0000,2023-10-19,09:23:23


In [0]:
silver.createOrReplaceGlobalTempView("gblsilver_india")

checkpointLocation:/dbfs/FileStore/Indian_Silver/Indian_silver_stock
sourceDatabase : default
sourceTable : bronzeindia
targetDatabase: default
targetTable : silverindia