In [19]:
from pyspark.sql import SparkSession
from malaya.tokenizer import SentenceTokenizer
from pyspark.sql.functions import array, col, concat_ws
from pyspark.sql import Row
from itertools import chain
import re

# Text Preprocessing with Malaya
# Broadcast Variables
# Stopwords List, https://github.com/stopwords-iso/stopwords-ms
stopwords = set([
    'abdul', 'abdullah', 'acara', 'ada', 'adalah', 'ahmad', 'air', 'akan', 'akhbar', 
    'akhir', 'aktiviti', 'alam', 'amat', 'amerika', 'anak', 'anggota', 'antara', 
    'antarabangsa', 'apa', 'apabila', 'april', 'as', 'asas', 'asean', 'asia', 'asing', 
    'atas', 'atau', 'australia', 'awal', 'awam', 'bagaimanapun', 'bagi', 'bahagian', 
    'bahan', 'baharu', 'bahawa', 'baik', 'bandar', 'bank', 'banyak', 'barangan', 
    'baru', 'baru-baru', 'bawah', 'beberapa', 'bekas', 'beliau', 'belum', 'berada', 
    'berakhir', 'berbanding', 'berdasarkan', 'berharap', 'berikutan', 'berjaya', 
    'berjumlah', 'berkaitan', 'berkata', 'berkenaan', 'berlaku', 'bermula', 'bernama', 
    'bernilai', 'bersama', 'berubah', 'besar', 'bhd', 'bidang', 'bilion', 'bn', 'boleh', 
    'bukan', 'bulan', 'bursa', 'cadangan', 'china', 'dagangan', 'dalam', 'dan', 'dana', 
    'dapat', 'dari', 'daripada', 'dasar', 'datang', 'datuk', 'demikian', 'dengan', 'depan', 
    'derivatives', 'dewan', 'di', 'diadakan', 'dibuka', 'dicatatkan', 'dijangka', 
    'diniagakan', 'dis', 'disember', 'ditutup', 'dolar', 'dr', 'dua', 'dunia', 'ekonomi', 
    'eksekutif', 'eksport', 'empat', 'enam', 'faedah', 'feb', 'global', 'hadapan', 'hanya', 
    'harga', 'hari', 'hasil', 'hingga', 'hubungan', 'ia', 'iaitu', 'ialah', 'indeks', 'india', 
    'indonesia', 'industri', 'ini', 'islam', 'isnin', 'isu', 'itu', 'jabatan', 'jalan', 'jan', 
    'jawatan', 'jawatankuasa', 'jepun', 'jika', 'jualan', 'juga', 'julai', 'jumaat', 'jumlah', 
    'jun', 'juta', 'kadar', 'kalangan', 'kali', 'kami', 'kata', 'katanya', 'kaunter', 'kawasan', 
    'ke', 'keadaan', 'kecil', 'kedua', 'kedua-dua', 'kedudukan', 'kekal', 'kementerian', 'kemudahan', 
    'kenaikan', 'kenyataan', 'kepada', 'kepentingan', 'keputusan', 'kerajaan', 'kerana', 'kereta', 
    'kerja', 'kerjasama', 'kes', 'keselamatan', 'keseluruhan', 'kesihatan', 'ketika', 'ketua', 
    'keuntungan', 'kewangan', 'khamis', 'kini', 'kira-kira', 'kita', 'klci', 'klibor', 'komposit', 
    'kontrak', 'kos', 'kuala', 'kuasa', 'kukuh', 'kumpulan', 'lagi', 'lain', 'langkah', 'laporan', 
    'lebih', 'lepas', 'lima', 'lot', 'luar', 'lumpur', 'mac', 'mahkamah', 'mahu', 'majlis', 'makanan', 
    'maklumat', 'malam', 'malaysia', 'mana', 'manakala', 'masa', 'masalah', 'masih', 'masing-masing', 
    'masyarakat', 'mata', 'media', 'mei', 'melalui', 'melihat', 'memandangkan', 'memastikan', 'membantu', 
    'membawa', 'memberi', 'memberikan', 'membolehkan', 'membuat', 'mempunyai', 'menambah', 'menarik', 
    'menawarkan', 'mencapai', 'mencatatkan', 'mendapat', 'mendapatkan', 'menerima', 'menerusi', 
    'mengadakan', 'mengambil', 'mengenai', 'menggalakkan', 'menggunakan', 'mengikut', 'mengumumkan', 
    'mengurangkan', 'meningkat', 'meningkatkan', 'menjadi', 'menjelang', 'menokok', 'menteri', 
    'menunjukkan', 'menurut', 'menyaksikan', 'menyediakan', 'mereka', 'merosot', 'merupakan', 
    'mesyuarat', 'minat', 'minggu', 'minyak', 'modal', 'mohd', 'mudah', 'mungkin', 'naik', 'najib', 
    'nasional', 'negara', 'negara-negara', 'negeri', 'niaga', 'nilai', 'nov', 'ogos', 'okt', 'oleh', 
    'operasi', 'orang', 'pada', 'pagi', 'paling', 'pameran', 'papan', 'para', 'paras', 'parlimen', 
    'parti', 'pasaran', 'pasukan', 'pegawai', 'pejabat', 'pekerja', 'pelabur', 'pelaburan', 'pelancongan', 
    'pelanggan', 'pelbagai', 'peluang', 'pembangunan', 'pemberita', 'pembinaan', 'pemimpin', 
    'pendapatan', 'pendidikan', 'penduduk', 'penerbangan', 'pengarah', 'pengeluaran', 'pengerusi', 
    'pengguna', 'pengurusan', 'peniaga', 'peningkatan', 'penting', 'peratus', 'perdagangan', 'perdana', 
    'peringkat', 'perjanjian', 'perkara', 'perkhidmatan', 'perladangan', 'perlu', 'permintaan', 
    'perniagaan', 'persekutuan', 'persidangan', 'pertama', 'pertubuhan', 'pertumbuhan', 'perusahaan', 
    'peserta', 'petang', 'pihak', 'pilihan', 'pinjaman', 'polis', 'politik', 'presiden', 'prestasi', 
    'produk', 'program', 'projek', 'proses', 'proton', 'pukul', 'pula', 'pusat', 'rabu', 'rakan', 'rakyat', 
    'ramai', 'rantau', 'raya', 'rendah', 'ringgit', 'rumah', 'sabah', 'sahaja', 'saham', 'sama', 'sarawak', 
    'satu', 'sawit', 'saya', 'sdn', 'sebagai', 'sebahagian', 'sebanyak', 'sebarang', 'sebelum', 'sebelumnya', 
    'sebuah', 'secara', 'sedang', 'segi', 'sehingga', 'sejak', 'sekarang', 'sektor', 'sekuriti', 'selain', 
    'selama', 'selasa', 'selatan', 'selepas', 'seluruh', 'semakin', 'semalam', 'semasa', 'sementara', 
    'semua', 'semula', 'sen', 'sendiri', 'seorang', 'sepanjang', 'seperti', 'sept', 'september', 
    'serantau', 'seri', 'serta', 'sesi', 'setiap', 'setiausaha', 'sidang', 'singapura', 'sini', 'sistem', 
    'sokongan', 'sri', 'sudah', 'sukan', 'suku', 'sumber', 'supaya', 'susut', 'syarikat', 'syed', 'tahap', 
    'tahun', 'tan', 'tanah', 'tanpa', 'tawaran', 'teknologi', 'telah', 'tempat', 'tempatan', 'tempoh', 
    'tenaga', 'tengah', 'tentang', 'terbaik', 'terbang', 'terbesar', 'terbuka', 'terdapat', 'terhadap', 
    'termasuk', 'tersebut', 'terus', 'tetapi', 'thailand', 'tiada', 'tidak', 'tiga', 'timbalan', 'timur', 
    'tindakan', 'tinggi', 'tun', 'tunai', 'turun', 'turut', 'umno', 'unit', 'untuk', 'untung', 'urus', 'usaha', 
    'utama', 'walaupun', 'wang', 'wanita', 'wilayah', 'yang'
])

