In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/online-retail-transaction-records/Online Retail.csv


In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=57ea5b24e9ea4c3be6391da7942ba950c5ae961bd371900b0a9ee88d3709fe4a
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [3]:
import pyspark
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, RegexTokenizer
from pyspark.sql import Row, SparkSession

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

LabeledDoc = Row("id", "text", "label")
training = spark.createDataFrame([
    (0, "It’s getting dark.", 1.0),
    (1, "There are many things that confuse me about that", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)], ["id", "text", "label"]
)
training.show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/08 20:01:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+---+--------------------+-----+
| id|                text|label|
+---+--------------------+-----+
|  0|  It’s getting dark.|  1.0|
|  1|There are many th...|  0.0|
|  2|         spark f g h|  1.0|
|  3|    hadoop mapreduce|  0.0|
+---+--------------------+-----+



In [4]:
# Tokenizer
token = Tokenizer(inputCol="text", outputCol="token_tex")
tk = token.transform(training)
tk.show()

+---+--------------------+-----+--------------------+
| id|                text|label|           token_tex|
+---+--------------------+-----+--------------------+
|  0|  It’s getting dark.|  1.0|[it’s, getting, d...|
|  1|There are many th...|  0.0|[there, are, many...|
|  2|         spark f g h|  1.0|    [spark, f, g, h]|
|  3|    hadoop mapreduce|  0.0| [hadoop, mapreduce]|
+---+--------------------+-----+--------------------+



In [5]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DoubleType
count_tokens = udf(lambda words: len(words), IntegerType())

In [6]:
tk.select("text", "token_tex").withColumn("Tokens", count_tokens(col("token_tex"))).show()

[Stage 5:>                                                          (0 + 3) / 3]

+--------------------+--------------------+------+
|                text|           token_tex|Tokens|
+--------------------+--------------------+------+
|  It’s getting dark.|[it’s, getting, d...|     3|
|There are many th...|[there, are, many...|     9|
|         spark f g h|    [spark, f, g, h]|     4|
|    hadoop mapreduce| [hadoop, mapreduce]|     2|
+--------------------+--------------------+------+



                                                                                

In [7]:
# Regex Tokenizer
rg_tok = RegexTokenizer(inputCol = "text", outputCol = "token_text")
rg_tk = rg_tok.transform(training)
rg_tk.show()

+---+--------------------+-----+--------------------+
| id|                text|label|          token_text|
+---+--------------------+-----+--------------------+
|  0|  It’s getting dark.|  1.0|[it’s, getting, d...|
|  1|There are many th...|  0.0|[there, are, many...|
|  2|         spark f g h|  1.0|    [spark, f, g, h]|
|  3|    hadoop mapreduce|  0.0| [hadoop, mapreduce]|
+---+--------------------+-----+--------------------+



In [8]:
rg_tk.select("text", "token_text").withColumn("Tokens", count_tokens(col("token_text"))).show()

+--------------------+--------------------+------+
|                text|          token_text|Tokens|
+--------------------+--------------------+------+
|  It’s getting dark.|[it’s, getting, d...|     3|
|There are many th...|[there, are, many...|     9|
|         spark f g h|    [spark, f, g, h]|     4|
|    hadoop mapreduce| [hadoop, mapreduce]|     2|
+--------------------+--------------------+------+



                                                                                

In [9]:
# ML Pipeline
tokenizer = Tokenizer(inputCol = "text", outputCol = "words")
hashingTF = HashingTF(inputCol = tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages = [tokenizer, hashingTF, lr])

In [10]:
model = pipeline.fit(training)

                                                                                

In [11]:
test = spark.createDataFrame([
    (4, "spark is all you need"),
    (5, "l m n"),
    (6, "mapreduce the spark"),
    (7, "apache hadoop")], ["id", "text"])

In [12]:
prediction = model.transform(test)
selected = prediction.select("id", "text", "prediction")
for row in selected.collect():
    print(row)

Row(id=4, text='spark is all you need', prediction=1.0)
Row(id=5, text='l m n', prediction=0.0)
Row(id=6, text='mapreduce the spark', prediction=0.0)
Row(id=7, text='apache hadoop', prediction=0.0)


                                                                                

Cross Validation

In [17]:
training = spark.createDataFrame([
    (0, "It’s getting dark.", 1.0),
    (1, "There are many things that confuse me about that", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0),
    (4, "b spark who", 1.0),
    (5, "g d a y", 0.0),
    (6, "spark fly", 1.0),
    (7, "was mapreduce", 0.0),
    (8, "e spark program", 1.0),
    (9, "a e c l", 0.0),
    (10, "spark compile", 1.0),
    (11, "hadoop software", 0.0)], ["id", "text", "label"]
)
training.show()

+---+--------------------+-----+
| id|                text|label|
+---+--------------------+-----+
|  0|  It’s getting dark.|  1.0|
|  1|There are many th...|  0.0|
|  2|         spark f g h|  1.0|
|  3|    hadoop mapreduce|  0.0|
|  4|         b spark who|  1.0|
|  5|             g d a y|  0.0|
|  6|           spark fly|  1.0|
|  7|       was mapreduce|  0.0|
|  8|     e spark program|  1.0|
|  9|             a e c l|  0.0|
| 10|       spark compile|  1.0|
| 11|     hadoop software|  0.0|
+---+--------------------+-----+



In [19]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator

tokenizer = Tokenizer(inputCol = "text", outputCol = "words")
hashingTF = HashingTF(inputCol = tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=5)
pipeline = Pipeline(stages = [tokenizer, hashingTF, lr])

grid = ParamGridBuilder().addGrid(hashingTF.numFeatures, [10, 100, 1000]).addGrid(lr.regParam, [0.1, 0.01]).build()

evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator,numFolds=3)

cvModel = cv.fit(training)

Exception ignored in: <function JavaWrapper.__del__ at 0x7b627b4f64d0>          
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/pyspark/ml/wrapper.py", line 53, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'LogisticRegression' object has no attribute '_java_obj'


In [21]:
test = spark.createDataFrame([
    (4, "spark is all you need"),
    (5, "l m n"),
    (6, "mapreduce spark"),
    (7, "apache hadoop")], ["id", "text"])

In [22]:
preds = cvModel.transform(test)
selected = preds.select("id", "text", "prediction")
for row in selected.collect():
    print(row)

Row(id=4, text='spark is all you need', prediction=1.0)
Row(id=5, text='l m n', prediction=0.0)
Row(id=6, text='mapreduce spark', prediction=1.0)
Row(id=7, text='apache hadoop', prediction=0.0)
