In [None]:
from pyspark.sql.functions import col, count, to_date, split, regexp_extract, when, udf, lit, regexp_replace, cast, substr
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType, StructType, StructField, StringType, ArrayType, LongType, FloatType
from pyspark.sql.functions import udf

import json
import pandas as pd
import re
import numpy as np

In [None]:
def mount_container(container_name, mount_folder, storage_account_name, client_id, directory_id, key):
  if any(mount.mountPoint == mount_folder for mount in dbutils.fs.mounts()):
    print(f"Container '{container_name}' is already mounted.")
    return False
  else:
    configs = {"fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": client_id,
    "fs.azure.account.oauth2.client.secret": key,
    "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{directory_id}/oauth2/token"}


    dbutils.fs.mount(
    source = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net", # contrainer@storageacc
    mount_point = mount_folder,
    extra_configs = configs)
    print(f"Mount container {container_name} in {storage_account_name} into Databrick successfully")
    return True

Mount container silver in moviescrapingsa into Databrick successfully


True

In [None]:
def create_empty_delta_table(table_path,schema):
    empty_df = spark.createDataFrame([], schema=schema)
    empty_df.write.format("delta").mode("ignore").save(table_path)

In [None]:
class BronzeBDS():
    def __init__(self,read_path,write_path,checkpoint_path):
        self.read_path = read_path
        self.write_path = write_path
        self.checkpoint_path = checkpoint_path

    def get_schema(self):
        schema = StructType([
          StructField("name", StringType(), True),
          StructField("release_date", StringType(), True),
          StructField("genre", StringType(), True),
          StructField("certificate", StringType(), True),
          StructField("vote_count", StringType(), True),
          StructField("runtime", StringType(), True),
          StructField("imdb_score", StringType(), True),
          StructField("director", StringType(), True),
          StructField("writter", StringType(), True),
          StructField("stars", StringType(), True),
          StructField("budget", StringType(), True),
          StructField("gross_global", StringType(), True),
          StructField("countries", StringType(), True),
          StructField("language", StringType(), True),
          StructField("locations", StringType(), True),
          StructField("company", StringType(), True),
          StructField("url", StringType(), True),
      ])
        return schema

    def get_raw_data(self):
        lines = (spark.readStream
                    .format('json')
                    .option("multiline", "true")
                    # .option("maxFilesPerTrigger", 1)
                    .schema(self.get_schema())
                    .load(f"{self.read_path}")
                )

        return lines


    def append_bronze_data(self,bronze_df,trigger):
        sQuery =  ( bronze_df.writeStream
                            .format('delta')
                            .queryName("bronze-ingestion")
                            .option("checkpointLocation",f"{self.checkpoint_path}/bronze_table_checkpoint")
                            .outputMode("append")
        )

        if(trigger=='batch'):
            return ( sQuery.trigger(availableNow = True)
                         .start(f"{self.write_path}/bronze_delta_table"))
        else:
            return ( sQuery.trigger(processingTime = trigger)
                         .start(f"{self.write_path}/bronze_delta_table"))


    def process(self,trigger='batch'):
        print(f"\nStarting Bronze Stream...")
        # dbutils.fs.mkdirs(self.read_path)
        # Read
        bronze_df = self.get_raw_data()

        # Transform
        #create_empty_delta_table(f"{self.write_path}/bronze_delta_table",bronze_df.schema)

        # Write
        sQuery =  self.append_bronze_data(bronze_df,trigger)
        return sQuery

