In [0]:
from pyspark.sql.functions import split, explode, lower, trim

In [0]:
class Batch_WordCount(cleanup_activity):

    def __init__(self):
        super().__init__()
    
    def get_raw_data(self):
        self.df = spark.read.option('lineSep','.').text(f'{self.base_dir}/working')
        return self.df.withColumn('characters', explode(split(self.df.value,'')))
    
    def get_quality_data(self, Rawdf):
        return (Rawdf.select(Rawdf.value.alias('Text'), lower(trim(Rawdf.characters)).alias('characters'))\
            .where(Rawdf['characters'].isNotNull())\
            .where(Rawdf.characters.rlike("[a-zA-Z]")))
    
    def get_word_count(self, Quadf):
        return Quadf.groupBy(Quadf.Text, Quadf.characters).count()
    
    def inserting_into_table(self, Aggdf):
        Aggdf.write.format('delta')\
                    .mode('overwrite')\
                    .saveAsTable('batch_wordcount_table')
        
    
    def launcher(self):
        print('Batch load for Word Count is started...', end='')
        Rawdf = self.get_raw_data()
        Quadf = self.get_quality_data(Rawdf)
        Aggdf = self.get_word_count(Quadf)
        self.inserting_into_table(Aggdf)
        print('Done')
        

In [0]:
class Streaming_WordCount(cleanup_activity):

    def __init__(self):
        super().__init__()
    
    def get_raw_data(self):
        self.df = spark.readStream.option('lineSep','.').text(f'{self.base_dir}/working')
        return self.df.withColumn('characters', explode(split(self.df.value,'')))
    
    def get_quality_data(self, Rawdf):
        return (Rawdf.select(Rawdf.value.alias('Text'), lower(trim(Rawdf.characters)).alias('characters'))\
            .where(Rawdf['characters'].isNotNull())\
            .where(Rawdf.characters.rlike("[a-zA-Z]")))
    
    def get_word_count(self, Quadf):
        return Quadf.groupBy(Quadf.Text, Quadf.characters).count()
    
    def inserting_into_table(self, Aggdf):
        return (Aggdf.writeStream.format('delta')\
                    .option('checkpointLocation', f'{self.base_dir}/checkpoint')
                    .outputMode('complete')\
                    .toTable('stream_wordcount_table'))
        
    
    def launcher(self):
        print('Stream load for Word Count is started:')
        Rawdf = self.get_raw_data()
        Quadf = self.get_quality_data(Rawdf)
        Aggdf = self.get_word_count(Quadf)
        squery = self.inserting_into_table(Aggdf)
        return squery
        