In [2]:
import pandas as pd

In [34]:
# Single threaded
df = pd.read_csv('actualitate.csv', usecols=['_id', 'category', 'datePublished', 'content', 'title'], nrows=1000)

def lower_case(column):
    return column.lower()

for key,value in df.iterrows():
    result = lower_case(value['content'])
    print(result)

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



In [44]:
# Now using spark

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("LowercaseText") \
    .config("spark.executor.cores", "24") \
    .getOrCreate()

df_spark = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("actualitate.csv") \
    .select("_id", "category", "datePublished", "content", "title") \
    .limit(10000)  # Limiting to first 10 rows for example

24/04/14 14:59:36 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [45]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def lower_case_udf(column):
    return column.lower()

lower_case_spark_udf = udf(lower_case_udf, StringType())


In [46]:
df_lowercased = df_spark.withColumn("content_lower", lower_case_spark_udf("content"))
df_lowercased.show(truncate=False)
spark.stop()

+------------------------+-----------+----------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------

In [40]:
import multiprocessing
multiprocessing.cpu_count()

24

In [55]:
import pandas as pd
import re
import stanza
import spacy_stanza
import spacy
nlp = spacy_stanza.load_pipeline("ro")

def lemmatize_tokens(tokens):
    doc = nlp(" ".join(tokens))
    lemmatized_tokens = [token.lemma_ for token in doc]
    return lemmatized_tokens

def remove_ner(text):
    doc = nlp(text)
    tokens = []
    for token in doc:
        if token.ent_type_ not in ['MONEY', 'DATE', 'TIME', 'QUANTITY', 'ORDINAL', 'CARDINAL', 'NUMERIC_VALUE', 'PERSON', 'DATETIME']:
            alpha_chars = [char for char in token.text if char.isalpha()]
            cleaned_token = ''.join(alpha_chars)
            if cleaned_token:  
                tokens.append(cleaned_token)
    return tokens

def remove_stopwords(tokens):
    stopwords = spacy.lang.ro.stop_words.STOP_WORDS
    filtered_tokens = [token for token in tokens if token.lower() not in stopwords]
    return filtered_tokens

def preprocess_text(text):
    tokens = remove_ner(text)
    tokens = remove_stopwords(tokens)
    tokens = lemmatize_tokens(tokens)
    return tokens


csv_file = "actualitate.csv"
df = pd.read_csv(csv_file, usecols=['_id', 'category', 'datePublished', 'content', 'title'], nrows=10)  # Read only the first 10 rows

tokenized_data = []

for index, row in df.iterrows():
    content = row['content']
    tokens = preprocess_text(content)
    tokenized_data.append({
        '_id': row['_id'],
        'category': row['category'],
        'datePublished': row['datePublished'],
        'tokens': tokens
    })

tokenized_df = pd.DataFrame(tokenized_data)
tokenized_df.to_csv("tokenized_output.csv", index=False, columns=['_id', 'category', 'datePublished', 'tokens'])





2024-04-14 15:08:26 INFO: Checking for updates to resources.json in case models have been updated.  Note: this behavior can be turned off with download_method=None or download_method=DownloadMethod.REUSE_RESOURCES


Downloading https://raw.githubusercontent.com/stanfordnlp/stanza-resources/main/resources_1.6.0.json:   0%|   …

2024-04-14 15:08:26 INFO: Loading these models for language: ro (Romanian):
| Processor | Package      |
----------------------------
| tokenize  | rrt          |
| pos       | rrt_nocharlm |
| lemma     | rrt_nocharlm |
| depparse  | rrt_nocharlm |

2024-04-14 15:08:26 INFO: Using device: cpu
2024-04-14 15:08:26 INFO: Loading: tokenize
2024-04-14 15:08:26 INFO: Loading: pos
2024-04-14 15:08:27 INFO: Loading: lemma
2024-04-14 15:08:27 INFO: Loading: depparse
2024-04-14 15:08:27 INFO: Done loading processors!


In [57]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import ArrayType, StringType

spark = SparkSession.builder \
    .appName("TextPreprocessing") \
    .getOrCreate()


