In [1]:


import datetime, sys, threading,time
from sys import stderr
from logging import getLogger, StreamHandler, Formatter, INFO, DEBUG, ERROR, FileHandler, WARNING

'''
spark.sql.shuffle.partitions Local模式下调小，一般为CPU核心数2-10倍
'''
import findspark

findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

# hdfs:///user/cdh/jars/mysql-connector-java-5.1.49.jar为传入的mysql-connector包
spark = SparkSession.builder \
    .appName("mysql_to_ods加工") \
    .master('yarn') \
    .config('spark.sql.shuffle.partitions', '1000') \
    .config('spark.jars', 'hdfs:///user/cdh/jars/mysql-connector-java-5.1.49.jar') \
    .getOrCreate()
# 配置参数
class settings:

    # 日志信息存储的mysql
    MYSQL_CONNECT = {
        'host': '10.8.16.83',
        'port': 3306,
        'user': 'root',
        'password': 'fiang123',
        'charset': 'utf8',
        'db': 'logs'
    }
    MYSQL_PARAMS = {
        'host': '10.8.16.83',
        'port': 3306,
        'user': 'root',
        'password': 'fiang123',
    }

    TABLES = [
        {"name": "智联招聘岗位信息", "db": "spider", "table": "jobs_2023_10_13", "db_num": 0, "t_num": 0,
         'hive_db': 'ods_jobfree', 'hive_table': 'ods_jobfree_db_spider_t_jobs', "update_column": "local_row_update_time"},
        {"name": "智联招聘岗位信息", "db": "spider", "table": "jobs_2023_10_16", "db_num": 0, "t_num": 0,
         'hive_db': 'ods_jobfree', 'hive_table': 'ods_jobfree_db_spider_t_jobs', "update_column": "local_row_update_time"},
        {"name": "智联招聘岗位信息", "db": "spider", "table": "jobs_2023_10_17", "db_num": 0, "t_num": 0,
         'hive_db': 'ods_jobfree', 'hive_table': 'ods_jobfree_db_spider_t_jobs', "update_column": "local_row_update_time"},
        {"name": "智联招聘岗位信息", "db": "spider", "table": "jobs_2023_10_18", "db_num": 0, "t_num": 0,
         'hive_db': 'ods_jobfree', 'hive_table': 'ods_jobfree_db_spider_t_jobs', "update_column": "local_row_update_time"},
        {"name": "智联招聘岗位信息", "db": "spider", "table": "jobs_2023_10_20", "db_num": 0, "t_num": 0,
         'hive_db': 'ods_jobfree', 'hive_table': 'ods_jobfree_db_spider_t_jobs', "update_column": "local_row_update_time"},
        {"name": "智联招聘岗位信息", "db": "spider", "table": "jobs_2023_10_21", "db_num": 0, "t_num": 0,
         'hive_db': 'ods_jobfree', 'hive_table': 'ods_jobfree_db_spider_t_jobs', "update_column": "local_row_update_time"},
        {"name": "智联招聘岗位信息", "db": "spider", "table": "jobs_2023_10_22", "db_num": 0, "t_num": 0,
         'hive_db': 'ods_jobfree', 'hive_table': 'ods_jobfree_db_spider_t_jobs', "update_column": "local_row_update_time"},
        {"name": "用户简历信息", "db": "jobfree", "table": "resume", "db_num": 0, "t_num": 0,
         'hive_db': 'ods_jobfree', 'hive_table': 'ods_jobfree_db_jobfree_t_resume', "update_column": "last_update"},
    ]

    #mysql2hive--0 还是hive2mysql--1
    outType=0
    log_tb_name=f"{'mysql2hive' if outType==0 else 'hive2mysql'}_spark"
    log_name = f'{log_tb_name}.log'
    error_log_name = f'{log_tb_name}_spark.error.log'
    TYPE = 'all'
    # 日志结构
    log_data = {
        'host': MYSQL_PARAMS.get('host'),
        'port': MYSQL_PARAMS.get('port'),
        'db_name': '',
        'tb_name': '',
        'status': False,
        'hive_db_name': '',
        'hive_tb_name': '',
        'outType':outType,
        'msg': '',
        'num': 0,
        'executed_way': TYPE,
        'local_row_update_time_start': '',
        'local_row_update_time_end': '',
        'partition_date':'',
        'exec_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    }
    #是否删除已有分区
    is_del=False
    
def getHiveData(table: dict):
    '''
    使用spark读取hive数据
    '''
    df = spark.sql(f'select * from {table["hive_db"]}.{table["hive_table"]} ')
    return df
def getMySQLData(params: dict, table: dict):
    '''
    使用spark读取mysql数据
    '''
    df = spark.read.format('jdbc') \
        .option('url',
                f"jdbc:mysql://{params.get('host')}:{params.get('port')}/{table.get('db')}?useSSL=false&useUnicode=true") \
        .option('dbtable', table.get('table')) \
        .option('user', params.get('user')) \
        .option('driver', 'com.mysql.jdbc.Driver') \
        .option('password', params.get('password')) \
        .load().withColumn('cdc_sync_date', F.current_timestamp())
    return df

# 日志
def getLog():
    '''
    获得日志对象
    '''
    logger = getLogger(f'log_{datetime.datetime.now()}')
    logger.setLevel(INFO)
    rf_handler = StreamHandler(stderr)  # 默认是sys.stderr
    rf_handler.setLevel(DEBUG)
    f2_handler = FileHandler(settings.log_name)
    f2_handler.setLevel(INFO)
    f2_handler.setFormatter(Formatter(
        "%(asctime)s - %(levelname)s - %(filename)s[:%(lineno)d] - %(message)s"))
    f3_handler = FileHandler(settings.error_log_name)
    f3_handler.setLevel(ERROR)
    f3_handler.setFormatter(Formatter(
        "%(asctime)s - %(levelname)s - %(filename)s[:%(lineno)d] - %(message)s"))
    logger.addHandler(rf_handler)
    logger.addHandler(f2_handler)
    logger.addHandler(f3_handler)
    return logger


class MysqlDB:
    '''
    mysql操作类
    '''

    def __init__(self, params):
        self.params=params
    
    def getconn(self):
        import pymysql
        try:
            conn = pymysql.connect(**self.params)
        except pymysql.err.OperationalError:
            # 如果没有数据库则创建数据库
            db = self.params.pop('db')
            conn = pymysql.connect(**self.params)
            cursor = conn.cursor()
            cursor.execute(f"CREATE DATABASE {db}")
            cursor.close()
            conn.close()
            self.params['db'] = db
            conn = pymysql.connect(**self.params)
        return conn
    @staticmethod
    def is_alive(params):
        '''
        判断mysql连接是否存活
        '''
        import pymysql
        try:
            # 执行一个简单的查询
            conn = pymysql.connect(**params)
            with conn.cursor() as cursor:
                cursor.execute("SELECT 1")
            # 检查查询结果
            result = cursor.fetchone()
            if result[0] == 1:
                return True
            else:
                return False
        except:
            return False
    @classmethod
    def create_table(self,conn, table_name: str, columns: list, types: list):
        '''
        根据表名、字段、字段类型建表
        '''
        columns_str = ''
        id_str = '`id` INT AUTO_INCREMENT PRIMARY KEY,\n'
        for i, j in enumerate(columns):
            if j == 'id':
                id_str = ''
            columns_str += f'`{j}` {types[i]},\n'
        columns_str = columns_str.strip("\n").strip(",")
        sql = f'''
        CREATE TABLE IF NOT EXISTS {table_name} (
            {id_str}
            {columns_str}
        )ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
        '''
        try:
            cur = conn.cursor()
            cur.execute(sql)
            conn.commit()
            return True
        except Exception as e:
            conn.rollback()
            raise ValueError(f'创建表{table_name}失败-error{e}-sql:{sql}')

    def get_types(self, data: list):
        '''
        根据数据的类型转换为mysql中字段类型
        '''
        from datetime import datetime
        # 获取一行数据
        data = data[0]
        
        def getType(column):
            if isinstance(column, str):
                return 'TEXT'
            elif isinstance(column, datetime):
                return 'datetime'
            elif isinstance(column, int):
                return 'int'
            elif isinstance(column, float):
                return 'double'
            elif isinstance(column, bool):
                return 'bool'
            else:
                raise ValueError(f'不支持字段类型:{type(column)}')

        types = []
        for i in data:
            types.append(getType(i))
        return types
    @classmethod
    def check_table_exists(self,conn, table_name):
        '''
        查询表是否存在
        '''
        try:
            # 创建游标对象
            cursor = conn.cursor()
            # 执行SHOW TABLES查询
            cursor.execute("SHOW TABLES")
            # 获取返回的所有表名
            tables = cursor.fetchall()
            # 检查目标表是否在返回的结果中存在
            if (table_name,) in tables:
                return True
            else:
                return False
        except:
            return False

    def save(self, data: list, table_name: str, method: str = 'append'):
        '''
        简单便捷的保存方法
        自动建库建表
        支持append、replace模式
        '''
        import pandas as pd
        assert len(data) > 0, 'data不能为空'

        df = pd.DataFrame(data)
        conn=self.getconn()
        cur = conn.cursor()
        data = df.values.tolist()
        columns = df.columns.tolist()
        columns_str = ','.join([f'`{i}`' for i in columns])
        s_str = ','.join(['%s' for i in range(len(columns))])
        sql = f'insert into {table_name}({columns_str}) values ({s_str})'
        if self.check_table_exists(conn,table_name):
            if method == 'append':
                pass
            else:
                cur.execute(f'drop table {table_name}')
                conn.commit()
                types = self.get_types(data)
                self.create_table(conn,table_name, columns, types)
        else:
            types = self.get_types(data)
            self.create_table(conn,table_name, columns, types)
        try:
            cur.executemany(sql, data)
            conn.commit()
        except Exception as e:
            conn.rollback()
            raise ValueError(f'保存失败-error:{e}-sql:{sql}')


class myThread(threading.Thread):
    def __init__(self, name, log_data, table, Type):
        super().__init__()
        self.name = name
        self.log_data = log_data.copy()
        self.table = table.copy()
        self.Type = Type

    def run(self):
        
        if settings.outType==0:
            run_res = mysql_to_hive(self.name, self.log_data, self.table, self.Type)
        else:
            run_res = hive_to_mysql(self.name, self.log_data, self.table, self.Type)


def hive_to_mysql(name, log_data: dict, table: dict, Type):
    '''
    调用spark将mysql数据导入hive，并记录日志
    '''
    logger.info(f"Thread {table['hive_db']}.{table['hive_table']}->{table['db']}.{table['table']}执行开始")
    params = settings.MYSQL_PARAMS.copy()
    try:
        log_data['db_name'] = table.get('db')
        log_data['tb_name'] = table.get('table')
        df = getHiveData(table)
        
        # 导入数据到mysql
        if TYPE == 'all':

            df = df.where(f"{table['update_column']}<'{today_time.strftime('%Y-%m-%d 23:59:59')}'")

        elif TYPE == 'update':
            df = df.where(
                f"{table['update_column']} between '{today_time.strftime('%Y-%m-%d 00:00:00')}' and '{today_time.strftime('%Y-%m-%d 23:59:59')}'")

        elif TYPE == 'other':
            df = df.where(f"{table['update_column']} between '{start_time}' and '{end_time}'")
        # 查询为空
        if df.rdd.isEmpty():
            log_data['status'] = True
            log_data['num'] = 0
            logger.info(f"表{table['hive_db']}.{table['hive_table']}无更新---num:0")
            log_data['msg'] = f"表{table['hive_db']}.{table['hive_table']}无更新"
            return
        num = df.count()
        df.write.format('jdbc') \
        .mode('append') \
        .option('url',
                f"jdbc:mysql://{params.get('host')}:{params.get('port')}/{table.get('db')}?useSSL=false&useUnicode=true") \
        .option('dbtable', table.get('table')) \
        .option('user', params.get('user')) \
        .option('driver', 'com.mysql.jdbc.Driver') \
        .option('password', params.get('password')) \
        .save()

        logger.info(f"Thread {table['hive_db']}.{table['hive_table']}->{table['db']}.{table['table']}执行成功---num:{num}")
        log_data['status'] = True
        log_data['num'] = num
        log_data['msg'] = f"导入{table['hive_db']}.{table['hive_table']}数据到{table['db']}.{table['table']}mysql表成功"
    except Exception as e:
        logger.error(f"{table['db']}.{table['table']}->{table['hive_db']}.{table['hive_table']}执行失败----error:{e}")
        log_data['msg'] = f"导入表{table['db']}.{table['table']}失败----error:{e}"
    finally:
        Log_db.save([log_data],settings.log_tb_name, 'append')
def mysql_to_hive(name, log_data: dict, table: dict, Type):
    '''
    调用spark将mysql数据导入hive，并记录日志
    '''
    logger.info(f"Thread {table['db']}.{table['table']}->{table['hive_db']}.{table['hive_table']}执行开始")
    params = settings.MYSQL_PARAMS.copy()
    try:
        log_data['db_name'] = table.get('db')
        log_data['tb_name'] = table.get('table')
        df = getMySQLData(params, table)
        # 删除partition_date分区已有数据
        try:
            if settings.is_del:
                spark.sql(
                    f'ALTER TABLE {table["hive_db"]}.{table["hive_table"]} DROP PARTITION (partition_date="{partition_date}")')
        except:
            pass
        # 导入数据到hive
        if TYPE == 'all':

            df = df.where(f"{table['update_column']}<'{today_time.strftime('%Y-%m-%d 23:59:59')}'")

        elif TYPE == 'update':
            df = df.where(
                f"{table['update_column']} between '{today_time.strftime('%Y-%m-%d 00:00:00')}' and '{today_time.strftime('%Y-%m-%d 23:59:59')}'")

        elif TYPE == 'other':
            df = df.where(f"{table['update_column']} between '{start_time}' and '{end_time}'")
        # 查询为空
        if df.rdd.isEmpty():
            log_data['status'] = True
            log_data['num'] = 0
            logger.info(f"表{table['db']}.{table['table']}无更新---num:0")
            log_data['msg'] = f"表{table['db']}.{table['table']}无更新"
            return
        num = df.count()
        df.createOrReplaceTempView(f"mysql_to_hive_{name}")
        spark.sql(
            f"INSERT INTO {table['hive_db']}.{table['hive_table']} PARTITION(partition_date={partition_date}) SELECT * FROM mysql_to_hive_{name}")

        logger.info(f"Thread {table['db']}.{table['table']}->{table['hive_db']}.{table['hive_table']}执行成功---num:{num}")
        log_data['status'] = True
        log_data['num'] = num
        log_data['msg'] = f"导入{table['db']}.{table['table']}数据到hive表{table['hive_db']}.{table['hive_table']}成功"
    except Exception as e:
        logger.error(f"{table['db']}.{table['table']}->{table['hive_db']}.{table['hive_table']}执行失败----error:{e}")
        log_data['msg'] = f"导入表{table['db']}.{table['table']}失败----error:{e}"
    finally:
        Log_db.save([log_data], settings.log_tb_name, 'append')


def insertData(table: dict):
    '''
    导入
    '''
    table = table.copy()
    log_data = settings.log_data.copy()
    log_data['hive_db_name'] = table.get('hive_db')
    log_data['hive_tb_name'] = table.get('hive_table')
    log_data['exec_time'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    # 创建多个线程
    threads = []
    # 单库单表
    if table['db_num'] == 0 and table['t_num'] == 0:
        t = myThread(f"{table['db']}_{table['table']}", log_data, table, TYPE)
        threads.append(t)
        t.start()
    ##单库多表
    elif table['db_num'] == 0 and table['t_num'] > 0:
        '''
        程序自动添加表下标并且依次遍历分表
        '''

        for i in range(table['t_num'] + 1):
            ##处理表名
            new_tb = table['table'].split('_')
            if len(new_tb) > 2:
                new_tb.pop()
                table['table'] = '_'.join(new_tb) + f'_{i}'
            else:
                table['table'] = table['table'] + f'_{i}'
            t = myThread(f"{table['db']}_{table['table']}", log_data, table, TYPE)
            threads.append(t)
            t.start()
    #     ##多库多表
    elif table['db_num'] > 0 and table['t_num'] > 0:
        '''
        程序自动添加库和表下标并且依次遍历分表
        '''
        for i in range(table['db_num'] + 1):
            ##处理库名
            new_db = table['db'].split('_')
            if len(new_db) > 2:
                new_db.pop()
                table['db'] = '_'.join(new_db) + f'_{i}'

            else:
                table['db'] = table['db'] + f'_{i}'
            for j in range(table['t_num'] + 1):
                ##处理表名
                new_tb = table['table'].split('_')
                if len(new_tb) > 2:
                    new_tb.pop()
                    table['table'] = '_'.join(new_tb) + f'_{j}'
                else:
                    table['table'] = table['table'] + f'_{j}'

                t = myThread(f"{table['db']}_{table['table']}", log_data, table, TYPE)
                threads.append(t)
                t.start()
    ##不能识别类型
    else:
        raise ValueError('db_num or t_num ValueError')
    for t in threads:
        t.join()


if __name__ == '__main__':
    logger = getLog()
    Log_db = MysqlDB(settings.MYSQL_CONNECT)
    today_time = datetime.date.today()
#     yesterday_time = today_time + datetime.timedelta(-1)
    partition_date = today_time.strftime("%Y%m%d")
    try:
        TYPE = sys.argv[1]
        if TYPE not in ['all','update','other']:
            TYPE = settings.TYPE
    except:
        TYPE = settings.TYPE
    tables = settings.TABLES
    logger.info(f'导入开始--type:{TYPE}')
    start=time.time()
    ##1 全量  截止运行时间零点
    if settings.outType==0:
        logger.info('mysql_to_hive start')
    else:
        logger.info('hive_to_mysql start')
    if TYPE == 'all':
        settings.log_data['executed_way'] = TYPE
        settings.log_data['partition_date'] = partition_date
        settings.log_data['local_row_update_time_start'] = today_time.strftime("%Y-%m-%d %H:%M:%S")
        settings.log_data['local_row_update_time_end'] = today_time.strftime("%Y-%m-%d %H:%M:%S")

    elif TYPE == 'update':
        settings.log_data['executed_way'] = TYPE
        settings.log_data['partition_date'] = partition_date
        settings.log_data['local_row_update_time_start'] = today_time.strftime("%Y-%m-%d %H:%M:%S")
        settings.log_data['local_row_update_time_end'] = today_time.strftime("%Y-%m-%d %H:%M:%S")

    ##3 指定时间段 追加放入昨天分区/指定分区里
    elif TYPE == 'other':
        start_time = sys.argv[2]
        end_time = sys.argv[3]
        # start_time = "2022-08-11 00:00:00"
        # end_time   = "2022-08-12 00:00:00"
        partition_date = sys.argv[4] if len(sys.argv) > 4 else partition_date
        where_str = "{} between '%s' and '%s' " % (start_time, end_time)
        is_merge_small_file = True
        settings.log_data['partition_date'] = partition_date
        settings.log_data['executed_way'] = TYPE
        settings.log_data['local_row_update_time_start'] = start_time
        settings.log_data['local_row_update_time_end'] = end_time
    else:
        raise ValueError(f'不能识别类型{TYPE}')
    for table in tables:
        insertData(table)
    end=time.time()
    
    logger.info(f'导入完成---总耗时{(end-start)}秒')

导入开始--type:all
mysql_to_hive start
Thread spider.jobs_2023_10_13->ods_jobfree.ods_jobfree_db_spider_t_jobs执行开始
Thread spider.jobs_2023_10_13->ods_jobfree.ods_jobfree_db_spider_t_jobs执行成功---num:24217
Thread spider.jobs_2023_10_16->ods_jobfree.ods_jobfree_db_spider_t_jobs执行开始
Thread spider.jobs_2023_10_16->ods_jobfree.ods_jobfree_db_spider_t_jobs执行成功---num:3240
Thread spider.jobs_2023_10_17->ods_jobfree.ods_jobfree_db_spider_t_jobs执行开始
Thread spider.jobs_2023_10_17->ods_jobfree.ods_jobfree_db_spider_t_jobs执行成功---num:111629
Thread spider.jobs_2023_10_18->ods_jobfree.ods_jobfree_db_spider_t_jobs执行开始
Thread spider.jobs_2023_10_18->ods_jobfree.ods_jobfree_db_spider_t_jobs执行成功---num:65279
Thread spider.jobs_2023_10_20->ods_jobfree.ods_jobfree_db_spider_t_jobs执行开始
Thread spider.jobs_2023_10_20->ods_jobfree.ods_jobfree_db_spider_t_jobs执行成功---num:112197
Thread spider.jobs_2023_10_21->ods_jobfree.ods_jobfree_db_spider_t_jobs执行开始
Thread spider.jobs_2023_10_21->ods_jobfree.ods_jobfree_db_spider_t_j

In [2]:
import datetime, sys, threading,time
from sys import stderr
from logging import getLogger, StreamHandler, Formatter, INFO, DEBUG, ERROR, FileHandler, WARNING

'''
spark.sql.shuffle.partitions Local模式下调小，一般为CPU核心数2-10倍
'''
import findspark

findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

# hdfs:///user/cdh/jars/mysql-connector-java-5.1.49.jar为传入的mysql-connector包
spark = SparkSession.builder \
    .appName("ods_dwd加工") \
    .master('yarn') \
    .config('spark.sql.shuffle.partitions', '200') \
    .config('spark.jars', 'hdfs:///user/cdh/jars/mysql-connector-java-5.1.49.jar') \
    .getOrCreate()


# 配置参数
class settings:
    # 日志信息存储的mysql
    MYSQL_CONNECT = {
        'host': '10.8.16.83',
        'port': 3306,
        'user': 'root',
        'password': 'fiang123',
        'charset': 'utf8',
        'db': 'logs'
    }
    TABLES = [
        {"name": "智联招聘岗位信息ods->dwd", "db1": "ods_jobfree", "tb1": "ods_jobfree_db_spider_t_jobs",'db2': 'dwd_jobfree', 'tb2': 'dwd_jobfree_db_spider_t_jobs', "update_column": "local_row_update_time",
        "SQL":'''select 
        job_id,
        jobId,
        get_json_object(number,"$") as number,
        get_json_object(name,"$") as name,
        Case when
        get_json_object(education,"$")="不限" then -1
        when get_json_object(education,"$")="博士" then 8
        when get_json_object(education,"$")="MBA/EMBA" then 7
        when get_json_object(education,"$")="硕士" then 6
        when get_json_object(education,"$")="本科" then 5
        when get_json_object(education,"$")="大专" then 4
        when get_json_object(education,"$")="高中" then 3
        when get_json_object(education,"$")="中专/中技" then 2
        when get_json_object(education,"$")="初中及以下" then 1
        else -1
        end as educationCode,
        get_json_object(education,"$") as education,
        CONCAT_WS(',',split(regexp_replace( industryCompanyTags, '\\\\[|\\\\]|"', ''),',')) as industryCompanyTags,
        get_json_object(industryName,"$") as industryName,
        get_json_object(jobSummary,"$") as jobSummary,
        get_json_object(positionUrl,"$") as positionUrl,
        get_json_object(positionSourceTypeUrl,"$") as positionSourceTypeUrl,
        get_json_object(property,"$") as property,
        cast (get_json_object(propertyCode,"$") as int) as propertyCode,
        get_json_object(recruitNumber,"$") as recruitNumber,
        get_json_object(salary60,"$") as salary60,
        get_json_object(salaryReal,"$") as salaryReal,
        cast (split(get_json_object(salaryReal,"$"),"-")[0] as int) as salary_min,
        cast (split(get_json_object(salaryReal,"$"),"-")[1] as int) as salary_max,
        get_json_object(salaryType,"$") as salaryType,

        get_json_object(salaryCount,"$") as salaryCounte,
        CONCAT_WS('/',split(regexp_replace(get_json_object(skillLabel,"$[*].value"), '\\\\[|\\\\]|"', ''),',')) as skillLabel,
        cast(get_json_object(publishTime,"$") as timestamp) as publishTime,
        get_json_object(cityDistrict,"$") as cityDistrict,
        get_json_object(streetId,"$") as streetId,
        get_json_object(streetName,"$") as streetName,
        cast (get_json_object(subJobTypeLevel,"$") as BIGINT) as subJobTypeLevel,
        get_json_object(subJobTypeLevelName,"$") as subJobTypeLevelName,
        CONCAT_WS('/',split(regexp_replace(welfareTagList, '\\\\[|\\\\]|"', ''),',')) as welfareTagList,
        cast (get_json_object(Cityid,"$") as int) as Cityid,
        get_json_object(workCity,"$") as workCity,
        Case when
        get_json_object(workType,"$")="不限" then -1
        when get_json_object(workType,"$")="兼职/临时" then 1
        when get_json_object(workType,"$")="全职" then 2
        when get_json_object(workType,"$")="实习" then 3
        when get_json_object(workType,"$")="校园" then 4
        else 0
        end as workTypeCode ,
        get_json_object(workType,"$") as workType,
        Case when
        get_json_object(workingExp,"$")="不限" then -1
        when get_json_object(workingExp,"$")="无经验" then 0
        when get_json_object(workingExp,"$")="1年以下" then 1
        when get_json_object(workingExp,"$")="1-3年" then 2
        when get_json_object(workingExp,"$")="3-5年" then 3
        when get_json_object(workingExp,"$")="5-10年" then 4
        when get_json_object(workingExp,"$")="10年以上" then 5
        else 0
        end as workingExpCode ,
        get_json_object(workingExp,"$") as workingExp,
        companyId,
        get_json_object(companyNumber,"$") as companyNumber,
        CONCAT_WS('/',split(regexp_replace(companyScaleTypeTagsNew, '\\\\[|\\\\]|"', ''),',')) as companyScaleTypeTagsNew,
        get_json_object(companyName,"$") as companyName,
        get_json_object(rootCompanyNumber,"$") as rootCompanyNumber,
        get_json_object(companyLogo,"$") as companyLogo,
        get_json_object(companySize,"$") as companySize,
        get_json_object(companyUrl,"$") as companyUrl
        from ods_jobfree.ods_jobfree_db_spider_t_jobs'''
        },
        {
        "name": "智联招聘用户简历信息ods->dwd", "db1": "ods_jobfree", "tb1": "ods_jobfree_db_jobfree_t_resume",'db2': 'dwd_jobfree', 'tb2': 'dwd_jobfree_db_jobfree_t_resume', "update_column": "last_update",
        'SQL':'''select id, name, birth, gender, photo, currentidentity, currentcity, currentcitytranslation, currentcitydistrictid, currentcitydistrictidtranslation, currentprovince, currentprovincetranslation, eduhighestlevel, eduhighestleveltranslation, politicalaffiliation, workingexpcode, workingexp, worktypecode, worktype, workcity, workcitytranslation, workcity2, workcity2translation, workcity3, workcity3translation, cast(subjobtypelevel as bigint) as subjobtypelevel, subjobtypelevelname, skilllabel, propertycode, property, preferredsalarymin, preferredsalarymax, selfevaluate, created_time, last_update, user_id, cdc_sync_date
        from (select *, row_number() over (distribute by id sort by last_update DESC ) as rank from `ods_jobfree`.`ods_jobfree_db_jobfree_t_resume`) t1
        where t1.rank = 1'''
        },
        {
            "name": "智联招聘用户简历特征信息dwd->dwd", "db1": "dwd_jobfree", "tb1": "dwd_jobfree_db_jobfree_t_resume",'db2': 'dwd_jobfree', 'tb2': 'dwd_jobfree_db_jobfree_t_resume_train', "update_column": None,
            "SQL":'''select user_id, eduhighestlevel, workingexpCode, worktypeCode, (workcity+if(workcity2=0,workcity,workcity2)+if(workcity3=0,workcity,workcity3)) as workcity, subjobtypelevel, propertycode, preferredsalarymin, preferredsalarymax,SelfEvaluate,skilllabel
from dwd_jobfree.dwd_jobfree_db_jobfree_t_resume'''
        },
        {
            "name": "智联招聘信息特征dwd->dwd", "db1": "dwd_jobfree", "tb1": "dwd_jobfree_db_spider_t_jobs",'db2': 'dwd_jobfree', 'tb2': 'dwd_jobfree_db_spider_t_jobs_train', "update_column": None,
            "SQL":'''SELECT job_id,educationcode, workingexpCode, worktypeCode, cityid*3 as cityid, subjobtypelevel, propertycode,salary_min, salary_max,jobsummary,skilllabel FROM dwd_jobfree.dwd_jobfree_db_spider_t_jobs'''
        }
        
    ]

    log_tb_name=f"dwd_jobfree_spark"
    log_name = f'{log_tb_name}.log'
    error_log_name = f'{log_tb_name}_spark.error.log'
    TYPE = 'update'
    # 日志结构
    log_data = {
        'db1': 'ods_jobfree',
        'tb1': 'ods_jobfree_db_spider_t_jobs',
        'status': False,
        'db2': 'dwd_jobfree',
        'tb2': 'dwd_jobfree_db_spider_t_jobs',
        'msg': '',
        'num': 0,
        'executed_way': TYPE,
        'local_row_update_time_start': '',
        'local_row_update_time_end': '',
        'partition_date':'',
        'SQL':'',
        'exec_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    }
    #是否删除已有分区
    is_del=True
    
def getHiveDataOnSQL(SQL: str):
    '''
    使用spark读取hive数据
    '''
    df = spark.sql(SQL)
    return df


# 日志
def getLog():
    '''
    获得日志对象
    '''
    logger = getLogger(f'log_{datetime.datetime.now()}')
    logger.setLevel(INFO)
    rf_handler = StreamHandler(stderr)  # 默认是sys.stderr
    rf_handler.setLevel(DEBUG)
    f2_handler = FileHandler(settings.log_name)
    f2_handler.setLevel(INFO)
    f2_handler.setFormatter(Formatter(
        "%(asctime)s - %(levelname)s - %(filename)s[:%(lineno)d] - %(message)s"))
    f3_handler = FileHandler(settings.error_log_name)
    f3_handler.setLevel(ERROR)
    f3_handler.setFormatter(Formatter(
        "%(asctime)s - %(levelname)s - %(filename)s[:%(lineno)d] - %(message)s"))
    logger.addHandler(rf_handler)
    logger.addHandler(f2_handler)
    logger.addHandler(f3_handler)
    return logger


class MysqlDB:
    '''
    mysql操作类
    '''

    def __init__(self, params):
        self.params=params
    
    def getconn(self):
        import pymysql
        try:
            conn = pymysql.connect(**self.params)
        except pymysql.err.OperationalError:
            # 如果没有数据库则创建数据库
            db = self.params.pop('db')
            conn = pymysql.connect(**self.params)
            cursor = conn.cursor()
            cursor.execute(f"CREATE DATABASE {db}")
            cursor.close()
            conn.close()
            self.params['db'] = db
            conn = pymysql.connect(**self.params)
        return conn
    @staticmethod
    def is_alive(params):
        '''
        判断mysql连接是否存活
        '''
        import pymysql
        try:
            # 执行一个简单的查询
            conn = pymysql.connect(**params)
            with conn.cursor() as cursor:
                cursor.execute("SELECT 1")
            # 检查查询结果
            result = cursor.fetchone()
            if result[0] == 1:
                return True
            else:
                return False
        except:
            return False
    @classmethod
    def create_table(self,conn, table_name: str, columns: list, types: list):
        '''
        根据表名、字段、字段类型建表
        '''
        columns_str = ''
        id_str = '`id` INT AUTO_INCREMENT PRIMARY KEY,\n'
        for i, j in enumerate(columns):
            if j == 'id':
                id_str = ''
            columns_str += f'`{j}` {types[i]},\n'
        columns_str = columns_str.strip("\n").strip(",")
        sql = f'''
        CREATE TABLE IF NOT EXISTS {table_name} (
            {id_str}
            {columns_str}
        )ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
        '''
        try:
            cur = conn.cursor()
            cur.execute(sql)
            conn.commit()
            return True
        except Exception as e:
            conn.rollback()
            raise ValueError(f'创建表{table_name}失败-error{e}-sql:{sql}')

    def get_types(self, data: list):
        '''
        根据数据的类型转换为mysql中字段类型
        '''
        from datetime import datetime
        # 获取一行数据
        data = data[0]
        
        def getType(column):
            if isinstance(column, str):
                return 'TEXT'
            elif isinstance(column, datetime):
                return 'datetime'
            elif isinstance(column, int):
                return 'int'
            elif isinstance(column, float):
                return 'double'
            elif isinstance(column, bool):
                return 'bool'
            else:
                raise ValueError(f'不支持字段类型:{type(column)}')

        types = []
        for i in data:
            types.append(getType(i))
        return types
    @classmethod
    def check_table_exists(self,conn, table_name):
        '''
        查询表是否存在
        '''
        try:
            # 创建游标对象
            cursor = conn.cursor()
            # 执行SHOW TABLES查询
            cursor.execute("SHOW TABLES")
            # 获取返回的所有表名
            tables = cursor.fetchall()
            # 检查目标表是否在返回的结果中存在
            if (table_name,) in tables:
                return True
            else:
                return False
        except:
            return False

    def save(self, data: list, table_name: str, method: str = 'append'):
        '''
        简单便捷的保存方法
        自动建库建表
        支持append、replace模式
        '''
        import pandas as pd
        assert len(data) > 0, 'data不能为空'

        df = pd.DataFrame(data)
        conn=self.getconn()
        cur = conn.cursor()
        data = df.values.tolist()
        columns = df.columns.tolist()
        columns_str = ','.join([f'`{i}`' for i in columns])
        s_str = ','.join(['%s' for i in range(len(columns))])
        sql = f'insert into {table_name}({columns_str}) values ({s_str})'
        if self.check_table_exists(conn,table_name):
            if method == 'append':
                pass
            else:
                cur.execute(f'drop table {table_name}')
                conn.commit()
                types = self.get_types(data)
                self.create_table(conn,table_name, columns, types)
        else:
            types = self.get_types(data)
            self.create_table(conn,table_name, columns, types)
        try:
            cur.executemany(sql, data)
            conn.commit()
        except Exception as e:
            conn.rollback()
            raise ValueError(f'保存失败-error:{e}-sql:{sql}')





def hive_to_hive(log_data: dict, table: dict, Type):
    '''
    调用spark将hive数据导入hive，并记录日志
    '''
    logger.info(f"Thread {table['db1']}.{table['tb1']}->{table['db2']}.{table['tb2']}执行开始")
    try:
        log_data['db1'] = table.get('db1')
        log_data['tb1'] = table.get('tb1')
       
        # 删除partition_date分区已有数据
        try:
            if settings.is_del:
                spark.sql(
                    f'ALTER TABLE {table["db2"]}.{table["tb2"]} DROP PARTITION (partition_date="{partition_date}")')
        except:
            pass
        # 导入数据到hive
        where='where'
        if not table['update_column']:
            SQL=table.get('SQL')
            df = getHiveDataOnSQL(SQL)
        else:
            if table.get('SQL').find('where')!=-1:
                where=' and '
            if TYPE == 'all':
                SQL=table.get('SQL')
                df = getHiveDataOnSQL(SQL)
            elif TYPE == 'update':
                SQL=table.get('SQL')+f' {where} partition_date="{partition_date}"'
                df = getHiveDataOnSQL(SQL)
            elif TYPE == 'other':
                SQL=table.get('SQL')+f" {where} {table['update_column']} between '{start_time}' and '{end_time}'"
                df = getHiveDataOnSQL(SQL)
            else:
                logger.error(f"Type类型错误:{TYPE}")
                return
        log_data['SQL'] =SQL
        # 查询为空
        if df.rdd.isEmpty():
            log_data['status'] = True
            log_data['num'] = 0
            logger.info(f"表{table['db1']}.{table['tb1']}无更新---num:0")
            log_data['msg'] = f"表{table['db1']}.{table['tb1']}无更新"
            return
        num = df.count()
        df.createOrReplaceTempView(f"hive_to_hive_tmp")
        if not table['update_column']:
            spark.sql(f"INSERT  OVERWRITE table {table['db2']}.{table['tb2']}  SELECT * FROM hive_to_hive_tmp")
        else:
            spark.sql(f"INSERT INTO {table['db2']}.{table['tb2']} PARTITION(partition_date={partition_date}) SELECT * FROM hive_to_hive_tmp")

        logger.info(f"Thread {table['db1']}.{table['tb1']}->{table['db2']}.{table['tb2']}执行成功---num:{num}")
        log_data['status'] = True
        log_data['num'] = num
        log_data['msg'] = f"导入{table['db1']}.{table['tb1']}数据到hive表{table['db2']}.{table['tb2']}成功"
    except Exception as e:
        logger.error(f"{table['db1']}.{table['tb1']}->{table['db2']}.{table['tb2']}执行失败----error:{e}")
        log_data['msg'] = f"导入表{table['db1']}.{table['tb1']}失败----error:{e}"
    finally:
        Log_db.save([log_data], settings.log_tb_name, 'append')


def insertData(table: dict):
    '''
    导入
    '''
    table = table.copy()
    log_data = settings.log_data.copy()
    log_data['db2'] = table.get('db2')
    log_data['tb2'] = table.get('tb2')
    log_data['exec_time'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    hive_to_hive(log_data, table, TYPE)


if __name__ == '__main__':
    logger = getLog()
    Log_db = MysqlDB(settings.MYSQL_CONNECT)
    today_time = datetime.date.today()
#     yesterday_time = today_time + datetime.timedelta(-1)
    partition_date = today_time.strftime("%Y%m%d")
    try:
        TYPE = sys.argv[1]
        if TYPE not in ['all','update','other']:
            TYPE = settings.TYPE
    except:
        TYPE = settings.TYPE
    tables = settings.TABLES
    logger.info(f'导入开始--type:{TYPE}')
    start=time.time()
    ##1 全量  截止运行时间零点
    if TYPE == 'all':
        settings.log_data['executed_way'] = TYPE
        settings.log_data['partition_date'] = partition_date
        settings.log_data['local_row_update_time_start'] = today_time.strftime("%Y-%m-%d %H:%M:%S")
        settings.log_data['local_row_update_time_end'] = today_time.strftime("%Y-%m-%d %H:%M:%S")

    elif TYPE == 'update':
        settings.log_data['executed_way'] = TYPE
        settings.log_data['partition_date'] = partition_date
        settings.log_data['local_row_update_time_start'] = today_time.strftime("%Y-%m-%d %H:%M:%S")
        settings.log_data['local_row_update_time_end'] = today_time.strftime("%Y-%m-%d %H:%M:%S")

    ##3 指定时间段 追加放入昨天分区/指定分区里
    elif TYPE == 'other':
        start_time = sys.argv[2]
        end_time = sys.argv[3]
        # start_time = "2022-08-11 00:00:00"
        # end_time   = "2022-08-12 00:00:00"
        partition_date = sys.argv[4] if len(sys.argv) > 4 else partition_date
        where_str = "{} between '%s' and '%s' " % (start_time, end_time)
        is_merge_small_file = True
        settings.log_data['partition_date'] = partition_date
        settings.log_data['executed_way'] = TYPE
        settings.log_data['local_row_update_time_start'] = start_time
        settings.log_data['local_row_update_time_end'] = end_time
    else:
        raise ValueError(f'不能识别类型{TYPE}')
    for table in tables:
        insertData(table)
    end=time.time()
    
    logger.info(f'导入完成---总耗时{(end-start)}秒')

导入开始--type:update
Thread ods_jobfree.ods_jobfree_db_spider_t_jobs->dwd_jobfree.dwd_jobfree_db_spider_t_jobs执行开始
Thread ods_jobfree.ods_jobfree_db_spider_t_jobs->dwd_jobfree.dwd_jobfree_db_spider_t_jobs执行成功---num:396190
Thread ods_jobfree.ods_jobfree_db_jobfree_t_resume->dwd_jobfree.dwd_jobfree_db_jobfree_t_resume执行开始
Thread ods_jobfree.ods_jobfree_db_jobfree_t_resume->dwd_jobfree.dwd_jobfree_db_jobfree_t_resume执行成功---num:3
Thread dwd_jobfree.dwd_jobfree_db_jobfree_t_resume->dwd_jobfree.dwd_jobfree_db_jobfree_t_resume_train执行开始
Thread dwd_jobfree.dwd_jobfree_db_jobfree_t_resume->dwd_jobfree.dwd_jobfree_db_jobfree_t_resume_train执行成功---num:3
Thread dwd_jobfree.dwd_jobfree_db_spider_t_jobs->dwd_jobfree.dwd_jobfree_db_spider_t_jobs_train执行开始
Thread dwd_jobfree.dwd_jobfree_db_spider_t_jobs->dwd_jobfree.dwd_jobfree_db_spider_t_jobs_train执行成功---num:396190
导入完成---总耗时42.536142349243164秒


In [83]:
import datetime, sys, threading,time
from sys import stderr
from logging import getLogger, StreamHandler, Formatter, INFO, DEBUG, ERROR, FileHandler, WARNING

'''
spark.sql.shuffle.partitions Local模式下调小，一般为CPU核心数2-10倍
'''
import findspark

findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

# hdfs:///user/cdh/jars/mysql-connector-java-5.1.49.jar为传入的mysql-connector包
spark = SparkSession.builder \
    .appName("dwd_mysql加工") \
    .master('yarn') \
    .config('spark.sql.shuffle.partitions', '200') \
    .config('spark.jars', 'hdfs:///user/cdh/jars/mysql-connector-java-5.1.49.jar') \
    .getOrCreate()


# 配置参数
class settings:
    # 日志路径
    

    # 日志信息存储的mysql
    MYSQL_CONNECT = {
        'host': '10.8.16.83',
        'port': 3306,
        'user': 'root',
        'password': 'fiang123',
        'charset': 'utf8',
        'db': 'logs'
    }
    MYSQL_PARAMS = {
        'host': '10.8.16.83',
        'port': 3306,
        'user': 'root',
        'password': 'fiang123',
    }

    TABLES = [
        {"name": "智联招聘岗位信息", "db": "jobfree", "table": "jobs", "db_num": 0, "t_num": 0,
         'hive_db': 'dwd_jobfree', 'hive_table': 'dwd_jobfree_db_spider_t_jobs', "update_column": "publishtime"},
        
    ]
    #mysql2hive--0 还是hive2mysql--1
    outType=1
    log_tb_name=f"{'mysql2hive' if outType==0 else 'hive2mysql'}_spark"
    log_name = f'{log_tb_name}.log'
    error_log_name = f'{log_tb_name}_spark.error.log'
    TYPE = 'update'
    # 日志结构
    log_data = {
        'host': MYSQL_PARAMS.get('host'),
        'port': MYSQL_PARAMS.get('port'),
        'db_name': '',
        'tb_name': '',
        'status': False,
        'hive_db_name': '',
        'hive_tb_name': '',
        'outType':outType,
        'msg': '',
        'num': 0,
        'executed_way': TYPE,
        'local_row_update_time_start': '',
        'local_row_update_time_end': '',
        'partition_date':'',
        'exec_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    }
    #是否删除已有分区
    is_del=True
    
def getHiveData(table: dict):
    '''
    使用spark读取hive数据
    '''
    df = spark.sql(f'select * from {table["hive_db"]}.{table["hive_table"]} ')
    return df
def getMySQLData(params: dict, table: dict):
    '''
    使用spark读取mysql数据
    '''
    df = spark.read.format('jdbc') \
        .option('url',
                f"jdbc:mysql://{params.get('host')}:{params.get('port')}/{table.get('db')}?useSSL=false&useUnicode=true") \
        .option('dbtable', table.get('table')) \
        .option('user', params.get('user')) \
        .option('driver', 'com.mysql.jdbc.Driver') \
        .option('password', params.get('password')) \
        .load().withColumn('cdc_sync_date', F.current_timestamp())
    return df

# 日志
def getLog():
    '''
    获得日志对象
    '''
    logger = getLogger(f'log_{datetime.datetime.now()}')
    logger.setLevel(INFO)
    rf_handler = StreamHandler(stderr)  # 默认是sys.stderr
    rf_handler.setLevel(DEBUG)
    f2_handler = FileHandler(settings.log_name)
    f2_handler.setLevel(INFO)
    f2_handler.setFormatter(Formatter(
        "%(asctime)s - %(levelname)s - %(filename)s[:%(lineno)d] - %(message)s"))
    f3_handler = FileHandler(settings.error_log_name)
    f3_handler.setLevel(ERROR)
    f3_handler.setFormatter(Formatter(
        "%(asctime)s - %(levelname)s - %(filename)s[:%(lineno)d] - %(message)s"))
    logger.addHandler(rf_handler)
    logger.addHandler(f2_handler)
    logger.addHandler(f3_handler)
    return logger


class MysqlDB:
    '''
    mysql操作类
    '''

    def __init__(self, params):
        self.params=params
    
    def getconn(self):
        import pymysql
        try:
            conn = pymysql.connect(**self.params)
        except pymysql.err.OperationalError:
            # 如果没有数据库则创建数据库
            db = self.params.pop('db')
            conn = pymysql.connect(**self.params)
            cursor = conn.cursor()
            cursor.execute(f"CREATE DATABASE {db}")
            cursor.close()
            conn.close()
            self.params['db'] = db
            conn = pymysql.connect(**self.params)
        return conn
    @staticmethod
    def is_alive(params):
        '''
        判断mysql连接是否存活
        '''
        import pymysql
        try:
            # 执行一个简单的查询
            conn = pymysql.connect(**params)
            with conn.cursor() as cursor:
                cursor.execute("SELECT 1")
            # 检查查询结果
            result = cursor.fetchone()
            if result[0] == 1:
                return True
            else:
                return False
        except:
            return False
    @classmethod
    def create_table(self,conn, table_name: str, columns: list, types: list):
        '''
        根据表名、字段、字段类型建表
        '''
        columns_str = ''
        id_str = '`id` INT AUTO_INCREMENT PRIMARY KEY,\n'
        for i, j in enumerate(columns):
            if j == 'id':
                id_str = ''
            columns_str += f'`{j}` {types[i]},\n'
        columns_str = columns_str.strip("\n").strip(",")
        sql = f'''
        CREATE TABLE IF NOT EXISTS {table_name} (
            {id_str}
            {columns_str}
        )ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
        '''
        try:
            cur = conn.cursor()
            cur.execute(sql)
            conn.commit()
            return True
        except Exception as e:
            conn.rollback()
            raise ValueError(f'创建表{table_name}失败-error{e}-sql:{sql}')

    def get_types(self, data: list):
        '''
        根据数据的类型转换为mysql中字段类型
        '''
        from datetime import datetime
        # 获取一行数据
        data = data[0]
        
        def getType(column):
            if isinstance(column, str):
                return 'TEXT'
            elif isinstance(column, datetime):
                return 'datetime'
            elif isinstance(column, int):
                return 'int'
            elif isinstance(column, float):
                return 'double'
            elif isinstance(column, bool):
                return 'bool'
            else:
                raise ValueError(f'不支持字段类型:{type(column)}')

        types = []
        for i in data:
            types.append(getType(i))
        return types
    @classmethod
    def check_table_exists(self,conn, table_name):
        '''
        查询表是否存在
        '''
        try:
            # 创建游标对象
            cursor = conn.cursor()
            # 执行SHOW TABLES查询
            cursor.execute("SHOW TABLES")
            # 获取返回的所有表名
            tables = cursor.fetchall()
            # 检查目标表是否在返回的结果中存在
            if (table_name,) in tables:
                return True
            else:
                return False
        except:
            return False

    def save(self, data: list, table_name: str, method: str = 'append'):
        '''
        简单便捷的保存方法
        自动建库建表
        支持append、replace模式
        '''
        import pandas as pd
        assert len(data) > 0, 'data不能为空'

        df = pd.DataFrame(data)
        conn=self.getconn()
        cur = conn.cursor()
        data = df.values.tolist()
        columns = df.columns.tolist()
        columns_str = ','.join([f'`{i}`' for i in columns])
        s_str = ','.join(['%s' for i in range(len(columns))])
        sql = f'insert into {table_name}({columns_str}) values ({s_str})'
        if self.check_table_exists(conn,table_name):
            if method == 'append':
                pass
            else:
                cur.execute(f'drop table {table_name}')
                conn.commit()
                types = self.get_types(data)
                self.create_table(conn,table_name, columns, types)
        else:
            types = self.get_types(data)
            self.create_table(conn,table_name, columns, types)
        try:
            cur.executemany(sql, data)
            conn.commit()
        except Exception as e:
            conn.rollback()
            raise ValueError(f'保存失败-error:{e}-sql:{sql}')


class myThread(threading.Thread):
    def __init__(self, name, log_data, table, Type):
        super().__init__()
        self.name = name
        self.log_data = log_data.copy()
        self.table = table.copy()
        self.Type = Type

    def run(self):
        
        if settings.outType==0:
            run_res = mysql_to_hive(self.name, self.log_data, self.table, self.Type)
        else:
            run_res = hive_to_mysql(self.name, self.log_data, self.table, self.Type)


def hive_to_mysql(name, log_data: dict, table: dict, Type):
    '''
    调用spark将mysql数据导入hive，并记录日志
    '''
    logger.info(f"Thread {table['hive_db']}.{table['hive_table']}->{table['db']}.{table['table']}执行开始")
    params = settings.MYSQL_PARAMS.copy()
    try:
        log_data['db_name'] = table.get('db')
        log_data['tb_name'] = table.get('table')
        df = getHiveData(table)
        
        # 导入数据到mysql
        if TYPE == 'all':

            df = df.where(f"{table['update_column']}<'{today_time.strftime('%Y-%m-%d 23:59:59')}'")

        elif TYPE == 'update':
            df = df.where(
                f'partition_date="{partition_date}"').drop('partition_date')

        elif TYPE == 'other':
            df = df.where(f"{table['update_column']} between '{start_time}' and '{end_time}'")
        # 查询为空
        if df.rdd.isEmpty():
            log_data['status'] = True
            log_data['num'] = 0
            logger.info(f"表{table['hive_db']}.{table['hive_table']}无更新---num:0")
            log_data['msg'] = f"表{table['hive_db']}.{table['hive_table']}无更新"
            return
        num = df.count()
        df.write.format('jdbc') \
        .mode('append') \
        .option('url',
                f"jdbc:mysql://{params.get('host')}:{params.get('port')}/{table.get('db')}?useSSL=false&useUnicode=true") \
        .option('dbtable', table.get('table')) \
        .option('user', params.get('user')) \
        .option('driver', 'com.mysql.jdbc.Driver') \
        .option('password', params.get('password')) \
        .save()

        logger.info(f"Thread {table['hive_db']}.{table['hive_table']}->{table['db']}.{table['table']}执行成功---num:{num}")
        log_data['status'] = True
        log_data['num'] = num
        log_data['msg'] = f"导入{table['hive_db']}.{table['hive_table']}数据到{table['db']}.{table['table']}mysql表成功"
    except Exception as e:
        logger.error(f"{table['db']}.{table['table']}->{table['hive_db']}.{table['hive_table']}执行失败----error:{e}")
        log_data['msg'] = f"导入表{table['db']}.{table['table']}失败----error:{e}"
    finally:
        Log_db.save([log_data],settings.log_tb_name, 'append')
def mysql_to_hive(name, log_data: dict, table: dict, Type):
    '''
    调用spark将mysql数据导入hive，并记录日志
    '''
    logger.info(f"Thread {table['db']}.{table['table']}->{table['hive_db']}.{table['hive_table']}执行开始")
    params = settings.MYSQL_PARAMS.copy()
    try:
        log_data['db_name'] = table.get('db')
        log_data['tb_name'] = table.get('table')
        df = getMySQLData(params, table)
        # 删除partition_date分区已有数据
        try:
            if settings.is_del:
                spark.sql(
                    f'ALTER TABLE {table["hive_db"]}.{table["hive_table"]} DROP PARTITION (partition_date="{partition_date}")')
        except:
            pass
        # 导入数据到hive
        if TYPE == 'all':

            df = df.where(f"{table['update_column']}<'{today_time.strftime('%Y-%m-%d 23:59:59')}'")

        elif TYPE == 'update':
            df = df.where(
                f"{table['update_column']} between '{today_time.strftime('%Y-%m-%d 00:00:00')}' and '{today_time.strftime('%Y-%m-%d 23:59:59')}'")

        elif TYPE == 'other':
            df = df.where(f"{table['update_column']} between '{start_time}' and '{end_time}'")
        # 查询为空
        if df.rdd.isEmpty():
            log_data['status'] = True
            log_data['num'] = 0
            logger.info(f"表{table['db']}.{table['table']}无更新---num:0")
            log_data['msg'] = f"表{table['db']}.{table['table']}无更新"
            return
        num = df.count()
        df.createOrReplaceTempView(f"mysql_to_hive_{name}")
        spark.sql(
            f"INSERT INTO {table['hive_db']}.{table['hive_table']} PARTITION(partition_date={partition_date}) SELECT * FROM mysql_to_hive_{name}")

        logger.info(f"Thread {table['db']}.{table['table']}->{table['hive_db']}.{table['hive_table']}执行成功---num:{num}")
        log_data['status'] = True
        log_data['num'] = num
        log_data['msg'] = f"导入{table['db']}.{table['table']}数据到hive表{table['hive_db']}.{table['hive_table']}成功"
    except Exception as e:
        logger.error(f"{table['db']}.{table['table']}->{table['hive_db']}.{table['hive_table']}执行失败----error:{e}")
        log_data['msg'] = f"导入表{table['db']}.{table['table']}失败----error:{e}"
    finally:
        Log_db.save([log_data], settings.log_tb_name, 'append')


def insertData(table: dict):
    '''
    导入
    '''
    table = table.copy()
    log_data = settings.log_data.copy()
    log_data['hive_db_name'] = table.get('hive_db')
    log_data['hive_tb_name'] = table.get('hive_table')
    log_data['exec_time'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    # 创建多个线程
    threads = []
    # 单库单表
    if table['db_num'] == 0 and table['t_num'] == 0:
        t = myThread(f"{table['db']}_{table['table']}", log_data, table, TYPE)
        threads.append(t)
        t.start()
    ##单库多表
    elif table['db_num'] == 0 and table['t_num'] > 0:
        '''
        程序自动添加表下标并且依次遍历分表
        '''

        for i in range(table['t_num'] + 1):
            ##处理表名
            new_tb = table['table'].split('_')
            if len(new_tb) > 2:
                new_tb.pop()
                table['table'] = '_'.join(new_tb) + f'_{i}'
            else:
                table['table'] = table['table'] + f'_{i}'
            t = myThread(f"{table['db']}_{table['table']}", log_data, table, TYPE)
            threads.append(t)
            t.start()
    #     ##多库多表
    elif table['db_num'] > 0 and table['t_num'] > 0:
        '''
        程序自动添加库和表下标并且依次遍历分表
        '''
        for i in range(table['db_num'] + 1):
            ##处理库名
            new_db = table['db'].split('_')
            if len(new_db) > 2:
                new_db.pop()
                table['db'] = '_'.join(new_db) + f'_{i}'

            else:
                table['db'] = table['db'] + f'_{i}'
            for j in range(table['t_num'] + 1):
                ##处理表名
                new_tb = table['table'].split('_')
                if len(new_tb) > 2:
                    new_tb.pop()
                    table['table'] = '_'.join(new_tb) + f'_{j}'
                else:
                    table['table'] = table['table'] + f'_{j}'

                t = myThread(f"{table['db']}_{table['table']}", log_data, table, TYPE)
                threads.append(t)
                t.start()
    ##不能识别类型
    else:
        raise ValueError('db_num or t_num ValueError')
    for t in threads:
        t.join()


if __name__ == '__main__':
    logger = getLog()
    Log_db = MysqlDB(settings.MYSQL_CONNECT)
    today_time = datetime.date.today()+ datetime.timedelta(-1)
#     yesterday_time = today_time + datetime.timedelta(-1)
    partition_date =today_time.strftime("%Y%m%d")
    try:
        TYPE = sys.argv[1]
        if TYPE not in ['all','update','other']:
            TYPE = settings.TYPE
    except:
        TYPE = settings.TYPE
    tables = settings.TABLES
    logger.info(f'导入开始--type:{TYPE}')
    start=time.time()
    ##1 全量  截止运行时间零点
    if settings.outType==0:
        logger.info('mysql_to_hive start')
    else:
        logger.info('hive_to_mysql start')
    if TYPE == 'all':
        settings.log_data['executed_way'] = TYPE
        settings.log_data['partition_date'] = partition_date
        settings.log_data['local_row_update_time_start'] = today_time.strftime("%Y-%m-%d %H:%M:%S")
        settings.log_data['local_row_update_time_end'] = today_time.strftime("%Y-%m-%d %H:%M:%S")

    elif TYPE == 'update':
        settings.log_data['executed_way'] = TYPE
        settings.log_data['partition_date'] = partition_date
        settings.log_data['local_row_update_time_start'] = today_time.strftime("%Y-%m-%d %H:%M:%S")
        settings.log_data['local_row_update_time_end'] = today_time.strftime("%Y-%m-%d %H:%M:%S")

    ##3 指定时间段 追加放入昨天分区/指定分区里
    elif TYPE == 'other':
        start_time = sys.argv[2]
        end_time = sys.argv[3]
        # start_time = "2022-08-11 00:00:00"
        # end_time   = "2022-08-12 00:00:00"
        partition_date = sys.argv[4] if len(sys.argv) > 4 else partition_date
        where_str = "{} between '%s' and '%s' " % (start_time, end_time)
        is_merge_small_file = True
        settings.log_data['partition_date'] = partition_date
        settings.log_data['executed_way'] = TYPE
        settings.log_data['local_row_update_time_start'] = start_time
        settings.log_data['local_row_update_time_end'] = end_time
    else:
        raise ValueError(f'不能识别类型{TYPE}')
    for table in tables:
        insertData(table)
    end=time.time()
    
    logger.info(f'导入完成---总耗时{(end-start)}秒')

导入开始--type:update
hive_to_mysql start
Thread dwd_jobfree.dwd_jobfree_db_spider_t_jobs->jobfree.jobs执行开始
jobfree.jobs->dwd_jobfree.dwd_jobfree_db_spider_t_jobs执行失败----error:An error occurred while calling o2325.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1872.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1872.0 (TID 10800, cdh92, executor 9): java.sql.BatchUpdateException: Duplicate entry '316562' for key 'PRIMARY'
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.jdbc.Util.handleNewInstance(Util.java:403)
	at com.mysql.jdbc.Util.getInstance(Util.java:386)
	at com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.jav

导入完成---总耗时140.27099704742432秒


In [82]:
df2=spark.sql('SELECT jobid, education, workingexp, worktype, workcity, subjobtypelevel, skilllabel, propertycode, salary_min, salary_max, jobsummary FROM `dwd_jobfree`.`dwd_zhilian_db_spider_t_jobs`')
df2.show()

AnalysisException: "Table or view not found: `dwd_jobfree`.`dwd_zhilian_db_spider_t_jobs`; line 1 pos 140;\n'Project ['jobid, 'education, 'workingexp, 'worktype, 'workcity, 'subjobtypelevel, 'skilllabel, 'propertycode, 'salary_min, 'salary_max, 'jobsummary]\n+- 'UnresolvedRelation `dwd_jobfree`.`dwd_zhilian_db_spider_t_jobs`\n"

In [80]:
import datetime, sys, threading,time
from sys import stderr
from logging import getLogger, StreamHandler, Formatter, INFO, DEBUG, ERROR, FileHandler, WARNING

'''
spark.sql.shuffle.partitions Local模式下调小，一般为CPU核心数2-10倍
'''
import findspark

findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

# hdfs:///user/cdh/jars/mysql-connector-java-5.1.49.jar为传入的mysql-connector包
spark = SparkSession.builder \
    .appName("dwd_mysql加工") \
    .master('yarn') \
    .config('spark.sql.shuffle.partitions', '200') \
    .config('spark.jars', 'hdfs:///user/cdh/jars/mysql-connector-java-5.1.49.jar') \
    .getOrCreate()


# 配置参数
class settings:
    # 日志路径
    

    # 日志信息存储的mysql
    MYSQL_CONNECT = {
        'host': '10.8.16.83',
        'port': 3306,
        'user': 'root',
        'password': 'fiang123',
        'charset': 'utf8',
        'db': 'logs'
    }
    MYSQL_PARAMS = {
        'host': '10.8.16.83',
        'port': 3306,
        'user': 'root',
        'password': 'fiang123',
    }

    TABLES = [
        {"name": "智联招聘岗位信息", "db": "jobfree", "table": "jobs", "db_num": 0, "t_num": 0,
         'hive_db': 'dwd_jobfree', 'hive_table': 'dwd_jobfree_db_spider_t_jobs', "update_column": "publishtime"},
        
    ]
    #mysql2hive--0 还是hive2mysql--1
    outType=1
    log_tb_name=f"{'mysql2hive' if outType==0 else 'hive2mysql'}_spark"
    log_name = f'{log_tb_name}.log'
    error_log_name = f'{log_tb_name}_spark.error.log'
    TYPE = 'update'
    # 日志结构
    log_data = {
        'host': MYSQL_PARAMS.get('host'),
        'port': MYSQL_PARAMS.get('port'),
        'db_name': '',
        'tb_name': '',
        'status': False,
        'hive_db_name': '',
        'hive_tb_name': '',
        'outType':outType,
        'msg': '',
        'num': 0,
        'executed_way': TYPE,
        'local_row_update_time_start': '',
        'local_row_update_time_end': '',
        'partition_date':'',
        'exec_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    }
    #是否删除已有分区
    is_del=True
    
def getHiveData(table: dict):
    '''
    使用spark读取hive数据
    '''
    df = spark.sql(f'select * from {table["hive_db"]}.{table["hive_table"]} ')
    return df
def getMySQLData(params: dict, table: dict):
    '''
    使用spark读取mysql数据
    '''
    df = spark.read.format('jdbc') \
        .option('url',
                f"jdbc:mysql://{params.get('host')}:{params.get('port')}/{table.get('db')}?useSSL=false&useUnicode=true") \
        .option('dbtable', table.get('table')) \
        .option('user', params.get('user')) \
        .option('driver', 'com.mysql.jdbc.Driver') \
        .option('password', params.get('password')) \
        .load().withColumn('cdc_sync_date', F.current_timestamp())
    return df

# 日志
def getLog():
    '''
    获得日志对象
    '''
    logger = getLogger(f'log_{datetime.datetime.now()}')
    logger.setLevel(INFO)
    rf_handler = StreamHandler(stderr)  # 默认是sys.stderr
    rf_handler.setLevel(DEBUG)
    f2_handler = FileHandler(settings.log_name)
    f2_handler.setLevel(INFO)
    f2_handler.setFormatter(Formatter(
        "%(asctime)s - %(levelname)s - %(filename)s[:%(lineno)d] - %(message)s"))
    f3_handler = FileHandler(settings.error_log_name)
    f3_handler.setLevel(ERROR)
    f3_handler.setFormatter(Formatter(
        "%(asctime)s - %(levelname)s - %(filename)s[:%(lineno)d] - %(message)s"))
    logger.addHandler(rf_handler)
    logger.addHandler(f2_handler)
    logger.addHandler(f3_handler)
    return logger


class MysqlDB:
    '''
    mysql操作类
    '''

    def __init__(self, params):
        self.params=params
    
    def getconn(self):
        import pymysql
        try:
            conn = pymysql.connect(**self.params)
        except pymysql.err.OperationalError:
            # 如果没有数据库则创建数据库
            db = self.params.pop('db')
            conn = pymysql.connect(**self.params)
            cursor = conn.cursor()
            cursor.execute(f"CREATE DATABASE {db}")
            cursor.close()
            conn.close()
            self.params['db'] = db
            conn = pymysql.connect(**self.params)
        return conn
    @staticmethod
    def is_alive(params):
        '''
        判断mysql连接是否存活
        '''
        import pymysql
        try:
            # 执行一个简单的查询
            conn = pymysql.connect(**params)
            with conn.cursor() as cursor:
                cursor.execute("SELECT 1")
            # 检查查询结果
            result = cursor.fetchone()
            if result[0] == 1:
                return True
            else:
                return False
        except:
            return False
    @classmethod
    def create_table(self,conn, table_name: str, columns: list, types: list):
        '''
        根据表名、字段、字段类型建表
        '''
        columns_str = ''
        id_str = '`id` INT AUTO_INCREMENT PRIMARY KEY,\n'
        for i, j in enumerate(columns):
            if j == 'id':
                id_str = ''
            columns_str += f'`{j}` {types[i]},\n'
        columns_str = columns_str.strip("\n").strip(",")
        sql = f'''
        CREATE TABLE IF NOT EXISTS {table_name} (
            {id_str}
            {columns_str}
        )ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
        '''
        try:
            cur = conn.cursor()
            cur.execute(sql)
            conn.commit()
            return True
        except Exception as e:
            conn.rollback()
            raise ValueError(f'创建表{table_name}失败-error{e}-sql:{sql}')

    def get_types(self, data: list):
        '''
        根据数据的类型转换为mysql中字段类型
        '''
        from datetime import datetime
        # 获取一行数据
        data = data[0]
        
        def getType(column):
            if isinstance(column, str):
                return 'TEXT'
            elif isinstance(column, datetime):
                return 'datetime'
            elif isinstance(column, int):
                return 'int'
            elif isinstance(column, float):
                return 'double'
            elif isinstance(column, bool):
                return 'bool'
            else:
                raise ValueError(f'不支持字段类型:{type(column)}')

        types = []
        for i in data:
            types.append(getType(i))
        return types
    @classmethod
    def check_table_exists(self,conn, table_name):
        '''
        查询表是否存在
        '''
        try:
            # 创建游标对象
            cursor = conn.cursor()
            # 执行SHOW TABLES查询
            cursor.execute("SHOW TABLES")
            # 获取返回的所有表名
            tables = cursor.fetchall()
            # 检查目标表是否在返回的结果中存在
            if (table_name,) in tables:
                return True
            else:
                return False
        except:
            return False

    def save(self, data: list, table_name: str, method: str = 'append'):
        '''
        简单便捷的保存方法
        自动建库建表
        支持append、replace模式
        '''
        import pandas as pd
        assert len(data) > 0, 'data不能为空'

        df = pd.DataFrame(data)
        conn=self.getconn()
        cur = conn.cursor()
        data = df.values.tolist()
        columns = df.columns.tolist()
        columns_str = ','.join([f'`{i}`' for i in columns])
        s_str = ','.join(['%s' for i in range(len(columns))])
        sql = f'insert into {table_name}({columns_str}) values ({s_str})'
        if self.check_table_exists(conn,table_name):
            if method == 'append':
                pass
            else:
                cur.execute(f'drop table {table_name}')
                conn.commit()
                types = self.get_types(data)
                self.create_table(conn,table_name, columns, types)
        else:
            types = self.get_types(data)
            self.create_table(conn,table_name, columns, types)
        try:
            cur.executemany(sql, data)
            conn.commit()
        except Exception as e:
            conn.rollback()
            raise ValueError(f'保存失败-error:{e}-sql:{sql}')


class myThread(threading.Thread):
    def __init__(self, name, log_data, table, Type):
        super().__init__()
        self.name = name
        self.log_data = log_data.copy()
        self.table = table.copy()
        self.Type = Type

    def run(self):
        
        if settings.outType==0:
            run_res = mysql_to_hive(self.name, self.log_data, self.table, self.Type)
        else:
            run_res = hive_to_mysql(self.name, self.log_data, self.table, self.Type)


def hive_to_mysql(name, log_data: dict, table: dict, Type):
    '''
    调用spark将mysql数据导入hive，并记录日志
    '''
    logger.info(f"Thread {table['hive_db']}.{table['hive_table']}->{table['db']}.{table['table']}执行开始")
    params = settings.MYSQL_PARAMS.copy()
    try:
        log_data['db_name'] = table.get('db')
        log_data['tb_name'] = table.get('table')
        df = getHiveData(table)
        
        # 导入数据到mysql
        if TYPE == 'all':

            df = df.where(f"{table['update_column']}<'{today_time.strftime('%Y-%m-%d 23:59:59')}'")

        elif TYPE == 'update':
            df = df.where(
                f'partition_date="{partition_date}"').drop('partition_date')

        elif TYPE == 'other':
            df = df.where(f"{table['update_column']} between '{start_time}' and '{end_time}'")
        # 查询为空
        if df.rdd.isEmpty():
            log_data['status'] = True
            log_data['num'] = 0
            logger.info(f"表{table['hive_db']}.{table['hive_table']}无更新---num:0")
            log_data['msg'] = f"表{table['hive_db']}.{table['hive_table']}无更新"
            return
        num = df.count()
        df.write.format('jdbc') \
        .mode('append') \
        .option('url',
                f"jdbc:mysql://{params.get('host')}:{params.get('port')}/{table.get('db')}?useSSL=false&useUnicode=true") \
        .option('dbtable', table.get('table')) \
        .option('user', params.get('user')) \
        .option('driver', 'com.mysql.jdbc.Driver') \
        .option('password', params.get('password')) \
        .save()

        logger.info(f"Thread {table['hive_db']}.{table['hive_table']}->{table['db']}.{table['table']}执行成功---num:{num}")
        log_data['status'] = True
        log_data['num'] = num
        log_data['msg'] = f"导入{table['hive_db']}.{table['hive_table']}数据到{table['db']}.{table['table']}mysql表成功"
    except Exception as e:
        logger.error(f"{table['db']}.{table['table']}->{table['hive_db']}.{table['hive_table']}执行失败----error:{e}")
        log_data['msg'] = f"导入表{table['db']}.{table['table']}失败----error:{e}"
    finally:
        Log_db.save([log_data],settings.log_tb_name, 'append')
def mysql_to_hive(name, log_data: dict, table: dict, Type):
    '''
    调用spark将mysql数据导入hive，并记录日志
    '''
    logger.info(f"Thread {table['db']}.{table['table']}->{table['hive_db']}.{table['hive_table']}执行开始")
    params = settings.MYSQL_PARAMS.copy()
    try:
        log_data['db_name'] = table.get('db')
        log_data['tb_name'] = table.get('table')
        df = getMySQLData(params, table)
        # 删除partition_date分区已有数据
        try:
            if settings.is_del:
                spark.sql(
                    f'ALTER TABLE {table["hive_db"]}.{table["hive_table"]} DROP PARTITION (partition_date="{partition_date}")')
        except:
            pass
        # 导入数据到hive
        if TYPE == 'all':

            df = df.where(f"{table['update_column']}<'{today_time.strftime('%Y-%m-%d 23:59:59')}'")

        elif TYPE == 'update':
            df = df.where(
                f"{table['update_column']} between '{today_time.strftime('%Y-%m-%d 00:00:00')}' and '{today_time.strftime('%Y-%m-%d 23:59:59')}'")

        elif TYPE == 'other':
            df = df.where(f"{table['update_column']} between '{start_time}' and '{end_time}'")
        # 查询为空
        if df.rdd.isEmpty():
            log_data['status'] = True
            log_data['num'] = 0
            logger.info(f"表{table['db']}.{table['table']}无更新---num:0")
            log_data['msg'] = f"表{table['db']}.{table['table']}无更新"
            return
        num = df.count()
        df.createOrReplaceTempView(f"mysql_to_hive_{name}")
        spark.sql(
            f"INSERT INTO {table['hive_db']}.{table['hive_table']} PARTITION(partition_date={partition_date}) SELECT * FROM mysql_to_hive_{name}")

        logger.info(f"Thread {table['db']}.{table['table']}->{table['hive_db']}.{table['hive_table']}执行成功---num:{num}")
        log_data['status'] = True
        log_data['num'] = num
        log_data['msg'] = f"导入{table['db']}.{table['table']}数据到hive表{table['hive_db']}.{table['hive_table']}成功"
    except Exception as e:
        logger.error(f"{table['db']}.{table['table']}->{table['hive_db']}.{table['hive_table']}执行失败----error:{e}")
        log_data['msg'] = f"导入表{table['db']}.{table['table']}失败----error:{e}"
    finally:
        Log_db.save([log_data], settings.log_tb_name, 'append')


def insertData(table: dict):
    '''
    导入
    '''
    table = table.copy()
    log_data = settings.log_data.copy()
    log_data['hive_db_name'] = table.get('hive_db')
    log_data['hive_tb_name'] = table.get('hive_table')
    log_data['exec_time'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    # 创建多个线程
    threads = []
    # 单库单表
    if table['db_num'] == 0 and table['t_num'] == 0:
        t = myThread(f"{table['db']}_{table['table']}", log_data, table, TYPE)
        threads.append(t)
        t.start()
    ##单库多表
    elif table['db_num'] == 0 and table['t_num'] > 0:
        '''
        程序自动添加表下标并且依次遍历分表
        '''

        for i in range(table['t_num'] + 1):
            ##处理表名
            new_tb = table['table'].split('_')
            if len(new_tb) > 2:
                new_tb.pop()
                table['table'] = '_'.join(new_tb) + f'_{i}'
            else:
                table['table'] = table['table'] + f'_{i}'
            t = myThread(f"{table['db']}_{table['table']}", log_data, table, TYPE)
            threads.append(t)
            t.start()
    #     ##多库多表
    elif table['db_num'] > 0 and table['t_num'] > 0:
        '''
        程序自动添加库和表下标并且依次遍历分表
        '''
        for i in range(table['db_num'] + 1):
            ##处理库名
            new_db = table['db'].split('_')
            if len(new_db) > 2:
                new_db.pop()
                table['db'] = '_'.join(new_db) + f'_{i}'

            else:
                table['db'] = table['db'] + f'_{i}'
            for j in range(table['t_num'] + 1):
                ##处理表名
                new_tb = table['table'].split('_')
                if len(new_tb) > 2:
                    new_tb.pop()
                    table['table'] = '_'.join(new_tb) + f'_{j}'
                else:
                    table['table'] = table['table'] + f'_{j}'

                t = myThread(f"{table['db']}_{table['table']}", log_data, table, TYPE)
                threads.append(t)
                t.start()
    ##不能识别类型
    else:
        raise ValueError('db_num or t_num ValueError')
    for t in threads:
        t.join()


if __name__ == '__main__':
    logger = getLog()
    Log_db = MysqlDB(settings.MYSQL_CONNECT)
    today_time = datetime.date.today()
#     yesterday_time = today_time + datetime.timedelta(-1)
    partition_date =today_time.strftime("%Y%m%d")
    try:
        TYPE = sys.argv[1]
        if TYPE not in ['all','update','other']:
            TYPE = settings.TYPE
    except:
        TYPE = settings.TYPE
    tables = settings.TABLES
    logger.info(f'导入开始--type:{TYPE}')
    start=time.time()
    ##1 全量  截止运行时间零点
    if settings.outType==0:
        logger.info('mysql_to_hive start')
    else:
        logger.info('hive_to_mysql start')
    if TYPE == 'all':
        settings.log_data['executed_way'] = TYPE
        settings.log_data['partition_date'] = partition_date
        settings.log_data['local_row_update_time_start'] = today_time.strftime("%Y-%m-%d %H:%M:%S")
        settings.log_data['local_row_update_time_end'] = today_time.strftime("%Y-%m-%d %H:%M:%S")

    elif TYPE == 'update':
        settings.log_data['executed_way'] = TYPE
        settings.log_data['partition_date'] = partition_date
        settings.log_data['local_row_update_time_start'] = today_time.strftime("%Y-%m-%d %H:%M:%S")
        settings.log_data['local_row_update_time_end'] = today_time.strftime("%Y-%m-%d %H:%M:%S")

    ##3 指定时间段 追加放入昨天分区/指定分区里
    elif TYPE == 'other':
        start_time = sys.argv[2]
        end_time = sys.argv[3]
        # start_time = "2022-08-11 00:00:00"
        # end_time   = "2022-08-12 00:00:00"
        partition_date = sys.argv[4] if len(sys.argv) > 4 else partition_date
        where_str = "{} between '%s' and '%s' " % (start_time, end_time)
        is_merge_small_file = True
        settings.log_data['partition_date'] = partition_date
        settings.log_data['executed_way'] = TYPE
        settings.log_data['local_row_update_time_start'] = start_time
        settings.log_data['local_row_update_time_end'] = end_time
    else:
        raise ValueError(f'不能识别类型{TYPE}')
    for table in tables:
        insertData(table)
    end=time.time()
    
    logger.info(f'导入完成---总耗时{(end-start)}秒')

导入开始--type:update
hive_to_mysql start
Thread dwd_jobfree.dwd_jobfree_db_spider_t_jobs->jobfree.jobs执行开始
jobfree.jobs->dwd_jobfree.dwd_jobfree_db_spider_t_jobs执行失败----error:An error occurred while calling o2180.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1868.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1868.0 (TID 10773, cdh91, executor 10): java.sql.BatchUpdateException: Duplicate entry '316562' for key 'PRIMARY'
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.jdbc.Util.handleNewInstance(Util.java:403)
	at com.mysql.jdbc.Util.getInstance(Util.java:386)
	at com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.ja

导入完成---总耗时131.40171694755554秒


In [173]:
data=[[1,1,'click'],[1,2,'start'],[1,2,'click'],[1,1,'click']]

In [174]:
df=spark.createDataFrame(data,schema='uid:string,gid:string,action:string')

In [295]:
df2.show()

AttributeError: 'str' object has no attribute 'show'

In [283]:
getHiveData({
    'hive_db':'dwd_jobfree',
    'hive_table':'dwd_zhilian_db_spider_t_jobs'
}).show()

+-----------+--------------------+---------+-------------------+--------------------+--------------------+------------------------------------+--------------------+---------------------+--------+------------+-------------+-----------+-----------+----------+------------+-------------------+------+------------+--------+----------+---------------+-------------------+---------------------------------+--------+--------+----------+---------+--------------------+--------------------------+--------------------+--------------------+-----------+--------------------+
|      JobId|              number|education|   firstPublishTime| industryCompanyTags|        industryName|                          jobSummary|         positionUrl|positionSourceTypeUrl|property|propertyCode|recruitNumber|   salary60| salaryReal|salaryType|  skillLabel|        publishTime|cityId|cityDistrict|streetId|streetName|subJobTypeLevel|subJobTypeLevelName|                   welfareTagList|workCity|workType|workingExp|companyI

In [187]:
click_weight = 1
star_weight = 5

# 将点击和收藏转换为权重
df = df.withColumn("weight", when(df["action"] == "click", click_weight).otherwise(star_weight))
df.show()

+---+---+------+------+
|uid|gid|action|weight|
+---+---+------+------+
|  1|  1| click|     1|
|  1|  2| start|    10|
|  1|  2| click|     1|
|  1|  1| click|     1|
+---+---+------+------+



In [197]:
user_counts = df.groupBy("uid",'gid').sum().withColumnRenamed('sum(weight)','weight')

In [198]:
user_counts.show()

+---+----------+
|uid|user_count|
+---+----------+
|  1|         4|
+---+----------+



In [194]:
df.join(user_counts, "uid", "left") \
                     .selectExpr("uid", "hid", "weight", "user_count")

AnalysisException: "Reference 'weight' is ambiguous, could be: weight, weight.; line 1 pos 0"

In [285]:
我=1

In [286]:
我

1

In [229]:
Json=json.loads(re.findall('__INITIAL_STATE__=(.*?})</script>',text)[0])

In [230]:
jobtype=Json.get('baseData').get('jobType')
jobtype.pop(0)

{'code': '-1',
 'parentCode': None,
 'name': '不限',
 'en_name': '',
 'deleted': False,
 'sublist': []}

In [231]:
Json.get('')

[{'code': '19000000000000',
  'parentCode': None,
  'name': '销售/商务拓展',
  'en_name': 'Sales/BD',
  'deleted': False,
  'sublist': [{'code': '19000200000000',
    'parentCode': '19000000000000',
    'name': '销售顾问',
    'en_name': 'Salesperson',
    'deleted': False,
    'sublist': [{'code': '19000200100000',
      'parentCode': '19000200000000',
      'name': '销售顾问',
      'en_name': 'Sales Consultant',
      'deleted': False,
      'sublist': []},
     {'code': '19000200020000',
      'parentCode': '19000200000000',
      'name': '大客户代表',
      'en_name': 'Key Account Representative',
      'deleted': False,
      'sublist': []},
     {'code': '19000200030000',
      'parentCode': '19000200000000',
      'name': '电话销售',
      'en_name': 'Tele-sales',
      'deleted': False,
      'sublist': []},
     {'code': '19000200060000',
      'parentCode': '19000200000000',
      'name': '商品销售',
      'en_name': 'Product Sales',
      'deleted': False,
      'sublist': []},
     {'code': '1900020

In [232]:
JobTypeList=[]
for i in jobtype:
    code1=i.get('code')
    for j in i.get('sublist'):
        code2=j.get('code')
        for k in j.get('sublist'):
            code3=k.get('code')
            JobTypeList.append(f'{code1},{code2},{code3}')

In [238]:
len(JobTypeList)

1403

In [237]:
len(['530', '531', '532', '533', '534', '535', '536', '537', '538', '539', '540', '541', '542', '543', '544', '545', '546', '547', '548', '549', '550', '551', '552', '553', '554', '555', '556', '557', '558', '559', '560','562;561;563'])

32

In [439]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
string_indexer = StringIndexer(inputCol='workcity', outputCol='workcity_index')
assembler = VectorAssembler(inputCols=['eduhighestleveltranslation', 'workingexp', 'worktype', 'subjobtypelevel', 'skilllabel', 'propertycode', 'preferredsalarymin', 'preferredsalarymax', 'selfevaluate'], outputCol='features')

{'name': '南方医科大学南方医院',
 'level': '三级甲等',
 'address': '广东省广州市广州大道北路1838号',
 'phone': '020-61641114',
 'concerns': '6138',
 'reserves': '10.1万'}

In [419]:
r=requests.get('https://www.wedoctor.com/commentslist/h-a986c8d76-c720-11e1-913c-5cf9dd2e7135000/1-0?pageNo=2&sign=A069467567A3E9034EF43D42FFDD9B2B8357C5F677C1698B5A2A8DC114FC84985AAEFC57F0D5107C383B8B5A0499A956386FE0DD05C6ADB8581E34B2E727DE72&timestamp=1697684520534')

In [4]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors, DenseVector
from pyspark.ml.feature import Normalizer,MinMaxScaler
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.sql.types import *
import jieba
import numpy as np
spark = SparkSession.builder \
    .appName("model") \
    .master('yarn') \
    .config('spark.sql.shuffle.partitions', '200') \
    .config('spark.jars', 'hdfs:///user/cdh/jars/mysql-connector-java-5.1.49.jar') \
    .getOrCreate()
def manhattan_similarity(vec1, vec2):
    distance = vec1.squared_distance(vec2)
    similarity = 1 / (1 + distance)
    return float(similarity)
def jieba_cut(x):
    x=x.replace('职位','').replace('岗位','').replace('描述','').replace('职责','')
    return list(filter(lambda x:len(x)>1,jieba.lcut(x)))
def cos_similarity(Vector1, Vector2):
    similarity = Vector1.dot(Vector2) / (np.linalg.norm(Vector1) * np.linalg.norm(Vector2))
    return float(similarity)
manhattan_similarity_udf = udf(manhattan_similarity, DoubleType())
cos_similarity_udf = udf(cos_similarity, DoubleType())
jieba_cut_udf = udf(jieba_cut, ArrayType(StringType()))
# 加载用户画像数据
user_profile_df = spark.sql('SELECT * FROM `dwd_jobfree`.`dwd_jobfree_db_jobfree_t_resume_train`  ')
user_profile_df =user_profile_df.withColumn('summary',jieba_cut_udf(user_profile_df['selfevaluate'])).withColumn('skilllabel',split(col('skilllabel'),'/'))
# 加载招聘信息数据
job_postings_df = spark.sql('SELECT * FROM `dwd_jobfree`.`dwd_jobfree_db_spider_t_jobs_train`  ')
job_postings_df =job_postings_df.withColumn('summary',jieba_cut_udf(job_postings_df['jobsummary'])).withColumn('skilllabel',split(col('skilllabel'),'/'))

In [5]:
def cos_similarity(Vector1, Vector2):
    similarity = Vector1.dot(Vector2) / (np.linalg.norm(Vector1) * np.linalg.norm(Vector2))
    return float(similarity)
cos_similarity_udf = udf(cos_similarity, DoubleType())
def pearson_similarity(vector1, vector2):
    # 转换为 NumPy 数组
    vector1 = np.array(vector1)
    vector2 = np.array(vector2)
    
    # 计算向量的平均值
    mean1 = np.mean(vector1)
    mean2 = np.mean(vector2)
    
    # 计算向量的差值
    diff1 = np.subtract(vector1, mean1)
    diff2 = np.subtract(vector2, mean2)
    
    # 计算皮尔逊相关性系数
    similarity = np.sum(diff1 * diff2) / (np.sqrt(np.sum(diff1 ** 2)) * np.sqrt(np.sum(diff2 ** 2)))
    
    return float(similarity)
pearson_similarity_udf = udf(pearson_similarity, DoubleType())

In [6]:
from pyspark.ml.feature import HashingTF, IDF 
hashing_TF = HashingTF(inputCol="summary", outputCol="words", numFeatures=20)  
job_df = hashing_TF.transform(job_postings_df)  
user_df=hashing_TF.transform(user_profile_df)
hashing_TF2 = HashingTF(inputCol="skilllabel", outputCol="words2", numFeatures=20)  
job_df = hashing_TF2.transform(job_df)  
user_df=hashing_TF2.transform(user_df)

In [5]:
idf = IDF(inputCol="words", outputCol="text_features")  
idfModel = idf.fit(job_df)  
idf2 = IDF(inputCol="words2", outputCol="text_features2")  
idfModel2 = idf2.fit(job_df)  

Py4JJavaError: An error occurred while calling o193.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most recent failure: Lost task 3.3 in stage 0.0 (TID 10, cdh90, executor 5): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 361, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/opt/cloudera/parcels/CDH/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 236, in read_udfs
    arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type, runner_conf)
  File "/opt/cloudera/parcels/CDH/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 163, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/opt/cloudera/parcels/CDH/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 64, in read_command
    command = serializer._read_with_length(file)
  File "/opt/cloudera/parcels/CDH/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length
    return self.loads(obj)
  File "/opt/cloudera/parcels/CDH/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 577, in loads
    return pickle.loads(obj, encoding=encoding)
  File "/opt/cloudera/parcels/CDH/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 875, in subimport
    __import__(name)
ModuleNotFoundError: No module named 'jieba'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1334)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1145)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1145)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$25.apply(RDD.scala:1146)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$25.apply(RDD.scala:1146)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2178)
	at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1098)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1092)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1161)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1137)
	at org.apache.spark.mllib.feature.IDF.fit(IDF.scala:54)
	at org.apache.spark.ml.feature.IDF.fit(IDF.scala:92)
	at org.apache.spark.ml.feature.IDF.fit(IDF.scala:68)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 361, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/opt/cloudera/parcels/CDH/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 236, in read_udfs
    arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type, runner_conf)
  File "/opt/cloudera/parcels/CDH/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 163, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/opt/cloudera/parcels/CDH/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 64, in read_command
    command = serializer._read_with_length(file)
  File "/opt/cloudera/parcels/CDH/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length
    return self.loads(obj)
  File "/opt/cloudera/parcels/CDH/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 577, in loads
    return pickle.loads(obj, encoding=encoding)
  File "/opt/cloudera/parcels/CDH/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 875, in subimport
    __import__(name)
ModuleNotFoundError: No module named 'jieba'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1334)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1145)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1145)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$25.apply(RDD.scala:1146)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$25.apply(RDD.scala:1146)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
idfModel.save('hdfs://user/cdh/model/words.model')
idfModel2.save('hdfs://user/cdh/model/words2.model')

In [None]:
job_df = idfModel.transform(job_df) 
user_df = idfModel.transform(user_df)
job_df = idfModel2.transform(job_df) 
user_df = idfModel2.transform(user_df) 

In [986]:
from pyspark.ml.feature import PCA
assembler = VectorAssembler(inputCols=["text_features", "text_features2"], outputCol="features")
dfWithFeatures1 = assembler.transform(job_df)
dfWithFeatures2 = assembler.transform(user_df)
# 应用 PCA 进行降维
pca = PCA(k=1, inputCol="features", outputCol="pcaFeatures")
model1 = pca.fit(dfWithFeatures1)


KeyboardInterrupt: 

In [None]:
user_df = model1.transform(dfWithFeatures2).withColumn('pcaFeatures',to_array('pcaFeatures')[0])
job_df = model1.transform(dfWithFeatures1).withColumn('pcaFeatures',to_array('pcaFeatures')[0])

In [7]:
user_profile_df=user_df.drop('selfevaluate','skilllabel','summary','words','words2','text_features','text_features2','features','pcaFeatures')

In [8]:
user_profile_df.show()

+-------+---------------+--------------+------------+--------+---------------+------------+------------------+------------------+
|user_id|eduhighestlevel|workingexpcode|worktypecode|workcity|subjobtypelevel|propertycode|preferredsalarymin|preferredsalarymax|
+-------+---------------+--------------+------------+--------+---------------+------------+------------------+------------------+
|      3|              5|             1|           1|    1666| 14000600100000|           5|              3000|              6000|
|      1|              5|             1|           2|    1590|  9000300040000|           5|             10000|             20000|
|      2|              5|             0|           3|    1666|  7000200170000|           5|              1000|              5000|
+-------+---------------+--------------+------------+--------+---------------+------------+------------------+------------------+



In [9]:
job_postings_df =job_df.drop('jobsummary','skilllabel','summary','words','words2','text_features','text_features2','features','pcaFeatures')

In [10]:
job_postings_df.show()

+------+-------------+--------------+------------+--------+---------------+------------+----------+----------+
|job_id|educationcode|workingexpcode|worktypecode|workcity|subjobtypelevel|propertycode|salary_min|salary_max|
+------+-------------+--------------+------------+--------+---------------+------------+----------+----------+
|     1|            5|            -1|           2|    1590|  3000100050000|           2|         0|         0|
|     2|            5|            -1|           2|    1590| 19000200090000|           2|         0|         0|
|     3|            5|             0|           2|    1590|  8000100030000|           8|         0|         0|
|     4|            5|             0|           2|    1590|  8000100030000|           8|         0|         0|
|     5|            5|             0|           2|    1590|  8000100030000|           8|         0|         0|
|     6|            5|             0|           2|    1590|  8000100030000|           8|         0|         0|
|

In [11]:

# 合并用户画像特征为一个向量列
assembler = VectorAssembler(inputCols=user_profile_df.columns[1:], outputCol="features")
user_profile_df = assembler.transform(user_profile_df)
# 合并招聘信息特征为一个向量列
assembler = VectorAssembler(inputCols=job_postings_df.columns[1:], outputCol="features")
job_postings_df = assembler.transform(job_postings_df)
# 标准化向量列
minmax = MinMaxScaler(inputCol="features", outputCol="job_features_norm")
minmax_model=minmax.fit(job_postings_df)


In [25]:
user_profile_df.collect()

[Row(user_id=1, eduhighestlevel=5, workingexpcode=1, worktypecode=2, workcity=1590, subjobtypelevel=9000300040000, propertycode=5, preferredsalarymin=10000, preferredsalarymax=20000, features=DenseVector([5.0, 1.0, 2.0, 1590.0, 9000300040000.0, 5.0, 10000.0, 20000.0]))]

In [26]:
job_postings_df.collect()

[Row(job_id=266052, educationcode=4, workingexpcode=-1, worktypecode=2, workcity=1593, subjobtypelevel=1000100010000, propertycode=5, salary_min=3001, salary_max=6000, features=DenseVector([4.0, -1.0, 2.0, 1593.0, 1000100010000.0, 5.0, 3001.0, 6000.0])),
 Row(job_id=99631, educationcode=5, workingexpcode=-1, worktypecode=2, workcity=1590, subjobtypelevel=3000300120000, propertycode=5, salary_min=15001, salary_max=25000, features=DenseVector([5.0, -1.0, 2.0, 1590.0, 3000300120000.0, 5.0, 15001.0, 25000.0])),
 Row(job_id=152179, educationcode=5, workingexpcode=3, worktypecode=2, workcity=1590, subjobtypelevel=17000300040000, propertycode=8, salary_min=13001, salary_max=20000, features=DenseVector([5.0, 3.0, 2.0, 1590.0, 17000300040000.0, 8.0, 13001.0, 20000.0])),
 Row(job_id=153584, educationcode=2, workingexpcode=2, worktypecode=2, workcity=1590, subjobtypelevel=15000100270000, propertycode=9, salary_min=4001, salary_max=7000, features=DenseVector([2.0, 2.0, 2.0, 1590.0, 15000100270000.

In [12]:
job_postings_df =minmax_model.transform(job_postings_df)
minmax =MinMaxScaler(inputCol="features", outputCol="user_features_norm")
minmax_model=minmax.fit(job_postings_df)
user_profile_df = minmax_model.transform(user_profile_df)
# 获取用户画像向量和招聘信息特征向量的笛卡尔积
cartesian_df = user_profile_df.crossJoin(job_postings_df)
# 计算相似度
cartesian_df = cartesian_df.withColumn("similarity",cos_similarity_udf(cartesian_df['user_features_norm'],cartesian_df['job_features_norm']) )

# 显示相似度计算结果，可以根据需要进行进一步筛选和排序
cartesian_df.selectExpr('user_id','job_id','similarity').show()

+-------+------+------------------+
|user_id|job_id|        similarity|
+-------+------+------------------+
|      3|     1|0.7678040279225161|
|      1|     1|0.8932435387593447|
|      2|     1|0.9255663671089442|
|      3|     2|0.8127545894110211|
|      1|     2|0.8758757976466914|
|      2|     2|0.8805548735600979|
|      3|     3|0.8831579180956801|
|      1|     3|0.9621684010938669|
|      2|     3|0.9261344859986289|
|      3|     4|0.8831579180956801|
|      1|     4|0.9621684010938669|
|      2|     4|0.9261344859986289|
|      3|     5|0.8831579180956801|
|      1|     5|0.9621684010938669|
|      2|     5|0.9261344859986289|
|      3|     6|0.8831579180956801|
|      1|     6|0.9621684010938669|
|      2|     6|0.9261344859986289|
|      3|     7|0.8831579180956801|
|      1|     7|0.9621684010938669|
+-------+------+------------------+
only showing top 20 rows



In [36]:
cartesian_df.selectExpr('user_id','job_id','similarity').show()

+-------+---------------+--------------+------------+--------+---------------+------------+------------------+------------------+--------------------+--------------------+------+-------------+--------------+------------+--------+---------------+------------+----------+----------+--------------------+--------------------+------------------+
|user_id|eduhighestlevel|workingexpcode|worktypecode|workcity|subjobtypelevel|propertycode|preferredsalarymin|preferredsalarymax|            features|  user_features_norm|job_id|educationcode|workingexpcode|worktypecode|workcity|subjobtypelevel|propertycode|salary_min|salary_max|            features|   job_features_norm|        similarity|
+-------+---------------+--------------+------------+--------+---------------+------------+------------------+------------------+--------------------+--------------------+------+-------------+--------------+------------+--------+---------------+------------+----------+----------+--------------------+---------------

In [62]:
# 假设筛选条件为相似度大于0.8的用户-招聘信息对
filtered_df = cartesian_df.filter("similarity> 0.8")
# 根据相似度值进行降序排序
sorted_df = filtered_df.orderBy("similarity", ascending=True)


In [63]:
sorted_df.count()

799503

In [77]:
sorted_df.filter('job_id=303046').collect()

[Row(user_id=3, eduhighestlevel=5, workingexpcode=1, worktypecode=1, workcity=1666, subjobtypelevel=14000600100000, propertycode=5, preferredsalarymin=3000, preferredsalarymax=6000, features=DenseVector([5.0, 1.0, 1.0, 1666.0, 14000600100000.0, 5.0, 3000.0, 6000.0]), user_features_norm=DenseVector([0.6667, 0.3333, 0.0, 0.0026, 0.2653, 0.3125, 0.0009, 0.0011]), job_id=303046, educationcode=5, workingexpcode=1, worktypecode=2, workcity=1710, subjobtypelevel=9000300220000, propertycode=5, salary_min=8001, salary_max=15000, features=DenseVector([5.0, 1.0, 2.0, 1710.0, 9000300220000.0, 5.0, 8001.0, 15000.0]), job_features_norm=DenseVector([0.6667, 0.3333, 0.3333, 0.0041, 0.1633, 0.3125, 0.0025, 0.0028]), similarity=0.9206728027804162),
 Row(user_id=2, eduhighestlevel=5, workingexpcode=0, worktypecode=3, workcity=1666, subjobtypelevel=7000200170000, propertycode=5, preferredsalarymin=1000, preferredsalarymax=5000, features=DenseVector([5.0, 0.0, 3.0, 1666.0, 7000200170000.0, 5.0, 1000.0, 500

In [65]:
rating_matrix_df=sorted_df.selectExpr('user_id','job_id','similarity')
rating_matrix_df.show()

+-------+------+------------------+
|user_id|job_id|        similarity|
+-------+------+------------------+
|      2|371288|0.8000107951212437|
|      2|371373|0.8000110434972885|
|      2|371156|0.8000114712241883|
|      2|370444|0.8000116046817027|
|      2|370696|0.8000116177992485|
|      3|101258|0.8000267212116915|
|      3| 26061|0.8000429153498407|
|      3| 26416|0.8000429153498407|
|      3| 26417| 0.800044167646844|
|      3| 26409|0.8000441680848821|
|      1|374608|0.8000453851709016|
|      3|115912|0.8000573652658297|
|      1|374580|0.8000676303013072|
|      2|149272|0.8000712120038658|
|      1|373881|0.8000798802903548|
|      3|100624|0.8000817586590868|
|      1|184403|0.8000819878879551|
|      1|184611|0.8000820779394183|
|      1|184108|0.8000820779394183|
|      3| 99409|0.8000879037109551|
+-------+------+------------------+
only showing top 20 rows



In [66]:
from pyspark.ml.recommendation import ALS

# 创建ALS模型实例
als = ALS(userCol="user_id", itemCol="job_id", ratingCol="similarity", coldStartStrategy="drop",implicitPrefs=True,maxIter=5,regParam=0.01)

# 拆分数据集为训练集和测试集
(training_data, test_data) = rating_matrix_df.randomSplit([0.8, 0.2])

# 训练ALS模型
model = als.fit(training_data)


In [67]:
pri=model.transform(training_data)
pri.show()

+-------+------+------------------+----------+
|user_id|job_id|        similarity|prediction|
+-------+------+------------------+----------+
|      2|   148|0.8985208867607823| 0.9963739|
|      3|   148|0.8468843383654009| 0.9958409|
|      1|   148|0.9032674865011788| 0.9982139|
|      3|   463|0.8541152158402285| 0.9925325|
|      1|   463|0.8938516063978649| 0.9906085|
|      2|   496| 0.899770553908844| 0.9922511|
|      3|   496|0.8594759053546392|0.99468434|
|      3|  1088| 0.840860382676698| 0.9924743|
|      1|  1088| 0.926039390301971| 0.9907485|
|      2|  1238|0.8648522924721355|0.99490076|
|      1|  1238|0.8568821982657739| 0.9955462|
|      1|  1342|0.8761210278066944| 0.9896882|
|      2|  1580|0.9228667000939351|0.99524325|
|      1|  1580|0.9029614789905029| 0.9958466|
|      2|  1591| 0.926797510797794| 0.9952656|
|      1|  1591|0.9033474599060832|0.99584943|
|      2|  1645|0.9118679370053299| 0.9951793|
|      1|  1645|0.8828014183604984|0.99571806|
|      2|  18

In [68]:
from pyspark.ml.evaluation import RegressionEvaluator
# 使用测试集评估模型
predictions = model.transform(test_data)

# 创建 RegressionEvaluator 评估器
evaluator = RegressionEvaluator(labelCol="similarity", predictionCol="prediction", metricName="rmse")

# 计算 RMSE
rmse = evaluator.evaluate(predictions)

# 输出评估结果
print("Root Mean Squared Error (RMSE):", rmse)

Root Mean Squared Error (RMSE): 0.8802805200040512


In [75]:
model.recommendForAllUsers(50).show()

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|      1|[[316557, 1.35693...|
|      3|[[316556, 1.36753...|
|      2|[[316559, 1.36188...|
+-------+--------------------+



In [87]:
model.recommendForAllUsers(180).createOrReplaceTempView('recommendForAllUsers')
spark.sql('INSERT  OVERWRITE table ads_jobfree.ads_jobfree_db_jobfree_t_recommendforallusers  SELECT user_id,to_json(recommendations) FROM recommendForAllUsers')

DataFrame[]

In [88]:
MYSQL_PARAMS = {
        'host': '10.8.16.83',
        'port': 3306,
        'user': 'root',
        'password': 'fiang123',
        'db':'jobfree',
        'table':'recommendforallusers'
    }
df=spark.sql('SELECT * FROM ads_jobfree.ads_jobfree_db_jobfree_t_recommendforallusers')
df.write.format('jdbc') \
        .mode('overwrite') \
        .option('url',
                f"jdbc:mysql://{MYSQL_PARAMS.get('host')}:{MYSQL_PARAMS.get('port')}/{MYSQL_PARAMS.get('db')}?useSSL=false&useUnicode=true") \
        .option('dbtable', MYSQL_PARAMS.get('table')) \
        .option('user', MYSQL_PARAMS.get('user')) \
        .option('driver', 'com.mysql.jdbc.Driver') \
        .option('password', MYSQL_PARAMS.get('password')) \
        .save()

In [None]:
 {
            "code": "-1",
            "parentCode": null,
            "name": "不限",
            "en_name": "",
            "deleted": false,
            "sublist": []
        },
        {
            "code": "2",
            "parentCode": null,
            "name": "全职",
            "en_name": "Full-time",
            "deleted": false,
            "sublist": []
        },
        {
            "code": "1",
            "parentCode": null,
            "name": "兼职/临时",
            "en_name": "Part-time",
            "deleted": false,
            "sublist": []
        },
        {
            "code": "4",
            "parentCode": null,
            "name": "实习",
            "en_name": "Intern",
            "deleted": false,
            "sublist": []
        },
        {
            "code": "5",
            "parentCode": null,
            "name": "校园",
            "en_name": "Campus",
            "deleted": false,
            "sublist": []
        }
    

In [567]:
spark.sql('''SELECT get_json_object(Cityid,"$") as Cityid
FROM `ods_jobfree`.`ods_zhilian_db_spider_t_jobs` WHERE (`partition_date`='20231018')''').show()

+------+
|Cityid|
+------+
|   530|
|   530|
|   530|
|   530|
|   530|
|   530|
|   530|
|   530|
|   530|
|   530|
|   530|
|   530|
|   530|
|   530|
|   530|
|   530|
|   530|
|   530|
|   530|
|   530|
+------+
only showing top 20 rows

