In [52]:
import os
import datetime
import socket
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, length, when, col, broadcast
from pyspark.sql.types import BooleanType, IntegerType, LongType, StringType, ArrayType, FloatType, StructType, StructField, DateType, BooleanType
import pyspark.sql.functions as F
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
from pyspark import StorageLevel
from jinja2 import Environment, FileSystemLoader

from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

from collections import Counter

In [53]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark_Tutorial')\
        .getOrCreate()

In [54]:
df = spark.read.parquet(r"alfabattle2_train_transactions_contest\train_transactions_contest\part_000_0_to_23646.parquet")

In [55]:
# df = spark.read.csv("new_results.csv", header=True, inferSchema=True).persist(StorageLevel.DISK_ONLY)

## Первичный анализ

In [56]:
df.show(5)

+------+-------------------+--------+--------------+---------+--------------+--------------------+--------------+--------------+-----------+---+-------+----+------------+-----------+----+-----------+----------+---------+------------------+-----------------+
|app_id|               amnt|currency|operation_kind|card_type|operation_type|operation_type_group|ecommerce_flag|payment_system|income_flag|mcc|country|city|mcc_category|day_of_week|hour|days_before|weekofyear|hour_diff|transaction_number|__index_level_0__|
+------+-------------------+--------+--------------+---------+--------------+--------------------+--------------+--------------+-----------+---+-------+----+------------+-----------+----+-----------+----------+---------+------------------+-----------------+
|     0| 0.4654254330729043|       1|             4|       98|             4|                   2|             3|             7|          3|  2|      1|  37|           2|          4|  19|        351|        34|       -1|      

In [57]:
df[
 'app_id',
 'amnt',
 'currency',
 'operation_kind',
 'card_type',
 'operation_type',
 'operation_type_group',
 'ecommerce_flag',
 'payment_system',
 'income_flag',
 'mcc',
 'country',
 'city',
 'mcc_category',
 'day_of_week',
 'hour',
 'days_before',
#  'weekofyear',
#  'hour_diff',
#  'transaction_number',
].show()

+------+-------------------+--------+--------------+---------+--------------+--------------------+--------------+--------------+-----------+---+-------+----+------------+-----------+----+-----------+
|app_id|               amnt|currency|operation_kind|card_type|operation_type|operation_type_group|ecommerce_flag|payment_system|income_flag|mcc|country|city|mcc_category|day_of_week|hour|days_before|
+------+-------------------+--------+--------------+---------+--------------+--------------------+--------------+--------------+-----------+---+-------+----+------------+-----------+----+-----------+
|     0| 0.4654254330729043|       1|             4|       98|             4|                   2|             3|             7|          3|  2|      1|  37|           2|          4|  19|        351|
|     0|                0.0|       1|             2|       98|             7|                   1|             3|             7|          3|  2|      1|  49|           2|          4|  20|        351|


In [58]:
df.printSchema()

root
 |-- app_id: integer (nullable = true)
 |-- amnt: double (nullable = true)
 |-- currency: integer (nullable = true)
 |-- operation_kind: integer (nullable = true)
 |-- card_type: integer (nullable = true)
 |-- operation_type: integer (nullable = true)
 |-- operation_type_group: integer (nullable = true)
 |-- ecommerce_flag: integer (nullable = true)
 |-- payment_system: integer (nullable = true)
 |-- income_flag: integer (nullable = true)
 |-- mcc: integer (nullable = true)
 |-- country: integer (nullable = true)
 |-- city: integer (nullable = true)
 |-- mcc_category: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- days_before: integer (nullable = true)
 |-- weekofyear: integer (nullable = true)
 |-- hour_diff: long (nullable = true)
 |-- transaction_number: integer (nullable = true)
 |-- __index_level_0__: long (nullable = true)



* app_id - Идентификатор заявки. заявки пронумерованы так, что более поздним заявкам соответствует более поздняя дата
* amnt - Нормированная сумма транзакции. 0.0 - соответствует пропускам
* currency - Идентификатор валюты транзакции
* operation_kind - Идентификатор типа транзакции
* card_type - Уникальный идентификатор типа карты
* operation_type - Идентификатор типа операции по пластиковой карте
* operationtypegroup - Идентификатор группы карточных операций, например, дебетовая карта или кредитная карта
* ecommerce_flag - Признак электронной коммерции
* payment_system - Идентификатор типа платежной системы
* income_flag - Признак списания/внесения денежных средств на карту
* mcc - Уникальный идентификатор типа торговой точки
* country - Идентификатор страны транзакции
* city - Идентификатор города транзакции
* mcc_category - Идентификатор категории магазина транзакции
* dayofweek - День недели, когда транзакция была совершена
* hour - Час, когда транзакция была совершена
* days_before - Количество дней до даты выдачи кредита
* weekofyear - Номер недели в году, когда транзакция была совершена
* hour_diff - Количество часов с момента прошлой транзакции для данного клиента
* transaction_number - Порядковый номер транзакции клиента