def lemmatize_tokens(tokens):
    doc = nlp(" ".join(tokens))
    lemmatized_tokens = [token.lemma_ for token in doc]
    return lemmatized_tokens


def remove_ner(text):
    doc = nlp(text)
    tokens = []
    for token in doc:
        if token.ent_type_ not in ['MONEY', 'DATE', 'TIME', 'QUANTITY', 'ORDINAL', 'CARDINAL', 'NUMERIC_VALUE', 'PERSON', 'DATETIME']:
            alpha_chars = [char for char in token.text if char.isalpha()]
            cleaned_token = ''.join(alpha_chars)
            if cleaned_token:  
                tokens.append(cleaned_token)
    return tokens


def remove_stopwords(tokens):
    stopwords = spacy.lang.ro.stop_words.STOP_WORDS
    filtered_tokens = [token for token in tokens if token.lower() not in stopwords]
    return filtered_tokens

# Load the CSV file into a Spark DataFrame
csv_file = "actualitate.csv"
df = spark.read.csv(csv_file, header=True, inferSchema=True)

# Apply preprocessing functions to the content column
df = df.withColumn("tokens", lemmatize_tokens(remove_stopwords(remove_ner(col("content")))))

# Select columns of interest
df = df.select("_id", "category", "datePublished", "tokens")

# Write the processed DataFrame to a new CSV file
df.coalesce(1).write.csv("tokenized_output_spark", header=True, mode="overwrite")

# Stop the Spark session
spark.stop()


ValueError: [E1041] Expected a string, Doc, or bytes as input, but got: <class 'pyspark.sql.column.Column'>

In [106]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

# Create a Spark session
spark = SparkSession.builder \
    .appName("Spark CSV Example") \
    .getOrCreate()

# Define the path to your CSV file
csv_file = "actualitate.csv"

# Read the CSV file into a DataFrame
df_spark = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(csv_file) \
    .select("_id", "category", "datePublished", "content", "title") \
    .limit(10000)  # Limiting to first 10 rows for example

# Define the function to remove named entities
def remove_ner_spark(text):
    import spacy
    nlp = spacy.load("en_core_web_sm")
    doc = nlp(text)
    tokens = []
    for token in doc:
        if token.ent_type_ not in ['MONEY', 'DATE', 'TIME', 'QUANTITY', 'ORDINAL', 'CARDINAL', 'NUMERIC_VALUE', 'PERSON', 'DATETIME']:
            alpha_chars = [char for char in token.text if char.isalpha()]
            cleaned_token = ''.join(alpha_chars)
            if cleaned_token:  
                tokens.append(cleaned_token)
    return tokens

def remove_stopwords(tokens):
    stopwords = spacy.lang.ro.stop_words.STOP_WORDS
    filtered_tokens = [token for token in tokens if token.lower() not in stopwords]
    return filtered_tokens

# Register the function as a UDF (User Defined Function)
remove_ner_udf = udf(remove_ner_spark, StringType())

# Apply the UDF to create a new column "cleaned_tokens"
df_processed = df_spark.withColumn("cleaned_tokens", remove_ner_udf(df_spark["content"]))

# Show the resulting DataFrame
df_processed.show(truncate=False)

# Stop the Spark session
spark.stop()


                                                                                

+------------------------+-----------+----------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------

In [100]:
# Working spacy with spark
#
!pip install findspark
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("NER Removal") \
    .getOrCreate()

# Define the function to remove named entities
def remove_ner_spark(text):
    # Initialize spaCy inside the function
    import spacy
    nlp = spacy.load("en_core_web_sm")

    doc = nlp(text)
    tokens = []
    for token in doc:
        if token.ent_type_ not in ['MONEY', 'DATE', 'TIME', 'QUANTITY', 'ORDINAL', 'CARDINAL', 'NUMERIC_VALUE', 'PERSON', 'DATETIME']:
            alpha_chars = [char for char in token.text if char.isalpha()]
            cleaned_token = ''.join(alpha_chars)
            if cleaned_token:  
                tokens.append(cleaned_token)
    return tokens

