Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PySpark环境下分词+POS标注会有ValueError的报错 #20

Closed
TianruiZhang opened this issue Dec 24, 2020 · 9 comments
Closed

PySpark环境下分词+POS标注会有ValueError的报错 #20

TianruiZhang opened this issue Dec 24, 2020 · 9 comments

Comments

@TianruiZhang
Copy link

TianruiZhang commented Dec 24, 2020

我在PySpark环境下使用FastHan进行大量微博语料的分词。由于我只想保留具有实意的词语,因此同时根据POS进行了筛选,只保留动词、副词、名词和形容词。小样本测验时,下面的代码能够完全正常执行并产生期待的结果,但是推广到全样本时,分词的步骤不知为何会产生ValueError的报错。

from pyspark.sql.functions import col, pandas_udf
from fastHan import FastHan
import pandas as pd
import regex

model = FastHan(model_type="large")
model.set_cws_style("wtb")

def tokenize(text):
  return [token.word for token in model(text, target="Parsing")[0] if token.pos in ["VA", "VC", "VE", "VV", "NR", "NT", "NN", "AD" "JJ", "FW"] and regex.search(r"\p{Han}", token.word)]
  
def tokenize_func(text: pd.Series) -> pd.Series:
  return text.map(lambda x: tokenize(x))
 
tokenizeText = pandas_udf(tokenize_func, returnType=ArrayType(StringType()))

df = df.withColumn(
  "tokens",
  tokenizeText(col("text"))
)
Job aborted due to stage failure: Task 148 in stage 12.0 failed 4 times, most recent failure: Lost task 148.3 in stage 12.0 (TID 21915, 10.139.64.10, executor 8): org.apache.spark.api.python.PythonException: 'ValueError: 69 is not in list', from <command-613965902721761>, line 6. Full traceback below:
Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 676, in main
    process()
  File "/databricks/spark/python/pyspark/worker.py", line 668, in process
    serializer.dump_stream(out_iter, outfile)
  File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 284, in dump_stream
    timely_flush_timeout_ms=self.timely_flush_timeout_ms)
  File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 97, in dump_stream
    for batch in iterator:
  File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 274, in init_stream_yield_batches
    for series in iterator:
  File "/databricks/spark/python/pyspark/worker.py", line 489, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/databricks/spark/python/pyspark/worker.py", line 489, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/databricks/spark/python/pyspark/worker.py", line 112, in <lambda>
    verify_result_type(f(*a)), len(a[0])), arrow_return_type)
  File "/databricks/spark/python/pyspark/util.py", line 109, in wrapper
    return f(*args, **kwargs)
  File "<command-613965902721761>", line 9, in tokenize_func
  File "/databricks/python/lib/python3.7/site-packages/pandas/core/series.py", line 3630, in map
    new_values = super()._map_values(arg, na_action=na_action)
  File "/databricks/python/lib/python3.7/site-packages/pandas/core/base.py", line 1154, in _map_values
    new_values = map_f(values, mapper)
  File "pandas/_libs/lib.pyx", line 2327, in pandas._libs.lib.map_infer
  File "<command-613965902721761>", line 9, in <lambda>
  File "<command-613965902721761>", line 6, in tokenize
  File "/databricks/python/lib/python3.7/site-packages/fastHan/FastModel.py", line 405, in __call__
    ans.append(self._parsing(head_preds,label_preds,pos_label,sentence[i]))
  File "/databricks/python/lib/python3.7/site-packages/fastHan/FastModel.py", line 294, in _parsing
    head=lengths.index(head)
ValueError: 69 is not in list

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:599)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:99)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:552)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:733)
	at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anon$1.hasNext(InMemoryRelation.scala:132)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1376)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1303)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1367)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1187)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:318)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
	at org.apache.spark.scheduler.Task.run(Task.scala:117)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:662)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:665)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:

