In [0]:
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import *

class Bronze():
    def __init__(self):
        self.base_data_dir = "/FileStore/xxxx"
        self.BOOTSTRAP_SERVER = "xxxx"
        self.JAAS_MODULE = "xxxx"
        self.CLUSTER_API_KEY = "xxxx"
        self.CLUSTER_API_SECRET = "xxxx"
        self.cast_schema = ArrayType(StructType([
                StructField(r'(PDH-CSV 4.0) (ora solare Europa occidentale)(-60)', StringType(), True),
                StructField(r'\\SUPERPORTATILE\Memoria\MByte disponibili', StringType(), True),
                StructField(r'\\SUPERPORTATILE\Interfaccia di rete(Realtek USB GbE Family Controller)\Byte ricevuti/sec', StringType(), True),
                StructField(r'\\SUPERPORTATILE\Interfaccia di rete(Intel[R] Dual Band Wireless-AC 7265)\Byte ricevuti/sec', StringType(), True),
                StructField(r'\\SUPERPORTATILE\Interfaccia di rete(Realtek USB GbE Family Controller)\Byte inviati/sec', StringType(), True),
                StructField(r'\\SUPERPORTATILE\Interfaccia di rete(Intel[R] Dual Band Wireless-AC 7265)\Byte inviati/sec', StringType(), True),
                StructField(r'\\SUPERPORTATILE\Disco fisico(_Total)\Media byte letti da disco', StringType(), True),
                StructField(r'\\SUPERPORTATILE\Disco fisico(_Total)\Media byte scritti su disco', StringType(), True),
                StructField(r'\\SUPERPORTATILE\Processore(_Total)\% Tempo privilegiato', StringType(), True),
                StructField(r'\\SUPERPORTATILE\Processore(_Total)\% Tempo processore', StringType(), True),
                StructField(r'\\SUPERPORTATILE\Processore(_Total)\% Tempo utente', StringType(), True),
                StructField(r'\\SUPERPORTATILE\Ritardo input utente per sessione(0)\Ritardo massimo input', StringType(), True),
                StructField(r'\\SUPERPORTATILE\Ritardo input utente per sessione(1)\Ritardo massimo input', StringType(), True),
                StructField(r'\\SUPERPORTATILE\Ritardo input utente per sessione(Max)\Ritardo massimo input', StringType(), True),
                StructField(r'\\SUPERPORTATILE\Ritardo input utente per sessione(Average)\Ritardo massimo input', StringType(), True),
                StructField(r'Genera un rapporto con informazioni dettagliate sullo stato delle risorse hardware locali, sui tempi di risposta del sistema e sui processi eseguiti nel computer locale. Tali informazioni consentono di identificare le possibili cause dei problemi di prestazioni. Per eseguire questo Insieme agenti di raccolta dati, � necessario essere almeno membri del gruppo Administrators locale o di un gruppo equivalente.', StringType(), True),
        ]))

    def ingestFromKafka(self, startingTime = 1):
        return ( spark.readStream
                        .format("kafka")            
                        .option("kafka.bootstrap.servers", self.BOOTSTRAP_SERVER)
                        .option("kafka.security.protocol", "xxxx")
                        .option("kafka.sasl.mechanism", "xxxx")
                        .option("kafka.sasl.jaas.config", f"{self.JAAS_MODULE} required username='{self.CLUSTER_API_KEY}' password='{self.CLUSTER_API_SECRET}';")
                        .option("subscribe", "xxxx")
                        .option("maxOffsetsPerTrigger", 10)
                        .option("startingTimestamp", startingTime)
                        .option("startingOffsetsByTimestampStrategy", "latest")
                        .load()
                )
       
    def process(self, startingTime = 1):
        print(f"Starting Bronze Stream...", end='')
        rawDF = self.ingestFromKafka(startingTime)
        invoicesDF =rawDF.select(rawDF.key.cast("string").alias("kafka_key"),
                                rawDF.value.cast("string").alias("value"),
                                rawDF.topic.cast("string").alias("kafka_topic"),
                                rawDF.timestamp.alias("kafka_timestamp"))
        df_explode_1 =invoicesDF.withColumn("json",F.explode(F.from_json("value",self.cast_schema)))
        dfexpanded = df_explode_1.select("*", "json.*") 
        dfexpanded_2 = dfexpanded
        for field_name in dfexpanded.schema.names:
            if field_name.startswith("\\\\SUPERPORTATILE"):
                dfexpanded_2 = dfexpanded_2.withColumn(field_name, dfexpanded_2[field_name].cast(FloatType()))
        dfexpanded_3 = dfexpanded_2.withColumn('native_timestamp',to_timestamp(col("`(PDH-CSV 4.0) (ora solare Europa occidentale)(-60)`"),"MM/dd/yyyy HH:mm:ss.SSS"))
        df_clean = dfexpanded_3.drop('json','value','kafka_topic','(PDH-CSV 4.0) (ora solare Europa occidentale)(-60)','Genera un rapporto con informazioni dettagliate sullo stato delle risorse hardware locali, sui tempi di risposta del sistema e sui processi eseguiti nel computer locale. Tali informazioni consentono di identificare le possibili cause dei problemi di prestazioni. Per eseguire questo Insieme agenti di raccolta dati, � necessario essere almeno membri del gruppo Administrators locale o di un gruppo equivalente.')
        df_c = df_clean.select([col(c).alias(
                c.replace( '(', '')
                .replace( ')', '')
                .replace( ',', '')
                .replace( ';', '')
                .replace( '{', '')
                .replace( '}', '')
                .replace( '\n', '')
                .replace( '\t', '')
                .replace( ' ', '_')
            ) for c in df_clean.columns])
        df_c2 = df_c.filter(df_c.native_timestamp.isNotNull())
        df_c3 = df_c2.dropDuplicates(['native_timestamp'])
        sQuery =  ( df_c3.writeStream
                                .queryName("bronze-ingestion")
                                .option("checkpointLocation", f"{base_data_dir}/chekpoint/cpl_perfmoningestion")
                                .outputMode("append")
                                .toTable("perfmoningestion")           
                        )
        print("Done")
        return sQuery
       