# Register the function as a UDF (User Defined Function)
remove_ner_udf = udf(remove_ner_spark, ArrayType(StringType()))

# Create a sample DataFrame
data = [("Text 1 with named entities",),
        ("Text 2 without entities",),
        ("Another text with dates like 01/01/2022",)]
df = spark.createDataFrame(data, ["text"])

# Apply the UDF to the DataFrame
df_processed = df.withColumn("cleaned_tokens", remove_ner_udf(df["text"]))

# Show the results
df_processed.show(truncate=False)

# Stop SparkSession
spark.stop()


Collecting findspark
  Obtaining dependency information for findspark from https://files.pythonhosted.org/packages/a4/cb/7d2bb508f4ca00a043fd53e8156c11767799d3f534bf451a0942211d5def/findspark-2.0.1-py2.py3-none-any.whl.metadata
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


24/04/14 15:27:39 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

+---------------------------------------+----------------------------------+
|text                                   |cleaned_tokens                    |
+---------------------------------------+----------------------------------+
|Text 1 with named entities             |[Text, with, named, entities]     |
|Text 2 without entities                |[Text, without, entities]         |
|Another text with dates like 01/01/2022|[Another, text, with, dates, like]|
+---------------------------------------+----------------------------------+



In [108]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

# Create a Spark session
spark = SparkSession.builder \
    .appName("Spark CSV Example") \
    .getOrCreate()

# Define the path to your CSV file
csv_file = "actualitate.csv"

# Read the CSV file into a DataFrame
df_spark = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(csv_file) \
    .select("_id", "category", "datePublished", "content", "title") \
    .limit(10000)  # Limiting to first 10 rows for example

# Define the function to remove named entities
def remove_ner_spark(text):
    import spacy
    nlp = spacy.load("en_core_web_sm")
    doc = nlp(text)
    tokens = []
    for token in doc:
        if token.ent_type_ not in ['MONEY', 'DATE', 'TIME', 'QUANTITY', 'ORDINAL', 'CARDINAL', 'NUMERIC_VALUE', 'PERSON', 'DATETIME']:
            alpha_chars = [char for char in token.text if char.isalpha()]
            cleaned_token = ''.join(alpha_chars)
            if cleaned_token:  
                tokens.append(cleaned_token)
    return tokens

# Define the function to remove stopwords
def remove_stopwords(tokens):
    import spacy
    nlp = spacy_stanza.load_pipeline("ro")
    stopwords = spacy.lang.ro.stop_words.STOP_WORDS
    filtered_tokens = [token for token in tokens if token.lower() not in stopwords]
    return filtered_tokens

# Register the functions as UDFs (User Defined Functions)
remove_ner_udf = udf(remove_ner_spark, StringType())
remove_stopwords_udf = udf(remove_stopwords, StringType())

# Apply the UDFs to create new columns
df_processed = df_spark.withColumn("cleaned_tokens_ner", remove_ner_udf(df_spark["content"])) \
                       .withColumn("cleaned_tokens_final", remove_stopwords_udf(col("cleaned_tokens_ner")))

# Show the resulting DataFrame
df_processed.show(truncate=False)

# Stop the Spark session
spark.stop()


2024-04-14 15:38:11 INFO: Checking for updates to resources.json in case models have been updated.  Note: this behavior can be turned off with download_method=None or download_method=DownloadMethod.REUSE_RESOURCES
Downloading https://raw.githubusercontent.com/stanfordnlp/stanza-resources/main/resources_1.6.0.json: 367kB [00:00, 12.1MB/s]                    
2024-04-14 15:38:12 INFO: Loading these models for language: ro (Romanian):
| Processor | Package      |
----------------------------
| tokenize  | rrt          |
| pos       | rrt_nocharlm |
| lemma     | rrt_nocharlm |
| depparse  | rrt_nocharlm |

