# Kickstarter Notebook

This notebook (created on Databricks) shows how to train a ML Model with Spark and the use of Spark SQL to clean and prepare the dataset.

In [2]:
%sql 

select * from default.ds_projects_summary

project_id,title,description,category
673765219,EcoFlow R600 - Modular Portable Power Station,Modular Design for Increased Capacity; 600W Smart Inverter; Chainable AC Power; From 0 to 80% in 1 hour; EcoFlow Mobile App,Product
637007911,Terraforming Mars Big Box + 3D Tiles!,3D tiles and storage solution that include new special terrain tiles and promos!,Games
748081064,Instafloss - The 10 Second Floss That Will Make You Smile,Instafloss is a multi-jet water flosser delivering an amazingly comfortable and effective flossing experience in only 10 seconds.,Product
433085097,Hoo(KEY) - Antimicrobial Silicone Patented No-Touch Tool,"HooKEY is an antimicrobial, anti-scratch, super-strong tool to open doors, press buttons, etc — all contact-free & germ-free",Others
256941264,"Ananta, A 17.3-inch Touchscreen Portable Monitor","Grow your ideas on a larger touchscreen monitor! Ananta is perfect for designing, productivity-boosting and gaming.",Product
54308480,"AirTab, The Lightest 15.6” Touchscreen Monitor","Weighing only 1.1 lb, AirTab is a reinvented 15.6” portable monitor that maximizes your productivity wherever you are.",Product
1774324319,Lumos Ultra - The New Standard In Bike Helmets,"Integrated LED lighting, turn signals, and smart features to keep you safe in one sleek package.",Product
374785399,RYZE-UPS: The Modern Pull-Up Bar,"Pull-up handles that fold to the size of a book, lock-on to nearly any door, and adjust to give you big results.",Product
1709033713,"Pocket Sized Plastic Bottle Rope Maker, Turn Trash Into Rope",This pocket sized tool turns a plastic bottle into over 50 feet of incredibly strong rope keeping plastic bottles out of landfills,Product
1972917609,GoGoLeash: All-in-One Dog Leash and Collar System,The World's Most Advanced Dog-Walking and Training Tool For Your Good Boy or Girl.,Others


First of all, we check the distribution of our target.
This notebook will take in input a text and predict the category of the project.

In [4]:
%sql 
select count(distinct project_id) from default.ds_projects_summary

count(DISTINCT project_id)
4022


In [5]:
%sql

select category, count(distinct project_id) num from default.ds_projects_summary group by category order by num desc

category,num
Others,1386
Illustration,981
Games,908
Product,764


Create new category to aggregate project into main bascket

