# Method 1: mapinpandas UDF

In [1]:
# pip install -i https://pypi.python.org/pypi pyspark
# pip install -i https://pypi.python.org/pypi fugue
import pyspark
import fugue

In [2]:
from typing import Iterable, Dict, Any, List
import pandas as pd
pd.set_option("display.max_colwidth", None)

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType, StringType
from pyspark.sql import DataFrame

spark = (SparkSession.builder
            .appName("Spark Tokenize")
            .master("local[4]")
            .config("spark.driver.memory", "16G")
            .getOrCreate())
sc = spark.sparkContext
sc.setLogLevel("WARN")

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/10/30 11:33:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df = pd.read_csv('./toxic_data/train.csv.zip')

In [5]:
df[:1]

Unnamed: 0,id,comment_text,toxic,severe_toxic,obscene,threat,insult,identity_hate
0,0000997932d777bf,"Explanation\nWhy the edits made under my username Hardcore Metallica Fan were reverted? They weren't vandalisms, just closure on some GAs after I voted at New York Dolls FAC. And please don't remove the template from the talk page since I'm retired now.89.205.38.27",0,0,0,0,0,0


In [6]:
df[:1][['comment_text']]

Unnamed: 0,comment_text
0,"Explanation\nWhy the edits made under my username Hardcore Metallica Fan were reverted? They weren't vandalisms, just closure on some GAs after I voted at New York Dolls FAC. And please don't remove the template from the talk page since I'm retired now.89.205.38.27"


In [7]:
import re
import string
from nltk.corpus import stopwords
from typing import Iterator
import pandas as pd

# schema: *, comment_tokens: [str]
def tokenize(df: pd.DataFrame, remove_punctuation: bool=True) -> pd.DataFrame:
    '''
    Tokenize English text and return a non-unique list of tokenized words found in the text.
    Normalize to lowercase, strip punctuation, remove stop words, filter non-scii characters.

    args:
    remove_punctuation bool: if True, remove punctuation
    input_col str: name of column that contains text
    output_col str: name of column that will contain the list of tokens
    '''

    # col is columnt that contains comment_text
    text = df['comment_text'].str.lower()
    if remove_punctuation:
        regex = re.compile( '[' + re.escape(string.punctuation) + '0-9\\r\\r\\n]')

        # remove punctuation and strip whitespace
        nopunct = text.apply(lambda x: str.strip(regex.sub(" ", x)))

        # split words on space
        words = nopunct.str.split(' ')

        # split word on new line
        words = words.apply(lambda x: [word.split('\n') for word in x])
        words = words.apply(lambda x: [word for line in x for word in line]) # flatten

    else:
        words = text.split(' ')
    
    # remove any non ascii
    words = words.apply(lambda x: [word.encode('ascii', 'ignore').decode('ascii') for word in x])

    # remove stop words
    stops = set(stopwords.words('english'))
    words = words.apply(lambda x: [w for w in x if not w in stops and w != ''])

    df['comment_tokens'] = words

    return df

