# preprocessing

We use serverless dataproc to handle processing here.


In [2]:
spark

In [3]:
bucket = "gs://dsgt-clef-erisk-2024"
test_df = spark.read.parquet(f"{bucket}/task1/parquet/test")
test_df.show()

[Stage 1:>                                                          (0 + 1) / 1]

+--------+--------------------+--------------------+--------------------+---------------+--------+
|   DOCNO|                POST|                 PRE|                TEXT|_corrupt_record|filename|
+--------+--------------------+--------------------+--------------------+---------------+--------+
|   0_0_6|I'm trying to wor...|\n\nAlthough most...|I will not tell m...|           null|s_0.trec|
|456_1_12|You're not like '...|In general though...|Oh, and if you're...|           null|s_1.trec|
| 764_1_5|Maybe it's one of...|My past experienc...|But I still want ...|           null|s_1.trec|
|651_0_28|\n\nWe all run ba...|\n\nSo this woman...|I couldn't even i...|           null|s_1.trec|
| 268_1_3| Both were great,...|\n\nI've only had...|One a couple year...|           null|s_1.trec|
|364_0_12|I started opening...|Even though I too...|Which I can under...|           null|s_1.trec|
|765_0_33|Nowhere on my inv...|I ask about the t...|Words can not exp...|           null|s_1.trec|
|409_0_18|

                                                                                

In [4]:
test_df.where("_corrupt_record <> null").show()

+-----+----+---+----+---------------+--------+
|DOCNO|POST|PRE|TEXT|_corrupt_record|filename|
+-----+----+---+----+---------------+--------+
+-----+----+---+----+---------------+--------+



In [5]:
test_df.groupby("filename").count().show()



+----------+-----+
|  filename|count|
+----------+-----+
|s_153.trec|27966|
|s_196.trec|26988|
|s_194.trec|28570|
|s_265.trec|26173|
|s_165.trec|28915|
|s_272.trec|29164|
| s_15.trec|29058|
|s_220.trec|27630|
|s_118.trec|27238|
|s_133.trec|26989|
|s_111.trec|26887|
|s_157.trec|26094|
|s_277.trec|26536|
|s_180.trec|26234|
|s_193.trec|28355|
|s_115.trec|30587|
|s_158.trec|27811|
|s_268.trec|28074|
|  s_1.trec|27087|
|s_280.trec|27301|
+----------+-----+
only showing top 20 rows



                                                                                

In [6]:
test_df.count()

                                                                                

15542200

In [7]:
# lets also load up the train df
train_df = spark.read.parquet(f"{bucket}/task1/parquet/train")
train_df.show()
train_df.count()

                                                                                

+--------+--------------------+--------+
|   DOCNO|                TEXT|filename|
+--------+--------------------+--------+
| s_0_0_0|    1.ye katiliyorum|s_0.trec|
| s_0_1_0|ok haklsn abi gol...|s_0.trec|
| s_0_2_0| almanca yarrak gibi|s_0.trec|
| s_0_3_0|hani u oyunlarn e...|s_0.trec|
| s_0_3_1|dead cellste ygda...|s_0.trec|
| s_0_3_2|bunlarn bir dili ...|s_0.trec|
| s_0_4_0|lnce diriltiyor s...|s_0.trec|
| s_0_6_0|       ziya gzel sal|s_0.trec|
| s_0_7_0|  artk dedem deilsin|s_0.trec|
| s_0_8_0|sorma bizim matem...|s_0.trec|
| s_0_9_0|240 Volt FUCKMAST...|s_0.trec|
|s_0_10_0|bunlar nerden evi...|s_0.trec|
|s_0_11_0|beynine gidecek k...|s_0.trec|
|s_0_12_0|semeyen vizyonsuz...|s_0.trec|
|s_0_13_0|       ok haklsn abi|s_0.trec|
|s_0_14_0|ilkokul zamanlari...|s_0.trec|
|s_0_15_0|iliki kurmakta zo...|s_0.trec|
|s_0_15_1|liseye giden bir ...|s_0.trec|
|s_0_15_2|tipimin ve kiilii...|s_0.trec|
|s_0_15_3|ben insanlarla ko...|s_0.trec|
+--------+--------------------+--------+
only showing top

4264693

In [8]:
from pyspark.sql import functions as F

# let's apply tf-idf to the text column, and also include word2vec
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, Word2Vec
from pyspark.ml import Pipeline