In [7]:
%sql
drop table ds_projects_categories_class;
CREATE TABLE ds_projects_categories_class AS (
select project_id, title, description, category as old_category,
case
  when category like '%Candles%' then 'Product'
  when category like '%Metal%' then 'Product'
  when category like '%Metal%' then 'Ceramics'
  when category like '%Fantasy%' then 'Illustration'
  when category like '%Print%' then 'Illustration'
  when category like '%Textiles%' then 'Product'
  when category like '%Childrenswear%' then 'Product'
  when category like '%Printing%' then 'Product'
  when category like '%Robots%' then 'Product'
  when category like '%Architecture%' then 'Product'
  when category like '%Games%' then 'Games'
  when category like '%Gaming%' then 'Games'
  when category like '%Product%' then 'Product'
  when category like '%Art%' then 'Illustration'
  when category like '%Illustration%' then 'Illustration'
  when category like '%Accessories%' then 'Product'
  when category like '%Comic Books%' then 'Illustration'
  when category like '%Apps%' then 'Product'
  when category like '%Books%' then 'Illustration'
  when category like '%Play%' then 'Games'
  when category like '%Technology%' then 'Product'
  when category like '%Graphic%' then 'Illustration'
  when category like '%Documentary%' then 'Illustration'
  when category like '%Hardware%' then 'Product'
  when category like '%Design%' then 'Product'
  when category like '%Comics%' then 'Illustration'
  when category like '%Games%' then 'Games'
  when category like '%Photobooks%' then 'Illustration'
  when category like '%Wearables%' then 'Product'
  when category like '%Anthologies%' then 'Illustration'
  when category like '%Software%' then 'Product'
  when category like '%Painting%' then 'Illustration'
  when category like '%Footwear%' then 'Product'
  when category like '%Toys%' then 'Product'
  when category like '%Puzzles%' then 'Illustration'
  when category like '%Animation%' then 'Illustration'
  when category like '%Food%' then 'Product'
  when category like '%Wearables%' then 'Product'
  when category like '%Software%' then 'Product'
  when category like '%Cookbooks%' then 'Illustration'
  when category like '%Woodworking%' then 'Product'
  when category like '%Shorts%' then 'Illustration'
  when category like '%Jewelry%' then 'Product'
  when category like '%Drinks%' then 'Product'
  when category like '%Footwear%' then 'Product'
  when category like '%Webcomics%' then 'Illustration'
  when category like '%Video%' then 'Illustration'
  when category like '%Sculpture%' then 'Product'
  when category like '%wear%' then 'Product'
  when category like '%Film%' then 'Product'
  when category like '%Apparel%' then 'Illustration'
  when category like '%Publishing%' then 'Illustration'
  when category like '%Gadgets%' then 'Product'
  when category like '%Crafts%' then 'Product'
  when category like '%Fashion%' then 'Product'
  when category like '%Farms%' then 'Product'
  when category like '%Vegan%' then 'Product'
  when category like '%Photo%' then 'Illustration'
  when category like '%Music%' then 'Music'
  when category like '%Fiction%' then 'Product'
  when category like '%Nonfiction%' then 'Product'
  when category like '%Web%' then 'Product'
  when category like '%Restaurants%' then 'Product'
  when category like '%Drama%' then 'Product'
  when category like '%Comedy%' then 'Product'
  when category like '%Pop%' then 'Music'
  when category like '%DIY%' then 'Product'
  when category like '%Folk%' then 'Music'
  when category like '%Rock%' then 'Music'
  when category like '%Glass%' then 'Product'
  when category like '%Bacon%' then 'Product'
  when category like '%Dance%' then 'Music'
  when category like '%Kids%' then 'Product'
  when category like '%Garden%' then 'Product'
  when category like '%Family%' then 'Product'
  when category like '%Immersive%' then 'Music'
  when category like '%Places%' then 'Product'
  when category like '%Latin%' then 'Music'
  when category like '%Blues%' then 'Music'
  when category like '%Jazz%' then 'Music'
  when category like '%Audio%' then 'Music'
  when category like '%R&B%' then 'Music'
  when category like '%Young%' then 'Product'
  when category like '%Practice%' then 'Product'
  when category like '%Sound%' then 'Music'
  when category like '%Hip-Hop%' then 'Music'
  when category like '%Mixed%' then 'Product'
  when category like '%Horror%' then 'Product'
  when category like '%Poetry%' then 'Product'
  when category like '%Ceramics%' then 'Product'
  when category like '%Nature%' then 'Product'
  when category like '%Journals%' then 'Illustration'
  when category like '%Letterpress%' then 'Illustration'
  when category like '%Typography%' then 'Illustration'
  when category like '%Periodicals%' then 'Illustration'
  when category like '%Journalism%' then 'Illustration'
else 'Others'
end as category
from default.ds_projects_summary )

In [8]:
%sql

select category, old_category, count(distinct project_id) num from ds_projects_categories_class where category='Others' group by category, old_category order by num desc

category,old_category,num
Others,Others,1386


In [9]:
%sql
select category, count(distinct project_id) num from ds_projects_categories_class group by category order by num desc

category,num
Others,1386
Illustration,981
Games,908
Product,764


Our data have an unbalanced distribution.
We try to focus on the first top categories.

In [11]:
ds_dataset = spark.sql("select * from ds_projects_categories where category <> 'Others' and category <> 'Music'").dropDuplicates(['project_id'])

In [12]:
ds_dataset.createOrReplaceTempView("ds_dataset")

