In [3]:
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar

def preprocess_chunk(chunk):
    """
    Preprocess the chunk to ensure data integrity.
    
    :param chunk: DataFrame chunk to preprocess.
    :return: Preprocessed DataFrame chunk.
    """
    # Drop rows with excessive missing values. You might adjust the threshold as needed.
    chunk.dropna(thresh=minimum_required_columns, inplace=True)

    # Assuming 'id' is critical and must be unique & correctly formatted:
    # Remove rows with invalid or missing IDs. Adjust the condition based on your ID format.
    chunk = chunk[pd.to_numeric(chunk['id'], errors='coerce').notnull()]
    
    # Sort or reorder rows based on a reliable column if necessary. This step depends on whether you have
    # a way to detect and correct switched rows. Here, I assume a timestamp column can help.
    # chunk.sort_values(by=['timestamp'], inplace=True)

    return chunk

def process_and_save_chunk(chunk, index, base_path='p2_pre_chunks/'):
    # Preprocess the chunk.
    chunk = preprocess_chunk(chunk)
    
    # Convert the pandas DataFrame to a Dask DataFrame for efficient storage.
    dask_df = dd.from_pandas(chunk, npartitions=1)
    
    # Define the filename and save the Parquet file.
    filename = f'{base_path}chunk_{index}.parquet'
    dask_df.to_parquet(filename, engine='pyarrow')

# Define your CSV file path
csv_file_path = 'subset_news_sample.csv'

# Define the chunk size
chunk_size = 31000  # Adjust based on your needs

# Define the minimum number of non-NA fields for a row to be considered valid
minimum_required_columns = 10  # Adjust based on your dataset's structure

# Progress bar setup for better visibility
with ProgressBar():
    # Iterate through the CSV file in chunks
    for i, chunk in enumerate(pd.read_csv(csv_file_path, chunksize=chunk_size)):
        process_and_save_chunk(chunk, i)

print("Chunking and saving to Parquet completed.")