# stemmer = malaya.stem.sastrawi()
# broadcast_stopwords = spark.sparkContext.broadcast(stopwords)
# broadcast_stemmer = spark.sparkContext.broadcast(stemmer)

# Define your data transformation function
class DE_Transformation:

    def __init__(self, spark):
        self.spark = spark

    def convertRawDataIntoSentences(self):
        # self.spark = SparkSession.builder.appName("DataTransformationWithMalaya").getOrCreate()
        df = self.spark.read.parquet("DE-prj/RawData")
        #drop duplicated
        df = df.drop_duplicates(['url']).select(['title','articleBrief','contents'])
        
        #Merge DF
        df = df.withColumn("Flattened_Contents", concat_ws(" ", col("contents")))
        df = df.select(['title','articleBrief','Flattened_Contents'])
        merged_df = df.withColumn("Merged", array(*[col(c) for c in df.columns])).select(['Merged'])
        lists = merged_df.collect()
        
        # Flatten the 'contents' column
        flattened_list = list(chain.from_iterable(row.Merged for row in lists))
        
        # Split into sentences and remove special characters
        s_tokenizer = SentenceTokenizer()
        rddInput = spark.sparkContext.parallelize(flattened_list)
        rdd_sentences = rddInput.map(s_tokenizer.tokenize)
        # remove none result
        rdd_sentences = rdd_sentences.filter(lambda x: len(x) > 0)
        def remove_special_characters(texts):
            # Remove all characters except alphanumeric, comma, and period
            result = []
            for text in texts:
                removedSpecialChar = re.sub(r'[^a-zA-Z0-9,.]', ' ', text)
                stripped = removedSpecialChar.strip()
                cleaned_text = re.sub(r" \.", ".", stripped)
                result.append(re.sub(r'\s+', ' ', cleaned_text))
            return result
            
        rdd_sentences_clean = rdd_sentences.map(remove_special_characters)
        
        rdd_result = rdd_sentences_clean.coalesce(1)

        result_list = rdd_result.collect()

        flattenned_result = [item for sublist in result_list for item in sublist]
        return flattenned_result    

    
    
    def transform_data(self, df):
        # Step 1: Combine the text columns into one text column
        df = df.withColumn("text_column", concat_ws(" ", col("title"), col("contents"), col("articleBrief")))
        # Step 1: Noise Removal
        df = df.withColumn("cleaned_text", regexp_replace("text_column", "[^a-zA-Z\\s]", ""))
        df = df.withColumn("cleaned_text", lower(col("cleaned_text")))
    
        # Step 2: Tokenization with Malaya
        # Sentence Segmentation
        from malaya.text.function import split_into_sentences
        
        def malaya_split_sentences(text):
            if isinstance(text, str):
                return split_into_sentences(text)
            return []
        
        segment_udf = udf(malaya_split_sentences, ArrayType(StringType()))
        df = df.withColumn("sentences", segment_udf(col("cleaned_text")))
    
        def split_into_words(sentences):
            if isinstance(sentences, list):
                # Flatten the list of sentences into words
                return [word for sentence in sentences for word in sentence.split()]
            return []
        
        tokenize_udf = udf(split_into_words, ArrayType(StringType()))
        df = df.withColumn("tokens", tokenize_udf(col("sentences")))
    
        stopwords = broadcast_stopwords.value
        # Step 3: Stopword Removal using Broadcasted Stopwords
        def remove_stopwords(tokens):
            return [word for word in tokens if word not in stopwords]
    
        remove_stopwords_udf = udf(remove_stopwords, ArrayType(StringType()))
        df = df.withColumn("filtered_tokens", remove_stopwords_udf(col("tokens")))
    
        stemmer = broadcast_stemmer.value
        # Step 4: Stemming with Malaya
        def malaya_stem(tokens):
            return [stemmer.stem(token) for token in tokens]
    
        stem_udf = udf(malaya_stem, ArrayType(StringType()))
        df = df.withColumn("stemmed_tokens", stem_udf(col("filtered_tokens")))

        df = df.select("cleaned_text", "sentences", "tokens", "filtered_tokens", "stemmed_tokens")

        return df

