In [5]:
import os
import socket
from datetime import date,datetime,timedelta
import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

# 设置pyspark 提交job的环境变量
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /root/data/xgboost4j_3.0-1.0.0-0.1.0.jar,/root/data/xgboost4j-spark_3.0-1.0.0-0.1.0.jar,/root/data/clickhouse-native-jdbc-2.2-stable.jar,/root/data/clickhouse4j-1.4.4.jar,/root/data/joda-time-2.9.9.jar pyspark-shell'

# Create Spark config for our Kubernetes based cluster manager
sparkConf = SparkConf()
#### K8s调度 配置参数
sparkConf.setMaster("k8s://https://kubernetes.default.svc.cluster.local:443")
sparkConf.set("spark.kubernetes.container.image", "harbor.aift.ftwhale.com/library/spark/pyspark:v3.0.0-aift")
sparkConf.set("spark.kubernetes.container.image.pullPolicy", "Always")
sparkConf.set("spark.kubernetes.namespace", "default")
sparkConf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
sparkConf.set("spark.kubernetes.authenticate.serviceAccountName", "spark")
sparkConf.set("spark.kubernetes.pyspark.pythonVersion", "3")
#### 配置executor的volume挂载
sparkConf.set("spark.kubernetes.executor.volumes.persistentVolumeClaim.heketi-jupyter-pvc.mount.path","/root/data")
sparkConf.set("spark.kubernetes.executor.volumes.persistentVolumeClaim.heketi-jupyter-pvc.readOnly","false")
sparkConf.set("spark.kubernetes.executor.volumes.persistentVolumeClaim.heketi-jupyter-pvc.options.claimName","heketi-jupyter-pvc")

#### spark driver 配置
sparkConf.set("spark.driver.host", "172.20.2.140") # 区别于其他人的jupyter（jupyter的域名） 必改！
sparkConf.set("spark.driver.port", "29413")
sparkConf.set("spark.driver.extraClassPath", "/root/data/xgboost4j_3.0-1.0.0-0.1.0.jar:/root/data/xgboost4j-spark_3.0-1.0.0-0.1.0.jar")
sparkConf.set("spark.driver.memory", "4g")
sparkConf.set("spark.driver.cores", "1")
#### spark driver 调优
sparkConf.set("spark.driver.allowMultipleContexts", "true")
sparkConf.set("spark.driver.maxResultSize", "3g")                 #低于其他两个

#### spark executor 配置
sparkConf.set("spark.executor.extraClassPath", "/root/data/xgboost4j_3.0-1.0.0-0.1.0.jar:/root/data/xgboost4j-spark_3.0-1.0.0-0.1.0.jar")
sparkConf.set("spark.executor.instances", "2") # 时间计算实例的数量
sparkConf.set("spark.executor.cores", "1") # 每个计算实例的cpu
sparkConf.set("spark.executor.memory", "4g") # 每个计算实例的内存
#### spark executor 调优


#### spark tasks 调优
sparkConf.set("spark.task.cpus", 1)
sparkConf.setAppName("huzize-AP1816") # 区别于其他人提交任务的名称 必改！

#### spark sql 配置
sparkConf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
sparkConf.set("spark.sql.inMemoryColumnarStorage.batchSize",50000)
sparkConf.set("spark.sql.autoBroadcastJoinThreshold", 1024*1024*512)
sparkConf.set("spark.sql.shuffle.partitions", 100)
sparkConf.set("spark.sql.parquet.columnarReaderBatchSize", 5000000)
sparkConf.set("spark.sql.files.maxPartitionBytes", 1024*1024*512)
sparkConf.set("spark.sql.broadcastTimeout", 10*60*1000)
sparkConf.set("spark.sql.adaptive.enabled", "true")

#### spark 其他配置参数
sparkConf.set("spark.rpc.message.maxSize", 2040)
# sparkConf.set("spark.submit.pyFiles", "/root/data/samples.zip,/root/data/xgboost4j-spark_3.0-1.0.0-0.1.0.jar")

# Initialize our Spark cluster, this will actually
# generate the worker nodes.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
spark

In [6]:
def write_data_toClickHouse(target_sdf,i,table):
    try:
        target_sdf.write.mode("append").format("jdbc") \
            .option("driver", "com.github.housepower.jdbc.ClickHouseDriver") \
            .option("url", "jdbc:clickhouse://chi-ftabc-clickhouse-{}-0.default.svc.cluster.local:9000".format(i)) \
            .option("dbtable", table) \
            .option("batchsize",target_sdf.count())\
            .option("user", "ftabc") \
            .option("password", "aihub@2020") \
            .option("numPartitions", "1") \
            .option("isolationLevel", "NONE") \
            .option("truncate", "true") \
            .save()
    except Exception as e:
        message = str(e).split('\n')[1]
        ignore_message = ': java.sql.SQLFeatureNotSupportedException'
        if(message == ignore_message):
            print('分片'+str(i)+'插入数据成功')
        else:
            print('分片'+str(i)+'插入数据不成功')
    else:
        print('分片'+str(i)+'已成功插入数据')