With Spark, we can create a custom temp view on our dataset and use Spark SQL to query the data.

In [14]:
%sql

select category, count(*) num from ds_dataset group by category order by num desc

category,num
Product,1606
Illustration,1235
Games,796


Now, we can split the data in train and test dataset.
The training data is used to make sure the machine recognizes patterns in the data, and the test data is used to see how well the machine can predict new answers based on its training.

In [16]:
train, test = ds_dataset.randomSplit([0.9, 0.1], seed=12345)

Our goal is to create a pipeline that receive in input a dataset and perform this task:
- select the correct feature
- apply a tokenization method to split the description in sigle words
- filter tokens and remove the noise
- create ngrams from text
- train a word2vec model
- tran a NaiveBayes model
- test our performance

First, let's select only the imortant feature. We will use the description to predict the category.
During this task, we make lowercase the text and we remove some noise...

To remove some noise, we often use regular expression:
https://en.wikipedia.org/wiki/Regular_expression

In [19]:
train.createOrReplaceTempView("train")
train_cleaned = spark.sql("select regexp_replace(title || '' '' || description, '[0-9]', ' ') as description_cleaned, category as target from train")
display(train_cleaned.select("*").limit(10))

description_cleaned,target
Altor : A Star Wars Story Fan FilmThis is a passion project that we have already worked incredibly hard on. We are almost there! We just need that last little push!,Product
Magical Gamer Charm Set!Magical Girl Gamer Charms/Keychains,Illustration
"CleanTouch washer | A foldable and wireless washing machine.CleanTouch washer is an Eco-friendly foldable and wireless washing machine. Wash socks, t-shirts, underwear, baby clothes and more.",Product
FOOT FIST FRANKENSTEIN: A Monster Martial Arts Revenge ComicFoot Fist Frankenstein is a page hardcover Martial Arts story of revenge and cool monster action all in one awesome Graphic Novel,Illustration
"Flowery Enamel Pins for Avatar: The Last AirbenderStart off with Prunu Aang! Unlock your favorite characters, and giggle at our three iconic stickers.",Illustration
"Runt: a materials free & rules-lite roleplaying systemA materials free, rules-lite, and genre agnostic tabletop roleplaying game system with an emphasis on storytelling and simple gameplay.",Games
INTERDIMENSIONALBringing Classic Anthology Sci-Fi Back to Comics,Illustration
"NaturalsBAESA natural luxury Skincare/Haircare line created to be unisex, all inclusive, affordable and multi-purposeful.",Product
Malas Decisiones: Bilingual editionA game in which you get points by predicting what terrible decisions your friends will make. AVAILABLE IN ENGLISH AND ESPAÑOL!,Games
"Incognito Games - Memory Games for Mature MindsA board game, card game, and flash cards to assist memory and mental elasticity particularly, but not exclusively, for ages and up.",Games


### Pre-processing (clean text and reduce features)

Spark has two tokenizer component (called transformers):
- Default Tokenizer
- RegexTokenizer

We use the RegexTokenize to make sure to split the description by ' ' and contestually remove some special chars (like |,/!....).

Each Spark transformers usually have:
- an inputCol: the column used as input during the transformation
- an outputCol: the column created as output of the transformer
- a transform method that takes in input a dataset and produce a new dataset with all the columns of the incoming dataset with (plus) the outputCol

In [21]:
from pyspark.ml.feature import RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

regexTokenizer = RegexTokenizer(inputCol="description_cleaned", outputCol="words", pattern="\\W")

# check our results
countTokens = udf(lambda words: len(words), IntegerType())
tokenized = regexTokenizer.transform(train_cleaned)
tokenized_counts = tokenized.select("description_cleaned", "words").withColumn("tokens", countTokens(col("words")))
display(tokenized_counts.select("*").limit(10))

