<a href="https://colab.research.google.com/github/anslava00/PySpark_sber/blob/main/PySpark_lesson3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import pyspark.sql.functions as F

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import pyspark.sql.functions as F

conf = SparkConf().set('spark.ui.port', '4050').set('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')\
                  .set('spark.dynamicAllocation.enabled', 'true')\
                  .set('spark.shuffle.service.enabled', 'true') #трекер, чтобы возвращать ресурсы
sc = SparkContext(conf=conf)
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
from pyspark.sql.window import Window

In [None]:
trans_data = spark.read.parquet('/content/drive/MyDrive/Colab_Notebooks/spark_transactions.parquet')

In [None]:
trans_data.show(3)

+----+----+----+-----+---+-----+-------+-----------------+-------------+-------------+-------+----+-------+
|User|Card|Year|Month|Day| Time| Amount|          UseChip| MerchantCity|MerchantState|    Zip| MCC|IsFraud|
+----+----+----+-----+---+-----+-------+-----------------+-------------+-------------+-------+----+-------+
|   0|   0|2002|    9|  1|06:21|$134.09|Swipe Transaction|     La Verne|           CA|91750.0|5300|     No|
|   0|   0|2002|    9|  1|06:42| $38.48|Swipe Transaction|Monterey Park|           CA|91754.0|5411|     No|
|   0|   0|2002|    9|  2|06:22|$120.34|Swipe Transaction|Monterey Park|           CA|91754.0|5411|     No|
+----+----+----+-----+---+-----+-------+-----------------+-------------+-------------+-------+----+-------+
only showing top 3 rows



Посмотрим на схему данных

In [None]:
trans_data.printSchema()

root
 |-- User: long (nullable = true)
 |-- Card: long (nullable = true)
 |-- Year: long (nullable = true)
 |-- Month: long (nullable = true)
 |-- Day: long (nullable = true)
 |-- Time: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- UseChip: string (nullable = true)
 |-- MerchantCity: string (nullable = true)
 |-- MerchantState: string (nullable = true)
 |-- Zip: double (nullable = true)
 |-- MCC: long (nullable = true)
 |-- IsFraud: string (nullable = true)



Сколько в среднем транзакций у пользователя

In [None]:
trans_data.groupBy('User').count()\
          .select(F.mean('count').alias('mean_for_users')).show()

+--------------+
|mean_for_users|
+--------------+
|      12193.45|
+--------------+



Сколько карт у пользователей в среднем

In [None]:
trans_data.groupBy('User').sum('Card')\
          .select(F.mean('sum(Card)').alias('mean_for_users_card')).show()

+-------------------+
|mean_for_users_card|
+-------------------+
|          16477.816|
+-------------------+



Немного обработаем данные: Amount в float, из Time вытянем час транзакции и удалим исходный Time, Zip  к типу int

In [None]:
proc_data = trans_data\
            .withColumn('TimeN', F.substring(trans_data.Time, 0, 2))\
            .drop('Time').withColumnRenamed('TimeN', 'Time')\
            .withColumn('Amount', F.regexp_extract(trans_data.Amount,'(\d+)' , 1).cast('float'))\
            .withColumn('Zip', trans_data.Zip.cast('int'))
proc_data.show()

+----+----+----+-----+---+------+------------------+-------------+-------------+-----+----+-------+----+
|User|Card|Year|Month|Day|Amount|           UseChip| MerchantCity|MerchantState|  Zip| MCC|IsFraud|Time|
+----+----+----+-----+---+------+------------------+-------------+-------------+-----+----+-------+----+
|   0|   0|2002|    9|  1| 134.0| Swipe Transaction|     La Verne|           CA|91750|5300|     No|  06|
|   0|   0|2002|    9|  1|  38.0| Swipe Transaction|Monterey Park|           CA|91754|5411|     No|  06|
|   0|   0|2002|    9|  2| 120.0| Swipe Transaction|Monterey Park|           CA|91754|5411|     No|  06|
|   0|   0|2002|    9|  2| 128.0| Swipe Transaction|Monterey Park|           CA|91754|5651|     No|  17|
|   0|   0|2002|    9|  3| 104.0| Swipe Transaction|     La Verne|           CA|91750|5912|     No|  06|
|   0|   0|2002|    9|  3|  86.0| Swipe Transaction|Monterey Park|           CA|91755|5970|     No|  13|
|   0|   0|2002|    9|  4|  93.0| Swipe Transaction|Mon

