https://sonseungha.tistory.com/687

# Mlflow 플랫폼
- MLflow Tracking
  - 기계 학습 코드를 실행할 때 파라미터, 코드 버전, 메트릭 및 결과 파일을 기록
  - 나중에 결과를 시각화하기 위해 API 및 UI를 제공함
- MLflow Project
  - 머신러닝 코드를 재사용 가능하고 재현 가능한 형태로 패키징 -> 어떤 플랫폼에서도 재현가능하도록 지원
  - 포장된 형태를 다른 데이터 사이언티스트와 공유하거나 프러덕션에 반영
  - 프로젝트를 실행하기 위한 API와 명령줄 도구가 포함되어 있어 프로젝트를 워크플로우 연결할 수 있는 기능 제공
- MLflow Models
  - 머신러닝 모델을 패키징하고 서빙할 수 있는 표준화 방법 제공
  - 동일한 모델을 AWS, Apache Spark 등으로 쉽게 배치할 수 있도록 지원
  - 다양한 다운스트림 도구(예 : Rest API를 통한 실시간 제공 또는 Apache Spark의 추론)에서 사용할 수 있는 기계 학습 모델을 패키징하기 위한 표준 형식
  - 다양한 다운스트림 도구에서 이해할 수 있는 다양한 "특성"으로 모델을 저장할 수 있는 규칙을 정의
- MLflow Model Registry
  - MLflow 모델의 전체 수명 주기를 공동으로 관리하기 위한 중앙 집중식 모델 스토어, API 및 UI 모음
  - 모델 계보(MLflow 실험 및 실행이 모델을 생성함), 모델 버전 관리, 단계 전환(예:스테이징에서 프로덕션으로) 및 주석을 제공

In [10]:
import mlflow
import mlflow.sklearn
import os
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

from sklearn.neighbors import KNeighborsClassifier
import pandas as pd
from sklearn.metrics import roc_auc_score
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
from sklearn.linear_model import Ridge
from sklearn.model_selection import GridSearchCV
from mlflow.models.signature import infer_signature
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema, ColSpec

In [2]:
import seaborn as sns
df = sns.load_dataset('penguins')
df.head()

Unnamed: 0,species,island,bill_length_mm,bill_depth_mm,flipper_length_mm,body_mass_g,sex
0,Adelie,Torgersen,39.1,18.7,181.0,3750.0,Male
1,Adelie,Torgersen,39.5,17.4,186.0,3800.0,Female
2,Adelie,Torgersen,40.3,18.0,195.0,3250.0,Female
3,Adelie,Torgersen,,,,,
4,Adelie,Torgersen,36.7,19.3,193.0,3450.0,Female


In [3]:
df2=df.dropna()
df2

Unnamed: 0,species,island,bill_length_mm,bill_depth_mm,flipper_length_mm,body_mass_g,sex
0,Adelie,Torgersen,39.1,18.7,181.0,3750.0,Male
1,Adelie,Torgersen,39.5,17.4,186.0,3800.0,Female
2,Adelie,Torgersen,40.3,18.0,195.0,3250.0,Female
4,Adelie,Torgersen,36.7,19.3,193.0,3450.0,Female
5,Adelie,Torgersen,39.3,20.6,190.0,3650.0,Male
...,...,...,...,...,...,...,...
338,Gentoo,Biscoe,47.2,13.7,214.0,4925.0,Female
340,Gentoo,Biscoe,46.8,14.3,215.0,4850.0,Female
341,Gentoo,Biscoe,50.4,15.7,222.0,5750.0,Male
342,Gentoo,Biscoe,45.2,14.8,212.0,5200.0,Female


In [4]:
df2=df.dropna()
df2['sex']=df2['sex'].apply(lambda x:1 if x=='Male' else 0)
df_one_hot=pd.get_dummies(df2.loc[:,['species','island']]).reset_index(drop=True)
df_y=df2['sex'].reset_index(drop=True)

df2=df2.iloc[:,2:-1]
minmax=MinMaxScaler()
scale_df2=minmax.fit_transform(df2)
scale_df2=pd.DataFrame(scale_df2,columns=['bill_length_mm','bill_depth_mm','flipper_length_mm','body_mass_g'])
df_new=pd.concat([df_one_hot,scale_df2,df_y],axis=1)
df_new.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df2['sex']=df2['sex'].apply(lambda x:1 if x=='Male' else 0)


Unnamed: 0,species_Adelie,species_Chinstrap,species_Gentoo,island_Biscoe,island_Dream,island_Torgersen,bill_length_mm,bill_depth_mm,flipper_length_mm,body_mass_g,sex
0,1,0,0,0,0,1,0.254545,0.666667,0.152542,0.291667,1
1,1,0,0,0,0,1,0.269091,0.511905,0.237288,0.305556,0
2,1,0,0,0,0,1,0.298182,0.583333,0.389831,0.152778,0
3,1,0,0,0,0,1,0.167273,0.738095,0.355932,0.208333,0
4,1,0,0,0,0,1,0.261818,0.892857,0.305085,0.263889,1