2024-04-14 15:38:12 INFO: Using device: cpu
2024-04-14 15:38:12 INFO: Loading: tokenize
2024-04-14 15:38:12 INFO: Loading: pos
2024-04-14 15:38:12 INFO: Loading: lemma
2024-04-14 15:38:13 INFO: Loading: depparse
2024-04-14 15:38:13 INFO: Done loading processors!
2024-04-14 15:38:13 INFO: Checking for updates to resources.json in case models have been updated.  Note: this behavior can be

+------------------------+-----------+----------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------

In [36]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

# Create a Spark session
spark = SparkSession.builder \
    .appName("Spark CSV Example") \
    .getOrCreate()

# Define the path to your CSV file
csv_file = "actualitate.csv"

# Read the CSV file into a DataFrame
df_spark = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(csv_file) \
    .select("_id", "category", "content") \
    .limit(10)  # Limiting to first 10 rows for example

# Define the function to remove named entities
def remove_ner_spark(text):
    import spacy
    doc = nlp(text)
    print(text)
    tokens = []
    for token in doc:
        if token.ent_type_ not in ['MONEY', 'DATE', 'TIME', 'QUANTITY', 'ORDINAL', 'CARDINAL', 'NUMERIC_VALUE', 'PERSON', 'DATETIME']:
            alpha_chars = [char for char in token.text if char.isalpha()]
            cleaned_token = ''.join(alpha_chars)
            if cleaned_token:  
                tokens.append(cleaned_token)
    return tokens

# Define the function to remove stopwords
def remove_stopwords(tokens):
    stopwords = spacy.lang.ro.stop_words.STOP_WORDS
    filtered_tokens = [token for token in tokens if token.lower() not in stopwords]
    return filtered_tokens

# Define the function to lemmatize tokens
def lemmatize_tokens(tokens):
    stopwords = spacy.lang.ro.stop_words.STOP_WORDS
    doc = nlp(" ".join(tokens))
    lemmatized_tokens = [token.lemma_ for token in doc]
    return lemmatized_tokens

# Register the functions as UDFs (User Defined Functions)

remove_ner_udf = udf(remove_ner_spark, StringType())
remove_stopwords_udf = udf(remove_stopwords, StringType())
lemmatize_tokens_udf = udf(lemmatize_tokens, StringType())


# Apply the UDFs to create new columns
df_processed = df_spark.withColumn("cleaned_tokens_ner", remove_ner_udf(df_spark["content"])) \
                       .withColumn("cleaned_tokens_stopwords", remove_stopwords_udf(col("cleaned_tokens_ner"))) \
                       .withColumn("cleaned_tokens_final", lemmatize_tokens_udf(col("cleaned_tokens_stopwords")))

output_csv = "processed_Data.csv"

# Save the DataFrame to a CSV file
df_processed.write.csv("./path_to_save.csv", mode="overwrite", header=True)

df_processed.show(truncate=False)
spark.stop()


"[""Viceliderul grupului deputaților UDMR Szabo Odon a declarat     (0 + 1) / 1]
"[""Viceliderul grupului deputaților UDMR Szabo Odon a declarat
"[""Viceliderul grupului deputaților UDMR Szabo Odon a declarat
"[""Viceliderul grupului deputaților UDMR Szabo Odon a declarat     (0 + 1) / 1]
"[""Viceliderul grupului deputaților UDMR Szabo Odon a declarat
"[""Viceliderul grupului deputaților UDMR Szabo Odon a declarat
                                                                                

+------------------------+--------+---------------------------------------------------------------+------------------------------------------+---------------------------------------+---------------------------------+
|_id                     |category|content                                                        |cleaned_tokens_ner                        |cleaned_tokens_stopwords               |cleaned_tokens_final             |
+------------------------+--------+---------------------------------------------------------------+------------------------------------------+---------------------------------------+---------------------------------+
|6477c2ef540bc5d16fa5d3c2|Politică|"[""Viceliderul grupului deputaților UDMR Szabo Odon a declarat|[Viceliderul, grupului, UDMR, a, declarat]|[Viceliderul, grupului, UDMR, declarat]|[vicelider, grup, UDMR, declarat]|
+------------------------+--------+---------------------------------------------------------------+---------------------------------

In [27]:
pd.read_csv('checkthis.csv')