In [None]:
class SilverBDS():
    def __init__(self,read_path,write_path,checkpoint_path):
        self.read_path = read_path
        self.write_path = write_path
        self.checkpoint_path = checkpoint_path

    @staticmethod
    @udf(IntegerType())
    #Convert runtime
    def convert_to_minutes(runtime):
        hours = 0
        minutes = 0
        if runtime is None:
          return None
        if "hour" in runtime:
            hours = int(re.search(r'(\d+) hour', runtime).group(1))
        if "minute" in runtime:
            minutes = int(re.search(r'(\d+) minute', runtime).group(1))
        total_minutes = hours * 60 + minutes
        return total_minutes


    def read_bronze_data(self):
        schema = StructType([
            StructField("name", StringType(), True),
            StructField("release_date", StringType(), True),
            StructField("genre", StringType(), True),
            StructField("certificate", StringType(), True),
            StructField("vote_count", StringType(), True),
            StructField("runtime", StringType(), True),
            StructField("imdb_score", StringType(), True),
            StructField("director", StringType(), True),
            StructField("writter", StringType(), True),
            StructField("stars", StringType(), True),
            StructField("budget", StringType(), True),
            StructField("gross_global", StringType(), True),
            StructField("countries", StringType(), True),
            StructField("language", StringType(), True),
            StructField("locations", StringType(), True),
            StructField("company", StringType(), True),
            StructField("url", StringType(), True),
        ])
        return ( spark.readStream
                    .format('delta')
                    .load(f"{self.read_path}/bronze_delta_table")
                )

    #get correct unit of money
    def get_unit(self, money):
        money = money.replace(',', '')
        unit = ''.join(x for x in money if not x.isdigit())
        return unit
    #get number of money
    def get_number(self, money):
        money = money.replace(',', '')
        number = ''.join(x for x in money if x.isdigit())
        return float(number)
    #convert to milion usd
    def to_usd(self, money):
        unit = self.get_unit(money)
        number = self.get_number(money)
        if unit == '$':
            number = number
        elif unit == '€':
            number = number * 1.07
        elif unit == '£':
            number = number * 1.22
        elif unit == '¥':
            number = number * 0.0076
        elif unit == '₩':
            number = number * 0.0008
        elif unit == '₹':
            number = number * 0.012
        elif unit == 'TRL\xa0':
            number = number * 0.053
        elif unit == 'NOK\xa0':
            number = number * 0.1
        elif unit == 'NOK\xa0':
            number = number * 0.1
        elif unit == 'A$':
            number = number * 0.69
        elif unit == 'CA$':
            number = number * 0.75
        elif unit == 'DKK\xa0':
            number = number * 0.14
        elif unit == 'SEK\xa0':
            number = number * 0.096
        elif unit == 'MVR\xa0':
            number = number * 0.065
        elif unit == 'NZ$':
            number = number * 0.64
        elif unit == 'PKR\xa0':
            number = number * 0.0044
        elif unit == 'R$':
            number = number * 0.19
        elif unit == 'BDT\xa0':
            number = number * 0.0095
        return number / 1000000



    def transform_silver(self, bronze_df):

        # pandas_df = bronze_df.alias('bronze_df').toPandas()

        # pandas_df['gross_global'] = pandas_df['gross_global'].apply(lambda x: self.to_usd(x) if(type(x) == str) else x)
        # pandas_df['budget'] = pandas_df['budget'].apply(lambda x: self.to_usd(x) if(type(x) == str) else x)
        # bronze_df = spark.createDataFrame(pandas_df)

        date_format_str="dd/MM/yyyy"

        silver_df = bronze_df.withColumn("release_date", to_date(bronze_df["release_date"], "MMMM dd, yyyy"))

        silver_df = silver_df.withColumn("stars", split(col("stars"), ", ")) \
                              .withColumn("language", split(col("language"), ", ")) \
                              .withColumn("locations", split(col("locations"), ", ")) \
                              .withColumn("company", split(col("company"), ", "))

        #Convert vote_vcount
        silver_df = silver_df.withColumn("vote_count",
                          when(col("vote_count").contains("K"),
                                regexp_extract(col("vote_count"), r'(\d+)K', 1).cast("int") * 1000)
                          .otherwise(col("vote_count").cast("int")))

        silver_df = silver_df.withColumn("runtime", self.convert_to_minutes(silver_df["runtime"]))
        return silver_df


    def upsert(self, silver_df, batch_id):
        delta_table_path=f"{self.write_path}/silver_delta_table"
        tmp_view_name="silver_df_temp_view"
        silver_df.createOrReplaceTempView(tmp_view_name)
        merge_statement = f"""MERGE INTO delta.`{delta_table_path}` s
                USING {tmp_view_name} t
                ON s.url = t.url
                WHEN MATCHED THEN
                UPDATE SET *
                WHEN NOT MATCHED THEN
                INSERT *
            """
        silver_df._jdf.sparkSession().sql(merge_statement)

    def append_silver_data(self,silver_df,trigger):
        sQuery = (silver_df.writeStream
                    .queryName("silver-processing")
                    .format("delta")
                    .outputMode("update")
                    .foreachBatch(self.upsert)
                    .option("checkpointLocation",f"{self.checkpoint_path}/silver_table_checkpoint")
        )

        if(trigger=='batch'):
            return ( sQuery.trigger(availableNow = True)
                         .start(f"{self.write_path}/silver_delta_table"))
        else:
            return ( sQuery.trigger(processingTime = trigger)
                         .start(f"{self.write_path}/silver_delta_table"))

    def process(self,trigger='batch'):
        print(f"\nStarting Silver Stream...", end='')

        # Read
        bronze_df = self.read_bronze_data()

        # Transform
        silver_df = self.transform_silver(bronze_df)
        #create_empty_delta_table(f"{self.write_path}/silver_delta_table",silver_df.schema)
        def file_exists(dir):
            try:
                dbutils.fs.ls(dir)
            except:
                return False  
            return True

        if not file_exists(f"{self.write_path}/silver_delta_table"):
            empty_df = spark.createDataFrame([], schema=silver_df.schema)
            empty_df.write.format("delta").mode("ignore").save(f"{self.write_path}/silver_delta_table")

        # Write
        sQuery = self.append_silver_data(silver_df,trigger)
        return sQuery

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-3841870613545280>, line 1[0m
[0;32m----> 1[0m [38;5;28;01mclass[39;00m [38;5;21;01mSilverBDS[39;00m():
[1;32m      2[0m     [38;5;28;01mdef[39;00m [38;5;21m__init__[39m([38;5;28mself[39m,read_path,write_path,checkpoint_path):
[1;32m      3[0m         [38;5;28mself[39m[38;5;241m.[39mread_path [38;5;241m=[39m read_path

File [0;32m<command-3841870613545280>, line 8[0m, in [0;36mSilverBDS[0;34m()[0m
[1;32m      4[0m     [38;5;28mself[39m[38;5;241m.[39mwrite_path [38;5;241m=[39m write_path
[1;32m      5[0m     [38;5;28mself[39m[38;5;241m.[39mcheckpoint_path [38;5;241m=[39m checkpoint_path
[1;32m      7[0m [38;5;129m@staticmethod[39m
[0;32m----> 8[0m [38;5;129m@udf[39m([43mIntegerType[49m())
[1;32m      9[0m [38;5;66;03m#Convert runtime[39;00

In [None]:
def stop_all_streaming_queries():
    # Get the active streaming queries
    active_queries = spark.streams.active

    # Check if there are any active queries
    if active_queries:
        print("List of Active Streaming Queries:")
        for query in active_queries:
            print(f"Query Name: {query.name}, ID: {query.id}")
            query.stop()
    else:
        print("No active streaming queries.")

def main():
    # spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
    bronze_base_dir='/mnt/bronze'
    bronze_read_path='/mnt/bronze/bronze'


    silver_base_dir='/mnt/silver'
    silver_read_path=bronze_base_dir


    bronze=BronzeBDS(bronze_read_path,bronze_base_dir,bronze_base_dir)
    silver=SilverBDS(bronze_base_dir,silver_base_dir,silver_base_dir)

    # Trigger as batch pipeline, change this parameter if you want stream pipeline
    trigger='5 seconds'
    bzQuery=bronze.process(trigger)
    # bzQuery.awaitTermination()
    slQuery=silver.process(trigger)


No active streaming queries.

Starting Bronze Stream...

Starting Silver Stream...