In [5]:
df_x=df_new.iloc[:,:-1]
df_y=df_new.iloc[:,-1]
x_train,x_test,y_train,y_test=train_test_split(df_x,df_y,test_size=0.2,random_state=2,stratify=df_y)

1. 모델의 평가 지표 : mlflow.log_metric
- 학습 정확도, 평가 정확도, AUC
2. 테스트 결과와 관련된 이미지 : mlflow.log_artifact
- ROC plot과 confusion matrix, heatmap
3. 모델 자체: mlflow.log_artifact

In [6]:
mlflow.set_tracking_uri('http://192.168.1.60:25000/')
os.environ['MLFLOW_S3_ENDPOINT_URL'] = "http://192.168.1.60:9000/" 
os.environ['AWS_ACCESS_KEY_ID'] = 'mlflow'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'mlflow123'

In [11]:
eval_data.columns

Index(['species_Adelie', 'species_Chinstrap', 'species_Gentoo',
       'island_Biscoe', 'island_Dream', 'island_Torgersen', 'bill_length_mm',
       'bill_depth_mm', 'flipper_length_mm', 'body_mass_g', 'sex'],
      dtype='object')

### 시그니처 추가하기
- MLmodel을 통해 모델에 대한 정보를 먼저 파악하려고 할 것임
- MLmodel 파일만 봐서는 이 모델이 어떤 입력을 받고 어떤 출력을 뱉는지 알 수가 없음
- 입출력에 대한 정보는 시그니처라고 말하는데 이를 추가할 수 있음

- infer_signature() 함수
  - 해당 함수는 실제 입력데이터와 출력 데이터를 파라미터로 넘기면 시그니처에서 알아서 추론해줌
  
- Schema,ColSpec
  - 추론해주지 않고 직접 시그니처를 저장하는 방법


In [None]:
#직접 시그니처 저장하는 법
input_schema = Schema([
  ColSpec("double", 'species_Adelie'),
  ColSpec("double", 'species_Chinstrap'),
  ColSpec("double", 'species_Gentoo'),
  ColSpec("double", 'island_Biscoe'),
  ColSpec("double", 'island_Dream'),
  ColSpec("double", 'island_Torgersen'),
  ColSpec("double", 'bill_length_mm'),
  ColSpec("double", 'bill_depth_mm'),
  ColSpec("double", 'flipper_length_mm'),
  ColSpec("double", 'body_mass_g'),
])
output_schema = Schema([ColSpec("long")])
signature = ModelSignature(inputs=input_schema, outputs=output_schema)

### 간단하게 사용하는 법

In [7]:
eval_data=pd.concat([x_test,y_test],axis=1)
model = KNeighborsClassifier()
model.fit(x_train, y_train)
mlflow.set_experiment("penguins")


with mlflow.start_run(run_name="penguins_light") as run:
    # Log the baseline model to MLflow
    mlflow.sklearn.log_model(model, "model")
    model_uri = mlflow.get_artifact_uri("model")

    result = mlflow.evaluate(
        model_uri,
        eval_data,
        targets = 'sex',
        model_type = "classifier",
        evaluators = ["default"],
    )
mlflow.end_run()

2023/05/23 09:51:52 INFO mlflow.models.evaluation.base: Evaluating the model with the default evaluator.
2023/05/23 09:51:52 INFO mlflow.models.evaluation.default_evaluator: The evaluation dataset is inferred as binary dataset, positive label is 1, negative label is 0.


<Figure size 1050x700 with 0 Axes>

In [8]:
def train(model,x_train,y_train):
    model=model.fit(x_train,y_train)
    train_acc=model.score(x_train,y_train)
    mlflow.log_metric("train_acc",train_acc)
    print(f"Train Accuracy: {train_acc:.3%}")

def evaluate(model,x_test,y_test):
    eval_acc=model.score(x_test,y_test)
    preds=model.predict(x_test)
    auc_score=roc_auc_score(y_test,preds,multi_class="ovr")
    mlflow.log_metric("eval_acc",eval_acc)
    mlflow.log_metric("auc_score",auc_score)
    print(f"Auc Score: {auc_score:.3%}")
    print(f"Eval Accuracy: {eval_acc:.3%}")
    

    
    conf_matrix=confusion_matrix(y_test,preds)
    ax=sns.heatmap(conf_matrix,annot=True,fmt='g')
    plt.title("Confusion Matrix")
    plt.savefig('conf_matrix.png')
    
    mlflow.log_artifact("conf_matrix.png")