description_cleaned,words,tokens
Altor : A Star Wars Story Fan FilmThis is a passion project that we have already worked incredibly hard on. We are almost there! We just need that last little push!,"List(altor, a, star, wars, story, fan, filmthis, is, a, passion, project, that, we, have, already, worked, incredibly, hard, on, we, are, almost, there, we, just, need, that, last, little, push)",30
Magical Gamer Charm Set!Magical Girl Gamer Charms/Keychains,"List(magical, gamer, charm, set, magical, girl, gamer, charms, keychains)",9
"CleanTouch washer | A foldable and wireless washing machine.CleanTouch washer is an Eco-friendly foldable and wireless washing machine. Wash socks, t-shirts, underwear, baby clothes and more.","List(cleantouch, washer, a, foldable, and, wireless, washing, machine, cleantouch, washer, is, an, eco, friendly, foldable, and, wireless, washing, machine, wash, socks, t, shirts, underwear, baby, clothes, and, more)",28
FOOT FIST FRANKENSTEIN: A Monster Martial Arts Revenge ComicFoot Fist Frankenstein is a page hardcover Martial Arts story of revenge and cool monster action all in one awesome Graphic Novel,"List(foot, fist, frankenstein, a, monster, martial, arts, revenge, comicfoot, fist, frankenstein, is, a, page, hardcover, martial, arts, story, of, revenge, and, cool, monster, action, all, in, one, awesome, graphic, novel)",30
"Flowery Enamel Pins for Avatar: The Last AirbenderStart off with Prunu Aang! Unlock your favorite characters, and giggle at our three iconic stickers.","List(flowery, enamel, pins, for, avatar, the, last, airbenderstart, off, with, prunu, aang, unlock, your, favorite, characters, and, giggle, at, our, three, iconic, stickers)",23
"Runt: a materials free & rules-lite roleplaying systemA materials free, rules-lite, and genre agnostic tabletop roleplaying game system with an emphasis on storytelling and simple gameplay.","List(runt, a, materials, free, rules, lite, roleplaying, systema, materials, free, rules, lite, and, genre, agnostic, tabletop, roleplaying, game, system, with, an, emphasis, on, storytelling, and, simple, gameplay)",27
INTERDIMENSIONALBringing Classic Anthology Sci-Fi Back to Comics,"List(interdimensionalbringing, classic, anthology, sci, fi, back, to, comics)",8
"NaturalsBAESA natural luxury Skincare/Haircare line created to be unisex, all inclusive, affordable and multi-purposeful.","List(naturalsbaesa, natural, luxury, skincare, haircare, line, created, to, be, unisex, all, inclusive, affordable, and, multi, purposeful)",16
Malas Decisiones: Bilingual editionA game in which you get points by predicting what terrible decisions your friends will make. AVAILABLE IN ENGLISH AND ESPAÑOL!,"List(malas, decisiones, bilingual, editiona, game, in, which, you, get, points, by, predicting, what, terrible, decisions, your, friends, will, make, available, in, english, and, espa, ol)",25
"Incognito Games - Memory Games for Mature MindsA board game, card game, and flash cards to assist memory and mental elasticity particularly, but not exclusively, for ages and up.","List(incognito, games, memory, games, for, mature, mindsa, board, game, card, game, and, flash, cards, to, assist, memory, and, mental, elasticity, particularly, but, not, exclusively, for, ages, and, up)",28


There is a lot of noise. *What do do?*

Let's check the distribution of the words (by plotting the first 20 tokens by frequency)

In [23]:
from pyspark.sql.functions import explode, desc
tokens = tokenized.select(explode(col("words")).alias("word")).groupBy(col("word")).count().orderBy(desc("count"))
display(tokens.select("*").limit(20))

word,count
the,2267
and,1752
a,1633
of,1528
to,1123
in,803
for,801
with,644
your,517
s,477


### StopWordsRemover
We can remove stopwords, *i.e. common terms in documents or language that fill our analysis with noise.*

http://spark.apache.org/docs/latest/ml-features.html#stopwordsremover

StopWordsRemover takes as input a sequence of strings (e.g. the output of a Tokenizer) and drops all the stop words from the input sequences. The list of stopwords is specified by the stopWords parameter. Default stop words for some languages are accessible by calling StopWordsRemover.loadDefaultStopWords(language), for which available options are “danish”, “dutch”, “english”, “finnish”, “french”, “german”, “hungarian”, “italian”, “norwegian”, “portuguese”, “russian”, “spanish”, “swedish” and “turkish”. A boolean parameter caseSensitive indicates if the matches should be case sensitive (false by default).

