# grid_search & cross_validation

In [1]:
from pyspark.sql.functions import *

d1 = spark.read.parquet('/data/user/hive/warehouse/ian/feature/unrecognized/*')
d2 = spark.read.parquet('/data/user/hive/warehouse/ian/feature/recognized/*')
d2 = d2.sample(0.20,seed=123)

df = d1.union(d2)

from pyspark.ml.feature import MinMaxScaler,StandardScaler,VectorAssembler,StringIndexer

vec = VectorAssembler(inputCols=['f1','f2','f3','f4','f5','f6','f7'],outputCol='features')
vec_ = vec.transform(df)

In [2]:
trainDF, testDF = vec_.randomSplit([0.7,0.3],seed=0)

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder
import numpy as np

rf = RandomForestClassifier()

paramGrid_rf = ParamGridBuilder()\
       .addGrid(rf.maxDepth,[3,4,5,6,7,8,9,10])\
       .addGrid(rf.numTrees,[10,15,20,25,30])\
       .build()

crossval_rf = CrossValidator(estimator=rf,
                            estimatorParamMaps=paramGrid_rf,
                            evaluator=BinaryClassificationEvaluator(),
                            numFolds=5)

cvModel_rf = crossval_rf.fit(trainDF)

In [16]:
best_model_rf = cvModel_rf.bestModel