我不明白其中在调用FastHan过程中的ValueError: 69 is not in list是如何产生的。之前测试的几次尝试中也出现过相同的报错,只是数字不是69, 是1,是10等等不一样的数字。求大佬解答。

@fdugzc
Copy link
Member

fdugzc commented Dec 24, 2020

你好,可以提供下报错时具体的输入字符串吗?

@TianruiZhang
Copy link
Author

您好,感谢您的快速回复。

经过排查,我找出了一个可以模拟这个错误的例子。

from fastHan import FastHan

model = FastHan(model_type="large")
model.set_cws_style("wtb")

def tokenize(text):
    return [token.word for token in model(text, target="Parsing")[0] if token.pos in ["VA", "VC", "VE", "VV", "NR", "NT", "NN", "AD" "JJ", "FW"] and regex.search(r"\p{Han}", token.word)]

text = "有些爸爸素质很差,不仅不管孩子,当你含辛茹苦时,他经常跳出来打劫!”傅首尔在“奇葩说”里如是说。 这种爸爸的育儿方式,我们称之为“诈尸式育儿”。即“死又没死透,经常跳出来瞎指挥,刷一波存在感。好的地方都像他,坏的地方都怪妈!”“不闻不问之后的说三道四”。 看看下面网友们的吐槽。 这样的“诈尸式育儿”透着独断和偏见,是育儿之路的大忌。 想起阿德勒所言:“每个父亲不应忘记:女性在家庭生活中的作用无法替代。他的任务不是贬抑妻子,而是与她合作。有一点要特别强调:即使他是家里的经济来源,这也是共享的事。他绝不能显得好像自己总在施舍,而别人总在接受一样。在幸福的婚姻中,他挣钱只是家里劳动分工的结果。” ”父亲是一种独特的存在,对培养孩子有一种特别的力量。“格尔迪如是说。父亲是孩子探索世界的安全基地,对父亲形成安全依恋的孩子,自我调节情绪的能力更好,更擅于与同伴交往。而父亲形象的弱化和缺失,不仅会影响孩子对自己的性别认同,还会削弱他们与世界相处的自信心。 缺乏父爱的男孩,一生都在“渴望获得父亲的肯定”以及“避免自己成为像父亲一样的男人”这样的矛盾中拉扯;而缺乏父爱的女孩,一生都在寻找一个像父亲的男人,填补她生命中的残缺。"

该字符串的长度是511是少于512的,在执行tokenize(text)时的报错信息是:

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-40-6596177b0379> in <module>
----> 1 tokenize(text)

<ipython-input-37-9eb8e8d1e022> in tokenize(text)
      5 
      6 def tokenize(text):
----> 7     return [token.word for token in model(text, target="Parsing")[0] if token.pos in ["VA", "VC", "VE", "VV", "NR", "NT", "NN", "AD" "JJ", "FW"] and regex.search(r"\p{Han}", token.word)]

/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/fastHan/FastModel.py in __call__(self, sentence, target, use_dict)
    392 
    393             #输入模型
--> 394             res=self.model.predict(chars,seq_len,task_class)
    395 
    396             #逐句进行解析

/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/fastHan/model/model.py in predict(self, chars, seq_len, task_class, tag_seqs)
    213         mask = chars.ne(0)
    214         layers=self.layers_map[task]
--> 215         feats=self.embed(chars,layers)
    216 
    217         if task=='Parsing':

/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/torch/nn/modules/module.py in _call_impl(self, *input, **kwargs)
    725             result = self._slow_forward(*input, **kwargs)
    726         else:
