**Домашнее задание**
**Срок 07.10.2024**

Куда же без домашки, верно?

Есть данные по транзакциям клиентов, ваша задача состоит в анализе этих данных и подготовки к структуре, которая похожа на ту структуру, которую можно использовать в нейронных сетях + промежуточные задания.

Не забудьте делать всякие show после каждого задания, чтобы было видно результат

**Файл spark_transactions.parquet можете забрать в папке с записями лекций**

**Важно**
В домашнем задании старайтесь использовать максимально dataframe api, а не sql запросы.

In [1]:
!pip install pyspark findspark



In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import pyspark.sql.functions as F

conf = SparkConf().set('spark.ui.port', '4050').set('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')\
                  .set('spark.dynamicAllocation.enabled', 'true')\
                  .set('spark.shuffle.service.enabled', 'true') #трекер, чтобы возвращать ресурсы
sc = SparkContext(conf=conf)
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [3]:
trans_data = spark.read.parquet('spark_transactions.parquet')

In [4]:
trans_data.count()

24386900

In [5]:
trans_data.show(5)

+----+----+----+-----+---+-----+-------+-----------------+-------------+-------------+-------+----+-------+
|User|Card|Year|Month|Day| Time| Amount|          UseChip| MerchantCity|MerchantState|    Zip| MCC|IsFraud|
+----+----+----+-----+---+-----+-------+-----------------+-------------+-------------+-------+----+-------+
|   0|   0|2002|    9|  1|06:21|$134.09|Swipe Transaction|     La Verne|           CA|91750.0|5300|     No|
|   0|   0|2002|    9|  1|06:42| $38.48|Swipe Transaction|Monterey Park|           CA|91754.0|5411|     No|
|   0|   0|2002|    9|  2|06:22|$120.34|Swipe Transaction|Monterey Park|           CA|91754.0|5411|     No|
|   0|   0|2002|    9|  2|17:45|$128.95|Swipe Transaction|Monterey Park|           CA|91754.0|5651|     No|
|   0|   0|2002|    9|  3|06:23|$104.71|Swipe Transaction|     La Verne|           CA|91750.0|5912|     No|
+----+----+----+-----+---+-----+-------+-----------------+-------------+-------------+-------+----+-------+
only showing top 5 rows



Посмотрим на схему данных

In [6]:
trans_data.printSchema()

root
 |-- User: long (nullable = true)
 |-- Card: long (nullable = true)
 |-- Year: long (nullable = true)
 |-- Month: long (nullable = true)
 |-- Day: long (nullable = true)
 |-- Time: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- UseChip: string (nullable = true)
 |-- MerchantCity: string (nullable = true)
 |-- MerchantState: string (nullable = true)
 |-- Zip: double (nullable = true)
 |-- MCC: long (nullable = true)
 |-- IsFraud: string (nullable = true)



Сколько в среднем транзакций у пользователя

In [7]:
trans_data.groupBy('User').count().agg(F.mean('count')).show()

+----------+
|avg(count)|
+----------+
|  12193.45|
+----------+



Сколько карт у пользователей в среднем

In [8]:
trans_data.groupBy('User').agg(F.countDistinct('Card').alias('card_count')).agg(F.mean('card_count')).show()

+---------------+
|avg(card_count)|
+---------------+
|         3.0695|
+---------------+



или

In [9]:
trans_data.groupBy('User', 'Card').count().groupBy('User').count().agg(F.mean('count')).show()

+----------+
|avg(count)|
+----------+
|    3.0695|
+----------+



Немного обработаем данные: Amount в float, из Time вытянем час транзакции и удалим исходный Time, Zip  к типу int

In [10]:
trans_data = trans_data.withColumn('Amount', F.regexp_replace('Amount', '^\$', '').cast('float'))
trans_data = trans_data.withColumn('Hour', F.hour(F.col('Time')))
trans_data = trans_data.drop('Time')
trans_data = trans_data.withColumn('Zip', F.col('Zip').cast('int'))

trans_data.show()