# Instantiate the transformation class
spark = SparkSession.builder.appName("DataTransformationWithMalaya").getOrCreate()
try:
    transformation = DE_Transformation(spark)
    df = transformation.convertRawDataIntoSentences()
finally:
    spark.stop()
# # Read raw data
# df_raw = data_transformer.readRawData()

# # Perform data transformation
# df_transformed = data_transformer.transform_data(df_raw)

# # Show the transformed DataFrame with full content
# df_transformed.show(truncate=False)  # Disable truncation for full results
# # Convert PySpark DataFrame to Pandas DataFrame
# pandas_df = df_transformed.toPandas()

# # Save as CSV
# pandas_df.to_csv('out.csv', index=False)



# # Optionally, save the processed DataFrame as a Parquet file
# output_path = "DE-prj/ProcessedData"
# df_transformed.write.parquet(output_path, mode="overwrite")
# #write_to_csv(df, output_local_path, is_hdfs=False) 

# # Stop SparkSession


24/12/13 00:08:37 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
  self.tok = re.compile(r'({})'.format('|'.join(pipeline)))
  self.tok = re.compile(r'({})'.format('|'.join(pipeline)))
                                                                                

In [None]:
from pathlib import Path
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws, col, lower, udf, regexp_replace
from pyspark.sql.types import StringType, ArrayType
import malaya
import glob
import re
from pyspark import SparkFiles