hashing_features = 256
word2vec_features = 256
tokenizer = Tokenizer(inputCol="TEXT", outputCol="words")
hashingTF = HashingTF(
    inputCol=tokenizer.getOutputCol(),
    outputCol="hashingtf",
    numFeatures=hashing_features,
)
idf = IDF(inputCol=hashingTF.getOutputCol(), outputCol="tfidf")
# word2vec = Word2Vec(
#     vectorSize=word2vec_features,
#     minCount=0,
#     inputCol=tokenizer.getOutputCol(),
#     outputCol="word2vec",
# )
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf])

pipeline_model = pipeline.fit(train_df)
pipeline_model.transform(train_df).show()



+--------+--------------------+--------+--------------------+--------------------+--------------------+
|   DOCNO|                TEXT|filename|               words|           hashingtf|               tfidf|
+--------+--------------------+--------+--------------------+--------------------+--------------------+
| s_0_0_0|    1.ye katiliyorum|s_0.trec| [1.ye, katiliyorum]|(256,[18,181],[1....|(256,[18,181],[3....|
| s_0_1_0|ok haklsn abi gol...|s_0.trec|[ok, haklsn, abi,...|(256,[53,77,118,1...|(256,[53,77,118,1...|
| s_0_2_0| almanca yarrak gibi|s_0.trec|[almanca, yarrak,...|(256,[78,108,148]...|(256,[78,108,148]...|
| s_0_3_0|hani u oyunlarn e...|s_0.trec|[hani, u, oyunlar...|(256,[41,47,50,71...|(256,[41,47,50,71...|
| s_0_3_1|dead cellste ygda...|s_0.trec|[dead, cellste, y...|(256,[2,11,47,53,...|(256,[2,11,47,53,...|
| s_0_3_2|bunlarn bir dili ...|s_0.trec|[bunlarn, bir, di...|(256,[8,26,44,83,...|(256,[8,26,44,83,...|
| s_0_4_0|lnce diriltiyor s...|s_0.trec|[lnce, diriltiyor...|(25

                                                                                

In [24]:
# generate a new dataframe with features from both train and test

total_df = (
    train_df.select(
        "DOCNO",
        "TEXT",
        "filename",
        F.lit("train").alias("dataset"),
    )
    .union(
        test_df.select(
            "DOCNO",
            F.concat(
                F.coalesce(F.col("PRE"), F.lit("")),
                F.coalesce(F.col("TEXT"), F.lit("")),
                F.coalesce(F.col("POST"), F.lit("")),
            ).alias("TEXT"),
            "filename",
            F.lit("test").alias("dataset"),
        ),
    )
    .where("filename is not null")
    .where("TEXT is not null")
)
total_df.show()
total_df.count()

                                                                                

+--------+--------------------+--------+-------+
|   DOCNO|                TEXT|filename|dataset|
+--------+--------------------+--------+-------+
| s_0_0_0|    1.ye katiliyorum|s_0.trec|  train|
| s_0_1_0|ok haklsn abi gol...|s_0.trec|  train|
| s_0_2_0| almanca yarrak gibi|s_0.trec|  train|
| s_0_3_0|hani u oyunlarn e...|s_0.trec|  train|
| s_0_3_1|dead cellste ygda...|s_0.trec|  train|
| s_0_3_2|bunlarn bir dili ...|s_0.trec|  train|
| s_0_4_0|lnce diriltiyor s...|s_0.trec|  train|
| s_0_6_0|       ziya gzel sal|s_0.trec|  train|
| s_0_7_0|  artk dedem deilsin|s_0.trec|  train|
| s_0_8_0|sorma bizim matem...|s_0.trec|  train|
| s_0_9_0|240 Volt FUCKMAST...|s_0.trec|  train|
|s_0_10_0|bunlar nerden evi...|s_0.trec|  train|
|s_0_11_0|beynine gidecek k...|s_0.trec|  train|
|s_0_12_0|semeyen vizyonsuz...|s_0.trec|  train|
|s_0_13_0|       ok haklsn abi|s_0.trec|  train|
|s_0_14_0|ilkokul zamanlari...|s_0.trec|  train|
|s_0_15_0|iliki kurmakta zo...|s_0.trec|  train|
|s_0_15_1|liseye gid

                                                                                

19806893

In [26]:
from pyspark.ml.functions import vector_to_array

pipeline_model = pipeline.fit(total_df)
res_df = (
    pipeline_model.transform(total_df)
    .withColumn("hashingtf", vector_to_array(F.col("hashingtf")))
    .withColumn("tfidf", vector_to_array(F.col("tfidf")))
)
res_df.printSchema()

# save both the pipeline and the total_df
pipeline_model.write().overwrite().save(f"{bucket}/task1/models/pipeline_tfidf")
total_df.write.mode("overwrite").parquet(f"{bucket}/task1/parquet/combined_tfidf")

                                                                                

root
 |-- DOCNO: string (nullable = true)
 |-- TEXT: string (nullable = true)
 |-- filename: string (nullable = true)
 |-- dataset: string (nullable = false)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- hashingtf: array (nullable = false)
 |    |-- element: double (containsNull = false)
 |-- tfidf: array (nullable = false)
 |    |-- element: double (containsNull = false)



                                                                                

: 

: 

In [None]:
! gcloud storage du --readable-sizes {bucket}/task1/parquet/combined_tfidf

0B           gs://dsgt-clef-erisk-2024/task1/parquet/combined_tfidf/
0B           gs://dsgt-clef-erisk-2024/task1/parquet/combined_tfidf/_SUCCESS
2.14kiB      gs://dsgt-clef-erisk-2024/task1/parquet/combined_tfidf/part-00000-787e6764-55b3-4a6b-863f-4e8445368c5d-c000.snappy.parquet
392.45MiB    gs://dsgt-clef-erisk-2024/task1/parquet/combined_tfidf/part-00002-787e6764-55b3-4a6b-863f-4e8445368c5d-c000.snappy.parquet
251.01MiB    gs://dsgt-clef-erisk-2024/task1/parquet/combined_tfidf/part-00006-787e6764-55b3-4a6b-863f-4e8445368c5d-c000.snappy.parquet
368.46MiB    gs://dsgt-clef-erisk-2024/task1/parquet/combined_tfidf/part-00008-787e6764-55b3-4a6b-863f-4e8445368c5d-c000.snappy.parquet
367.25MiB    gs://dsgt-clef-erisk-2024/task1/parquet/combined_tfidf/part-00009-787e6764-55b3-4a6b-863f-4e8445368c5d-c000.snappy.parquet
368.30MiB    gs://dsgt-clef-erisk-2024/task1/parquet/combined_tfidf/part-00010-787e6764-55b3-4a6b-863f-4e8445368c5d-c000.snappy.parquet
367.48MiB    gs://dsgt-clef-erisk-2024

In [None]:
total_df = spark.read.parquet(f"{bucket}/task1/parquet/combined_tfidf")
total_df.show()

24/03/17 16:09:39 WARN TaskSetManager: Lost task 0.0 in stage 44.0 (TID 238) (10.128.15.220 executor 1): java.lang.IllegalArgumentException: requirement failed: Index 0 follows 0 and is not strictly increasing
	at scala.Predef$.require(Predef.scala:337)
	at org.apache.spark.ml.linalg.SparseVector.$anonfun$new$5(Vectors.scala:629)
	at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18)
	at scala.collection.ArrayOps$.foreach$extension(ArrayOps.scala:1329)
	at org.apache.spark.ml.linalg.SparseVector.<init>(Vectors.scala:628)
	at org.apache.spark.ml.linalg.VectorUDT.deserialize(VectorUDT.scala:64)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$ge

Py4JJavaError: An error occurred while calling o580.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 44.0 failed 4 times, most recent failure: Lost task 0.3 in stage 44.0 (TID 241) (10.128.15.221 executor 0): java.lang.IllegalArgumentException: requirement failed: Index 0 follows 0 and is not strictly increasing
	at scala.Predef$.require(Predef.scala:337)
	at org.apache.spark.ml.linalg.SparseVector.$anonfun$new$5(Vectors.scala:629)
	at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18)
	at scala.collection.ArrayOps$.foreach$extension(ArrayOps.scala:1329)
	at org.apache.spark.ml.linalg.SparseVector.<init>(Vectors.scala:628)
	at org.apache.spark.ml.linalg.VectorUDT.deserialize(VectorUDT.scala:64)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.immutable.List.foreach(List.scala:333)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2277)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2298)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2317)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4177)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3161)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4167)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:528)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4165)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:284)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:323)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.IllegalArgumentException: requirement failed: Index 0 follows 0 and is not strictly increasing
	at scala.Predef$.require(Predef.scala:337)
	at org.apache.spark.ml.linalg.SparseVector.$anonfun$new$5(Vectors.scala:629)
	at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18)
	at scala.collection.ArrayOps$.foreach$extension(ArrayOps.scala:1329)
	at org.apache.spark.ml.linalg.SparseVector.<init>(Vectors.scala:628)
	at org.apache.spark.ml.linalg.VectorUDT.deserialize(VectorUDT.scala:64)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