ParserError: Error tokenizing data. C error: Expected 8 fields in line 18, saw 9


In [67]:
import findspark
import re
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

# Create a Spark session
spark = SparkSession.builder \
    .appName("Spark CSV Example") \
    .getOrCreate()

# Define the path to your CSV file
csv_file = "modified_csv_file.csv"

# Read the CSV file into a DataFrame
df_spark = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(csv_file) \
    .select("_id", "category", "content") \
    .limit(10)  # Limiting to first 10 rows for example

# Define the function to remove named entities
def remove_ner_spark(text):
    import spacy
    doc = nlp(text)
    print(text)
    tokens = []
    for token in doc:
        if token.ent_type_ not in ['MONEY', 'DATE', 'TIME', 'QUANTITY', 'ORDINAL', 'CARDINAL', 'NUMERIC_VALUE', 'PERSON', 'DATETIME']:
            alpha_chars = [char for char in token.text if char.isalpha()]
            cleaned_token = ''.join(alpha_chars)
            if cleaned_token:  
                tokens.append(cleaned_token)
    return tokens

# Define the function to remove stopwords
def remove_stopwords(tokens):
    stopwords = spacy.lang.ro.stop_words.STOP_WORDS
    filtered_tokens = [token for token in tokens if token.lower() not in stopwords]
    return filtered_tokens

# Define the function to lemmatize tokens
def lemmatize_tokens(tokens):
    stopwords = spacy.lang.ro.stop_words.STOP_WORDS
    doc = nlp(" ".join(tokens))
    lemmatized_tokens = [token.lemma_ for token in doc]
    return lemmatized_tokens

def non_alphabetic_chars(text):
    # Using regular expression to remove non-alphabetic characters
    cleaned_text = re.sub(r'[^a-zA-Z]', ' ', text)
    print(len(cleaned_text))
    return cleaned_text

# Register the functions as UDFs (User Defined Functions)
non_alphabetic_chars_udf = udf(non_alphabetic_chars, StringType())
remove_ner_udf = udf(remove_ner_spark, StringType())
remove_stopwords_udf = udf(remove_stopwords, StringType())
lemmatize_tokens_udf = udf(lemmatize_tokens, StringType())


# Apply the UDFs to create new columns
df_processed = df_spark.withColumn("non_alphabetic_chars", non_alphabetic_chars_udf(col("content"))) \
                       .withColumn("cleaned_tokens_ner", remove_ner_udf(col("content"))) \
                       .withColumn("cleaned_tokens_stopwords", remove_stopwords_udf(col("cleaned_tokens_ner"))) \
                       .withColumn("cleaned_tokens_final", lemmatize_tokens_udf(col("cleaned_tokens_stopwords")))

output_csv = "processed_Data.csv"

# Save the DataFrame to a CSV file
df_processed.write.csv("./path_to_save.csv", mode="overwrite", header=True)

df_processed.show(truncate=False)
spark.stop()


2770ge 2:>                                                          (0 + 9) / 9]
3853
3301
Preotul Calistrat Chifan de la Mănăstirea Vlădiceni județul Iași care a fost filmat miercuri seară în timp ce lovește o femeie în curtea bisericii spune că de fapt doar a împinso dar după ce a fost provocat El mai spune că femeia care între timp a înaintat plângere la poliție împotriva lui și sora ei au mai creat conflicte la biserică catalogândule drept femei cu probleme de comportament care fac chestii obraznice Mai multă lume a avut tangene cu ele Sunt uor deviante sunt două suflete Dar până la ce limită poi răbda Când faci chestii obraznice doreti să spurci imaginea mănăstirii Vii tu o măciucă nemăritată să faci teatru în faă la câteva sute de oameni a declarat părintele Calistrat pentru BZIPreotul a povestit pentru sursa citată versiunea lui despre incidentul de miercuri seară Aseară au scos o avocată din biserică de la Maslu i iau spus Cârpo o să tergem cu tine pe jos Iar ai venit Eu citeam

+------------------------+-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [64]:
df = pd.read_csv('modified_csv_file.csv')