+----+----+----+-----+---+------+------------------+-------------+-------------+-----+----+-------+----+
|User|Card|Year|Month|Day|Amount|           UseChip| MerchantCity|MerchantState|  Zip| MCC|IsFraud|Hour|
+----+----+----+-----+---+------+------------------+-------------+-------------+-----+----+-------+----+
|   0|   0|2002|    9|  1|134.09| Swipe Transaction|     La Verne|           CA|91750|5300|     No|   6|
|   0|   0|2002|    9|  1| 38.48| Swipe Transaction|Monterey Park|           CA|91754|5411|     No|   6|
|   0|   0|2002|    9|  2|120.34| Swipe Transaction|Monterey Park|           CA|91754|5411|     No|   6|
|   0|   0|2002|    9|  2|128.95| Swipe Transaction|Monterey Park|           CA|91754|5651|     No|  17|
|   0|   0|2002|    9|  3|104.71| Swipe Transaction|     La Verne|           CA|91750|5912|     No|   6|
|   0|   0|2002|    9|  3| 86.19| Swipe Transaction|Monterey Park|           CA|91755|5970|     No|  13|
|   0|   0|2002|    9|  4| 93.84| Swipe Transaction|Mon

Посчитайте количество транзакций по годам, учитывая только те транзакции, объем которых был больше 100

In [11]:
trans_data.filter(F.col('Amount') > 100).groupBy('Year').count().show()

+----+------+
|Year| count|
+----+------+
|2007|121489|
|2014|179492|
|2012|174439|
|1991|   266|
|2016|182742|
|1994|  1533|
|2018|182215|
|1999| 13656|
|1997|  5593|
|2009|150327|
|2010|165024|
|2006|105907|
|2017|182434|
|1998|  9009|
|2013|178456|
|2004| 69846|
|2003| 54288|
|2002| 40944|
|2011|172273|
|2020| 34925|
+----+------+
only showing top 20 rows



Определите, есть ли пропуски в данных по каждому столбцу

In [12]:
trans_data.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in trans_data.columns]).show()

+----+----+----+-----+---+------+-------+------------+-------------+-------+---+-------+----+
|User|Card|Year|Month|Day|Amount|UseChip|MerchantCity|MerchantState|    Zip|MCC|IsFraud|Hour|
+----+----+----+-----+---+------+-------+------------+-------------+-------+---+-------+----+
|   0|   0|   0|    0|  0|     0|      0|           0|      2720821|2878135|  0|      0|   0|
+----+----+----+-----+---+------+-------+------------+-------------+-------+---+-------+----+



Заполните пропуски исходя из типа данных

In [13]:
trans_data.printSchema()

root
 |-- User: long (nullable = true)
 |-- Card: long (nullable = true)
 |-- Year: long (nullable = true)
 |-- Month: long (nullable = true)
 |-- Day: long (nullable = true)
 |-- Amount: float (nullable = true)
 |-- UseChip: string (nullable = true)
 |-- MerchantCity: string (nullable = true)
 |-- MerchantState: string (nullable = true)
 |-- Zip: integer (nullable = true)
 |-- MCC: long (nullable = true)
 |-- IsFraud: string (nullable = true)
 |-- Hour: integer (nullable = true)



In [14]:
trans_data.fillna({'MerchantState': 'Unknown', 'Zip': 0}).show()

+----+----+----+-----+---+------+------------------+-------------+-------------+-----+----+-------+----+
|User|Card|Year|Month|Day|Amount|           UseChip| MerchantCity|MerchantState|  Zip| MCC|IsFraud|Hour|
+----+----+----+-----+---+------+------------------+-------------+-------------+-----+----+-------+----+
|   0|   0|2002|    9|  1|134.09| Swipe Transaction|     La Verne|           CA|91750|5300|     No|   6|
|   0|   0|2002|    9|  1| 38.48| Swipe Transaction|Monterey Park|           CA|91754|5411|     No|   6|
|   0|   0|2002|    9|  2|120.34| Swipe Transaction|Monterey Park|           CA|91754|5411|     No|   6|
|   0|   0|2002|    9|  2|128.95| Swipe Transaction|Monterey Park|           CA|91754|5651|     No|  17|
|   0|   0|2002|    9|  3|104.71| Swipe Transaction|     La Verne|           CA|91750|5912|     No|   6|
|   0|   0|2002|    9|  3| 86.19| Swipe Transaction|Monterey Park|           CA|91755|5970|     No|  13|
|   0|   0|2002|    9|  4| 93.84| Swipe Transaction|Mon