Посчитайте количество транзакций по годам, учитывая только те транзакции, объем которых был больше 100

In [None]:
proc_data.printSchema()

root
 |-- User: long (nullable = true)
 |-- Card: long (nullable = true)
 |-- Year: long (nullable = true)
 |-- Month: long (nullable = true)
 |-- Day: long (nullable = true)
 |-- Amount: float (nullable = true)
 |-- UseChip: string (nullable = true)
 |-- MerchantCity: string (nullable = true)
 |-- MerchantState: string (nullable = true)
 |-- Zip: integer (nullable = true)
 |-- MCC: long (nullable = true)
 |-- IsFraud: string (nullable = true)
 |-- Time: string (nullable = true)



In [None]:
proc_data.filter(F.col('Amount') > 100).groupBy('Year').count().show()

+----+------+
|Year| count|
+----+------+
|2007|126495|
|2014|186557|
|2012|181524|
|1991|   278|
|2016|190323|
|1994|  1607|
|2018|189510|
|1999| 14358|
|1997|  5846|
|2009|156325|
|2010|171967|
|2006|110132|
|2017|190046|
|1998|  9409|
|2013|185881|
|2004| 72738|
|2003| 56560|
|2002| 42572|
|2011|179197|
|2020| 36178|
+----+------+
only showing top 20 rows



Определите, есть ли пропуски в данных по каждому столбцу

In [None]:
proc_data.count()

24386900

In [None]:
proc_data.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in proc_data.columns]).show()

+----+----+----+-----+---+------+-------+------------+-------------+-------+---+-------+----+
|User|Card|Year|Month|Day|Amount|UseChip|MerchantCity|MerchantState|    Zip|MCC|IsFraud|Time|
+----+----+----+-----+---+------+-------+------------+-------------+-------+---+-------+----+
|   0|   0|   0|    0|  0|     0|      0|           0|      2720821|2878135|  0|      0|   0|
+----+----+----+-----+---+------+-------+------------+-------------+-------+---+-------+----+



Заполните пропуски исходя из типа данных

При помощи оконных функций для каждого клиента рассчитайте средний размер транзакции, количество транзакций и последнюю по дате транзакцию.

In [None]:
from pyspark.sql.window import Window
window_proc_data = proc_data.select('User', 'Card', 'Year', 'Month', 'Day', 'Amount')\
.withColumn('mean_tran', F.mean('Amount').over(Window.partitionBy('User')))\
.withColumn('count_tran', F.count('Amount').over(Window.partitionBy('User')))\
.withColumn('last_tran_data', F.max('Year').over(Window.partitionBy('User')))

window_proc_data.show()

+----+----+----+-----+---+------+-----------------+----------+--------------+
|User|Card|Year|Month|Day|Amount|        mean_tran|count_tran|last_tran_data|
+----+----+----+-----+---+------+-----------------+----------+--------------+
|   0|   0|2002|    9|  1| 134.0|89.72198567349596|     19963|          2020|
|   0|   0|2002|    9|  1|  38.0|89.72198567349596|     19963|          2020|
|   0|   0|2002|    9|  2| 120.0|89.72198567349596|     19963|          2020|
|   0|   0|2002|    9|  2| 128.0|89.72198567349596|     19963|          2020|
|   0|   0|2002|    9|  3| 104.0|89.72198567349596|     19963|          2020|
|   0|   0|2002|    9|  3|  86.0|89.72198567349596|     19963|          2020|
|   0|   0|2002|    9|  4|  93.0|89.72198567349596|     19963|          2020|
|   0|   0|2002|    9|  4| 123.0|89.72198567349596|     19963|          2020|
|   0|   0|2002|    9|  5|  61.0|89.72198567349596|     19963|          2020|
|   0|   0|2002|    9|  5|  57.0|89.72198567349596|     19963|  