# Print the entire 'content' column
with pd.option_context('display.max_colwidth', None):
    print(df['content'])

0    Viceliderul grupului deputaților UDMR Szabo Odon a declarat miercuri despre greva profesorilor că trebuie recâtigată încrederea i că nu a picat foarte bine faptul că ministrul de Interne Lucian Bode a ieit cu anunul că sa îneles cu sindicatele din poliție pentru a mări de la  iunie salariile polițiștilor A picat ca nuca în perete a spus el citat de Newsro Szabo Odon vicepreședinte al Comisiei pentru Învățământ a declarat în emisiunea Proiect de ară România de la postul de televiziune Prima News că preedintele UDMR nu a fost prezent la aceste discuii dar se tie că în general sunt solidari cu Guvernul În acest contex credem că discuiile serioase au venit cam târziu i cred că neîncrederea pornete de la faptul că iniial în legea educaiei naionale a fost prevăzut clar transpus în proiect de lege i în transparenă faptul că nivelul salarizării minime pentru debutani ar fi trebuit să înceapă de la salariul mediu brut pe economie După ce acest articol a fost retras a început neîncrederea c

In [66]:
import pandas as pd
import re

# Read the CSV file
df = pd.read_csv('actualitate.csv')

# Define a function to remove non-alphabetic characters
def remove_specific_chars(text):
    chars_to_remove = r',;[\]()".'
    return re.sub(r'[' + re.escape(chars_to_remove) + r']', '', text)

# Apply the function to the 'content' column
df['content'] = df['content'].apply(remove_non_alphabetic)

# Write the modified DataFrame back to CSV
df.to_csv('modified_csv_file.csv', index=False)


In [69]:
pd.read_csv()

Unnamed: 0,_id,category,content,non_alphabetic_chars,cleaned_tokens_ner,cleaned_tokens_stopwords,cleaned_tokens_final
0,6477c2ef540bc5d16fa5d3c2,Politică,Viceliderul grupului deputaților UDMR Szabo Od...,Viceliderul grupului deputa ilor UDMR Szabo Od...,"[Viceliderul, grupului, UDMR, a, declarat, des...","[Viceliderul, grupului, UDMR, declarat, greva,...","[vicelider, grup, UDMR, declarat, Greva, trebu..."
1,6477c2ef540bc5d16fa5d3c3,,Veteranul ministru leton de externe Edgars Rin...,Veteranul ministru leton de externe Edgars Rin...,"[Veteranul, ministru, leton, de, externe, a, f...","[Veteranul, ministru, leton, externe, ales, st...","[Veteranul, ministru, leton, extern, ales, sta..."
2,6477c2ef540bc5d16fa5d3c4,Actualitate,Staia de metrou Piaa Iancului este prima staie...,Staia de metrou Piaa Iancului este prima staie...,"[Staia, de, metrou, Piaa, Iancului, este, stai...","[Staia, metrou, Piaa, Iancului, staie, accesib...","[Staia, metrou, Piaa, ianc, staie, accesibiliz..."
3,6477c2ef540bc5d16fa5d3c5,SUA,Fostul vicepreedinte republican Mike Pence se ...,Fostul vicepreedinte republican Mike Pence se ...,"[Fostul, vicepreedinte, republican, se, pregăt...","[Fostul, vicepreedinte, republican, pregătete,...","[fost, vicepreedint, republican, pregătet, cur..."
4,6477c2ef540bc5d16fa5d3c6,Economie,Horia Constantinescu președintele Autorității ...,Horia Constantinescu pre edintele Autorit ii ...,"[Autorității, Naționale, pentru, Protecția, Co...","[Autorității, Naționale, Protecția, Consumator...","[Autoritate, național, Protecția, Consumatoril..."
5,6477c2ef540bc5d16fa5d3c7,Educație,Guvernul anunță întrun comunicat de presă emis...,Guvernul anun ntrun comunicat de pres emis...,"[Guvernul, anunță, întrun, comunicat, de, pres...","[Guvernul, anunță, întrun, comunicat, presă, e...","[guvern, anunța, întrun, comunicat, presă, emi..."
6,6477c2ef540bc5d16fa5d3c8,Educație,Sindicaliștii din Educație au anunțat continua...,Sindicali tii din Educa ie au anun at continua...,"[Sindicaliștii, din, Educație, au, anunțat, co...","[Sindicaliștii, Educație, anunțat, continuarea...","[Sindicaliștii, educație, anunța, continuare, ..."
7,6477c2ef540bc5d16fa5d3c9,Politică,Ministerul Sănătăii solicită universităilor i ...,Ministerul S n t ii solicit universit ilor i ...,"[Ministerul, Sănătăii, solicită, universităilo...","[Ministerul, Sănătăii, solicită, universităilo...","[minister, Sănătăii, solicita, universităilor,..."
8,6477c2ef540bc5d16fa5d3ca,Justiție,Curtea de Apel Bucureti a decis miercuri să re...,Curtea de Apel Bucureti a decis miercuri s re...,"[Curtea, de, Apel, Bucureti, a, decis, să, res...","[Curtea, Apel, Bucureti, decis, respingă, nefo...","[curte, Apel, Bucureti, decis, resping, nefond..."
9,6477c2ef540bc5d16fa5d3cb,Știri,Incendiu la locomotiva cu abur a Trenului Rega...,Incendiu la locomotiva cu abur a Trenului Rega...,"[Incendiu, la, locomotiva, cu, abur, a, Trenul...","[Incendiu, locomotiva, abur, Trenului, Regal, ...","[incendiu, locomotivă, abur, tren, Regal, ajun..."