[########################################] | 100% Completed | 442.15 ms
[########################################] | 100% Completed | 338.25 ms
[########################################] | 100% Completed | 434.93 ms
[########################################] | 100% Completed | 436.65 ms
[########################################] | 100% Completed | 442.34 ms
[########################################] | 100% Completed | 445.22 ms
[########################################] | 100% Completed | 331.45 ms
[########################################] | 100% Completed | 330.99 ms
[########################################] | 100% Completed | 437.93 ms
[########################################] | 100% Completed | 328.06 ms
[########################################] | 100% Completed | 328.08 ms
[########################################] | 100% Completed | 436.30 ms
[########################################] | 100% Completed | 337.15 ms
[########################################] | 100% Completed | 43

In [5]:
import pandas as pd
""" 
combined_cleaned_data = pd.read_csv('combined_cleaned_data.csv')

print(combined_cleaned_data.head())
print(combined_cleaned_data.info())
print(combined_cleaned_data['type'].value_counts())

# I want to see if Unnamed: 0 contains any values that aren't numbers. If it does, print it:

for index, row in combined_cleaned_data.iterrows():
    if not str(row['Unnamed: 0']).isdigit():
        print(row['Unnamed: 0']) """

# Combine p2_pre_chunks into a single DataFrame:
combined_data = dd.read_parquet('p2_pre_chunks/chunk_*.parquet/part.*.parquet', engine='pyarrow')
print(combined_data.head())

# Save the combined DataFrame to a single .csv file:
combined_data.to_csv('p2_combined_data.csv', single_file=True)

  Unnamed: 0         id               domain        type  \
0        732  7444726.0   nationalreview.com   political   
1       1348  6213642.0    beforeitsnews.com        fake   
2       7119  3867639.0     dailycurrant.com      satire   
3       1518  9560791.0          nytimes.com    reliable   
4       9345  2059625.0  infiniteunknown.net  conspiracy   

                                                 url  \
0  http://www.nationalreview.com/node/152734/%E2%...   
1  http://beforeitsnews.com/economy/2012/06/the-c...   
2  http://dailycurrant.com/2016/01/18/man-awoken-...   
3  https://query.nytimes.com/gst/fullpage.html?re...   
4  http://www.infiniteunknown.net/2011/09/14/100-...   

                                             content  \
0  Plus one article on Google Plus

(Thanks to Al...   
1  The Cost Of The Best Senate Banking Committee ...   
2  Man Awoken From 27-Year Coma Commits Suicide A...   
3  WHEN Julia Geist was asked to draw a picture o...   
4  – 100 Compiled Stud

['c:\\.KU\\Grundlæggende Data Science\\Eksamen\\p2_combined_data.csv']

In [None]:
combined_data = pd.read_csv('p2_combined_data.csv')
subset_news_sample = pd.read_csv('subset_news_sample.csv')


In [16]:
""" with open('subset_news_sample.csv', 'r') as file:
    print(file.read(100)) """

with open('subset_news_sample.csv', 'r', encoding='utf-8') as file:
    print(file.read(100))

Unnamed: 0,id,domain,type,url,content,scraped_at,inserted_at,updated_at,title,authors,keywords,meta_


In [17]:
# remove the first ten (10) characters from the file subset_news_sample.csv and save the result to a new file called subset_news_sample_2.csv:
with open('subset_news_sample.csv', 'r', encoding='utf-8') as file:
    data = file.read()
    with open('subset_news_sample_2.csv', 'w', encoding='utf-8') as file2:
        file2.write(data[10:])

# read:
with open('subset_news_sample_2.csv', 'r', encoding='utf-8') as file:
    print(file.read(100))

,id,domain,type,url,content,scraped_at,inserted_at,updated_at,title,authors,keywords,meta_keywords,m


In [None]:
# Re-import the subset_news_sample_2.csv file and check the first 5 rows:
subset_news_sample_2 = pd.read_csv('subset_news_sample_2.csv')
print(subset_news_sample_2.head())


In [20]:
print(subset_news_sample_2.head())

  Unnamed: 0         id               domain        type  \
0        732  7444726.0   nationalreview.com   political   
1       1348  6213642.0    beforeitsnews.com        fake   
2       7119  3867639.0     dailycurrant.com      satire   
3       1518  9560791.0          nytimes.com    reliable   
4       9345  2059625.0  infiniteunknown.net  conspiracy   

                                                 url  \
0  http://www.nationalreview.com/node/152734/%E2%...   
1  http://beforeitsnews.com/economy/2012/06/the-c...   
2  http://dailycurrant.com/2016/01/18/man-awoken-...   
3  https://query.nytimes.com/gst/fullpage.html?re...   
4  http://www.infiniteunknown.net/2011/09/14/100-...   

                                             content  \
0  Plus one article on Google Plus\r\n\r\n(Thanks...   
1  The Cost Of The Best Senate Banking Committee ...   
2  Man Awoken From 27-Year Coma Commits Suicide A...   
3  WHEN Julia Geist was asked to draw a picture o...   
4  – 100 Compiled Stud

In [None]:

# for our data:
for index, row in subset_news_sample_2.iterrows():
    if not str(row['Unnamed: 0']).isdigit():

        if str(row['Unnamed: 0'])[-2:] == ".0":
            print("v: ", row['Unnamed: 0'], "index: ", index)

In [None]:
# I want to remove 

In [27]:
# I want to create a new csv file of the rows between index 851968 and 851975 (inclusive) from the subset_news_sample_2.csv file:
# call it subset_anomaly.csv
subset_anomaly = subset_news_sample_2.loc[851965:851970]
subset_anomaly.to_csv('subset_anomaly.csv', index=False)



In [13]:
# Create a new csv file with only 10 rows. First 10 rows of subset_news_sample.csv

subset_news_sample.head(10).to_csv('subset_news_sample_10.csv')

#same for combined_data
combined_data.head(10).to_csv('combined_data_10.csv')


In [11]:
# Compare p2_combined_data.csv to subset_news_sample.csv. If, at any point, the two files differ, print that differing row and end the program.

#tho start by reading head of both files:

# print first 1500 chars of each file:
print("pre")
print(combined_data[:1])
print("post")
print(subset_news_sample[:1])


pre
   Unnamed: 0.1  Unnamed: 0         id              domain       type  \
0             0       732.0  7444726.0  nationalreview.com  political   

                                                 url  \
0  http://www.nationalreview.com/node/152734/%E2%...   

                                             content  \
0  Plus one article on Google Plus\n\n(Thanks to ...   

                   scraped_at                 inserted_at  \
0  2017-11-27T01:14:42.983556  2018-02-08 19:18:34.468038   

                   updated_at               title authors  keywords  \
0  2018-02-08 19:18:34.468066  Iran News Round Up     NaN       NaN   

                                       meta_keywords meta_description tags  \
0  ['National Review', 'National Review Online', ...              NaN  NaN   

   summary source  
0      NaN    NaN  
post
  Unnamed: 0         id              domain       type  \
0        732  7444726.0  nationalreview.com  political   

                                      

In [28]:
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar

def process_and_save_chunk(chunk, index, base_path='p2_chunks/'):
    """
    Process a chunk and save it to a Parquet file.
    
    :param chunk: DataFrame chunk to process.
    :param index: Index of the chunk (used for filename).
    :param base_path: Base path for saving the files.
    """
    
    # Example: Assuming 'id' is a critical column and rows missing 'id' should be dropped
    # Drop rows where 'id' (or your critical column) is missing
    chunk = chunk.dropna(subset=['id'])  # Replace 'id' with the name of your critical column
    
    # Alternatively, fill missing values with a default value or method
    # chunk['id'] = chunk['id'].fillna('default_value')  # Example of filling missing 'id' with 'default_value'
    
    # Convert the pandas DataFrame to a Dask DataFrame.
    dask_df = dd.from_pandas(chunk, npartitions=1)
    
    # Define the filename based on the chunk index.
    filename = f'{base_path}chunk_{index}.parquet'
    
    # Use Dask to write the Parquet file.
    dask_df.to_parquet(filename, engine='pyarrow')

# Define your CSV file path
csv_file_path = 'subset_news_sample_2.csv'

# Define the chunk size
chunk_size = 31000  # This is an example, adjust based on your total rows and desired number of files

# Progress bar setup for better visibility
with ProgressBar():
    # Iterate through the CSV file in chunks
    for i, chunk in enumerate(pd.read_csv(csv_file_path, chunksize=chunk_size)):
        if 'Unnamed: 0' in chunk.columns:
            chunk = chunk.drop(columns=['Unnamed: 0'])
        process_and_save_chunk(chunk, i)

print("Chunking and saving to Parquet completed.")


[########################################] | 100% Completed | 446.62 ms
[########################################] | 100% Completed | 434.66 ms
[########################################] | 100% Completed | 440.92 ms
[########################################] | 100% Completed | 434.24 ms
[########################################] | 100% Completed | 445.71 ms
[########################################] | 100% Completed | 437.56 ms
[########################################] | 100% Completed | 325.15 ms
[########################################] | 100% Completed | 434.95 ms
[########################################] | 100% Completed | 436.81 ms
[########################################] | 100% Completed | 334.20 ms
[########################################] | 100% Completed | 439.29 ms
[########################################] | 100% Completed | 336.17 ms
[########################################] | 100% Completed | 331.06 ms
[########################################] | 100% Completed | 44

In [29]:
import dask.dataframe as dd

# Path to your directory containing the Parquet files
parquet_directory = 'p2_chunks/'

# Read all Parquet files into a Dask DataFrame
dask_df = dd.read_parquet(parquet_directory + 'chunk_*.parquet/part.0.parquet')

# Optionally, if you want to perform any operations on the Dask DataFrame, do them here

# Convert the Dask DataFrame to a Pandas DataFrame
# Note: Be sure your dataset can fit into memory before doing this
pandas_df = dask_df.compute()

# Save the Pandas DataFrame to a single CSV file
output_csv_path = 'p2_combined_data.csv'
pandas_df.to_csv(output_csv_path, index=False)

print("Combined CSV file has been created.")


Combined CSV file has been created.


In [30]:
p2_combined_data = pd.read_csv('p2_combined_data.csv')

  p2_combined_data = pd.read_csv('p2_combined_data.csv')


In [31]:
p2_combined_data.head()

Unnamed: 0,id,domain,type,url,content,scraped_at,inserted_at,updated_at,title,authors,keywords,meta_keywords,meta_description,tags,summary,source
0,7444726.0,nationalreview.com,political,http://www.nationalreview.com/node/152734/%E2%...,Plus one article on Google Plus\r\n\r\n(Thanks...,2017-11-27T01:14:42.983556,2018-02-08 19:18:34.468038,2018-02-08 19:18:34.468066,Iran News Round Up,,,"['National Review', 'National Review Online', ...",,,,
1,6213642.0,beforeitsnews.com,fake,http://beforeitsnews.com/economy/2012/06/the-c...,The Cost Of The Best Senate Banking Committee ...,2017-11-27T01:14:08.7454,2018-02-08 19:18:34.468038,2018-02-08 19:18:34.468066,The Cost Of The Best Senate Banking Committee ...,,,[''],,,,
2,3867639.0,dailycurrant.com,satire,http://dailycurrant.com/2016/01/18/man-awoken-...,Man Awoken From 27-Year Coma Commits Suicide A...,2017-11-27T01:14:21.395055,2018-02-07 23:39:33.852671,2018-02-07 23:39:33.852696,Man Awoken From 27-Year Coma Commits Suicide A...,,,[''],,,,
3,9560791.0,nytimes.com,reliable,https://query.nytimes.com/gst/fullpage.html?re...,WHEN Julia Geist was asked to draw a picture o...,2018-02-11 00:46:42.632962,2018-02-11 00:14:20.346838,2018-02-11 00:14:20.346871,Opening a Gateway for Girls to Enter the Compu...,,,"['Computers and the Internet', 'Women and Girl...",WHEN Julia Geist was asked to draw a picture o...,,,nytimes
4,2059625.0,infiniteunknown.net,conspiracy,http://www.infiniteunknown.net/2011/09/14/100-...,– 100 Compiled Studies on Vaccine Dangers (Act...,2017-11-10T11:18:44.524042,2018-02-07 23:39:33.852671,2018-02-07 23:39:33.852696,100 Compiled Studies on Vaccine Dangers – Infi...,,,[''],,"Lymphoma, Hepatitis B, Immune System, Health, ...",,


In [37]:
import pandas as pd
import dask.dataframe as dd

# Assuming you've read your parquet files into dask_df as before
dask_df = dd.read_parquet('p2_chunks/chunk_*.parquet/part.0.parquet')

# Function to convert float IDs to int IDs if applicable
def convert_id_column(df):
    # Check if the 'id' column is a float type
    if df['id'].dtype == 'float':
        # Attempt to convert to int (this will fail if there are NaNs or non-integer floats)
        try:
            df['id'] = df['id'].astype(int)
        except ValueError:
            # Handle the case where conversion to int is not possible
            print("Conversion to int failed: the column may contain NaNs or non-integer values.")
    return df

# Apply the conversion function to the 'id' column of your DataFrame
dask_df = dask_df.map_partitions(convert_id_column)


#print:
print(dask_df.head())

# again, save as .csv:
dask_df.to_csv('p2_combined_data_2.csv', index=False, single_file=True)

        id               domain        type  \
0  7444726   nationalreview.com   political   
1  6213642    beforeitsnews.com        fake   
2  3867639     dailycurrant.com      satire   
3  9560791          nytimes.com    reliable   
4  2059625  infiniteunknown.net  conspiracy   

                                                 url  \
0  http://www.nationalreview.com/node/152734/%E2%...   
1  http://beforeitsnews.com/economy/2012/06/the-c...   
2  http://dailycurrant.com/2016/01/18/man-awoken-...   
3  https://query.nytimes.com/gst/fullpage.html?re...   
4  http://www.infiniteunknown.net/2011/09/14/100-...   

                                             content  \
0  Plus one article on Google Plus

(Thanks to ...   
1  The Cost Of The Best Senate Banking Committee ...   
2  Man Awoken From 27-Year Coma Commits Suicide A...   
3  WHEN Julia Geist was asked to draw a picture o...   
4  – 100 Compiled Studies on Vaccine Dangers (Act...   

                   scraped_at                

['c:\\.KU\\Grundlæggende Data Science\\Eksamen\\p2_combined_data_2.csv']

In [2]:
import pandas as pd
p2_combined_data = pd.read_csv('p2_combined_data_2.csv')

p2_combined_data.head()

  p2_combined_data = pd.read_csv('p2_combined_data_2.csv')


Unnamed: 0,id,domain,type,url,content,scraped_at,inserted_at,updated_at,title,authors,keywords,meta_keywords,meta_description,tags,summary,source
0,7444726,nationalreview.com,political,http://www.nationalreview.com/node/152734/%E2%...,Plus one article on Google Plus\r\n\r\n(Thanks...,2017-11-27T01:14:42.983556,2018-02-08 19:18:34.468038,2018-02-08 19:18:34.468066,Iran News Round Up,,,"['National Review', 'National Review Online', ...",,,,
1,6213642,beforeitsnews.com,fake,http://beforeitsnews.com/economy/2012/06/the-c...,The Cost Of The Best Senate Banking Committee ...,2017-11-27T01:14:08.7454,2018-02-08 19:18:34.468038,2018-02-08 19:18:34.468066,The Cost Of The Best Senate Banking Committee ...,,,[''],,,,
2,3867639,dailycurrant.com,satire,http://dailycurrant.com/2016/01/18/man-awoken-...,Man Awoken From 27-Year Coma Commits Suicide A...,2017-11-27T01:14:21.395055,2018-02-07 23:39:33.852671,2018-02-07 23:39:33.852696,Man Awoken From 27-Year Coma Commits Suicide A...,,,[''],,,,
3,9560791,nytimes.com,reliable,https://query.nytimes.com/gst/fullpage.html?re...,WHEN Julia Geist was asked to draw a picture o...,2018-02-11 00:46:42.632962,2018-02-11 00:14:20.346838,2018-02-11 00:14:20.346871,Opening a Gateway for Girls to Enter the Compu...,,,"['Computers and the Internet', 'Women and Girl...",WHEN Julia Geist was asked to draw a picture o...,,,nytimes
4,2059625,infiniteunknown.net,conspiracy,http://www.infiniteunknown.net/2011/09/14/100-...,– 100 Compiled Studies on Vaccine Dangers (Act...,2017-11-10T11:18:44.524042,2018-02-07 23:39:33.852671,2018-02-07 23:39:33.852696,100 Compiled Studies on Vaccine Dangers – Infi...,,,[''],,"Lymphoma, Hepatitis B, Immune System, Health, ...",,


In [6]:

# for our data:
weird = 0
for index, row in p2_combined_data.iterrows():
    #every 15k, print index:
    if index % 15000 == 0:
        print(index)
    if not str(row['id']).isdigit():
        weird += 1
        print(row['id'], index)

print("weird: ", weird)

0
15000
30000
45000
60000
75000
90000
105000
120000
135000
150000
165000
180000
195000
210000
225000
240000
255000
270000
285000
300000
315000
330000
345000
COP23, pipeline, Pope Francis, Ryan Zinke, Paris agreement, TransCanada's, antibiotics, denial 357919
360000
375000
390000
405000
420000
435000
450000
465000
480000
495000
stranded, air pollution, whales, Paris agreement 502584
510000
525000
540000
rainforests, coal, forests, nuclear, wind power, palm oil, solar, climate deniers, solar power, green roof 549515
555000
570000
585000
ocean, coral reefs, offshore oil and gas drilling, wind, nuclear, Doomsday Clock 594000
600000
615000
4179553.0 619514
2844663.0 619515
2539576.0 619516
6279235.0 619517
4362427.0 619518
6765734.0 619519
3129720.0 619520
724161.0 619521
9190763.0 619522
8603172.0 619523
6290092.0 619524
8807369.0 619525
4701309.0 619526
9533578.0 619527
1794740.0 619528
1336740.0 619529
9200279.0 619530
9006452.0 619531
8327902.0 619532
2557964.0 619533
724177.0 619534
12

In [7]:
# Remove every single row where the 'id' column is not a number. Save the result to a new file called p2_combined_data_cleaned.csv:
p2_combined_data_3 = p2_combined_data[p2_combined_data['id'].apply(lambda x: str(x).isdigit())]
p2_combined_data_3.to_csv('p2_combined_data_3.csv', index=False)

In [13]:
# We separate this into p2_chunks_2 parquet files:
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar

def process_and_save_chunk(chunk, index, base_path='p2_chunks_2/'):
    """
    Process a chunk and save it to a Parquet file.
    
    :param chunk: DataFrame chunk to process.
    :param index: Index of the chunk (used for filename).
    :param base_path: Base path for saving the files.
    """
    
    # Example: Assuming 'id' is a critical column and rows missing 'id' should be dropped
    # Drop rows where 'id' (or your critical column) is missing
    chunk = chunk.dropna(subset=['id'])  # Replace 'id' with the name of your critical column
    
    # Alternatively, fill missing values with a default value or method
    # chunk['id'] = chunk['id'].fillna('default_value')  # Example of filling missing 'id' with 'default_value'
    
    # Convert the pandas DataFrame to a Dask DataFrame.
    dask_df = dd.from_pandas(chunk, npartitions=1)
    
    # Define the filename based on the chunk index.
    filename = f'{base_path}chunk_{index}.parquet'
    
    # Use Dask to write the Parquet file.
    dask_df.to_parquet(filename, engine='pyarrow')

# Define your CSV file path
csv_file_path = 'p2_combined_data_3.csv'

# Define the chunk size
chunk_size = 31000  # This is an example, adjust based on your total rows and desired number of files

# Progress bar setup for better visibility
with ProgressBar():
    # Iterate through the CSV file in chunks
    for i, chunk in enumerate(pd.read_csv(csv_file_path, chunksize=chunk_size)):
        if 'Unnamed: 0' in chunk.columns:
            chunk = chunk.drop(columns=['Unnamed: 0'])
        process_and_save_chunk(chunk, i)

print("Chunking and saving to Parquet completed.")


[########################################] | 100% Completed | 438.60 ms
[########################################] | 100% Completed | 436.91 ms
[########################################] | 100% Completed | 437.91 ms
[########################################] | 100% Completed | 448.24 ms
[########################################] | 100% Completed | 444.47 ms
[########################################] | 100% Completed | 447.27 ms
[########################################] | 100% Completed | 440.93 ms
[########################################] | 100% Completed | 444.71 ms
[########################################] | 100% Completed | 769.33 ms
[########################################] | 100% Completed | 555.15 ms
[########################################] | 100% Completed | 663.25 ms
[########################################] | 100% Completed | 550.79 ms
[########################################] | 100% Completed | 335.10 ms
[########################################] | 100% Completed | 33

In [1]:
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
import os
import pandas as pd
import nltk
from nltk.tokenize import RegexpTokenizer
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
from cleantext import clean

import pyarrow as pa

# Configuration for Dask to utilize more cores and memory
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=16, threads_per_worker=1, memory_limit='4GB')
client = Client(cluster)

nltk.download('stopwords')
stop_words = set(stopwords.words('english'))

pattern = r'<num>|<date>|<email>|<url>|\w+|[^\w\s]'
tokenizer = RegexpTokenizer(pattern)
stemmer = PorterStemmer()

def preprocess(text):
    tokens = tokenizer.tokenize(text)
    tokens = [token.lower() for token in tokens if token.lower() not in stop_words]
    tokens = [stemmer.stem(token) for token in tokens]
    return tokens

# Define your cleaning function
def clean_text_faster(raw_text):
    if pd.isnull(raw_text):  # Handle missing values explicitly
        return ""
    cleaned_text = clean(raw_text, lower=True, no_line_breaks=True, no_urls=True, no_emails=True, no_numbers=True, no_punct=False, replace_with_url="<URL>", replace_with_email="<EMAIL>", replace_with_number="<NUM>")
    return cleaned_text

def process_partition(partition_df):
    partition_df['cleaned_content'] = partition_df['content'].apply(clean_text_faster).astype('string[pyarrow]')

    partition_df['tokens'] = partition_df['cleaned_content'].apply(preprocess)
    partition_df['tokens'] = partition_df['tokens'].apply(' '.join).astype('string[pyarrow]')
    
    return partition_df





path_to_parquets = 'p2_chunks_2'
output_path = 'p2_processed_chunks'

os.makedirs(output_path, exist_ok=True)




# Use a glob pattern to read multiple Parquet files in parallel
ddf = dd.read_parquet(os.path.join(path_to_parquets, 'chunk_*.parquet/part.0.parquet'))

sample_ddf = ddf.head(10)  # Gets the first 10 rows as a Pandas DataFrame

# Now, apply the `process_partition` function to this sample
# Since `sample_ddf` is already a Pandas DataFrame, you can directly apply your function

sample_processed = process_partition(sample_ddf)

# Use the processed sample to infer `meta`
meta = sample_processed.dtypes.to_dict()


print("Meta:", meta)



processed_ddf = ddf.map_partitions(process_partition, meta=meta)


with ProgressBar():
    # Save the processed Dask DataFrame to new Parquet files in one go
    print("Saving processed data to Parquet file")
    processed_ddf.to_parquet(output_path, engine='pyarrow', write_index=False)

print("All files processed and saved")

Since the GPL-licensed package `unidecode` is not installed, using Python's `unicodedata` package which yields worse results.
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\Main\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


Meta: {'id': dtype('int64'), 'domain': string[pyarrow], 'type': string[pyarrow], 'url': string[pyarrow], 'content': string[pyarrow], 'scraped_at': string[pyarrow], 'inserted_at': string[pyarrow], 'updated_at': string[pyarrow], 'title': string[pyarrow], 'authors': string[pyarrow], 'keywords': dtype('float64'), 'meta_keywords': string[pyarrow], 'meta_description': string[pyarrow], 'tags': string[pyarrow], 'summary': dtype('float64'), 'source': string[pyarrow], 'cleaned_content': string[pyarrow], 'tokens': string[pyarrow]}
Saving processed data to Parquet file
All files processed and saved
