In [26]:
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

# Задача 1.
# С помощью модуля pandas преобразуйте файл из .xlsx в .csv формат
pd.read_excel('data/online_retail.xlsx').to_csv('data/online_retail.csv', index=False)
print('Данные конвертированы в csv формат')

# Задача 2.
# Инициализируйте Spark-сессию
spark = SparkSession.builder\
        .master("local[*]")\
        .appName("SparkFirst")\
        .config("spark.executor.memory", "6G")\
        .config("spark.executor.cores", 6)\
        .getOrCreate()

# Задача 3.
# Создайте dataframe из скачанного файла
df = spark.read.option('header', 'true').csv('data/online_retail.csv')

# Задача 4.
# Подсчитайте следующие показатели
# a. Количество строк в файле
print(f'Количество строк в df: {df.count()}')

# b. Количество уникальных клиентов
print(f'Количество униклальных клиентов в df: {df.dropDuplicates(["CustomerID"]).count()}')

# c.В какой стране совершается большинство покупок
group_df = df.groupBy("Country").count().orderBy(F.col("count").desc()).limit(1).collect()
print(f'Большинство покупок совершено в {group_df[0][0]} в количестве {group_df[0][1]} позиций')

# d. Даты самой ранней и самой последней покупки на платформе
df = df.withColumn("Date", F.to_date(F.to_timestamp("InvoiceDate")))
print(f'Дата самой ранней покупки на платформе: {df.select(F.min(df.Date)).collect()[0][0]}')
print(f'Дата самой последней покупки на платформе: {df.select(F.max(df.Date)).collect()[0][0]}')

# Задача 5.
# Проведите RFM-анализ клиентов платформы и запишите ID клиентов в отдельный CSV файл
df.withColumn("SumOfSale", F.col("Quantity") * F.col("UnitPrice"))\
    .filter(F.col("CustomerID").isNotNull())\
    .groupBy("CustomerID", "InvoiceNo", "Date").agg({"SumOfSale": "sum"})\
    .withColumnRenamed("sum(SumOfSale)", "SumOfSale")\
    .groupBy("CustomerID").agg(
        F.count(F.col("InvoiceNo")).alias("Frequency"),
        (F.current_date() - F.max(F.col("Date"))).alias("Recency"),
        (F.sum(F.col("SumOfSale")) / F.count(F.col("InvoiceNo"))).alias("Monetary")
    )\
    .withColumn("CustomerID", F.col("CustomerID").cast("int"))\
    .withColumn("Recency", F.col("Recency").cast("int"))\
    .withColumn("Monetary", F.round(F.col("Monetary"), 2))\
    .withColumn("Frequency_abc", F.when(F.col("Frequency") <= 5, "C")\
                                .when(F.col("Frequency") < 10, "B")\
                                .otherwise("A"))\
    .withColumn("Recency_abc", F.when(F.col("Recency") <= 4300, "C")\
                                .when(F.col("Recency") < 4400, "B")\
                                .otherwise("A"))\
    .withColumn("Monetary_abc", F.when(F.col("Monetary") <= 0, "C")\
                                .when(F.col("Monetary") < 200, "B")\
                                .otherwise("A"))\
    .withColumn("ABC", F.concat_ws(
        "", F.col("Frequency_abc"), F.col("Recency_abc"), F.col("Monetary_abc")))\
    .filter(F.col("ABC") == "AAA").select("CustomerID")\
    .write.csv("result/", header=True, mode="overwrite")
print("RFM-анализ клиентов проведён, данные записаны в папку 'result'")



Данные конвертированы в csv формат
Количество строк в df: 541909
Количество униклальных клиентов в df: 4373
Большинство покупок совершено в United Kingdom в количестве 495478 позиций
Дата самой ранней покупки на платформе: 2010-12-01
Дата самой последней покупки на платформе: 2011-12-09
RFM-анализ клиентов проведён, данные записаны в папку 'result'