In [17]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
my_eval_rf = BinaryClassificationEvaluator(
    rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
my_eval_rf.evaluate(best_model_rf.transform(testDF))

0.9452258185245365

# Prediction

In [98]:
from pyspark.sql.functions import *
d = spark.read.csv('/user/maxnet/database/sig.db/data_visual_unknown/*',sep='\x01')
d1 = d.select('_c2').withColumnRenamed('_c2','val').distinct().dropna()

d1 = d1.withColumn('f1',length(col('val')))

d1 = d1.withColumn('f2',when(d1.val.startswith('A')|d1.val.startswith('B')|d1.val.startswith('C')\
                             |d1.val.startswith('D')|d1.val.startswith('E')|d1.val.startswith('F')\
                             |d1.val.startswith('G')|d1.val.startswith('H')|d1.val.startswith('I')\
                             |d1.val.startswith('J')|d1.val.startswith('K')|d1.val.startswith('L')\
                             |d1.val.startswith('M')|d1.val.startswith('N')|d1.val.startswith('O')\
                             |d1.val.startswith('P')|d1.val.startswith('Q')|d1.val.startswith('R')\
                             |d1.val.startswith('S')|d1.val.startswith('T')|d1.val.startswith('U')\
                             |d1.val.startswith('V')|d1.val.startswith('W')|d1.val.startswith('X')\
                             |d1.val.startswith('Y')|d1.val.startswith('Z'),1).otherwise(0))

import re

num_regex = re.compile(r'[0-9]') #数字
xiaoxiezimu_regex = re.compile(r'[a-z]')#小写字母
daxiezimu_regex = re.compile(r'[A-Z]') #大写字母 
#hanzi_regex = re.compile(r'[\u4E00-\u9FA5]')#汉字

from pyspark.sql.functions import udf
num = udf(lambda x: len(num_regex.findall(x)))
xiaoxie = udf(lambda x: len(xiaoxiezimu_regex.findall(x)))
daxie = udf(lambda x: len(daxiezimu_regex.findall(x)))

d1 = d1.withColumn('f3',num('val'))
d1 = d1.withColumn('f4',xiaoxie('val'))
d1 = d1.withColumn('f5',daxie('val'))

# 特征字符串长度 f1
# 首字母是否大写 f2
# 数字字符数量   f3
# 小写字母数量   f4
# 大写字母数量   f5
# 特殊符号-_:的数量 f6
# 空格的数量 f7


# 统计下划线个数
def xiahuaxian_count(s):
    xiahuaxian_counts=0
    for c in s:
        xiahuaxian_split_list = c.split('_')
        xiahuaxian_counts += len(xiahuaxian_split_list) - 1
    return xiahuaxian_counts

# 统计中划线个数
def zhonghuaxian_count(s):
    zhonghuaxian_counts=0
    for c in s:
        zhonghuaxian_split_list = c.split('-')
        zhonghuaxian_counts += len(zhonghuaxian_split_list) - 1
    return zhonghuaxian_counts

# 统计冒号个数
def maohao_count(s):
    maohao_counts=0
    for c in s:
        maohao_split_list = c.split(':')
        maohao_counts += len(maohao_split_list) - 1
    return maohao_counts

def teshu_count(s):
    teshu_counts=0
    a_counts=0
    b_counts=0
    c_counts=0
    for c in s:
        a_split_list = c.split('_')
        a_counts += len(a_split_list) - 1
        
        b_split_list = c.split('-')
        b_counts += len(b_split_list) - 1
        
        c_split_list = c.split(':')
        c_counts += len(c_split_list) - 1
        
        teshu_counts = a_counts + b_counts + c_counts
    return teshu_counts
        

# 统计空格个数
def space_count(s):
    space_counts=0
    for c in s:
        space_split_list = c.split(' ')
        space_counts += len(space_split_list) - 1
    return space_counts

teshu = udf(lambda x: teshu_count(x))
kongge = udf(lambda x: space_count(x))


d1 = d1.withColumn('f6',teshu('val'))
d1 = d1.withColumn('f7',kongge('val'))


d1 = d1.select('val',col('f1').cast('float'),col('f2').cast('float'),col('f3').cast('float'),col('f4').cast('float'),col('f5').cast('float'),col('f6').cast('float'),col('f7').cast('float'))

d1.show(truncate=False)

+-------------------------------+----+---+----+----+----+---+---+
|val                            |f1  |f2 |f3  |f4  |f5  |f6 |f7 |
+-------------------------------+----+---+----+----+----+---+---+
|K                              |1.0 |1.0|0.0 |0.0 |1.0 |0.0|0.0|
|T410s-THINK                    |11.0|1.0|3.0 |1.0 |6.0 |1.0|0.0|
|meilongdedageda                |15.0|0.0|0.0 |15.0|0.0 |0.0|0.0|
|liuxing                        |7.0 |0.0|0.0 |7.0 |0.0 |0.0|0.0|
|R98JEKDZAMT768C                |15.0|1.0|5.0 |0.0 |10.0|0.0|0.0|
|OUJJYEDT9K8YMPL                |15.0|1.0|2.0 |0.0 |13.0|0.0|0.0|
|NS-20160403RCFW                |15.0|1.0|8.0 |0.0 |6.0 |1.0|0.0|
|DEEP-2019WYJXED                |15.0|1.0|4.0 |0.0 |10.0|1.0|0.0|
|36CFMRKQVG7H4QF                |15.0|0.0|4.0 |0.0 |11.0|0.0|0.0|
|6A41MXNKIB0CTN9                |15.0|0.0|5.0 |0.0 |10.0|0.0|0.0|
|POS12                          |5.0 |1.0|2.0 |0.0 |3.0 |0.0|0.0|
|uplus-haier-0312-0a6f-v5-sapkz |30.0|0.0|7.0 |18.0|0.0 |5.0|0.0|
|TV0PXAHC9

In [99]:
d1.count()

34231

In [62]:
d1.dtypes

[('val', 'string'),
 ('f1', 'float'),
 ('f2', 'float'),
 ('f3', 'float'),
 ('f4', 'float'),
 ('f5', 'float'),
 ('f6', 'float'),
 ('f7', 'float')]

In [63]:
from pyspark.ml.feature import MinMaxScaler,StandardScaler,VectorAssembler,StringIndexer

unknow = VectorAssembler(inputCols=['f1','f2','f3','f4','f5','f6','f7'],outputCol='features')
unknow = vec.transform(d1)

In [64]:
t = best_model_rf.transform(unknow)

In [65]:
t.dtypes

[('val', 'string'),
 ('f1', 'float'),
 ('f2', 'float'),
 ('f3', 'float'),
 ('f4', 'float'),
 ('f5', 'float'),
 ('f6', 'float'),
 ('f7', 'float'),
 ('features', 'vector'),
 ('rawPrediction', 'vector'),
 ('probability', 'vector'),
 ('prediction', 'double')]

In [66]:
t.select('val','probability','prediction').count(),t.select('val','probability','prediction').distinct().count()

(30929, 30929)

In [44]:
t.select('val','probability','prediction').show(100,truncate=False)

+--------------------------------+------------------------------------------+----------+
|val                             |probability                               |prediction|
+--------------------------------+------------------------------------------+----------+
|IPC71024501                     |[0.9613202281559698,0.03867977184403018]  |0.0       |
|K                               |[0.989824507774071,0.010175492225928862]  |0.0       |
|meilongdedageda                 |[0.9816884278768971,0.01831157212310288]  |0.0       |
|1BC9DXC8R4TD17O                 |[0.9986211200471867,0.0013788799528132328]|0.0       |
|uplus-haier-0601-29a4-v5-sapkz  |[0.985693897542805,0.014306102457195075]  |0.0       |
|082fbf905d19135fb94f8c223a68c9b |[0.9998935417614898,1.0645823851009595E-4]|0.0       |
|008a6b32e8b299670e55081837e4a35 |[0.9998730600349981,1.2693996500194103E-4]|0.0       |
|0585430bf6aad373c9d46166fa4571d |[0.9998672723723357,1.3272762766423148E-4]|0.0       |
|RPVMMTCXP9XGYPL     

In [67]:
from pyspark.sql.types import DoubleType
unlist = udf(lambda x: float(list(x)[1]), DoubleType())

In [68]:
pred = t.select('val',unlist('probability').alias('probability'),'prediction').filter(t.prediction == 1).filter(t.val != 'unknown').filter(t.val != 'empty').filter(t.val != 'NONE').filter(t.val != 'none').filter(t.val != 'N/A').filter(t.val != 'normal').filter(t.val != 'anonymous').filter(t.val != 'null')

In [132]:
#pred.show(truncate=False)

In [70]:
t.select('val',unlist('probability').alias('probability'),'prediction').groupBy('prediction').count().show(truncate=False)

+----------+-----+
|prediction|count|
+----------+-----+
|0.0       |28657|
|1.0       |2272 |
+----------+-----+



In [131]:
# pred.sort('probability',ascending=False).show(truncate=False)

In [72]:
pred.count(),pred.distinct().count()

(2272, 2272)

In [73]:
2272/30929

0.07345856639399916

In [37]:
# pred_hdfs = pred.repartition(1)

#pred_hdfs.write.mode('overwrite').parquet('hdfs:///data/user/hive/warehouse/ian/prediction/p3',compression='gzip')

In [45]:
cvModel_rf.save('hdfs:///data/user/hive/warehouse/ian/model/model5')
# m5 = cvModel_rf.load('hdfs:///data/user/hive/warehouse/ian/model/model5')

In [46]:
m5 = cvModel_rf.load('hdfs:///data/user/hive/warehouse/ian/model/model5')

In [49]:
tt = m5.transform(unknow)

In [51]:
tt.select('val','probability','prediction').count(),tt.select('val','probability','prediction').distinct().count()

(21030, 21030)

In [52]:
total_pred = tt.select('val',unlist('probability').alias('probability'),'prediction').filter(tt.val != 'unknown').filter(tt.val != 'empty').filter(tt.val != 'NONE').filter(tt.val != 'none').filter(tt.val != 'N/A').filter(tt.val != 'normal').filter(tt.val != 'anonymous').filter(tt.val != 'null')

In [53]:
total_pred.count(),total_pred.distinct().count()

(21024, 21024)

In [54]:
total_pred.sample(0.1).show(200,truncate=False)

+--------------------------------+---------------------+----------+
|val                             |probability          |prediction|
+--------------------------------+---------------------+----------+
|ZTE24:58:6e:84:98:38            |0.0015733496289184127|0.0       |
|Hello-World                     |0.09568611070751051  |0.0       |
|001004990060893017128421F181ECB4|1.9467297051424154E-4|0.0       |
|07698ca6a9eef80671081f872c448b1 |1.3436773801395505E-4|0.0       |
|wangfei                         |0.019169871680838727 |0.0       |
|未知设备QQ 33333                |0.020472442379393854 |0.0       |
|D17295911                       |0.012605840987552717 |0.0       |
|8PXEVMUSUAF1568                 |8.514264575244621E-4 |0.0       |
|HiteVision                      |0.04100472700592675  |0.0       |
|ZTE8c:68:c8:b4:14:dc            |0.0032934316877382365|0.0       |
|02e2b671eb0b382620cfa084e76a5d0 |1.0645823851009595E-4|0.0       |
|04ed7811b6dd41de0721d3152cb4f50 |1.0645823851009595

In [57]:
total_pred.filter(total_pred.prediction == '1.0').count()

Py4JJavaError: An error occurred while calling o44407.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7068.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7068.0 (TID 164310, node6, executor 1455): java.io.FileNotFoundException: File does not exist: /user/maxnet/database/sig.db/data_visual_unknown/15426bc451d06d31-e9c1b5c600000001_1587816891_data.0.
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:85)
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:75)
	at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:152)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1909)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:735)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:415)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2675)