In [7]:
def read_data_ClickHouse(sdf_name,i,dbtable):
    sdf_name = spark.read.format("jdbc") \
            .option("driver", "cc.blynk.clickhouse.ClickHouseDriver") \
            .option("url", "jdbc:clickhouse://chi-ftabc-clickhouse-{}-0.default.svc.cluster.local:8123".format(i)) \
            .option("dbtable", dbtable) \
            .option("user", "ftabc") \
            .option("password", "aihub@2020") \
            .load()
    return sdf_name

----

- 得到ods_user表的id列表

In [8]:
sdf_ods_user = spark.read.parquet("/root/data/guiyang.yang/YGY/ClickHouse/data/ods_user.parquet")

Py4JJavaError: An error occurred while calling o306.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 7, 172.20.1.18, executor 1): java.io.InvalidClassException: org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$; local class incompatible: stream classdesc serialVersionUID = -187663704353307685, local class serialVersionUID = -3825936903196668049
	at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1964)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1830)
	at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1793)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1618)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2054)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1635)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2054)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1635)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:483)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:441)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2164)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:69)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:494)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:107)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.inferSchema(ParquetFileFormat.scala:163)
	at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:198)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:195)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:408)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
	at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:755)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.InvalidClassException: org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$; local class incompatible: stream classdesc serialVersionUID = -187663704353307685, local class serialVersionUID = -3825936903196668049
	at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1964)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1830)
	at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1793)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1618)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2054)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1635)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2054)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1635)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:483)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:441)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
list_id_collect = sdf_ods_user.select('id').collect()
list_id = [row[0] for row in list_id_collect]

In [None]:
list_gender = [0,1,2]
list_user_level = [0,1,2,3,4,5]
list_certificate = [110001,110002,110003,110004,110005,110006,110007,110008,110009,110011,\
                    110013,110015,110017,110018,110019,110020,110021,110022,110023,110024,\
                    110025,110026,110027,110028,110029,110030,110031,110033,110034,110035,\
                    110037,110047,110055,110057,119999]
list_nation = [0,1,2,3,4,5,6,7,8,10,\
               11,12,13,14,15,16,17,18,19,20,\
               21,22,23,24,25,26,27,28,29,30,\
               31,32,33,34,35,36,37,38,39,40,\
               41,42,43,44,45,46,47,48,49,50,\
               51,52,53,54,55,56,57,98,99]
list_education = [0,1,2,3,4,5,6,7,8,10,\
               11,12,13,14,15,16,17,18,19,20,\
               21,22,23,24,25,26,27,28,29,30,\
               31,32,33,34,35,36,37,38,39,40,\
               41,42,43,44,45,46,47,48,49,50,\
               51,52,53,54,55,56,57,58,59,60,\
               61,62,63,64,65,66,67,68,69,70,\
               71,72,73,74,75,76,77,78,79,80,\
               81,82,83,84,85,86,87,88]
list_populatioin = [0,1,2,3,4,5,6,7,8,10,\
                   11,19,20,26,30,42,44,50,52,64,\
                   66,70,78,80,90,98,99,103,134,148,\
                   151,184,198,203,218,219,231,234,240,247,\
                   248,250,251]
list_job_level = [0,1,2,3,4,5,6,7,8,10,\
                  11,12,22,32,37,42,51,52,53,57,\
                  58,60,63,67,72,75,78,79,80,82,\
                  93,98,99]
list_relationship_type = [0,1,2]
list_marriage = [0,1,2,3,4]
list_inline_customer = ['0','1']

- 将列表中的数据转化为一个dataframe，然后与ods_user连接

In [88]:
import os
import time
import pandas as pd

from faker import Faker
from datetime import date,datetime,timedelta
import random
import numpy as np

import json
import string
fake = Faker()

