In [31]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType, FloatType

# Создали локально спарк-сессию
spark = SparkSession.builder.master('local[1]').appName('SparkExample').getOrCreate()

In [32]:
# загрузили данные из csv файла и вывели dataframe из 5 первых строк
data = spark.read.csv('customer_shopping.csv', header=True, inferSchema=True)
data.show(5)

23/03/17 22:22:45 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , invoice_no, customer_id, gender, age, category, quantity, price, payment_method, invoice_date, shopping_mall
 Schema: _c0, invoice_no, customer_id, gender, age, category, quantity, price, payment_method, invoice_date, shopping_mall
Expected: _c0 but found: 
CSV file: file:///Users/daryakondratovich/pyspark/customer_shopping.csv
+---+----------+-----------+------+---+--------+--------+-------+--------------+-------------------+--------------+
|_c0|invoice_no|customer_id|gender|age|category|quantity|  price|payment_method|       invoice_date| shopping_mall|
+---+----------+-----------+------+---+--------+--------+-------+--------------+-------------------+--------------+
|  0|   I138884|    C241288|Female| 28|Clothing|       5| 1500.4|   Credit Card|2022-05-08 00:00:00|        Kanyon|
|  1|   I317333|    C111565|  Male| 21|   Shoes|       3|1800.51|    Debit Card|2021-12-12 00:00:00|Forum Istan

In [33]:
# посмотрим на схему данных
data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- invoice_no: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- invoice_date: timestamp (nullable = true)
 |-- shopping_mall: string (nullable = true)



In [34]:
# создадим свою новую схему наших данных для дальнейшей работы с ними
schema = StructType([
    StructField('id', IntegerType(), True),
    StructField('invoice_no', StringType(), False),
    StructField('customer_id', StringType(), True),
    StructField('gender', StringType(), True),
    StructField('age', IntegerType(), True),
    StructField('category', StringType(), True),
    StructField('quantity', IntegerType(), True),
    StructField('price', FloatType(), True),
    StructField('payment_method', StringType(), True),
    StructField('invoice_date', DateType(), True),
    StructField('shopping_mall', StringType(), True),
])

In [35]:
# снова прочитаем данные, но используем уже нашу схему и запишем все в новую переменную df
df = spark.read.csv('customer_shopping.csv', header=True, schema=schema)