It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:129)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:179)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:103)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1363)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2102)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2121)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2146)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2830)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2829)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:2829)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: File does not exist: /user/maxnet/database/sig.db/data_visual_unknown/15426bc451d06d31-e9c1b5c600000001_1587816891_data.0.
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:85)
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:75)
	at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:152)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1909)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:735)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:415)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2675)

It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:129)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:179)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:103)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1363)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [74]:
total_pred.count(),total_pred.distinct().count()

Py4JJavaError: An error occurred while calling o44349.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7118.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7118.0 (TID 166343, node8, executor 1693): java.io.FileNotFoundException: File does not exist: /user/maxnet/database/sig.db/data_visual_unknown/15426bc451d06d31-e9c1b5c600000001_1587816891_data.0.
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:85)
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:75)
	at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:152)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1909)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:735)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:415)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2675)

It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:129)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:179)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:103)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1363)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2102)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2121)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2146)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2830)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2829)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:2829)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: File does not exist: /user/maxnet/database/sig.db/data_visual_unknown/15426bc451d06d31-e9c1b5c600000001_1587816891_data.0.
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:85)
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:75)
	at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:152)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1909)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:735)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:415)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2675)

It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:129)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:179)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:103)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1363)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


# demo

In [None]:
# 模型训练
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)