In [None]:
window_proc_data.filter(F.col('User') == 0).show(4)
window_proc_data.filter(F.col('User') == 1).show(4)
window_proc_data.filter(F.col('User') == 2).show(4)

+----+----+----+-----+---+------+-----------------+----------+--------------+
|User|Card|Year|Month|Day|Amount|        mean_tran|count_tran|last_tran_data|
+----+----+----+-----+---+------+-----------------+----------+--------------+
|   0|   0|2002|    9|  1| 134.0|89.72198567349596|     19963|          2020|
|   0|   0|2002|    9|  1|  38.0|89.72198567349596|     19963|          2020|
|   0|   0|2002|    9|  2| 120.0|89.72198567349596|     19963|          2020|
|   0|   0|2002|    9|  2| 128.0|89.72198567349596|     19963|          2020|
+----+----+----+-----+---+------+-----------------+----------+--------------+
only showing top 4 rows

+----+----+----+-----+---+------+-----------------+----------+--------------+
|User|Card|Year|Month|Day|Amount|        mean_tran|count_tran|last_tran_data|
+----+----+----+-----+---+------+-----------------+----------+--------------+
|   1|   0|2003|    9|  1|  65.0|97.57158874313264|      8919|          2020|
|   1|   0|2003|    9|  4|  98.0|97.571

Теперь самое время сгруппировать данные по каждому клиенту (можно использовать collect_list для сбора данных после агрегации)
Когда будете делать агрегацию, то возьмите только часть выборки, например, через sample, для всей выборки либо не хватит памяти, либо очень долго считать

In [None]:
simp_data = trans_data.sample(fraction=0.3, seed=3)

In [None]:
print(simp_data.count())
simp_data.show(5)

7312560
+----+----+----+-----+---+-----+-------+-----------------+-------------+-------------+-------+----+-------+
|User|Card|Year|Month|Day| Time| Amount|          UseChip| MerchantCity|MerchantState|    Zip| MCC|IsFraud|
+----+----+----+-----+---+-----+-------+-----------------+-------------+-------------+-------+----+-------+
|   0|   0|2002|    9|  1|06:21|$134.09|Swipe Transaction|     La Verne|           CA|91750.0|5300|     No|
|   0|   0|2002|    9|  3|06:23|$104.71|Swipe Transaction|     La Verne|           CA|91750.0|5912|     No|
|   0|   0|2002|    9|  3|13:53| $86.19|Swipe Transaction|Monterey Park|           CA|91755.0|5970|     No|
|   0|   0|2002|    9|  8|06:38| $27.75|Swipe Transaction|     La Verne|           CA|91750.0|5411|     No|
|   0|   0|2002|    9|  9|06:54| $37.50|Swipe Transaction|     La Verne|           CA|91750.0|5411|     No|
+----+----+----+-----+---+-----+-------+-----------------+-------------+-------------+-------+----+-------+
only showing top 5 r

In [None]:
simp_data.printSchema()

root
 |-- User: long (nullable = true)
 |-- Card: long (nullable = true)
 |-- Year: long (nullable = true)
 |-- Month: long (nullable = true)
 |-- Day: long (nullable = true)
 |-- Time: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- UseChip: string (nullable = true)
 |-- MerchantCity: string (nullable = true)
 |-- MerchantState: string (nullable = true)
 |-- Zip: double (nullable = true)
 |-- MCC: long (nullable = true)
 |-- IsFraud: string (nullable = true)



