In [None]:
!pip install PySpark
!pip install findSpark

Collecting findSpark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findSpark
Successfully installed findSpark-2.0.1


In [None]:
import pyspark
import findspark

findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [None]:
df = spark.createDataFrame([('монитор', 1, 1, 0), ('телевизор', 1, 0, 0), ('ОЗУ', 1, 1, 0), ('ваучер', 0, 0, 0)], schema=['name', 'electronics', 'computer', 'discont'])
df.show()

+---------+-----------+--------+-------+
|     name|electronics|computer|discont|
+---------+-----------+--------+-------+
|  монитор|          1|       1|      0|
|телевизор|          1|       0|      0|
|      ОЗУ|          1|       1|      0|
|   ваучер|          0|       0|      0|
+---------+-----------+--------+-------+



In [None]:
def make_category_list(df):

    ''' Из полученного PySpark DataFrame строится новый, содержащий 2 колонки:
    (Название товара, Категория) В колонку "Категория" записывается название
    категории товара (колонки из полученного DataFrame).
    Если товар не числится ни в одной из категории, записывается значение 'none'
    Предполагается, что товары маркированы 1 если входят в какую-либо категорию
    или 0 если нет.
    '''

    def select_and_fill(data, col1:str, col2:str, name:str, value, new_value):

        ''' Функция выбирает колонки (col1, col2) из DataFrame (data)
            Переименовывает col2 в заданное имя (name)
            Изменяет тип данных переименованной колонки
            Заменяет значение (value) в колонке на новое значение (new_value)
            Возвращает новый DataFrame
        '''

        new_data = data[col1, data[col2].cast(StringType())].withColumnRenamed(col2, name)
        new_data = new_data.na.replace(str(value), new_value)

        return new_data

    # Колонка содержащая наименования
    first_col = df.columns[0]

    flag = False


    for col in df.columns[1:]:
        # Проиходися в цикле по списку колонок и выбираем строки по условию

        df1 = df[first_col, col].where(df[col] > 0)
        # Передаем в доп. функцию на обработку
        df1 = select_and_fill(df1, first_col, col, 'category', 1, col)

        if flag:
            result = result.union(df1) # Объединяем в один DataFrame
        else:
            result = df1
            flag = True
    # Ищем товары без категорий.
    df2 = df # Создаем копию данных
    for col in df.columns[1:][::-1]:
        df2 = df2.where(df2[col] == 0)
    df2 = select_and_fill(df2, first_col, col, 'category', 0, 'none')
    result = result.union(df2).sort('category')

    return result

In [None]:
a = make_category_list(df)
a.show()

+---------+-----------+
|     name|   category|
+---------+-----------+
|      ОЗУ|   computer|
|  монитор|   computer|
|  монитор|electronics|
|      ОЗУ|electronics|
|телевизор|electronics|
|   ваучер|       none|
+---------+-----------+