# 模型预测
prediction = lrModel.transform(testData)

# ROC score
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(prediction)


In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
grid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
             .build())
evaluator = BinaryClassificationEvaluator()
# Create 10-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=grid,
                    evaluator=evaluator,
                    numFolds=10)
cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)
# Evaluate best model
#evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# 构建模型
rf = RandomForestClassifier(numTrees=3, maxDepth=10, maxBins=30, labelCol="label", seed=123)
# 十折交叉验证
grid = (ParamGridBuilder().addGrid(rf.numTrees, [1, 3, 5])
                          .addGrid(rf.maxDepth, [3, 5, 7, 10])
                          .addGrid(rf.maxBins, [20, 30, 40])
                          .build())
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(estimator=rf,
                    evaluator=evaluator,
                    estimatorParamMaps=grid,
                    numFolds=10)
cvModel_rf = cv.fit(trainingData)

# 模型预测 ROC
predictions = cvModel_rf.transform(testData)
evaluator.evaluate(predictions)


# Load model from HDFS

In [100]:
from pyspark.ml.tuning import CrossValidatorModel
m1 = CrossValidatorModel.load('/data/user/hive/warehouse/ian/model/model4')
m2 = CrossValidatorModel.load('/data/user/hive/warehouse/ian/model/model5')

In [None]:
from pyspark.sql.functions import *
d = spark.read.csv('/user/maxnet/database/sig.db/data_visual_unknown/*',sep='\x01')
d1 = d.select('_c2').withColumnRenamed('_c2','val').distinct().dropna()

d1 = d1.withColumn('f1',length(col('val')))