In [36]:
# посомтрим на схему данных переменной df
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- invoice_no: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: float (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- invoice_date: date (nullable = true)
 |-- shopping_mall: string (nullable = true)



In [37]:
# выведим первые 5 строк данных используя toPandas() датафрэйма 
df.toPandas()


23/03/17 22:22:45 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , invoice_no, customer_id, gender, age, category, quantity, price, payment_method, invoice_date, shopping_mall
 Schema: id, invoice_no, customer_id, gender, age, category, quantity, price, payment_method, invoice_date, shopping_mall
Expected: id but found: 
CSV file: file:///Users/daryakondratovich/pyspark/customer_shopping.csv


Unnamed: 0,id,invoice_no,customer_id,gender,age,category,quantity,price,payment_method,invoice_date,shopping_mall
0,0,I138884,C241288,Female,28,Clothing,5,1500.400024,Credit Card,2022-05-08,Kanyon
1,1,I317333,C111565,Male,21,Shoes,3,1800.510010,Debit Card,2021-12-12,Forum Istanbul
2,2,I127801,C266599,Male,20,Clothing,1,300.079987,Cash,2021-09-11,Metrocity
3,3,I173702,C988172,Female,66,Shoes,5,3000.850098,Credit Card,2021-05-16,Metropol AVM
4,4,I337046,C189076,Female,53,Books,4,60.599998,Cash,2021-10-24,Kanyon
...,...,...,...,...,...,...,...,...,...,...,...
99452,99452,I219422,C441542,Female,45,Souvenir,5,58.650002,Credit Card,2022-09-21,Kanyon
99453,99453,I325143,C569580,Male,27,Food & Beverage,2,10.460000,Cash,2021-09-22,Forum Istanbul
99454,99454,I824010,C103292,Male,63,Food & Beverage,2,10.460000,Debit Card,2021-03-28,Metrocity
99455,99455,I702964,C800631,Male,56,Technology,4,4200.000000,Cash,2021-03-16,Istinye Park


In [38]:
# выведим все торговый центры без дубликатов, сортируя значения в алфавитном порядке
mall = df.select('shopping_mall').distinct().toPandas().sort_values(by='shopping_mall')
mall

Unnamed: 0,shopping_mall
4,Cevahir AVM
5,Emaar Square Mall
8,Forum Istanbul
7,Istinye Park
0,Kanyon
2,Mall of Istanbul
1,Metrocity
9,Metropol AVM
6,Viaport Outlet
3,Zorlu Center


In [39]:
# так же можно это сделать с помощью sql для начала зарегистируем DF как temparary view для SQL к view
df.createOrReplaceTempView('shopping')

In [40]:
# выведим все торговый центры и их колличество в датафрэйме с помощью spark.sql
spark.sql("select shopping_mall, count(shopping_mall) as count from shopping group by 1").toPandas()

Unnamed: 0,shopping_mall,count
0,Kanyon,19823
1,Metrocity,15011
2,Mall of Istanbul,19943
3,Zorlu Center,5075
4,Cevahir AVM,4991
5,Emaar Square Mall,4811
6,Viaport Outlet,4914
7,Istinye Park,9781
8,Forum Istanbul,4947
9,Metropol AVM,10161


In [41]:
# выведем возраст мужчин, количество вещей и способ оплаты, которые платили наличными 
df.select('age', 'quantity', 'payment_method').filter("payment_method = 'Cash' and gender = 'Male'").toPandas()

Unnamed: 0,age,quantity,payment_method
0,20,1,Cash
1,25,2,Cash
2,24,4,Cash
3,44,5,Cash
4,50,4,Cash
...,...,...,...
17933,24,1,Cash
17934,65,1,Cash
17935,50,5,Cash
17936,27,2,Cash


In [42]:
# выведим весь возраст встречающийся в данных и отсортируем их по убыванию
df.select('age').distinct().orderBy(f.col('age').desc()).toPandas()

Unnamed: 0,age
0,69
1,68
2,67
3,66
4,65
5,64
6,63
7,62
8,61
9,60


In [50]:
# проведем небольшую аналитику по среднему , минимальному и максимальному прайсу для каждой категории товаров, 
# который оплачивали крелитной картой, так же выведим колличество транзакций для каждой категории товара при оплате кредитной картой
df_analytic = df.filter(f.col('payment_method') == 'Credit Card').groupBy('category', 'payment_method').agg(
    f.avg('price').alias('avg_price'),
    f.min('price').alias('min_price'),
    f.max('price').alias('max_price'),
    f.count('price').alias('count')
)
df_analytic.toPandas()

Unnamed: 0,category,payment_method,avg_price,min_price,max_price,count
0,Food & Beverage,Credit Card,15.73184,5.23,26.15,5250
1,Cosmetics,Credit Card,124.075485,40.66,203.300003,5336
2,Books,Credit Card,45.217747,15.15,75.75,1696
3,Clothing,Credit Card,895.298958,300.079987,1500.400024,12025
4,Toys,Credit Card,107.823043,35.84,179.199997,3548
5,Technology,Credit Card,3186.537365,1050.0,5250.0,1753
6,Souvenir,Credit Card,35.243016,11.73,58.650002,1770
7,Shoes,Credit Card,1810.645151,600.169983,3000.850098,3553


In [55]:
# добавим новый столбец, который покажет порядок всех категорий товаров в зависимости от среднего прайса спускаясь по убыванию 
WindowSpec = Window.partitionBy().orderBy(f.col('avg_price').desc())
df_analytic_sorted = df_analytic.withColumn("row_id", f.row_number().over(WindowSpec))
df_analytic_sorted.toPandas()

23/03/17 22:33:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 22:33:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 22:33:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 22:33:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 22:33:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 22:33:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 2

Unnamed: 0,category,payment_method,avg_price,min_price,max_price,count,row_id
0,Technology,Credit Card,3186.537365,1050.0,5250.0,1753,1
1,Shoes,Credit Card,1810.645151,600.169983,3000.850098,3553,2
2,Clothing,Credit Card,895.298958,300.079987,1500.400024,12025,3
3,Cosmetics,Credit Card,124.075485,40.66,203.300003,5336,4
4,Toys,Credit Card,107.823043,35.84,179.199997,3548,5
5,Books,Credit Card,45.217747,15.15,75.75,1696,6
6,Souvenir,Credit Card,35.243016,11.73,58.650002,1770,7
7,Food & Beverage,Credit Card,15.73184,5.23,26.15,5250,8