In [59]:
df.columns

['app_id',
 'amnt',
 'currency',
 'operation_kind',
 'card_type',
 'operation_type',
 'operation_type_group',
 'ecommerce_flag',
 'payment_system',
 'income_flag',
 'mcc',
 'country',
 'city',
 'mcc_category',
 'day_of_week',
 'hour',
 'days_before',
 'weekofyear',
 'hour_diff',
 'transaction_number',
 '__index_level_0__']

In [60]:
df['app_id',
 'amnt',
 'currency',
 'operation_kind',
 'card_type',
 'operation_type',
 'operation_type_group',
 'ecommerce_flag',
 'payment_system',
 'income_flag'].describe().show()

+-------+------------------+-------------------+-----------------+------------------+------------------+------------------+--------------------+------------------+------------------+-------------------+
|summary|            app_id|               amnt|         currency|    operation_kind|         card_type|    operation_type|operation_type_group|    ecommerce_flag|    payment_system|        income_flag|
+-------+------------------+-------------------+-----------------+------------------+------------------+------------------+--------------------+------------------+------------------+-------------------+
|  count|           5408648|            5408648|          5408648|           5408648|           5408648|           5408648|             5408648|           5408648|           5408648|            5408648|
|   mean|11967.004015791008| 0.3731903180912123|1.102896509441916| 1.432611624938432|53.709856696165104|2.6666381321173054|  1.0809721764108147|1.1010409625473871|2.1071733638424983| 1.082

In [61]:
df['mcc',
 'country',
 'city',
 'mcc_category',
 'day_of_week',
 'hour',
 'days_before',
 'weekofyear',
 'hour_diff',
 'transaction_number',
 ].describe().show()
#  '__index_level_0__'

+-------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+
|summary|               mcc|           country|              city|      mcc_category|       day_of_week|             hour|       days_before|        weekofyear|         hour_diff|transaction_number|
+-------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+
|  count|           5408648|           5408648|           5408648|           5408648|           5408648|          5408648|           5408648|           5408648|           5408648|           5408648|
|   mean|12.961375005361784|1.5631464646987565| 20.34707878937583|4.4080431930493535|3.8609362265764013|14.16820968937154|168.13116826977833|25.945357693826626|31.924373706700823|270.64004276114844|
| std

In [62]:
# import seaborn as sns

# fig, ax = plt.subplots(figsize=(20,10))
# attr = ["Value (x$1000)","Shares","Voting Sole","Shared","None", "open", "high", "low","close","volume","total_value","Shares type indexed","CUSIP indexed","ticker indexed","CIK indexed", "target", 'next_target']
# ax.set_title("Correlation Matrix")

# ax = sns.heatmap(corrmatrix, linewidth=0.5, annot=True, fmt='.1g')
# ax.set_xticklabels(attr)
# ax.set_yticklabels(attr)
# plt.xticks(rotation = 45)
# plt.yticks(rotation = 45)
# plt.show()

## Рекомендательная система

In [63]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

Создаём рейтинг mcc

In [64]:
df_1 = df["app_id","mcc","day_of_week", "weekofyear"]
df_1.show()

+------+---+-----------+----------+
|app_id|mcc|day_of_week|weekofyear|
+------+---+-----------+----------+
|     0|  2|          4|        34|
|     0|  2|          4|        34|
|     0|  2|          4|        34|
|     0| 10|          2|        34|
|     0|  2|          4|        53|
|     0|  2|          4|        53|
|     0|  2|          2|        53|
|     0|  2|          2|        52|
|     0| 10|          7|        52|
|     0| 10|          7|        52|
|     0|  2|          2|        50|
|     0| 10|          5|        39|
|     0|  2|          3|        39|
|     0|  2|          6|        48|
|     0|  2|          5|        48|
|     0|  9|          5|        48|
|     0|  2|          2|        48|
|     0|  1|          4|        46|
|     0|  2|          4|        46|
|     0|  2|          4|        20|
+------+---+-----------+----------+
only showing top 20 rows



In [65]:
df_1.printSchema()

root
 |-- app_id: integer (nullable = true)
 |-- mcc: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- weekofyear: integer (nullable = true)



Ограничиваем данные одним днём

In [66]:
import datetime

now = datetime.datetime.now()

In [67]:
import datetime