In [25]:
from pyspark.ml.feature import StopWordsRemover
remover = StopWordsRemover(inputCol="words", outputCol="cleaned", caseSensitive=False)

In [26]:
remover.setStopWords(StopWordsRemover.loadDefaultStopWords("english"))
cleaned = remover.transform(tokenized)
cleaned.show()

Let's check our cleaning performance on the first 20 words

In [28]:
tokens = cleaned.select(explode(col("cleaned")).alias("word")).groupBy(col("word")).count().orderBy(desc("count"))
display(tokens.select("*").limit(20))

word,count
enamel,409
game,384
book,294
pins,246
new,238
d,201
pin,190
world,178
series,166
de,164


We continue to have some noise...

What do do now?

Can we remove too short terms?

How?

In Spark, It's quite easy to define some new function.

http://spark.apache.org/docs/latest/sql-ref-functions-udf-scalar.html

User-Defined Functions (UDFs) are user-programmable routines that act on one row.

In [31]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

# define our custom function to remove too short terms
def filter_by_len(words):
  filtered = [word for word in words if len(word) >= 2]
  return filtered

# register our function as udf
filter_by_len_udf = udf(filter_by_len, ArrayType(StringType()))

In [32]:
filtered = cleaned.withColumn("filtered", filter_by_len_udf(col("cleaned")))
filtered.show()

In [33]:
tokens = filtered.select(explode(col("filtered")).alias("word")).groupBy(col("word")).count().orderBy(desc("count"))
display(tokens.select("*").limit(20))

word,count
enamel,409
game,384
book,294
pins,246
new,238
pin,190
world,178
series,166
de,164
art,163


### Ngrams

An n-gram is a sequence of n tokens for some integer *n*.
The NGram component can be used to transform input features into n-grams.

Spark NGram takes as input a sequence of strings.

We will create ngrams for n=2.

In [35]:
from pyspark.ml.feature import NGram
ngrams2 = NGram(n=2, inputCol="cleaned", outputCol="ngrams_2")
ngrams = ngrams2.transform(filtered)

ngrams.show()

And now we can merge the result in a single column with a new udf.

In [37]:
# union of the results
def union_ngrams(c1,c2):
  return c1 + c2

union_ngrams_udf = udf(union_ngrams, ArrayType(StringType()))

ngrams_final = ngrams.filter("filtered is not Null").withColumn("ngrams", union_ngrams_udf(col("filtered"), col("ngrams_2")))
ngrams_final.show()

### TF-IDF

In [39]:
from pyspark.ml.feature import HashingTF
from pyspark.ml.feature import IDF