d1 = d1.withColumn('f2',when(d1.val.startswith('A')|d1.val.startswith('B')|d1.val.startswith('C')\
                             |d1.val.startswith('D')|d1.val.startswith('E')|d1.val.startswith('F')\
                             |d1.val.startswith('G')|d1.val.startswith('H')|d1.val.startswith('I')\
                             |d1.val.startswith('J')|d1.val.startswith('K')|d1.val.startswith('L')\
                             |d1.val.startswith('M')|d1.val.startswith('N')|d1.val.startswith('O')\
                             |d1.val.startswith('P')|d1.val.startswith('Q')|d1.val.startswith('R')\
                             |d1.val.startswith('S')|d1.val.startswith('T')|d1.val.startswith('U')\
                             |d1.val.startswith('V')|d1.val.startswith('W')|d1.val.startswith('X')\
                             |d1.val.startswith('Y')|d1.val.startswith('Z'),1).otherwise(0))

import re

num_regex = re.compile(r'[0-9]') #数字
xiaoxiezimu_regex = re.compile(r'[a-z]')#小写字母
daxiezimu_regex = re.compile(r'[A-Z]') #大写字母 
#hanzi_regex = re.compile(r'[\u4E00-\u9FA5]')#汉字

from pyspark.sql.functions import udf
num = udf(lambda x: len(num_regex.findall(x)))
xiaoxie = udf(lambda x: len(xiaoxiezimu_regex.findall(x)))
daxie = udf(lambda x: len(daxiezimu_regex.findall(x)))

d1 = d1.withColumn('f3',num('val'))
d1 = d1.withColumn('f4',xiaoxie('val'))
d1 = d1.withColumn('f5',daxie('val'))

# 特征字符串长度 f1
# 首字母是否大写 f2
# 数字字符数量   f3
# 小写字母数量   f4
# 大写字母数量   f5
# 特殊符号-_:的数量 f6
# 空格的数量 f7


# 统计下划线个数
def xiahuaxian_count(s):
    xiahuaxian_counts=0
    for c in s:
        xiahuaxian_split_list = c.split('_')
        xiahuaxian_counts += len(xiahuaxian_split_list) - 1
    return xiahuaxian_counts

# 统计中划线个数
def zhonghuaxian_count(s):
    zhonghuaxian_counts=0
    for c in s:
        zhonghuaxian_split_list = c.split('-')
        zhonghuaxian_counts += len(zhonghuaxian_split_list) - 1
    return zhonghuaxian_counts

# 统计冒号个数
def maohao_count(s):
    maohao_counts=0
    for c in s:
        maohao_split_list = c.split(':')
        maohao_counts += len(maohao_split_list) - 1
    return maohao_counts

def teshu_count(s):
    teshu_counts=0
    a_counts=0
    b_counts=0
    c_counts=0
    for c in s:
        a_split_list = c.split('_')
        a_counts += len(a_split_list) - 1
        
        b_split_list = c.split('-')
        b_counts += len(b_split_list) - 1
        
        c_split_list = c.split(':')
        c_counts += len(c_split_list) - 1
        
        teshu_counts = a_counts + b_counts + c_counts
    return teshu_counts
        

# 统计空格个数
def space_count(s):
    space_counts=0
    for c in s:
        space_split_list = c.split(' ')
        space_counts += len(space_split_list) - 1
    return space_counts

teshu = udf(lambda x: teshu_count(x))
kongge = udf(lambda x: space_count(x))


d1 = d1.withColumn('f6',teshu('val'))
d1 = d1.withColumn('f7',kongge('val'))


d1 = d1.select('val',col('f1').cast('float'),col('f2').cast('float'),col('f3').cast('float'),col('f4').cast('float'),col('f5').cast('float'),col('f6').cast('float'),col('f7').cast('float'))

In [104]:
from pyspark.ml.feature import MinMaxScaler,StandardScaler,VectorAssembler,StringIndexer

unknow = VectorAssembler(inputCols=['f1','f2','f3','f4','f5','f6','f7'],outputCol='features')
unknow = vec.transform(d1)

In [105]:
t1 = m1.transform(unknow)
t2 = m2.transform(unknow)

In [106]:
from pyspark.sql.types import DoubleType
unlist = udf(lambda x: float(list(x)[1]), DoubleType())

