# spark

In [1]:
from platform import python_version
print(python_version())

3.6.10


In [2]:
NUM_EXECUTORS = 32
EXECUTOR_CORES = 4
SPARK_NAME = 'chat_bot'

In [3]:
import os
import sys
#os.environ["PYSPARK_SUBMIT_ARGS"]='--num-executors 30 --master yarn --name claims_1 pyspark-shell'


os.environ["PYSPARK_SUBMIT_ARGS"] =  """--master yarn \
                                        --deploy-mode client \
                                        --num-executors {NUM_EXECUTORS} \
                                        --executor-cores {EXECUTOR_CORES} \
                                        --executor-memory 16G \
                                        --driver-memory 4G \
                                        --verbose \
                                        --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./env/bin/python3.6 \
                                        --name {SPARK_NAME} pyspark-shell
                                        """.format(NUM_EXECUTORS = NUM_EXECUTORS, 
                                                   EXECUTOR_CORES = EXECUTOR_CORES, 
                                                   SPARK_NAME = SPARK_NAME)

PYTHON = 'python3.6'

#os.environ["PYSPARK_DRIVER_PYTHON"]='jupyter'
#os.environ["PYSPARK_DRIVER_PYTHON_OPTS"]='notebook'
os.environ["PYSPARK_PYTHON"] = PYTHON
os.environ["SPARK_YARN_USER_ENV"] = PYTHON

os.environ["SPARK_HOME"]='/var/local/spark-2.3.2'

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))

exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.2
      /_/

Using Python version 3.6.10 (default, Mar 25 2020 23:51:54)
SparkSession available as 'spark'.


In [4]:
spark

In [5]:
spark.sparkContext.addPyFile('/home/pkrylov/packages/numpy-master.zip')

In [6]:
#spark.stop()

In [7]:
import time
import logging

logging.basicConfig(stream=sys.stdout, level=logging.INFO)

# y_concat

In [8]:
%%time
Y = spark.read.parquet("y_concat_inqr_id.parquet.snappy")#.select("label")
Y.show(5, vertical=False)

+-----+---------+
|LABEL|  INQR_ID|
+-----+---------+
|    6|252415512|
|    0|252975775|
|    6|253119730|
|    0|253510904|
|    5|253540677|
+-----+---------+
only showing top 5 rows

CPU times: user 3.56 ms, sys: 1.32 ms, total: 4.87 ms
Wall time: 16.3 s


In [9]:
Y.rdd.getNumPartitions()

39

In [10]:
%%time
Y = Y.repartition(250)
print(Y.rdd.getNumPartitions())

250
CPU times: user 0 ns, sys: 1.71 ms, total: 1.71 ms
Wall time: 42.7 ms


In [29]:
Y.printSchema()

root
 |-- LABEL: long (nullable = true)
 |-- INQR_ID: integer (nullable = true)



# x_coo

In [11]:
latency = '1hour'
deep = 45

# x_all_45_1hour.parquet.snappy

In [12]:
import logging
import sys
import time

import numpy as np
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import *
from pyspark.sql.types import *

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#from pyspark.mllib.evaluation import MulticlassMetrics

logging.basicConfig(stream=sys.stdout, level=logging.INFO)

In [13]:
%%time
# x_all_45_1hour.parquet.snappy
result_list = []

logging.info(f"Start training model with deep {deep} latency {latency}")    
#_create_train_test(latency)
#_get_metrics()

t0 =  time.time()

file_name = f"x_all_{deep}_{latency}.parquet.snappy"
x_coo = spark.read.parquet(file_name)
x_coo = x_coo.repartition(250)
x_coo.show(10)

INFO:root:Start training model with deep 45 latency 1hour
+--------+----+------------------+
|     row| col|              data|
+--------+----+------------------+
| 9640570|1985|15.454299926757812|
|23706962|1985|22.649999618530273|
|24905374|2047| 2.562700033187866|
|15668708|2058|0.7383000254631042|
| 3453445|1985|21.549400329589844|
|27820706|1952|14.759099960327148|
|18200664|1955| 37.42190170288086|
|22300383|1985| 42.88240051269531|
|15851837|2066| 38.49190139770508|
|28696207|2049|0.4814999997615814|
+--------+----+------------------+
only showing top 10 rows