# Initialize SparkSession
spark = SparkSession.builder \
    .appName("DataTransformationWithMalaya") \
    .getOrCreate()


# Text Preprocessing with Malaya
# Broadcast Variables
# Stopwords List, https://github.com/stopwords-iso/stopwords-ms
stopwords = set([
    'abdul', 'abdullah', 'acara', 'ada', 'adalah', 'ahmad', 'air', 'akan', 'akhbar', 
    'akhir', 'aktiviti', 'alam', 'amat', 'amerika', 'anak', 'anggota', 'antara', 
    'antarabangsa', 'apa', 'apabila', 'april', 'as', 'asas', 'asean', 'asia', 'asing', 
    'atas', 'atau', 'australia', 'awal', 'awam', 'bagaimanapun', 'bagi', 'bahagian', 
    'bahan', 'baharu', 'bahawa', 'baik', 'bandar', 'bank', 'banyak', 'barangan', 
    'baru', 'baru-baru', 'bawah', 'beberapa', 'bekas', 'beliau', 'belum', 'berada', 
    'berakhir', 'berbanding', 'berdasarkan', 'berharap', 'berikutan', 'berjaya', 
    'berjumlah', 'berkaitan', 'berkata', 'berkenaan', 'berlaku', 'bermula', 'bernama', 
    'bernilai', 'bersama', 'berubah', 'besar', 'bhd', 'bidang', 'bilion', 'bn', 'boleh', 
    'bukan', 'bulan', 'bursa', 'cadangan', 'china', 'dagangan', 'dalam', 'dan', 'dana', 
    'dapat', 'dari', 'daripada', 'dasar', 'datang', 'datuk', 'demikian', 'dengan', 'depan', 
    'derivatives', 'dewan', 'di', 'diadakan', 'dibuka', 'dicatatkan', 'dijangka', 
    'diniagakan', 'dis', 'disember', 'ditutup', 'dolar', 'dr', 'dua', 'dunia', 'ekonomi', 
    'eksekutif', 'eksport', 'empat', 'enam', 'faedah', 'feb', 'global', 'hadapan', 'hanya', 
    'harga', 'hari', 'hasil', 'hingga', 'hubungan', 'ia', 'iaitu', 'ialah', 'indeks', 'india', 
    'indonesia', 'industri', 'ini', 'islam', 'isnin', 'isu', 'itu', 'jabatan', 'jalan', 'jan', 
    'jawatan', 'jawatankuasa', 'jepun', 'jika', 'jualan', 'juga', 'julai', 'jumaat', 'jumlah', 
    'jun', 'juta', 'kadar', 'kalangan', 'kali', 'kami', 'kata', 'katanya', 'kaunter', 'kawasan', 
    'ke', 'keadaan', 'kecil', 'kedua', 'kedua-dua', 'kedudukan', 'kekal', 'kementerian', 'kemudahan', 
    'kenaikan', 'kenyataan', 'kepada', 'kepentingan', 'keputusan', 'kerajaan', 'kerana', 'kereta', 
    'kerja', 'kerjasama', 'kes', 'keselamatan', 'keseluruhan', 'kesihatan', 'ketika', 'ketua', 
    'keuntungan', 'kewangan', 'khamis', 'kini', 'kira-kira', 'kita', 'klci', 'klibor', 'komposit', 
    'kontrak', 'kos', 'kuala', 'kuasa', 'kukuh', 'kumpulan', 'lagi', 'lain', 'langkah', 'laporan', 
    'lebih', 'lepas', 'lima', 'lot', 'luar', 'lumpur', 'mac', 'mahkamah', 'mahu', 'majlis', 'makanan', 
    'maklumat', 'malam', 'malaysia', 'mana', 'manakala', 'masa', 'masalah', 'masih', 'masing-masing', 
    'masyarakat', 'mata', 'media', 'mei', 'melalui', 'melihat', 'memandangkan', 'memastikan', 'membantu', 
    'membawa', 'memberi', 'memberikan', 'membolehkan', 'membuat', 'mempunyai', 'menambah', 'menarik', 
    'menawarkan', 'mencapai', 'mencatatkan', 'mendapat', 'mendapatkan', 'menerima', 'menerusi', 
    'mengadakan', 'mengambil', 'mengenai', 'menggalakkan', 'menggunakan', 'mengikut', 'mengumumkan', 
    'mengurangkan', 'meningkat', 'meningkatkan', 'menjadi', 'menjelang', 'menokok', 'menteri', 
    'menunjukkan', 'menurut', 'menyaksikan', 'menyediakan', 'mereka', 'merosot', 'merupakan', 
    'mesyuarat', 'minat', 'minggu', 'minyak', 'modal', 'mohd', 'mudah', 'mungkin', 'naik', 'najib', 
    'nasional', 'negara', 'negara-negara', 'negeri', 'niaga', 'nilai', 'nov', 'ogos', 'okt', 'oleh', 
    'operasi', 'orang', 'pada', 'pagi', 'paling', 'pameran', 'papan', 'para', 'paras', 'parlimen', 
    'parti', 'pasaran', 'pasukan', 'pegawai', 'pejabat', 'pekerja', 'pelabur', 'pelaburan', 'pelancongan', 
    'pelanggan', 'pelbagai', 'peluang', 'pembangunan', 'pemberita', 'pembinaan', 'pemimpin', 
    'pendapatan', 'pendidikan', 'penduduk', 'penerbangan', 'pengarah', 'pengeluaran', 'pengerusi', 
    'pengguna', 'pengurusan', 'peniaga', 'peningkatan', 'penting', 'peratus', 'perdagangan', 'perdana', 
    'peringkat', 'perjanjian', 'perkara', 'perkhidmatan', 'perladangan', 'perlu', 'permintaan', 
    'perniagaan', 'persekutuan', 'persidangan', 'pertama', 'pertubuhan', 'pertumbuhan', 'perusahaan', 
    'peserta', 'petang', 'pihak', 'pilihan', 'pinjaman', 'polis', 'politik', 'presiden', 'prestasi', 
    'produk', 'program', 'projek', 'proses', 'proton', 'pukul', 'pula', 'pusat', 'rabu', 'rakan', 'rakyat', 
    'ramai', 'rantau', 'raya', 'rendah', 'ringgit', 'rumah', 'sabah', 'sahaja', 'saham', 'sama', 'sarawak', 
    'satu', 'sawit', 'saya', 'sdn', 'sebagai', 'sebahagian', 'sebanyak', 'sebarang', 'sebelum', 'sebelumnya', 
    'sebuah', 'secara', 'sedang', 'segi', 'sehingga', 'sejak', 'sekarang', 'sektor', 'sekuriti', 'selain', 
    'selama', 'selasa', 'selatan', 'selepas', 'seluruh', 'semakin', 'semalam', 'semasa', 'sementara', 
    'semua', 'semula', 'sen', 'sendiri', 'seorang', 'sepanjang', 'seperti', 'sept', 'september', 
    'serantau', 'seri', 'serta', 'sesi', 'setiap', 'setiausaha', 'sidang', 'singapura', 'sini', 'sistem', 
    'sokongan', 'sri', 'sudah', 'sukan', 'suku', 'sumber', 'supaya', 'susut', 'syarikat', 'syed', 'tahap', 
    'tahun', 'tan', 'tanah', 'tanpa', 'tawaran', 'teknologi', 'telah', 'tempat', 'tempatan', 'tempoh', 
    'tenaga', 'tengah', 'tentang', 'terbaik', 'terbang', 'terbesar', 'terbuka', 'terdapat', 'terhadap', 
    'termasuk', 'tersebut', 'terus', 'tetapi', 'thailand', 'tiada', 'tidak', 'tiga', 'timbalan', 'timur', 
    'tindakan', 'tinggi', 'tun', 'tunai', 'turun', 'turut', 'umno', 'unit', 'untuk', 'untung', 'urus', 'usaha', 
    'utama', 'walaupun', 'wang', 'wanita', 'wilayah', 'yang'
])