hashing_tf = HashingTF(inputCol="ngrams", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")
hash_dataset = hashing_tf.transform(ngrams_final)
idf_model = idf.fit(hash_dataset)
idf_dataset = idf_model.transform(hash_dataset)


### NaiveBayes

Before the train phase we have to convert target label to index. Spark Estimator usually takes in input category as bin (i.e. integer rapresentation of the class).

So we apply:
- IndexToString to encode the category to its index
- StringIndexer to decode the index to the original category

In [41]:
from pyspark.ml.feature import IndexToString, StringIndexer

indexer = StringIndexer(inputCol="target", outputCol="label")
indexer_model = indexer.fit(idf_dataset)
indexed = indexer_model.transform(idf_dataset)
converter = IndexToString(inputCol="prediction", outputCol="prediction_category", labels=indexer_model.labels)

Train the model...

In [43]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

nb = NaiveBayes(smoothing=1.0, modelType="multinomial", labelCol="label", featuresCol="features")
paramGrid = ParamGridBuilder()\
    .addGrid(nb.smoothing, [0.1, 0.5, 1.0]) \
    .build()

tvs = TrainValidationSplit(estimator=nb,
                           estimatorParamMaps=paramGrid,
                           evaluator=MulticlassClassificationEvaluator(),
                           trainRatio=0.8)

In [44]:
nb_model = tvs.fit(indexed)

### Test
Now, we have to apply the same operations to the test set to produce the same input for our ML model.

In [46]:
test.createOrReplaceTempView("test")
test_cleaned = spark.sql("select project_id, title, description, regexp_replace(title || ' ' || description, '[0-9]', ' ') as description_cleaned, category as target from test")
test_tokenized = regexTokenizer.transform(test_cleaned)
test_cleaned = remover.transform(test_tokenized)
test_filtered = test_cleaned.withColumn("filtered", filter_by_len_udf(col("cleaned")))
test_ngrams = ngrams2.transform(test_filtered)
test_ngrams_final = test_ngrams.filter("filtered is not Null").withColumn("ngrams", union_ngrams_udf(col("filtered"), col("ngrams_2")))

test_hash_dataset = hashing_tf.transform(test_ngrams_final)
test_idf_dataset = idf_model.transform(test_hash_dataset)
test_indexed = indexer_model.transform(test_idf_dataset)

Apply our model to the test dataset.

In [48]:
predictions = nb_model.transform(test_indexed).select("title", "description", "label", "prediction", "target")
predictions_decoded = converter.transform(predictions)
display(predictions_decoded.select("*").limit(20))

title,description,label,prediction,target,prediction_category
Kitsune Clan Enamel Pins,Kitsune Clan Hard Enamel Pins based on 10 original character designs inspired by Japanese fox masks called Kitsune masks.,1.0,0.0,Illustration,Product
Jane Austen's Revenge,A deliciously wicked expansion for Jane Austen's Matchmaker: Chapter Two.,2.0,2.0,Games,Games
CounterBlade: Two Worlds,"A game that centers on bringing two worlds apart, then together. The importance of unity and connectivity between the two worlds.",2.0,0.0,Games,Product
Fire and Spirit,"18""x24"" posters and prints",1.0,1.0,Illustration,Illustration
Kokue,A board game thought to improve family farming and agriculture by introducing agroecological measures,2.0,0.0,Games,Product
HYPER MIRROR - 0 Latency Casting Solution For Phone & Laptop,The Plug & Play Wireless Streaming Device. Smooth Mirroring Without App & Wifi for Any Phone & Laptop Contents On Any TVs & Screens.,0.0,0.0,Product,Product
SWIPE,"Finally, a way to keep your phone as clean as your hands.",0.0,0.0,Product,Product
Daisy's Cafe (Pet Cafe & Shelter),Starting a pet cafe and shelter that cultivates human/pet relationships for pet owners and those wanting to foster pets,0.0,1.0,Product,Illustration
Système Formel | Série spéciale : Mathématiques,French mechanical watches,0.0,2.0,Product,Games
brüud. in a bag.™: Fresh Coffee in a Bag,"A new, convenient way to enjoy delicious freshly ground coffee; brüud. in a bag.™ - Fresh coffee on the go. No Mess. No Fuss.",0.0,0.0,Product,Product


Compute classification performance on the test set

In [50]:
evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="weightedPrecision")
precision = evaluator.evaluate(predictions)
print("Test set weighted precision = " + str(precision))


evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="weightedRecall")
recall = evaluator_recall.evaluate(predictions)
print("Test set weighted recall = " + str(recall))


evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="f1")
f1 = evaluator_f1.evaluate(predictions)
print("Test set f1 = " + str(f1))

Check the issues...

What do do to improve our performance?

In [52]:
display(predictions_decoded.filter("label <> prediction").select("*").limit(20))

### Word2Vec
Train our Word2Vec model... the goal is to create a new column features that contain the results of our Word2Vec model for each record.

In [54]:
from pyspark.ml.feature import Word2Vec
word2Vec = Word2Vec(vectorSize=300, minCount=30, inputCol="ngrams", outputCol="features")
model = word2Vec.fit(ngrams_final)

Try our model and check if it works...

In [56]:
model.findSynonyms("game", 10).show(truncate=False)