now = datetime.datetime.now()
# df_2 = df_1[(df_1["day_of_week"] == now.weekday) & (df_1["weekofyear"] == now.year)]
# print(df_1.filter(col("day_of_week") == now.weekday).collect())

In [68]:
from pyspark.sql.functions import create_map, lit, explode

In [86]:
df_2 = df_1.groupBy("app_id").pivot("mcc").count()
map_df_2 = (c for x in df_2.columns if x != 'app_id' for c in [lit(x), col(x)])
df_3 = df_2.select('app_id', explode(create_map(*map_df_2))).withColumnRenamed("key","mcc").withColumnRenamed("value","count")
df_3 = df_3.withColumn("mcc", df_3["mcc"].cast(IntegerType()))
df_3.show()

+------+---+-----+
|app_id|mcc|count|
+------+---+-----+
|  7982|  1|   22|
|  7982|  2|    8|
|  7982|  3|   12|
|  7982|  4|    7|
|  7982|  5| null|
|  7982|  6| null|
|  7982|  7| null|
|  7982|  8| null|
|  7982|  9| null|
|  7982| 10|    1|
|  7982| 11| null|
|  7982| 12| null|
|  7982| 13|    1|
|  7982| 14|    1|
|  7982| 15|    2|
|  7982| 16| null|
|  7982| 17| null|
|  7982| 18| null|
|  7982| 19| null|
|  7982| 20| null|
+------+---+-----+
only showing top 20 rows



In [70]:
df_3.printSchema()

root
 |-- app_id: integer (nullable = true)
 |-- mcc: integer (nullable = true)
 |-- count: long (nullable = true)



In [71]:
# Create test and train set
(train, test) = df_3.randomSplit([0.8, 0.2], seed = 1234)

# Create ALS model
als = ALS(  userCol="app_id", 
            itemCol="mcc",
            ratingCol="count",
            nonnegative = True,
            implicitPrefs = False,
            coldStartStrategy="drop")

# Confirm that a model called "als" was created
type(als)

pyspark.ml.recommendation.ALS

In [72]:
# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()
            #             .addGrid(als.maxIter, [5, 50, 100, 200]) \

           
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="count", predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  16


### Build your cross validation pipeline

In [73]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# Confirm cv was built
print(cv)

CrossValidator_25d627d25ea1


In [74]:
train = train.na.fill(value=0)

In [77]:
#Fit cross validator to the 'train' dataset
model = cv.fit(train)

#Extract best model from the cv model above
best_model = model.bestModel

In [78]:
# Print best_model
print(type(best_model))

# Complete the code below to extract the ALS model parameters
print("**Best Model**")

# # Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())

# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())

# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

<class 'pyspark.ml.recommendation.ALSModel'>
**Best Model**
  Rank: 150
  MaxIter: 10
  RegParam: 0.01


In [80]:
test = test.na.fill(value=0)

In [81]:
# View the predictions
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

11.06338609595949


In [82]:
test_predictions.show()

+------+---+-----+------------+
|app_id|mcc|count|  prediction|
+------+---+-----+------------+
|    28|  3|    1|   17.050058|
|    28|  6|    0|     6.56532|
|    28| 18|    0|         0.0|
|    28| 27|    0|   1.1472384|
|    28| 38|    0|    4.986567|
|    28| 50|    0|  0.80591667|
|    28| 53|    0|   2.2959392|
|    28| 56|    0| 0.018748956|
|    28| 61|    0|0.0012728251|
|    28| 62|    0|0.0031631365|
|    28| 65|    0|   1.0087055|
|    28| 66|    0|  0.11613556|
|    28| 79|    0|   1.1144736|
|    28| 80|    0|  0.14943416|
|    28| 82|    0|   1.7378793|
|    28| 83|    0|0.0023888317|
|    28| 87|    0|  0.08898787|
|    28| 96|    0|  0.12451648|
|    28|108|    0|  0.59120995|
|    31|  7|    0|   5.6454105|
+------+---+-----+------------+
only showing top 20 rows



In [83]:
# Generate n Recommendations for all users
nrecommendations = best_model.recommendForAllUsers(10)
nrecommendations.limit(10).show()



+------+--------------------+
|app_id|     recommendations|
+------+--------------------+
|    28|[{1, 102.78399}, ...|
|    31|[{2, 62.46824}, {...|
|    34|[{2, 42.810116}, ...|
|    53|[{4, 253.964}, {1...|
|    65|[{1, 285.31662}, ...|
|    78|[{8, 112.24294}, ...|
|    81|[{9, 2.9355652}, ...|
|    85|[{1, 32.966114}, ...|
|   101|[{2, 91.71269}, {...|
|   108|[{1, 251.58226}, ...|
+------+--------------------+