stemmer = malaya.stem.sastrawi()
broadcast_stopwords = spark.sparkContext.broadcast(stopwords)
broadcast_stemmer = spark.sparkContext.broadcast(stemmer)

# Define your data transformation function
class DE_Transformation:

    def _init_(self, spark):
        self.spark = spark
    
    def convertRawDataIntoSentences(self):
        self.spark = SparkSession.builder.appName("DataTransformationWithMalaya").getOrCreate()
        try
            df = self.spark.read.parquet("RawData")
            #drop duplicated
            df = df.drop_duplicates(['url']).select(['title','articleBrief','contents'])
    
            #Merge DF
            df = df.withColumn("Flattened_Contents", concat_ws(" ", col("contents")))
            df = df.select(['title','articleBrief','Flattened_Contents'])
            merged_df = df.withColumn("Merged", array(*[col(c) for c in df.columns])).select(['Merged'])
            lists = merged_df.collect()
            
            # Flatten the 'contents' column
            flattened_list = list(chain.from_iterable(row.Merged for row in lists))
    
            # Split into sentences
            s_tokenizer = SentenceTokenizer()
            rddInput = self.spark.sparkContext.parallelize(flattened_list)
    
            # get output in list
            rdd_result = rddInput.map(s_tokenizer.tokenize)
            rdd_result = rdd_result.coalesce(1)
            
            # remove list with none
            filtered_rdd_result = rdd_result.filter(lambda x: len(x) > 0)
            result = rdd_result.collect()
        finally:
            self.spark.stop()
            # remove list with none
        return result    
    
    def transform_data(self, df):
        # Step 1: Combine the text columns into one text column
        df = df.withColumn("text_column", concat_ws(" ", col("title"), col("contents"), col("articleBrief")))
        # Step 1: Noise Removal
        df = df.withColumn("cleaned_text", regexp_replace("text_column", "[^a-zA-Z\\s]", ""))
        df = df.withColumn("cleaned_text", lower(col("cleaned_text")))
    
        # Step 2: Tokenization with Malaya
        # Sentence Segmentation
        from malaya.text.function import split_into_sentences
        
        def malaya_split_sentences(text):
            if isinstance(text, str):
                return split_into_sentences(text)
            return []
        
        segment_udf = udf(malaya_split_sentences, ArrayType(StringType()))
        df = df.withColumn("sentences", segment_udf(col("cleaned_text")))
    
        def split_into_words(sentences):
            if isinstance(sentences, list):
                # Flatten the list of sentences into words
                return [word for sentence in sentences for word in sentence.split()]
            return []
        
        tokenize_udf = udf(split_into_words, ArrayType(StringType()))
        df = df.withColumn("tokens", tokenize_udf(col("sentences")))
    
        stopwords = broadcast_stopwords.value
        # Step 3: Stopword Removal using Broadcasted Stopwords
        def remove_stopwords(tokens):
            return [word for word in tokens if word not in stopwords]
    
        remove_stopwords_udf = udf(remove_stopwords, ArrayType(StringType()))
        df = df.withColumn("filtered_tokens", remove_stopwords_udf(col("tokens")))
    
        stemmer = broadcast_stemmer.value
        # Step 4: Stemming with Malaya
        def malaya_stem(tokens):
            return [stemmer.stem(token) for token in tokens]
    
        stem_udf = udf(malaya_stem, ArrayType(StringType()))
        df = df.withColumn("stemmed_tokens", stem_udf(col("filtered_tokens")))

        df = df.select("cleaned_text", "sentences", "tokens", "filtered_tokens", "stemmed_tokens")

        return df

