<a href="https://colab.research.google.com/github/Verschworer/HSE_ML_final/blob/main/spark_final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
! pip install -q kaggle
from google.colab import files
files.upload()
! mkdir ~/.kaggle
! cp kaggle.json ~/.kaggle/
! chmod 600 ~/.kaggle/kaggle.json
! kaggle competitions download -c whats-cooking  

Saving kaggle.json to kaggle.json
Downloading sample_submission.csv.zip to /content
  0% 0.00/25.8k [00:00<?, ?B/s]
100% 25.8k/25.8k [00:00<00:00, 10.1MB/s]
Downloading test.json.zip to /content
  0% 0.00/426k [00:00<?, ?B/s]
100% 426k/426k [00:00<00:00, 34.3MB/s]
Downloading train.json.zip to /content
  0% 0.00/1.76M [00:00<?, ?B/s]
100% 1.76M/1.76M [00:00<00:00, 57.8MB/s]


### Data

In [2]:
! pip install -q pyspark

[K     |████████████████████████████████| 281.3 MB 31 kB/s 
[K     |████████████████████████████████| 198 kB 61.3 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [44]:
# import libraries

import pyspark
import pyspark.sql.functions as F
from pyspark.sql.types import * #JVM типы данных


from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

from pyspark.ml.feature import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import *

import numpy as np
import pandas as pd

from itertools import chain

import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

In [2]:
# spark up

spark = SparkSession.builder.master(
    "local[5]"
    ).appName(
    "team8final"
    ).config(
        "spark.sql.execution.arrow.pyspark.enabled", 
        "true").getOrCreate()

In [3]:
spark # spark is alive

In [4]:
! unzip '/content/train.json.zip'
! unzip '/content/test.json.zip'
! unzip '/content/sample_submission.csv.zip'

Archive:  /content/train.json.zip
replace train.json? [y]es, [n]o, [A]ll, [N]one, [r]ename: Archive:  /content/test.json.zip
replace test.json? [y]es, [n]o, [A]ll, [N]one, [r]ename: Archive:  /content/sample_submission.csv.zip
replace sample_submission.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
df = spark.read.json('train.json', multiLine=True)

In [6]:
print((df.count(), len(df.columns)))

(39774, 3)


In [7]:
df.show()

+-----------+-----+--------------------+
|    cuisine|   id|         ingredients|
+-----------+-----+--------------------+
|      greek|10259|[romaine lettuce,...|
|southern_us|25693|[plain flour, gro...|
|   filipino|20130|[eggs, pepper, sa...|
|     indian|22213|[water, vegetable...|
|     indian|13162|[black pepper, sh...|
|   jamaican| 6602|[plain flour, sug...|
|    spanish|42779|[olive oil, salt,...|
|    italian| 3735|[sugar, pistachio...|
|    mexican|16903|[olive oil, purpl...|
|    italian|12734|[chopped tomatoes...|
|    italian| 5875|[pimentos, sweet ...|
|    chinese|45887|[low sodium soy s...|
|    italian| 2698|[Italian parsley ...|
|    mexican|41995|[ground cinnamon,...|
|    italian|31908|[fresh parmesan c...|
|     indian|24717|[tumeric, vegetab...|
|    british|34466|[greek yogurt, le...|
|    italian| 1420|[italian seasonin...|
|       thai| 2941|[sugar, hot chili...|
| vietnamese| 8152|[soy sauce, veget...|
+-----------+-----+--------------------+
only showing top

In [8]:
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show() # check the NULL

+-------+---+-----------+
|cuisine| id|ingredients|
+-------+---+-----------+
|      0|  0|          0|
+-------+---+-----------+



In [9]:
# check the NaNs
df_temp = df.withColumn("ingredients", F.concat_ws(', ' ,F.col("ingredients")))
df_temp.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in df_temp.columns]).show()


+-------+---+-----------+
|cuisine| id|ingredients|
+-------+---+-----------+
|      0|  0|          0|
+-------+---+-----------+



In [10]:
len(df.select('cuisine').distinct().rdd.map(lambda r: r[0]).collect())

20

In [11]:
df.groupBy('cuisine').count().orderBy('count', ascending=False).show()

+------------+-----+
|     cuisine|count|
+------------+-----+
|     italian| 7838|
|     mexican| 6438|
| southern_us| 4320|
|      indian| 3003|
|     chinese| 2673|
|      french| 2646|
|cajun_creole| 1546|
|        thai| 1539|
|    japanese| 1423|
|       greek| 1175|
|     spanish|  989|
|      korean|  830|
|  vietnamese|  825|
|    moroccan|  821|
|     british|  804|
|    filipino|  755|
|       irish|  667|
|    jamaican|  526|
|     russian|  489|
|   brazilian|  467|
+------------+-----+



#PREPROCESSING

In [12]:
def lower_case(x):
    res = []
    for x_ in x:
        res.append(x_.lower())
    return res

convert_to_lower = F.udf(lower_case, ArrayType(StringType()))

In [13]:
df = df.withColumn("ingredients", convert_to_lower(F.col("ingredients")))

In [162]:
#stop words
stopwords = []
stopwordsRemover = StopWordsRemover(inputCol="ingredients", 
                                    outputCol="filtered").setStopWords(stopwords)
#vectorizing
# TF-IDF
hashingTF = HashingTF(inputCol="filtered", 
                      outputCol="rawFeatures", 
                      numFeatures=6714)

idf = IDF(inputCol="rawFeatures", 
          outputCol="tfidf_features", 
          minDocFreq=df.count()*0.2)
# count
countVectors = CountVectorizer(inputCol="filtered", 
                               outputCol="count_features", 
                               minDF=0.2, 
                               maxDF=0.8, 
                               vocabSize=6714)
# w2v
word2Vec = Word2Vec(vectorSize=30, 
                    maxIter=100, 
                    minCount=1, 
                    seed=8, 
                    maxSentenceLength=25, 
                    inputCol="filtered", 
                    outputCol="w2v_features")
#labeling
label_stringIdx = StringIndexer(inputCol = "cuisine", 
                                outputCol = "label")

In [134]:
pipeline = Pipeline(stages=[  
                            stopwordsRemover, 
                            hashingTF, 
                            idf,
                            countVectors,
                            #word2Vec, 
                            label_stringIdx
                            ])

pipelinefit = pipeline.fit(df)
next_df = pipelinefit.transform(df)

In [None]:
# ings = data['ingredients']

# def flatten(t):
#     return [item for sublist in t for item in sublist]

# lst = flatten(ings)

# ingrs = set(lst)

In [167]:
label_collect = next_df.select("label").groupBy("label").count().collect()
unique_label = [x["label"] for x in label_collect]
total_label = sum([x["count"] for x in label_collect])
unique_label_count = len(label_collect)
bin_count = [x["count"] for x in label_collect]

class_weights_label = {i: ii for i, ii in zip(unique_label, total_label / (unique_label_count * np.array(bin_count)))}

mapping_expr = F.create_map([F.lit(x) for x in chain(*class_weights_label.items())])

next_df = next_df.withColumn("weight", mapping_expr[F.col("label")])

print("\n".join("{!r}: {!r},".format(k, v) for k, v in sorted(class_weights_label.items())))

0.0: 0.25372544016330695,
1.0: 0.30890027958993477,
2.0: 0.46034722222222224,
3.0: 0.6622377622377622,
4.0: 0.7439955106621773,
5.0: 0.7515873015873016,
6.0: 1.2863518758085382,
7.0: 1.2922027290448344,
8.0: 1.3975404075895994,
9.0: 1.6925106382978723,
10.0: 2.0108190091001013,
11.0: 2.3960240963855424,
12.0: 2.4105454545454545,
13.0: 2.4222898903775882,
14.0: 2.473507462686567,
15.0: 2.634039735099338,
16.0: 2.981559220389805,
17.0: 3.7807984790874523,
18.0: 4.066871165644172,
19.0: 4.258458244111349,


#ML FLOW

https://towardsdatascience.com/multi-class-text-classification-with-pyspark-7d78d022ed35

In [151]:
(trainingData, testData) = next_df.randomSplit([0.7, 0.3], seed = 8)
(trainingData, testData) = next_df.randomSplit([0.7, 0.3], seed = 8)
print("We have %d training examples and %d test examples." % (trainingData.count(), testData.count()))

We have 27961 training examples and 11813 test examples.


In [112]:
lr = LogisticRegression(maxIter=13, 
                        regParam=0.1,
                        family = "multinomial", 
                        elasticNetParam=0, 
                        tol=0.001, 
                        fitIntercept=True, 
                        weightCol="weight")
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)

In [113]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName="accuracy")
evaluator.evaluate(predictions)

0.7375772454076018

In [115]:

for i in np.linspace(0, 1, 11):
    lr = LogisticRegression(maxIter=13, 
                        regParam=0.1,
                        family = "multinomial",
                        elasticNetParam=0, 
                        tol=1E-6, 
                        fitIntercept=True, 
                        weightCol="weight")
    lrModel = lr.fit(trainingData)
    predictions = lrModel.transform(testData)
    print("thresholds:",i,"=>",evaluator.evaluate(predictions))

tol: True => 0.7375772454076018
tol: False => 0.7362228053838991


In [99]:
lrModel.getFamily()

'auto'

In [None]:
rf = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol="indexed", seed=42,

    leafCol="leafId")

In [158]:
lsvc = LinearSVC(maxIter=10, regParam=0.1)
gbt = GBTClassifier(maxIter=5, maxDepth=2)
fm = FMClassifier(factorSize=2)
# lsvcModel = lsvc.fit(trainingData)
# predictions = lsvcModel.transform(testData)

In [159]:
# lr_for_OvR = LogisticRegression(maxIter=40, 
#                         regParam=0.2, 
#                         elasticNetParam=0, 
#                         tol=1E-6, 
#                         fitIntercept=True)

ovr = OneVsRest(classifier=fm, weightCol="weight")

In [160]:
ovrModel = ovr.fit(trainingData)
ovr_predictions = ovrModel.transform(testData)
evaluator.evaluate(ovr_predictions)

0.7027850672987387

In [157]:
nb = NaiveBayes(smoothing=0.2, modelType="multinomial")
nbmodel = nb.fit(trainingData)
nb_predictions = nbmodel.transform(testData)
evaluator.evaluate(nb_predictions)

0.7504444256327775

In [161]:
for i in np.linspace(0.1, 1, 10):
    nb = NaiveBayes(smoothing=round(i, 1), modelType="gaussian")
    nbmodel = nb.fit(trainingData)
    nb_predictions = nbmodel.transform(testData)
    print("smoothing:",i,"=>",evaluator.evaluate(nb_predictions))

smoothing: 0.1 => 0.3754338440700923
smoothing: 0.2 => 0.3754338440700923
smoothing: 0.30000000000000004 => 0.3754338440700923
smoothing: 0.4 => 0.3754338440700923
smoothing: 0.5 => 0.3754338440700923
smoothing: 0.6 => 0.3754338440700923
smoothing: 0.7000000000000001 => 0.3754338440700923
smoothing: 0.8 => 0.3754338440700923
smoothing: 0.9 => 0.3754338440700923
smoothing: 1.0 => 0.3754338440700923


In [121]:
rf = RandomForestClassifier(numTrees=400, maxDepth=10)
rfmodel = rf.fit(trainingData)
rf_predictions = rfmodel.transform(testData)
evaluator.evaluate(rf_predictions)

0.4374841276559722

In [None]:
fm = FMClassifier(factorSize=2)
fmmodel = fm.fit(trainingData)
fm_predictions = fmmodel.transform(testData)
evaluator.evaluate(fm_predictions)

In [None]:
sub_df = spark.read.json('test.json', multiLine=True)
sub_df_temp = sub_df.withColumn("ingredients", F.concat_ws(',' ,F.col("ingredients")))
sub_df_temp = sub_df_temp.withColumn("ingredients", F.lower(F.col("ingredients")))
sub_df_temp = sub_df_temp.withColumn("ingredients", spaceDeleteUDF("ingredients"))

In [None]:
sub_df_temp.show()

+-----+--------------------+
|   id|         ingredients|
+-----+--------------------+
|18009|bakingpowder,eggs...|
|28583|sugar,eggyolks,co...|
|41580|sausagelinks,fenn...|
|29752|meatcuts,filepowd...|
|35687|groundblackpepper...|
|38527|bakingpowder,all-...|
|19666|grapejuice,orange...|
|41217|groundginger,whit...|
|28753|dicedonions,tacos...|
|22659|eggs,cherries,dat...|
|21749|pasta,oliveoil,cr...|
|44967|water,butter,grou...|
|42969|currypowder,groun...|
|44883|pasta,marinarasau...|
|20827|salt,custardpowde...|
|23196|vegetableoilcooki...|
|35387|vanillaicecream,b...|
|33780|molasses,hotsauce...|
|19001|choppedgreenchili...|
|16526|coldwater,chicken...|
+-----+--------------------+
only showing top 20 rows



In [None]:
pipeline_sub = Pipeline(stages=[ 
                            regexTokenizer, 
                            stopwordsRemover, 
                            hashingTF, 
                            idf])#, 
                            #label_stringIdx])

pipelineFit_sub = pipeline_sub.fit(sub_df_temp)
next_df_sub = pipelineFit_sub.transform(sub_df_temp)

In [None]:
lrModel2 = lr.fit(next_df)
predictions = lrModel.transform(next_df_sub)

In [None]:
ovrModel_sub = ovr.fit(next_df)
predictions_sub = ovrModel_sub.transform(next_df_sub)

In [None]:
idx_to_string = IndexToString(
    inputCol="prediction", outputCol="cuisine", labels=pipelineFit.stages[4].labels)


idx_to_string.transform(predictions_sub).show()

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+------------+
|   id|         ingredients|       w_ingredients|            filtered|         rawFeatures|            features|       rawPrediction|prediction|     cuisine|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+------------+
|18009|bakingpowder,eggs...|[bakingpowder, eg...|[bakingpowder, eg...|(6714,[2035,2094,...|(6714,[2035,2094,...|[-3.3923268620506...|      14.0|     british|
|28583|sugar,eggyolks,co...|[sugar, eggyolks,...|[eggyolks, cornst...|(6714,[293,395,20...|(6714,[293,395,20...|[-3.7661119491585...|       2.0| southern_us|
|41580|sausagelinks,fenn...|[sausagelinks, fe...|[sausagelinks, fe...|(6714,[1654,2377,...|(6714,[1654,2377,...|[-2.8994937898562...|       9.0|       greek|
|29752|meatcuts,filepowd...|[meatcuts, filepo...|[me

In [None]:
predictions_sub = idx_to_string.transform(predictions_sub)

In [None]:
spark2sub_lr = predictions.select('id', 'cuisine')

spark2sub_lr.toPandas().to_csv('spark2sub_2_lr_tfidf.csv', index=False)

In [None]:
spark2sub_ovr = predictions_sub.select('id', 'cuisine')

spark2sub_ovr.toPandas().to_csv('spark2sub_3_ovr_tfidf.csv', index=False)

In [None]:
! kaggle competitions submit -c whats-cooking -f spark2sub_3_ovr_tfidf.csv -m "3nd try"

100% 138k/138k [00:01<00:00, 116kB/s]
Successfully submitted to What's Cooking?

1 - 0.69046 on kaggle

2 - 0.70132 on kaggle

3 - 0.71771 on kaggle

In [None]:
# split_udf = F.udf(lambda value: value[1].item(), DoubleType())

In [None]:
# predictions_gbt_2 = gbt_model_sub.transform(subData)

# predictions_gbt_2 = predictions_gbt_2.withColumn('c1', split_udf('probability'))

# spark_gbt_2 = predictions_gbt_2.select('id', 'c1').withColumnRenamed('c1', 'insomnia')

# spark_gbt_2.toPandas().to_csv('spark_gbt_sub2.csv', index=False)