CPU times: user 14.6 ms, sys: 15.3 ms, total: 29.9 ms
Wall time: 3min


In [16]:
x_coo

DataFrame[row: int, col: int, data: double]

# coorRDD

In [17]:
coorRDD = x_coo.rdd.map(lambda x: MatrixEntry(x[0], x[1], x[2])) 
coorMatrix = CoordinateMatrix(coorRDD)

t1 =  time.time()
logging.info("Create coorMatrix  {:.3f} s".format(t1 - t0)) 

INFO:root:Create coorMatrix  498.835 s


In [18]:
coorMatrix

<pyspark.mllib.linalg.distributed.CoordinateMatrix at 0x7fe7db188a58>

# DataFrame

In [20]:
%%time

df = coorMatrix.toRowMatrix().rows.map(lambda x: (x, )).toDF()
t2 =  time.time()
logging.info("Create df from coorMatrix  {:.3f} s".format(t2 - t1)) 

INFO:root:Create df from coorMatrix  908.545 s
CPU times: user 83.9 ms, sys: 64.8 ms, total: 149 ms
Wall time: 14min 57s


In [21]:
df

DataFrame[_1: vector]

In [22]:
df.show(10)

+--------------------+
|                  _1|
+--------------------+
|(3216,[170,172,17...|
|(3216,[170,175,18...|
|(3216,[170,173,17...|
|(3216,[1229,1241,...|
|(3216,[170,172,17...|
|(3216,[2,5,8,11,1...|
|(3216,[172,175,18...|
|(3216,[170,172,18...|
|(3216,[170,175,18...|
|(3216,[183,184,21...|
+--------------------+
only showing top 10 rows



In [None]:
df.printSchema()

#root
# |-- features: vector (nullable = true)

# preprocessing df

In [23]:
get_element=udf(lambda v: int(v[-1]), IntegerType())

df = df.withColumn('INQR_ID', get_element('_1'))

df = df.join(Y, on = ["INQR_ID"]).drop("INQR_ID")
logging.info("Create join Y with df") 

as_ml = udf(lambda v: v.asML() if v is not None else None, VectorUDT())

df = df.withColumn("features", as_ml("_1"))\
       .drop("_1") \
       .withColumnRenamed("LABEL", "label")

t3 =  time.time()
logging.info("Finished preprocessing df  {:.3f} s".format(t3 - t2)) 

INFO:root:Create join Y with df
INFO:root:Finished preprocessing df  156.425 s


# train

In [26]:
%%time

(train, test) = df.randomSplit([0.85,0.15], seed=25)
train = train.repartition(250)
logging.info("Create test/train")

rf = RandomForestClassifier(labelCol='label', 
                            featuresCol='features',
                            maxDepth=5,
                            maxBins=32,
                            numTrees=25)
model = rf.fit(train)

t4 =  time.time()
logging.info("Create train model {:.3f} s.".format(t4 - t3))

INFO:root:Create test/train
INFO:root:Create train model 823.373 s.
CPU times: user 201 ms, sys: 177 ms, total: 378 ms
Wall time: 12min 40s


# predictions

In [31]:
predictions = model.transform(test) 
t5 =  time.time()
logging.info("Create predictions  {:.3f} s.".format(t5 - t4))

INFO:root:Create predictions  285.568 s.


# metrics

In [33]:
%%time
evaluator = MulticlassClassificationEvaluator(
                   labelCol='label', 
                   predictionCol='prediction', 
                   metricName="f1")

metric_f1 = evaluator.evaluate(predictions)
print("Test set f1 = %g" % metric_f1)

evaluator.setMetricName("accuracy")
metric_accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = %g" % metric_accuracy)

t6 =  time.time()           
logging.info("Calculate metrics {:.3f} s.".format(t6 - t5)) 
        
result_list.append([deep, latency, metric_f1, metric_accuracy])       

Test set f1 = 0.12027
Test set accuracy = 0.249307
INFO:root:Calculate metrics 864.056 s. with latency 1hour
CPU times: user 134 ms, sys: 126 ms, total: 260 ms
Wall time: 14min 2s


In [34]:
result_list

[[45, '1hour', 0.12027046178176957, 0.24930721489322002]]

# busines metrics

In [35]:
%%time
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import ArrayType, IntegerType

def get_score(x):
    len_topn = 5
    return  [int(i) for x_, i in sorted(zip(x, range(len(x))), reverse = True)][:len_topn]    

get_score_udf = udf(lambda x: get_score(x), ArrayType(IntegerType()))
get_score_udf2 = udf(lambda x: len({x[0]}.intersection(set(x[1]))) / 5, FloatType())

predictions = predictions.select('label', get_score_udf('probability').alias("top5_predict"))

predictions = predictions.withColumn('score_model', get_score_udf2(struct('label', 'top5_predict')))\
                         .withColumn('score1_model', (col("score_model") > 0).cast("int"))

CPU times: user 5.81 ms, sys: 639 µs, total: 6.44 ms
Wall time: 176 ms


In [42]:
%%time
predictions.show(truncate=False)

+-----+-------------------+-----------+------------+
|label|top5_predict       |score_model|score1_model|
+-----+-------------------+-----------+------------+
|0    |[21, 7, 41, 19, 5] |0.0        |0           |
|0    |[21, 7, 41, 19, 26]|0.0        |0           |
|0    |[21, 7, 41, 36, 5] |0.0        |0           |
|0    |[36, 7, 21, 41, 26]|0.0        |0           |
|0    |[21, 5, 7, 41, 36] |0.0        |0           |
|0    |[21, 7, 41, 36, 19]|0.0        |0           |
|0    |[36, 7, 41, 21, 35]|0.0        |0           |
|0    |[21, 7, 41, 36, 19]|0.0        |0           |
|0    |[21, 7, 41, 5, 36] |0.0        |0           |
|0    |[36, 41, 21, 7, 26]|0.0        |0           |
|0    |[21, 7, 36, 41, 19]|0.0        |0           |
|0    |[21, 7, 41, 5, 19] |0.0        |0           |
|0    |[21, 7, 41, 5, 19] |0.0        |0           |
|0    |[21, 7, 41, 19, 5] |0.0        |0           |
|0    |[21, 7, 41, 18, 36]|0.0        |0           |
|0    |[21, 7, 41, 5, 19] |0.0        |0      

In [37]:
%%time
length_test = test.count()
length_test

CPU times: user 41 ms, sys: 39.5 ms, total: 80.5 ms
Wall time: 4min 37s


In [38]:
%%time
metrics = predictions.agg({"score_model": "sum", "score1_model": "sum"}).collect()

CPU times: user 38.7 ms, sys: 44.7 ms, total: 83.5 ms
Wall time: 4min 45s


In [40]:
score_model = metrics[0][0] / length_test
score_model

0.608565501869997

In [41]:
score1_model = metrics[0][1] / length_test
score1_model

0.12171310218766593

# full short code 

In [13]:
# 'wl - with out latency
latency_dict = {'wl':  -1.0,
                '15min': 0.010416667,
                '30min': 0.020833333,
                '1hour': 0.041666667,               
                '3hour': 0.125,
                '6hour': 0.25,
                '12hour': 0.5,
                '24hour': 1}

#deep_list = [14,45] 
deep_list = [14] 
# глубина данных

In [18]:
import logging
import sys
import time

import numpy as np
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.functions import udf, struct
import pyspark.sql.functions as f
from pyspark.sql.functions import col
from pyspark.sql.types import ArrayType, IntegerType
from pyspark.sql.types import *

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#from pyspark.mllib.evaluation import MulticlassMetrics

In [44]:
# x_all_45_1hour.parquet.snappy
latency = '1hour'
deep = 45  

logging.info(f"Start training model with deep {deep} latency {latency}")   
t0 =  time.time()

file_name = f"x_all_{deep}_{latency}.parquet.snappy"
x_coo = spark.read.parquet(file_name)
x_coo = x_coo.repartition(250)

coorRDD = x_coo.rdd.map(lambda x: MatrixEntry(x[0], x[1], x[2])) 
coorMatrix = CoordinateMatrix(coorRDD)
t1 =  time.time()
logging.info("Create coorMatrix  {:.3f} s".format(t1 - t0)) 

df = coorMatrix.toRowMatrix().rows.map(lambda x: (x, )).toDF()
t2 =  time.time()
logging.info("Create df from coorMatrix  {:.3f} s".format(t2 - t1)) 

get_element=udf(lambda v: int(v[-1]), IntegerType())
df = df.withColumn('INQR_ID', get_element('_1'))

df = df.join(Y, on = ["INQR_ID"]).drop("INQR_ID")
logging.info("Create join Y with df") 

as_ml = udf(lambda v: v.asML() if v is not None else None, VectorUDT())
df = df.withColumn("features", as_ml("_1"))\
       .drop("_1") \
       .withColumnRenamed("LABEL", "label")

t3 =  time.time()
logging.info("Finished preprocessing df  {:.3f} s".format(t3 - t2)) 

(train, test) = df.randomSplit([0.85,0.15], seed=25)
train = train.repartition(250)
logging.info("Create test/train")

t4 =  time.time()           
logging.info("Train {:.3f} s. with deep {} latency {}".format(t1 - t0, deep, latency)) 

rf = RandomForestClassifier(labelCol='label', 
                                    featuresCol='features',
                                    maxDepth=5,
                                    maxBins=32,
                                    numTrees=25)
model = rf.fit(train)
t4 =  time.time()
logging.info("Create train model {:.3f} s.".format(t4 - t3))

predictions = model.transform(test)  
t5 =  time.time()
logging.info("Create predictions  {:.3f} s.".format(t5 - t4))

evaluator = MulticlassClassificationEvaluator(
                   labelCol='label', 
                   predictionCol='prediction', 
                   metricName="f1")

metric_f1 = evaluator.evaluate(predictions)
print("Test set f1 = %g" % metric_f1)

evaluator.setMetricName("accuracy")
metric_accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = %g" % metric_accuracy)

t6 =  time.time()           
logging.info("Calculate metrics {:.3f} s.".format(t6 - t5)) 

get_score_udf = udf(lambda x: get_score(x), ArrayType(IntegerType()))
get_score_udf2 = udf(lambda x: len({x[0]}.intersection(set(x[1]))) / 5, FloatType())

predictions = predictions.select('label', get_score_udf('probability').alias("top5_predict"))

predictions = predictions.withColumn('score_model', get_score_udf2(struct('label', 'top5_predict')))\
                         .withColumn('score1_model', (col("score_model") > 0).cast("int"))

metrics = predictions.agg({"score_model": "sum", "score1_model": "sum"}).collect()
length_test = test.count()
score_model = metrics[0][0] / length_test
score1_model = metrics[0][1] / length_test  
print(f"Test busines accuracy metrics {score_model} {score1_model}")

t7 =  time.time()           
logging.info("Calculate busines metrics {:.3f} s.".format(t7 - t6)) 
        
result_list.append([deep, latency, metric_f1, metric_accuracy, score_model, score1_model])                    

INFO:root:Start training model with deep 45 latency 1hour
INFO:root:Create coorMatrix  151.918 s
INFO:root:Create df from coorMatrix  925.979 s
INFO:root:Create join Y with df
INFO:root:Finished preprocessing df  0.029 s
INFO:root:Create test/train
INFO:root:Train 151.918 s. with deep 45 latency 1hour
INFO:root:Create train model 777.481 s.
INFO:root:Create predictions  0.041 s.
Test set f1 = 0.120432
Test set accuracy = 0.249443
INFO:root:Calculate metrics 901.550 s.
Test busines accuracy metrics 0.608458911737076 0.12169178416076408
INFO:root:Calculate busines metrics 600.084 s.


In [45]:
result_list

[[45, '1hour', 0.12027046178176957, 0.24930721489322002],
 [45,
  '1hour',
  0.1204322793511232,
  0.2494428381480604,
  0.608458911737076,
  0.12169178416076408]]

In [46]:
spark.stop()