In [107]:
total_1 = t1.select('val',unlist('probability').alias('probability'),'prediction').filter(t1.val != 'unknown').filter(t1.val != 'empty').filter(t1.val != 'NONE').filter(t1.val != 'none').filter(t1.val != 'N/A').filter(t1.val != 'normal').filter(t1.val != 'anonymous').filter(t1.val != 'null')
total_2 = t2.select('val',unlist('probability').alias('probability'),'prediction').filter(t2.val != 'unknown').filter(t2.val != 'empty').filter(t2.val != 'NONE').filter(t2.val != 'none').filter(t2.val != 'N/A').filter(t2.val != 'normal').filter(t2.val != 'anonymous').filter(t2.val != 'null')

In [108]:
total_1.groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|31721|
|       1.0| 2504|
+----------+-----+



In [109]:
total_2.groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|31697|
|       1.0| 2528|
+----------+-----+



In [110]:
total_1.sort('probability',ascending=False).show(500,truncate=False)

+--------------------------------+------------------+----------+
|val                             |probability       |prediction|
+--------------------------------+------------------+----------+
|nova_5-2eb3de21a5cb3411         |0.9989348193771349|1.0       |
|nova_5-e68a4abbd4fd44e2         |0.9989348193771349|1.0       |
|nova_5-8039eb3a4bc7a2d7         |0.9989348193771349|1.0       |
|nova_5-ae38ea98bbe8c30          |0.9989348193771349|1.0       |
|nova_5-4eb95cbb775bbb9f         |0.9989348193771349|1.0       |
|nova_5-2fba8779cb768cbe         |0.9989348193771349|1.0       |
|aud8516p1v2-consys-emmc         |0.9989348193771349|1.0       |
|nova_5-95fd917625dfb9ae         |0.9989348193771349|1.0       |
|nova_5-f62e1e9117dfcdf4         |0.9989348193771349|1.0       |
|nova_5-d104ccc3331af28b         |0.9989348193771349|1.0       |
|nova_5-fd1e4e25d9a965a2         |0.9989348193771349|1.0       |
|nova_5-1a1dc998c112eccd         |0.9989348193771349|1.0       |
|nova_5-13f2216aefdf7f7c 

In [111]:
total_2.sort('probability',ascending=False).show(500,truncate=False)

+--------------------------------+------------------+----------+
|val                             |probability       |prediction|
+--------------------------------+------------------+----------+
|s1000-54bb0ce26ed75ed9          |0.9987503182531763|1.0       |
|Atomu_Honor-1f16c188ac3c3       |0.9984461219620794|1.0       |
|Honor_View10-ed728225de84       |0.9984461219620794|1.0       |
|Inspiron-15-5000-Series         |0.998184573952463 |1.0       |
|OSCA-550-4a029af61c1f34f0       |0.9980854926039804|1.0       |
|KSA-AL00-aaa3f0354a61eab5       |0.9980854926039804|1.0       |
|KSA-AL00-a48304cee2f78f29       |0.9980854926039804|1.0       |
|KSA-AL00-2f769df9c9baf0bb       |0.9980854926039804|1.0       |
|KSA-AL00-fb50e79e5c96754        |0.9980854926039804|1.0       |
|KSA-AL00-e87a341bdbc2fa04       |0.9980854926039804|1.0       |
|KSA-AL00-e454b330bc5a04f4       |0.9980854926039804|1.0       |
|BEE-AL10-27cae0454c3c40cb       |0.9980854926039804|1.0       |
|KSA-AL00-56d7eb1b04f114d

In [112]:
total_1.cache(),total_2.cache()

(DataFrame[val: string, probability: double, prediction: double],
 DataFrame[val: string, probability: double, prediction: double])

In [113]:
#total_2_hdfs = total_2.repartition(1)

#total_2_hdfs.write.mode('overwrite').parquet('hdfs:///data/user/hive/warehouse/ian/p_20190906',compression='gzip')

In [89]:
s = spark.read.csv('/user/maxnet/database/sig.db/term_sig_all',header=True,sep='\x01')

In [None]:
s.filter(s.value.contains('nova')).show(truncate=False)