In [None]:
# simp_data.show()
simp_data = simp_data.withColumn('Date', F.to_date(F.concat_ws('-', simp_data.Day, simp_data.Month, simp_data.Year), "d-M-y"))

In [None]:
simp_data.show()

+----+----+----+-----+---+-----+-------+-----------------+-------------+-------------+-------+----+-------+----------+
|User|Card|Year|Month|Day| Time| Amount|          UseChip| MerchantCity|MerchantState|    Zip| MCC|IsFraud|      Date|
+----+----+----+-----+---+-----+-------+-----------------+-------------+-------------+-------+----+-------+----------+
|   0|   0|2002|    9|  1|06:21|$134.09|Swipe Transaction|     La Verne|           CA|91750.0|5300|     No|2002-09-01|
|   0|   0|2002|    9|  3|06:23|$104.71|Swipe Transaction|     La Verne|           CA|91750.0|5912|     No|2002-09-03|
|   0|   0|2002|    9|  3|13:53| $86.19|Swipe Transaction|Monterey Park|           CA|91755.0|5970|     No|2002-09-03|
|   0|   0|2002|    9|  8|06:38| $27.75|Swipe Transaction|     La Verne|           CA|91750.0|5411|     No|2002-09-08|
|   0|   0|2002|    9|  9|06:54| $37.50|Swipe Transaction|     La Verne|           CA|91750.0|5411|     No|2002-09-09|
|   0|   0|2002|    9|  9|13:31|  $2.71|Swipe Tr

In [None]:
agg_data = simp_data.groupBy('User', 'Card')\
                    .agg(
                        F.collect_list('Date').alias('Date'),
                        F.collect_list('Time').alias('Time'),
                        F.collect_list('Amount').alias('Amount'),
                        F.collect_list('MCC').alias('MCC')
                        )

In [None]:
agg_data.show(5)

+----+----+--------------------+--------------------+--------------------+--------------------+
|User|Card|                Date|                Time|              Amount|                 MCC|
+----+----+--------------------+--------------------+--------------------+--------------------+
|   0|   1|[2014-04-09, 2014...|[13:53, 06:50, 06...|[$56.29, $19.97, ...|[5719, 5411, 5311...|
|   0|   4|[2008-09-29, 2008...|[13:14, 17:42, 11...|[$10.33, $1.62, $...|[5812, 5815, 5499...|
|   1|   0|[2003-09-18, 2003...|[10:49, 11:53, 12...|[$60.86, $29.18, ...|[5912, 7349, 7276...|
|   1|   1|[2011-02-02, 2011...|[13:20, 22:06, 12...|[$47.64, $120.00,...|[4814, 4829, 4900...|
|   1|   3|[2010-06-16, 2010...|[18:28, 10:35, 21...|[$46.62, $12.90, ...|[5912, 7210, 5411...|
+----+----+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



Напишите python функцию, которая возьмет данные после агрегации последовательностей, отсортирует их внутри по дате и времени и преобразует к формату python dict:
{'User': User,
'Card': Card,
'sequence':{
    'amount': [последовательность],
    'year': [последовательность],
    'month': [последовательность],
    'day': [последовательность],
    'time': [последовательность],
    'MCC': [последовательность]
}
}


Выведите как пример одну преобразованную запись, результаты сохраните на диск в через rdd pickle

In [None]:
one_dat = agg_data.take(1)

In [None]:
def to_dict(rec):
    conv_rec = {'User' : rec[0],
                'Card' : rec[1],
                'sequence' : {
                    'date' : sorted(rec[2]),
                    'time' : sorted(rec[3]),
                    'amount' : rec[4],
                    'MCC' : rec[5]
                }}

    return conv_rec

def convert_arr_to_dict(data):
    return [to_dict(data[i]) for i in range(len(data))]

print(convert_arr_to_dict(one_dat)[0])

{'User': 0, 'Card': 1, 'sequence': {'date': [2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2015, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2