In [89]:
class ods_userDataGen:
    def __init__(self):
        self.data_dict = {}
    def fake_data(self, a, list_id):
        pt_profile = fake.profile()
        self.data_dict['c_id'] = list_id[a]
        self.data_dict['gender'] = np.random.choice(list_gender)
        self.data_dict['user_level'] = np.random.choice(list_user_level)
        self.data_dict['certificate'] = np.random.choice(list_certificate) 
        self.data_dict['nation'] = np.random.choice(list_nation)
        self.data_dict['education'] = np.random.choice(list_education)
        self.data_dict['population'] = np.random.choice(list_populatioin)
        self.data_dict['job_level'] = np.random.choice(list_job_level)
        self.data_dict['relationship_type'] = np.random.choice(list_relationship_type)
        self.data_dict['marriage']= np.random.choice(list_marriage)
        self.data_dict['inline_customer']= random.choice(list_inline_customer)
        

    def get_data_series(self):
        return pd.Series(self.data_dict)

    def get_data_dict(self):
        return self.data_dict


rst_list = []
for b in range(len(list_id)):
    pt_data = ods_userDataGen()
    pt_data.fake_data(b,list_id)
    rst_list.append(pt_data.get_data_dict())

df = pd.DataFrame(rst_list)

In [90]:
df

Unnamed: 0,c_id,gender,user_level,certificate,nation,education,population,job_level,relationship_type,marriage,inline_customer
0,1096,0,1,110027,49,8,0,2,0,1,1
1,1087,2,0,110037,34,80,134,60,0,1,0
2,2090,0,2,110003,54,2,7,1,1,4,1
3,4474,2,0,110005,19,44,11,60,1,1,1
4,5803,1,0,110026,36,69,251,37,1,2,0
...,...,...,...,...,...,...,...,...,...,...,...
9995,2688,2,1,110005,1,76,8,5,1,0,1
9996,3127,1,0,110006,10,4,20,4,1,3,0
9997,5289,2,2,110057,6,83,11,80,0,3,1
9998,7786,0,4,110025,23,22,42,58,0,1,1


In [91]:
sdf_correct = spark.createDataFrame(df)

In [92]:
sdf_error = sdf_ods_user.drop('gender','user_level','certificate','nation','education',\
                              'population','job_level','relationship_type','marriage','inline_customer')

In [93]:
sdf_ods_user_right = sdf_error.join(sdf_correct,sdf_error.id==sdf_correct.c_id,'inner').drop('c_id')

In [94]:
sdf_ods_user_right.count()

10000

- 保存一份ods_user的parquet文件，备份到自己的目录

In [55]:
# 将spark.dataframe保存为parquet文件
#sdf_ods_user.write.parquet("/root/data/guiyang.yang/YGY/ClickHouse/data/ods_user.parquet",mode='overwrite')

# 读取parquet文件为spark.dataframe
#sdf_spark = spark.read.parquet("/root/data/guiyang.yang/YGY/ClickHouse/data/ods_user.parquet")

- 清空当前ods_user表的数据，并插入修正后的数据

In [58]:
from clickhouse_driver import connect
import clickhouse_driver

conn = connect(host='chi-ftabc-clickhouse-0-0.default.svc.cluster.local', port='9000', database='ftabcch', user='ftabc', password='aihub@2020')
cursor = conn.cursor()

In [59]:
# 清空当前clickhouse的ods_user表的数据
#cursor.execute('alter table ods_user on cluster "clickhouse" delete where id is not null')
#cursor.fetchall()

[('chi-ftabc-clickhouse-3-0', 9000, 0, '', 3, 0),
 ('chi-ftabc-clickhouse-2-0', 9000, 0, '', 2, 0),
 ('chi-ftabc-clickhouse-1-0', 9000, 0, '', 1, 0),
 ('chi-ftabc-clickhouse-0-0', 9000, 0, '', 0, 0)]

- 插入修改后的数据

In [95]:
ods_user1,ods_user2,ods_user3,ods_user4 = sdf_ods_user_right.randomSplit([0.25,0.25,0.25,0.25])

In [98]:
write_data_toClickHouse(ods_user1,0,"ftabcch.ods_user")
write_data_toClickHouse(ods_user2,1,"ftabcch.ods_user")
write_data_toClickHouse(ods_user3,2,"ftabcch.ods_user")
write_data_toClickHouse(ods_user4,3,"ftabcch.ods_user")

分片0插入数据成功
分片1插入数据成功
分片2插入数据成功
分片3插入数据成功


- 保存一份到根目录

In [100]:
sdf_ods_user_right.write.parquet("/root/data/ods_user.parquet",mode='overwrite')

In [101]:
spark.stop()

In [1]:
import os
import socket
from datetime import date,datetime,timedelta
import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F


In [2]:
from pyspark.ml.feature import StringIndexer, StringIndexerModel

In [4]:
df = pyspark.createDataFrame([("a", "foo"), ("b", "bar")], ("x1", "x2"))

pipeline = Pipeline(stages=[
    StringIndexer(inputCol=c, outputCol='{}_index'.format(c))
    for c in df.columns
])

model = pipeline.fit(df)

NameError: name 'pyspark' is not defined