In [1]:
import pandas as pd
excel_file = "/home/jovyan/work/online_retail.xlsx"

df = pd.read_excel(excel_file)

csv_file = "/home/jovyan/work/online_retail.csv"

df.to_csv(csv_file, index=False)

In [2]:
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkFirst") \
      .getOrCreate() 

In [3]:
from pyspark.sql import functions as f

In [4]:
df = spark.read.csv(csv_file, header=True, inferSchema=True)


In [5]:
df.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|     7.65|   17850.0|United Kingdom|
|   536365|    21730|GLASS S

In [44]:
# общее число строк
df.count()

541909

In [69]:
# Количество уникальных клиентов
df_customer = df.groupBy('CustomerID').count().count()
df_customer

4373

In [111]:
# В какой стране совершается большинство покупок
df_country = df.groupBy('Country').count()
df_country.select('Country').filter(col('count')== df_country.agg(f.max('count')).collect()[0][0]).show()

+--------------+
|       Country|
+--------------+
|United Kingdom|
+--------------+



In [115]:
# Даты самой ранней и самой последней покупки на платформе
df.agg(f.min('InvoiceDate').alias('min_data'), f.max('InvoiceDate').alias('max_data')).show()

+-------------------+-------------------+
|           min_data|           max_data|
+-------------------+-------------------+
|2010-12-01 08:26:00|2011-12-09 12:50:00|
+-------------------+-------------------+



In [None]:
# RFM-анализ
df_group = df.groupBy('CustomerID').agg(f.max('InvoiceDate').alias('max_data'))
df_group.select('CustomerID').show()

In [96]:
df.createOrReplaceTempView("retail")
RFM = spark.sql("SELECT *, CONCAT_WS('', Frequency_value, Recency_value, Monetary_value) AS sum_value \
        FROM(SELECT CustomerID, Frequency, Recency, Monetary, \
            CASE  \
                WHEN Frequency < 100 THEN 'C' \
                WHEN Frequency >= 100 AND Frequency < 200 THEN 'B' \
                WHEN  Frequency >= 200 THEN 'A' \
            END AS Frequency_value, \
            CASE  \
                WHEN Recency < 4300 THEN 'A' \
                WHEN Recency >= 4300 AND Recency < 4400 THEN 'B' \
                WHEN Recency >= 4400 THEN 'C' \
            END AS Recency_value, \
            CASE  \
                WHEN Monetary < 100 THEN 'C' \
                WHEN Monetary >= 100 AND Monetary < 300 THEN 'B' \
                WHEN Monetary >= 300 THEN 'A' \
            END AS Monetary_value \
            FROM( SELECT datediff(current_date(),MAX(InvoiceDate)) AS Recency, \
                    COUNT(InvoiceNo) AS Frequency, SUM(UnitPrice) AS Monetary, CustomerID  \
                FROM retail GROUP BY CustomerID))")


In [97]:
RFM.show()

+----------+---------+-------+------------------+---------------+-------------+--------------+---------+
|CustomerID|Frequency|Recency|          Monetary|Frequency_value|Recency_value|Monetary_value|sum_value|
+----------+---------+-------+------------------+---------------+-------------+--------------+---------+
|   16916.0|      143|   4312| 317.5199999999999|              B|            B|             A|      BBA|
|   17884.0|      117|   4292| 224.1699999999998|              B|            A|             B|      BAB|
|   13094.0|       30|   4310|             38.31|              C|            B|             C|      CBC|
|   16596.0|       12|   4304| 49.24999999999999|              C|            B|             C|      CBC|
|   17633.0|       72|   4320|206.32000000000008|              C|            B|             B|      CBB|
|   18114.0|       28|   4579|154.95000000000002|              C|            C|             B|      CCB|
|   13973.0|       11|   4576|             41.32|      

In [99]:
RFM.createOrReplaceTempView('rfm')
load_customer = spark.sql("SELECT CustomerID FROM rfm WHERE sum_value = 'AAA'")

In [102]:
load_customer.write \
    .format("csv") \
    .option("header", "true") \
    .save("/home/jovyan/work/customer_id_1.csv")