# Instantiate the transformation class
data_transformer = RunDataTransformation(spark)

# Read raw data
df_raw = data_transformer.readRawData()

# Perform data transformation
df_transformed = data_transformer.transform_data(df_raw)

# Show the transformed DataFrame with full content
df_transformed.show(truncate=False)  # Disable truncation for full results
# Convert PySpark DataFrame to Pandas DataFrame
pandas_df = df_transformed.toPandas()

# Save as CSV
pandas_df.to_csv('out.csv', index=False)



# Optionally, save the processed DataFrame as a Parquet file
output_path = "DE-prj/ProcessedData"
df_transformed.write.parquet(output_path, mode="overwrite")
#write_to_csv(df, output_local_path, is_hdfs=False) 

# Stop SparkSession


In [20]:
for ele in df:
    print(ele)



SPRM diminta utamakan siasatan dakwaan skandal rasuah ADUN Sabah.
Gambar hiasan.
Foto fail NSTP.
KOTA KINABALU DAP Sabah mahu Suruhanjaya Pencegahan Rasuah Malaysia SPRM mengutamakan siasatan dakwaan skandal rasuah membabitkan beberapa Ahli Dewan Undangan Negeri ADUN di negeri itu.
Setiausahanya, Vivian Wong berkata pihaknya memantau teliti perkembangan kes itu sejak rakaman video mengenainya didedahkan sebuah portal berita sejak sebulan lalu.
Katanya, DAP Sabah memandang serius dakwaan yang ditularkan melalui klip video terbabit dan menyokong penuh siasatan SPRM.
DAP Sabah memantau dengan teliti perkembangan berkaitan perkara ini dan melihat pendedahan rakaman video baru baru ini dengan penuh serius.
DAP Sabah menyokong penuh SPRM dalam menjalankan siasatan yang teliti dan saksama.
Kami menggesa SPRM untuk mengutamakan kes ini dan memastikan ia dikendalikan dengan ketelusan dan integriti, katanya dalam satu kenyataan, di sini.
Dakwaan skandal rasuah itu dikaitkan dengan isu pengurusan