При помощи оконных функций для каждого клиента рассчитайте средний размер транзакции, количество транзакций и последнюю по дате транзакцию.

In [15]:
from pyspark.sql.window import Window

In [16]:
trans_data_stats = trans_data.withColumn('avg_user_transaction', F.mean('Amount').over(Window.partitionBy('User')))\
.withColumn('transaction_number', F.count('Amount').over(Window.partitionBy('User')))\
.withColumn('last_transaction_date', F.max(F.to_date(F.concat_ws('-', 'Year', 'Month', 'Day'))).over(Window.partitionBy('User')))

Теперь самое время сгруппировать данные по каждому клиенту (можно использовать collect_list для сбора данных после агрегации)
Когда будете делать агрегацию, то возьмите только часть выборки, например, через sample, для всей выборки либо не хватит памяти, либо очень долго считать

In [17]:
trans_data_sample = trans_data_stats.sample(fraction=0.05, seed=42)

grouped_trans_data = trans_data_sample.groupBy('User', 'Card') \
                       .agg(F.first('avg_user_transaction').alias('avg_user_transaction'),
                            F.first('transaction_number').alias('transaction_number'),
                            F.first('last_transaction_date').alias('last_transaction_date'),
                            F.collect_list(F.struct('Amount', 'Year', 'Month', 'Day', 'Hour', 'MCC')).alias('transactions'))

Напишите python функцию, которая возьмет данные после агрегации последовательностей, отсортирует их внутри по дате и времени и преобразует к формату python dict:
{'User': User,
'Card': Card,
'sequence':{
    'amount': [последовательность],
    'year': [последовательность],
    'month': [последовательность],
    'day': [последовательность],
    'time': [последовательность],
    'MCC': [последовательность]
}
}




In [18]:
def transform_data(row):
    transactions = sorted(row.transactions, key=lambda x: (x.Year, x.Month, x.Day, x.Hour))
    return {
        'User': row.User,
        'Card': row.Card,
        'sequence': {
            'amount': [t.Amount for t in transactions],
            'year': [t.Year for t in transactions],
            'month': [t.Month for t in transactions],
            'day': [t.Day for t in transactions],
            'hour': [t.Hour for t in transactions],
            'MCC': [t.MCC for t in transactions]
        }
    }

transformed_trans_data = grouped_trans_data.rdd.map(transform_data)

Выведите как пример одну преобразованную запись, результаты сохраните на диск в через rdd pickle

In [19]:
print(transformed_trans_data.first())

{'User': 0, 'Card': 0, 'sequence': {'amount': [147.4499969482422, 55.40999984741211, 30.899999618530273, 31.010000228881836, 159.4600067138672, -88.0, 3.6500000953674316, 42.0099983215332, 151.49000549316406, 50.68000030517578, 148.4199981689453, 18.450000762939453, 141.19000244140625, 3.619999885559082, 147.41000366210938, 185.16000366210938, 123.66000366210938, 109.7300033569336, 67.5199966430664, 25.270000457763672, 118.9000015258789, 57.599998474121094, 21.68000030517578, 67.22000122070312, 48.11000061035156, 102.70999908447266, 6.239999771118164, 127.5199966430664, 33.630001068115234, 56.599998474121094, 58.470001220703125, 112.55000305175781, 6.809999942779541, 116.27999877929688, 10.9399995803833, 117.05999755859375, 110.94000244140625, 161.86000061035156, 76.0, 59.709999084472656, 6.789999961853027, 7.099999904632568, -82.0, 151.88999938964844, 36.0, 77.48999786376953, 40.27000045776367, 77.94999694824219, 151.64999389648438, 136.0800018310547, 102.33000183105469, 57.3199996948

In [20]:
transformed_trans_data.saveAsPickleFile('transformed_trans_data')