In [1]:
##############################################################################
# create SparkSession
##############################################################################

import os
import sys
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'

PYSPARK_SUBMIT_ARGS = """--num-executors 3 pyspark-shell"""

os.environ["PYSPARK_SUBMIT_ARGS"] = PYSPARK_SUBMIT_ARGS

spark_home = os.environ.get('SPARK_HOME', None)

sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ivashnikov").getOrCreate()
sc = spark.sparkContext

* **spark.users_items.update**: 0 или 1 режим работы, сделать новую матрицу users_items или добавить строки к существующей. По умолчанию - 1, добавить строки в матрицу.
* **spark.users_items.output_dir**: абсолютный или относительный путь к выходным данным.
* **spark.users_items.input_dir**: абсолютный или относительный путь к входным данным.

In [2]:
update = bool(int('0'))
input_dir = '/user/dmitry.ivashnikov/visits'
output_dir = '/user/dmitry.ivashnikov/users-items'


# update = int(spark.conf.get("spark.users_items.update"))
# input_dir = spark.conf.get("spark.users_items.input_dir")
# output_dir = spark.conf.get("spark.users_items.output_dir")

In [3]:
##############################################################################
# вспомогательные функции
##############################################################################


from pyspark.sql.functions import col, lower, concat, lit, max as spark_max, regexp_replace
import re

def read_df(action_type, input_dir=input_dir):
    """чтение датафрейма с заданным типом"""

    item_id = concat(
        lit(action_type), 
        lit('_'), 
        lower(regexp_replace(col('item_id'), '\s|-', '_'))
    ).alias('item_id')
    
    return spark \
        .read \
        .format('json') \
        .load(os.path.join(input_dir, action_type)) \
        .na.drop(subset='uid') \
        .select('date', 'uid', item_id)


def union_df(df1, df2):
    """объединение двух датафреймов с разным набором колонок"""

    columns1 = set(df1.columns)
    columns2 = set(df2.columns)

    columns1_new = columns2 - columns1
    columns2_new = columns1 - columns2

    expr1 = list(columns1) + list(map(lambda x: '0 as %s' % x, columns1_new))
    expr2 = list(columns2) + list(map(lambda x: '0 as %s' % x, columns2_new))

    df_union = df1.selectExpr(*expr1).unionByName(df2.selectExpr(*expr2))
    
    return df_union.select(sorted(df_union.columns))


def group_sum_df(df, key):
    """группировка по ключу и сумма остальных колонок с последующим переименованием"""

    df_new = df \
        .groupby(key) \
        .sum()

    for col in df_new.columns:
        df_new = df_new.withColumnRenamed(col, re.sub('\(|\)|sum', '', col))

    return df_new


def path_exists(path):
    """проверка, существует ли путь"""
    
    fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
    return fs.exists(sc._jvm.org.apache.hadoop.fs.Path(path))


def get_file_list(path):
    """получение списка файлов в папке"""

    hadoop = sc._jvm.org.apache.hadoop
    fs = hadoop.fs.FileSystem
    conf = hadoop.conf.Configuration()

    path = hadoop.fs.Path(path)
    file_list = [str(f.getPath()).rsplit('/', 1)[1] for f in fs.get(conf).listStatus(path) if not f.isDirectory()]

    return file_list


def get_dir_list(path):
    """получение списка файлов в папке"""

    hadoop = sc._jvm.org.apache.hadoop
    fs = hadoop.fs.FileSystem
    conf = hadoop.conf.Configuration()

    path = hadoop.fs.Path(path)
    file_list = [str(f.getPath()).rsplit('/', 1)[1] for f in fs.get(conf).listStatus(path) if f.isDirectory()]

    return file_list

In [6]:
visits = read_df('buy').union(read_df('view'))
visits.cache()

max_date_str = str(visits.agg(spark_max('date')).collect()[0][0])

users_items_new = visits \
    .groupby('uid') \
    .pivot('item_id') \
    .count() \
    .na.fill(0)

users_items_final = users_items_new

if update:
    cmd = 'hdfs dfs -ls %s' % os.path.join(output_dir)
    pattern = ' (/.+)'
    dir_list = os.popen(cmd).read().strip().split('\n')
    dir_list = list(filter(lambda x: re.search(pattern, x), dir_list))
    dir_list = list(map(lambda x: re.search(pattern, x).group(1).rsplit('/', 1)[1], dir_list))
    dir_list = list(filter(lambda x: x.isdigit(), dir_list))
    last_dir = str(max(list(map(int, dir_list))))

    # чтение последнего набора данных
    users_items_old = spark \
        .read \
        .format("parquet") \
        .load(os.path.join(output_dir, last_dir))

    users_items_old.cache()
    users_items_old.count()

    # объединение новых и старых данных
    users_items_final = group_sum_df(
        union_df(users_items_new, users_items_old), 'uid'
    )

# сохранение данных
users_items_final \
    .write \
    .format("parquet") \
    .mode("overwrite") \
    .save(os.path.join(output_dir, max_date_str))