--> 727             result = self.forward(*input, **kwargs)
    728         for hook in itertools.chain(
    729                 _global_forward_hooks.values(),

/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/fastHan/model/bert.py in forward(self, words, layers)
     96         if outputs is not None:
     97             return self.dropout(outputs)
---> 98         outputs = self.model(words,layers)
     99         outputs = torch.cat([*outputs], dim=-1)
    100 

/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/torch/nn/modules/module.py in _call_impl(self, *input, **kwargs)
    725             result = self._slow_forward(*input, **kwargs)
    726         else:
--> 727             result = self.forward(*input, **kwargs)
    728         for hook in itertools.chain(
    729                 _global_forward_hooks.values(),

/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/fastHan/model/bert.py in forward(self, words, layers)
    334                         self._max_position_embeddings - 2)
    335                 else:
--> 336                     raise RuntimeError(
    337                         "After split words into word pieces, the lengths of word pieces are longer than the "
    338                         f"maximum allowed sequence length:{self._max_position_embeddings} of bert. You can set "

RuntimeError: After split words into word pieces, the lengths of word pieces are longer than the maximum allowed sequence length:512 of bert. You can set `auto_truncate=True` for BertEmbedding to automatically truncate overlong input.

@fdugzc
Copy link
Member

fdugzc commented Dec 25, 2020

这个错误是因为模型在输入BERT的时候要加上[CLS]、[SEP]和fastHan的corpus tag,所以用户需要输入长度小于等于509的。。其实这是另一个报错,还有能够触发“ValueError: xx is not in list”这个报错的字符串吗?

@TianruiZhang
Copy link
Author

我在分词的时候是把句子长度输入设成<=512,我改成509试一下,看看还会不会出现这个ValueError. 谢谢回复!

@TianruiZhang
Copy link
Author

TianruiZhang commented Dec 25, 2020

已经把最大长度设成了<=509,把text的值替换成 "【日值四离 大事勿用】 解除 求医 词讼 和讼 求嗣 祭祀 纳财 栽种 破屋 服药 招赘 纳婿 立券 忌"会报ValueError: 36 is not in list,把方括号去掉以后就不会报错了。我是不是需要做特定预处理才能正常分词?

@TianruiZhang
Copy link
Author

再补充一个例子:text = "1% 而据新修订的 北京市生活垃圾管理条例 个人违反条例的先行由生活垃圾分类管理责任人 如小区物业 垃圾桶值守人员等进行劝阻 对拒不听从劝阻的 城管执法部门给予书面警告 再次违反规定的 处50元以上200元以下罚款 依据规定应当受到处罚的个人 自愿参加生活垃圾分类等社区服务活动的 可不予行政处罚 垃圾分类 小习惯透视大文明 据廊坊市建委的方案要求 严格执行分类收集规范 杜绝混装混运 完善双向监督机制 探索建立 不分类 不收运 的倒逼机制 建设简便易行的分类投放系统 合理设置居住小区 公共机构 商业和办公场所的生活垃圾分类收集容器 箱房 桶站等设施设备 推动生活垃圾定点分类投放 同时抓好示范片区建设 以居民社区为单元 开展生活垃圾分类示范片区建设 落实示范街道主体责任 将垃圾分类工作落实到人 2020年底前市主城区 北三县等率先实现全覆盖 屏幕前的小伙伴们 垃圾分类从我开始 你准备好了吗",这个必须将%号删除才能分词。

@TianruiZhang
Copy link
Author

再补充一个例子: text = "64亿元项目金额占2018年营收的16天邑股份3005046路由器已进入中国电信采购目录且已经实现批量发货海鸥住工002084与工业富联601138签署战略合作框架协议双方就共同打造5工业互联网应用于住宅工业",这个也会报ValueError。

@fdugzc
Copy link
Member

fdugzc commented Dec 25, 2020

好的,我先查着bug。如果你需要POS,不需要依存树的话,可以用“POS”模式,就是model(text,"POS"),这样不会报错。另外,字符“\u3000”建议替换为普通的空格,因为它不在fasthan的词表里,可能会影响性能。

@TianruiZhang
Copy link
Author

64亿元这句话里面base model可以正常跑,large model不行,可能是parsing和base和large模型衔接有问题。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants