In [10]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window as W
from pyspark.conf import SparkConf
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import pandas as pd
from pyspark.mllib.feature import HashingTF, IDF
import os

In [11]:
conf= pyspark.SparkConf()
conf.setMaster("k8s://https://"+os.environ['KUBERNETES_SERVICE_HOST'])
 
# For hdfs configuration
conf.set("spark.hadoop.fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem")
conf.set("spark.hadoop.fs.hdfs.server", "org.apache.hadoop.hdfs.server.namenode.NameNode")
conf.set("spark.hadoop.conf", "org.apache.hadoop.hdfs.HdfsConfiguration")
conf.set("spark.hadoop.dfs.nameservices", "nb")
conf.set("spark.hadoop.dfs.ha.namenodes.nb", "h1,h2")
conf.set("spark.hadoop.dfs.namenode.rpc-address.nb.h1", "h3001.ali-netbase.com:9820")
conf.set("spark.hadoop.dfs.namenode.rpc-address.nb.h2", "h3101.ali-netbase.com:9820")
conf.set("spark.hadoop.dfs.client.failover.proxy.provider.nb", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")
conf.set("spark.hadoop.fs.defaultFS", "hdfs://nb:9820")
 
# For kubernetes configration
f_name = "balanced_big_data.csv" #"big_data.csv"
executor_cores=16
executor_machine_memory=64
executor_heap_memory=62
executor_instances=32
driver_cores=32
driver_memory=128
 
# request on-demand machines
conf.set("spark.kubernetes.executor.label.eci", "true")
conf.set("spark.kubernetes.executor.annotation.k8s.aliyun.com/eci-use-specs", "{}-{}Gi".format(executor_cores, executor_machine_memory))
  
# machine numbers
conf.set("spark.executor.instances", executor_instances)
conf.set("spark.kubernetes.allocation.batch.size", executor_instances)
 
  
# machine cpu numbers
conf.set("spark.kubernetes.executor.request.cores", executor_cores)
conf.set("spark.kubernetes.executor.limit.cores", executor_cores)
conf.set("spark.executor.memory", "{}g".format(executor_heap_memory))
conf.set("spark.executor.cores", executor_cores)
conf.set("spark.driver.memory", "{}g".format(driver_memory))
conf.set("spark.driver.cores", driver_cores)
# spark version
# conf.set("spark.kubernetes.container.image", "maven-docker.netbase.com/spark-py:v2.4.6")
# conf.set("spark.kubernetes.container.image", "docker-registry.netbase.com/de/base-images/spark-py:1.2.0-spark3.1.1")
conf.set("spark.kubernetes.container.image", "docker-registry.netbase.com/de/base-images/spark-py:1.2.0-spark3.1.1-test")
conf.set("spark.kubernetes.container.image.pullPolicy", "Always")
  
conf.set("spark.kubernetes.namespace", "jupyterhub")
conf.set("spark.driver.host", os.environ['HOSTIP'])
conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
conf.set('spark.submit.deployMode', 'client')
conf.set('spark.kubernetes.pyspark.pythonVersion', "3")
conf.set('spark.driver.maxResultSize', "120g")
# tell spark executor act on behalf to user woot
conf.set('spark.kubernetes.executorEnv.HADOOP_USER_NAME', "woot")
conf.set('spark.executorEnv.HADOOP_USER_NAME', "woot")
 
#conf.set('spark.default.parallelism', '32')
conf.setAppName( os.environ['JUPYTERHUB_CLIENT_ID'])
spark = SparkSession.builder.config(conf=conf).getOrCreate()
   
spark.sparkContext.getConf().getAll()

[('spark.app.startTime', '1646301108603'),
 ('spark.app.id', 'spark-application-1646301109626'),
 ('spark.driver.host', '172.25.18.37'),
 ('spark.executor.instances', '32'),
 ('spark.kubernetes.executor.request.cores', '16'),
 ('spark.kubernetes.authenticate.driver.serviceAccountName', 'spark'),
 ('spark.executor.memory', '62g'),
 ('spark.master', 'k8s://https://10.122.0.1'),
 ('spark.driver.cores', '32'),
 ('spark.kubernetes.container.image',
  'docker-registry.netbase.com/de/base-images/spark-py:1.2.0-spark3.1.1-test'),
 ('spark.hadoop.dfs.namenode.rpc-address.nb.h1', 'h3001.ali-netbase.com:9820'),
 ('spark.kubernetes.executor.label.eci', 'true'),
 ('spark.kubernetes.pyspark.pythonVersion', '3'),
 ('spark.hadoop.fs.hdfs.impl', 'org.apache.hadoop.hdfs.DistributedFileSystem'),
 ('spark.sql.warehouse.dir', 'file:/home/jovyan/megaopus/spark-warehouse'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.executor.cores', '16'),
 ('spark.submit.deployMode', 'client'),
 ('spark.hadoop

In [12]:
path = 'hdfs://nb/ai-pipeline/megaopus_data/{}'.format(f_name)
_df = spark.read.options(header=True, encoding="UTF-8").csv(path)
# _df = _df[_df['Sound Bite Text'].isNotNull()]
# _df = _df.select(F.split(_df['Sound Bite Text'], " ").alias('word_array'))
_df = _df[_df['sentence'].isNotNull()]
_df = _df.select(F.split(_df['sentence'], " ").alias('word_array'))

In [13]:
REPEAT_TIME = 1
if REPEAT_TIME >= 1:
    n_to_array = F.udf(lambda n : list(range(n)), T.ArrayType(T.IntegerType()))
    __df = _df.withColumn('n', F.lit(REPEAT_TIME))
    __df = __df.withColumn('n', F.explode(n_to_array(__df.n)))
else:
    __df = _df.limit(int(_df.count() * REPEAT_TIME))
__df.show()
__df.count()

                                                                                

+--------------------+---+
|          word_array|  n|
+--------------------+---+
|[rj, providing, p...|  0|
|[rj, providing, p...|  1|
|[rj, providing, p...|  2|
|[rj, providing, p...|  3|
|[rj, providing, p...|  4|
|[slovakia, cambri...|  0|
|[slovakia, cambri...|  1|
|[slovakia, cambri...|  2|
|[slovakia, cambri...|  3|
|[slovakia, cambri...|  4|
|[serious, samples...|  0|
|[serious, samples...|  1|
|[serious, samples...|  2|
|[serious, samples...|  3|
|[serious, samples...|  4|
|[blow, meant, dec...|  0|
|[blow, meant, dec...|  1|
|[blow, meant, dec...|  2|
|[blow, meant, dec...|  3|
|[blow, meant, dec...|  4|
+--------------------+---+
only showing top 20 rows



                                                                                

5000000

In [14]:
__df = __df.repartition(50000)
__df.rdd.getNumPartitions()

50000

In [15]:
__df.printSchema()

root
 |-- word_array: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- n: integer (nullable = true)



In [16]:
hashingTF = pyspark.ml.feature.HashingTF(inputCol="word_array", outputCol="features")
hashingTF.setNumFeatures(10000)
tf = hashingTF.transform(__df)
tf.head().features

                                                                                

SparseVector(1000, {97: 1.0, 281: 1.0, 424: 1.0, 804: 1.0, 821: 1.0})

In [17]:
tf.cache()
idf_model = pyspark.ml.feature.IDF().setInputCol("features").setOutputCol("idf").fit(tf)
tf_idf = idf_model.transform(tf)
tf_idf.printSchema()
tf_idf.cache()

                                                                                

root
 |-- word_array: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- n: integer (nullable = true)
 |-- features: vector (nullable = true)
 |-- idf: vector (nullable = true)



DataFrame[word_array: array<string>, n: int, features: vector, idf: vector]

In [None]:
from datetime import datetime
ta = datetime.now()
#kmeans = KMeans(featuresCol='idf', k=15000, initSteps=10).setSeed(1)
kmeans = KMeans(featuresCol='idf', k=15000, initMode='random').setSeed(1)
model = kmeans.fit(tf_idf)
predictions = model.transform(tf_idf)
td = datetime.now()
print('elapsed time:', (td - ta).total_seconds())

22/03/03 10:00:45 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_140_15003 !
22/03/03 10:00:45 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_140_33443 !
22/03/03 10:00:45 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_140_2543 !
22/03/03 10:00:45 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_125_32266 !
22/03/03 10:00:45 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_108_1193 !
22/03/03 10:00:45 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_125_29330 !
22/03/03 10:00:45 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_140_30408 !
22/03/03 10:00:45 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_125_42872 !
22/03/03 10:00:45 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_108_30266 !
22/03/03 10:00:45 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_108_35743 !
22/03/03 10:00:45 WARN

In [None]:
predictions.show()

In [None]:
evaluator = ClusteringEvaluator(predictionCol='prediction', featuresCol='features', \
        metricName='silhouette', distanceMeasure='squaredEuclidean')
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

In [None]:
predictions.groupBy('prediction').\
        count().\
        sort(F.desc('count')).\
        show(truncate=False)

In [None]:
#spark.stop()