In [1]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover, StringIndexer
from pyspark.sql.functions import length, size, lit
from pyspark.sql import SQLContext
import string
sqlContext = SQLContext(sc)


In [2]:
df = spark.table("all_csv")

df.columns

In [3]:
df = df.select('username', 'about_me_text_clean', 'Gender').na.drop()
df_length = df.withColumn('length', length(df['about_me_text_clean']))

male_female_to_num = StringIndexer(inputCol='Gender', outputCol='label')

df = male_female_to_num.fit(df_length).transform(df_length)
df.show()

In [4]:
# Tokenize data
tokened_df = Tokenizer(inputCol="about_me_text_clean", outputCol="token_text")
tt_df = tokened_df.transform(df)
tt_df = tt_df.where(size(tt_df["token_text"]) > 10)
tt_df = tt_df.where(tt_df['label'] < 2.0)

tt_df.show()


In [5]:
stop_remove = StopWordsRemover(inputCol='token_text', outputCol='stop_tokens')
removed_frame = stop_remove.transform(tt_df)
removed_frame.show()

In [6]:
# Run the hashing term frequency
hashing = HashingTF(inputCol='token_text', outputCol="hashedValues")

# Transform into a DF
hashed_df = hashing.transform(removed_frame)
hashed_df.show()

In [7]:
# Fit the IDF on the data set 
idf = IDF(minDocFreq = 10, inputCol="hashedValues", outputCol="idf_features")
idfModel = idf.fit(hashed_df)
rescaledData = idfModel.transform(hashed_df)

In [8]:
# Display rescaled dataframe
#rescaledData.show()
rescaledData.select("token_text", "idf_features").show()

In [9]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_features', 'length'], outputCol='features')
cleaned_data = clean_up.transform(rescaledData)

In [10]:
#display cleaned data
cleaned_data.select(['label', 'idf_features']).show()

In [11]:
from pyspark.ml.classification import NaiveBayes, NaiveBayesModel
# Break data down into a training set and a testing set
#training, testing = cleaned_data.randomSplit([0.7, 0.3], seed = 42)
fractions = cleaned_data.select("label").distinct().withColumn("fraction", lit(0.7)).rdd.collectAsMap()

training,testing = cleaned_data.stat.sampleBy("label", fractions, 12).randomSplit([0.7, 0.3])
#testing = cleaned_data.subtract(training)

# Create a Naive Bayes model and fit training data
nb = NaiveBayes(smoothing = 1)
predictor = nb.fit(training)


In [12]:
test_results = predictor.transform(testing)
test_results.show(3)

In [13]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting gender was: %f" % acc)