def map_tokenize(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    for df in iterator:
        yield tokenize(df)

def sdf_tokenize(sdf: DataFrame) -> DataFrame:
    schema = StructType(list(sdf.schema.fields))
    schema.add(StructField("comment_tokens", ArrayType(StringType(), True)))
    return sdf.mapInPandas(lambda dfs: map_tokenize(dfs), schema)
    


In [8]:
tokenize(df[:2])[['comment_text', 'comment_tokens']]

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['comment_tokens'] = words


Unnamed: 0,comment_text,comment_tokens
0,"Explanation\nWhy the edits made under my username Hardcore Metallica Fan were reverted? They weren't vandalisms, just closure on some GAs after I voted at New York Dolls FAC. And please don't remove the template from the talk page since I'm retired now.89.205.38.27","[explanation, edits, made, username, hardcore, metallica, fan, reverted, vandalisms, closure, gas, voted, new, york, dolls, fac, please, remove, template, talk, page, since, retired]"
1,"D'aww! He matches this background colour I'm seemingly stuck with. Thanks. (talk) 21:51, January 11, 2016 (UTC)","[aww, matches, background, colour, seemingly, stuck, thanks, talk, january, utc]"


In [9]:
sdf = spark.createDataFrame(df[:2])
clean_sdf = sdf_tokenize(sdf)
clean_sdf.select('comment_text', 'comment_tokens').show(truncate=False)



+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|comment_text                                                                                                                                                                                                                                                             |comment_tokens                                                                                                                                                                        |
+-------------------------------------------------------------------------------------------------



# Method 2: Fugue Transformer

In [10]:
from fugue import transform
from fugue import NativeExecutionEngine
from fugue_spark import SparkExecutionEngine

# schema: *, comment_tokens: [str]
def tokenize(df: pd.DataFrame, remove_punctuation: bool=True) -> pd.DataFrame:
    '''
    Tokenize English text and return a non-unique list of tokenized words found in the text.
    Normalize to lowercase, strip punctuation, remove stop words, filter non-scii characters.

    args:
    remove_punctuation bool: if True, remove punctuation
    input_col str: name of column that contains text
    output_col str: name of column that will contain the list of tokens
    '''

    # col is columnt that contains comment_text
    text = df['comment_text'].str.lower()
    if remove_punctuation:
        regex = re.compile( '[' + re.escape(string.punctuation) + '0-9\\r\\r\\n]')

        # remove punctuation and strip whitespace
        nopunct = text.apply(lambda x: str.strip(regex.sub(" ", x)))

        # split words on space
        words = nopunct.str.split(' ')

        # split word on new line
        words = words.apply(lambda x: [word.split('\n') for word in x])
        words = words.apply(lambda x: [word for line in x for word in line]) # flatten

    else:
        words = text.split(' ')
    
    # remove any non ascii
    words = words.apply(lambda x: [word.encode('ascii', 'ignore').decode('ascii') for word in x])

    # remove stop words
    stops = set(stopwords.words('english'))
    words = words.apply(lambda x: [w for w in x if not w in stops and w != ''])

    df['comment_tokens'] = words

    return df

result_pdf = transform(df[:2], tokenize, params={'remove_punctuation': True}, engine=NativeExecutionEngine)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['comment_tokens'] = words


In [11]:
result_pdf[['comment_text','comment_tokens']]


Unnamed: 0,comment_text,comment_tokens
0,"Explanation\nWhy the edits made under my username Hardcore Metallica Fan were reverted? They weren't vandalisms, just closure on some GAs after I voted at New York Dolls FAC. And please don't remove the template from the talk page since I'm retired now.89.205.38.27","[explanation, edits, made, username, hardcore, metallica, fan, reverted, vandalisms, closure, gas, voted, new, york, dolls, fac, please, remove, template, talk, page, since, retired]"
1,"D'aww! He matches this background colour I'm seemingly stuck with. Thanks. (talk) 21:51, January 11, 2016 (UTC)","[aww, matches, background, colour, seemingly, stuck, thanks, talk, january, utc]"


In [12]:
result_sdf = transform(df[:2], tokenize, params={'remove_punctuation': True}, engine=SparkExecutionEngine)

In [14]:
result_sdf.select('comment_text', 'comment_tokens').show(truncate=False)



+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|comment_text                                                                                                                                                                                                                                                             |comment_tokens                                                                                                                                                                        |
+-------------------------------------------------------------------------------------------------



In [21]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

# Use pandas_udf to define a Pandas UDF
@pandas_udf('double', PandasUDFType.SCALAR)
# Input/output are both a pandas.Series of doubles

def pandas_plus_one(v: pd.Series) -> pd.Series:
    return v + 1

sdf.withColumn('toxic2', pandas_plus_one(sdf.toxic)).show()



+----------------+--------------------+-----+------------+-------+------+------+-------------+------+
|              id|        comment_text|toxic|severe_toxic|obscene|threat|insult|identity_hate|toxic2|
+----------------+--------------------+-----+------------+-------+------+------+-------------+------+
|0000997932d777bf|Explanation\nWhy ...|    0|           0|      0|     0|     0|            0|   1.0|
|000103f0d9cfb60f|D'aww! He matches...|    0|           0|      0|     0|     0|            0|   1.0|
+----------------+--------------------+-----+------------+-------+------+------+-------------+------+