In [None]:
import findspark
import re
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType


# modify csv first:

import pandas as pd
import re

# Read the CSV file
df = pd.read_csv('actualitate.csv')

# Create a Spark session
spark = SparkSession.builder \
    .appName("Spark CSV Example") \
    .getOrCreate()

# Define the path to your CSV file
csv_file = "modified_csv_file.csv"

# Read the CSV file into a DataFrame
df_spark = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(csv_file) \
    .select("_id", "category", "content") \
    .limit(10000)  # Limiting to first 10 rows for example

# Define the function to remove named entities
def remove_ner_spark(text):
    import spacy
    doc = nlp(text)
    tokens = []
    for token in doc:
        if token.ent_type_ not in ['MONEY', 'DATE', 'TIME', 'QUANTITY', 'ORDINAL', 'CARDINAL', 'NUMERIC_VALUE', 'PERSON', 'DATETIME']:
            alpha_chars = [char for char in token.text if char.isalpha()]
            cleaned_token = ''.join(alpha_chars)
            if cleaned_token:  
                tokens.append(cleaned_token)
    return tokens

# Define the function to remove stopwords
def remove_stopwords(tokens):
    stopwords = spacy.lang.ro.stop_words.STOP_WORDS
    filtered_tokens = [token for token in tokens if token.lower() not in stopwords]
    return filtered_tokens

# Define the function to lemmatize tokens
def lemmatize_tokens(tokens):
    stopwords = spacy.lang.ro.stop_words.STOP_WORDS
    doc = nlp(" ".join(tokens))
    lemmatized_tokens = [token.lemma_ for token in doc]
    return lemmatized_tokens

# Register the functions as UDFs (User Defined Functions)

remove_ner_udf = udf(remove_ner_spark, StringType())
remove_stopwords_udf = udf(remove_stopwords, StringType())
lemmatize_tokens_udf = udf(lemmatize_tokens, StringType())


# Apply the UDFs to create new columns
df_processed = df_spark.withColumn("cleaned_tokens_ner", remove_ner_udf(col("content"))) \
                       .withColumn("cleaned_tokens_stopwords", remove_stopwords_udf(col("cleaned_tokens_ner"))) \
                       .withColumn("cleaned_tokens_final", lemmatize_tokens_udf(col("cleaned_tokens_stopwords")))

output_csv = "processed_Data.csv"

# Save the DataFrame to a CSV file
df_processed.write.csv("./path_to_save.csv", mode="overwrite", header=True)

df_processed.show(truncate=False)
spark.stop()
