In [27]:
!pip install pyspark
!pip install spark-nlp

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


In [28]:
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, ArrayType

from pyspark.ml import Pipeline, Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.feature import Bucketizer

spark = SparkSession.builder.appName('NLP').getOrCreate()
spark

In [29]:
import nltk
nltk.download('punkt')
from nltk.tokenize import sent_tokenize, word_tokenize

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from pymongo import MongoClient
db = MongoClient(host='localhost', port=27017)['news_classification']

[nltk_data] Downloading package punkt to /home/vadim/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [30]:
import json
from datetime import datetime

In [36]:
class WordTokenizer(Transformer, HasInputCol, HasOutputCol):
    def __init__(self, inputCol: str = "input", outputCol: str = "output"):
        super(WordTokenizer, self).__init__()
        self.inputCol = inputCol
        self.outputCol = outputCol

    def _transform(self, df: DataFrame) -> DataFrame:
        transform_udf = F.udf(lambda x: [word_tokenize(i.lower()) for i in x], ArrayType(StringType()))
        return df.withColumn(self.outputCol, transform_udf(df[self.inputCol]))

In [37]:
class BIO(Transformer, HasInputCol, HasOutputCol):
    TYPE_FIELD = 'type'
    TOCKEN_FIELD = 'tokens'
    LENGTH_FIELD = 'l'
    
    def __init__(self, inputCol: str = "input", outputCol: str = "output", all_tags: list = []):
        super(BIO, self).__init__()
        self.inputCol = inputCol
        self.outputCol = outputCol
        self.all_tags = all_tags

    def _transform(self, df: DataFrame) -> DataFrame:
        transform_udf = F.udf(self.BIO_format, ArrayType(ArrayType(ArrayType(StringType()))))
        return df.withColumn(self.outputCol, transform_udf(df[self.inputCol]))
    
    def BIO_format(self, sentenses):
        BIO_tags = []
        for words in sentenses:
            tags = ["O"] * len(words)
            for i in self.all_tags:
                tag = i[self.TYPE_FIELD]
                tag_words = i[self.TOCKEN_FIELD]
                tag_len = i[self.LENGTH_FIELD]
                for i in range(len(words) - tag_len + 1):
                    if words[i:i+tag_len] == tag_words:
                        tags[i] = f"B-{tag}"
                        for j in range(1, tag_len):
                            tags[i+j] = f"I-{tag}"

            if tags == ["O"] * len(words):
                continue
            BIO_tags.append(list(zip(words, tags)))
        return BIO_tags

In [31]:
articles = [i for i in db['GameSpotArticles'].find({})]
df = spark.createDataFrame(articles).select(['title', 'text'])

In [38]:
df1 = df.rdd.map(lambda x: x + ([x['title']] + [k['text'] for k in x['text'] if k['isTitle']], )).toDF(["title", "text", 'main_text'])
word_transformer = WordTokenizer(inputCol="main_text", outputCol="tokenized_text")
bio_transformer = BIO(inputCol="tokenized_text", outputCol="BIO", all_tags=clean_tokens)
bio_transformer.TYPE_FIELD, bio_transformer.TOCKEN_FIELD, bio_transformer.LENGTH_FIELD = 'tag_slug', 'game_tokens', 'length'
pipeline = Pipeline(stages=[word_transformer, bio_transformer])
model = pipeline.fit(df1)
results = model.transform(df1).select('main_text', 'tokenized_text', "BIO")
results = results.filter(F.size(results.BIO) > 0)
results.show()
results_pandas = results.toPandas()

23/06/25 21:52:46 WARN TaskSetManager: Stage 6 contains a task of very large size (2709 KiB). The maximum recommended task size is 1000 KiB.


NameError: name 'clean_tokens' is not defined

In [245]:
rdf = results_pandas[["main_text", 'BIO']].explode('BIO', ignore_index=True)

In [246]:
rdf['sentence_id'] = rdf.index

In [247]:
rdf2 = rdf.explode('BIO', ignore_index=True)

In [248]:
rdf2[['BIO_word', 'BIO_tag']] = rdf2["BIO"].apply(lambda x: pd.Series((x[0], x[1])))

In [249]:
rdf3 = rdf2[['BIO_word', "BIO_tag", "sentence_id"]]

In [250]:
rdf3['BIO_type'] = rdf3['BIO_tag'].map(lambda x: x if x == 'O' else f"{x.split('-')[0]}-game")

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  rdf3['BIO_type'] = rdf3['BIO_tag'].map(lambda x: x if x == 'O' else f"{x.split('-')[0]}-game")


In [251]:
rdf3

Unnamed: 0,BIO_word,BIO_tag,sentence_id,BIO_type
0,almost,O,0,O
1,all,O,0,O
2,destiny,B-destiny,0,B-game
3,2,I-destiny,0,I-game
4,heavy,O,0,O
...,...,...,...,...
27024,eulogize,O,2342,O
27025,your,O,2342,O
27026,diablo,B-diablo,2342,B-game
27027,4,I-diablo,2342,I-game


In [None]:
json_array = rdf3.to_json(oriented='records')

In [252]:
with open("BIO_games.csv", 'w') as f:
    rdf3.to_csv(f)

In [254]:
np.unique(rdf2['BIO_tag'])

array(['B-ace-attorney', 'B-alien', 'B-animal-crossing',
       'B-assassins-creed', 'B-attack-on-titan', 'B-banjo-and-kazooie',
       'B-batman', 'B-battlefield', 'B-blazblue', 'B-bolt',
       'B-borderlands', 'B-call-of-duty', 'B-castlevania', 'B-conan',
       'B-contra', 'B-crash-bandicoot', 'B-cyberpunk', 'B-dc-comics',
       'B-dead-space', 'B-deadpool', 'B-destiny', 'B-diablo', 'B-digimon',
       'B-disney', 'B-donkey-kong', 'B-doom', 'B-dora-the-explorer',
       'B-dota', 'B-double-dragon', 'B-dragon-age', 'B-dragon-ball',
       'B-dungeons-dragons', 'B-f-zero', 'B-fables', 'B-fallout',
       'B-final-fantasy', 'B-fire-emblem', 'B-fortnite', 'B-forza',
       'B-frank-herbert-s-dune', 'B-friday-the-13th', 'B-front-mission',
       'B-game-of-thrones', 'B-genshin-impact', 'B-ghostbusters',
       'B-god-of-war', 'B-godzilla', 'B-gran-turismo',
       'B-granblue-fantasy', 'B-grand-theft-auto', 'B-gremlins',
       'B-guilty-gear', 'B-gungrave', 'B-half-life', 'B-halo',
  

In [None]:
# write data