In [9]:
model=LogisticRegression()
mlflow.set_experiment("penguins")
with mlflow.start_run(run_name="penguins") as run:
    train(model,x_train,y_train)
    evaluate(model,x_test,y_test)
    #입출력 정보를 정해주는 부분. 이런 정보를 시그니처라고 함
    signature=infer_signature(x_train,model.predict(x_test))
    mlflow.sklearn.log_model(model,"penguins_lr",signature=signature)
    mlflow.sklearn.log_model(model,"logistic_regression")
    print("Model run: ",mlflow.active_run().info.run_uuid)
mlflow.end_run()

Train Accuracy: 90.226%
Auc Score: 92.602%
Eval Accuracy: 92.537%
Model run:  2ff3462e6abe4aa5aad8d0873889dab2


### GridSearch 버전

In [73]:
mlflow.sklearn.autolog()
parameters={'alpha':[0.05,0.1,0.5,1,2,5]}
model=Ridge()
ridge_reg=GridSearchCV(model,parameters,scoring='neg_mean_squared_error',cv=5)

mlflow.set_experiment("penguins")
with mlflow.start_run(run_name="penguins2") as run:
    ridge_reg.fit(x_train,y_train)
mlflow.end_run()

2023/04/27 10:22:01 INFO mlflow.sklearn.utils: Logging the 5 best runs, one run will be omitted.


## Spark

In [87]:
df2=df.dropna()
df2.reset_index(drop=True,inplace=True)
print(df2['species'].unique())
print(df2['island'].unique())
print(df2['sex'].unique())

['Adelie' 'Chinstrap' 'Gentoo']
['Torgersen' 'Biscoe' 'Dream']
['Male' 'Female']


In [91]:
def species_class(x):
    if x=="Adelie":
        return 0
    elif x=="Chinstrap":
        return 1
    elif x=='Gentoo':
        return 2
def island_class(x):
    if x=='Torgersen':
        return 0
    elif x=="Biscoe":
        return 1
    elif x=='Dream':
        return 2
df2['species_encoded']=df2['species'].apply(lambda x:species_class(x))
df2['island_encoded']=df2['island'].apply(lambda x:island_class(x))
df2['sex_encoded']=df2['sex'].apply(lambda x:1 if x=='Male' else 0)
peng_df=df2.loc[:,['bill_length_mm','bill_depth_mm','flipper_length_mm','body_mass_g','species_encoded','island_encoded','sex_encoded']]

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df2['species_encoded']=df2['species'].apply(lambda x:species_class(x))
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df2['island_encoded']=df2['island'].apply(lambda x:island_class(x))
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df2['sex_encoded']=df2['sex'].apply(lambda x:1 if x=='Male' else 0

In [94]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

penguins_sdf=spark.createDataFrame(peng_df)

features=['bill_length_mm','bill_depth_mm','flipper_length_mm','body_mass_g','island_encoded','sex_encoded']

va=VectorAssembler(inputCols=features,outputCol='features')
data=penguins_sdf.select(F.col('species_encoded').alias('label'),*features)

trainData,testData=data.randomSplit([0.8,0.2])

trainData=va.transform(trainData)
testData=va.transform(testData)

def train(model,train_data):
    model=model.fit(train_data)
    trainingSummary=model.summary
    mlflow.log_metric("train_acc",trainingSummary.accuracy)
    print("Train Accuracy: ",trainingSummary.accuracy)
    return model

def evaluate(model,test_data):
    evaluationSummary=model.evaluate(test_data)
    mlflow.log_metric("eval_acc",evaluationSummary.accuracy)
    print("Evaluation Accuracy: ",evaluationSummary.accuracy)
    

lr=LogisticRegression(featuresCol='features',labelCol='label',maxIter=10)
mlflow.set_experiment("penguins")
with mlflow.start_run():
    lr=train(lr,trainData)
    evaluate(lr,testData)
    mlflow.spark.log_model(lr,"logistic_regression_pyspark")
mlflow.end_run()


Py4JJavaError: An error occurred while calling o140.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 1.0 failed 1 times, most recent failure: Lost task 7.0 in stage 1.0 (TID 15) (DESKTOP-C32S2V1 executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:188)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:108)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.sql.execution.SQLExecutionRDD.$anonfun$compute$1(SQLExecutionRDD.scala:52)
	at org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:158)
	at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.waitForNewConnection(Native Method)
	at java.base/java.net.PlainSocketImpl.socketAccept(PlainSocketImpl.java:163)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:551)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:519)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:175)
	... 48 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2309)
	at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1183)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1177)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1222)
	at org.apache.spark.ml.stat.Summarizer$.getClassificationSummarizers(Summarizer.scala:233)
	at org.apache.spark.ml.classification.LogisticRegression.$anonfun$train$1(LogisticRegression.scala:511)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:495)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:286)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:188)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:108)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.sql.execution.SQLExecutionRDD.$anonfun$compute$1(SQLExecutionRDD.scala:52)
	at org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:158)
	at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.waitForNewConnection(Native Method)
	at java.base/java.net.PlainSocketImpl.socketAccept(PlainSocketImpl.java:163)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:551)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:519)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:175)
	... 48 more
