In [1]:
import os
import sys
from datetime import datetime
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'
os.environ["PYSPARK_SUBMIT_ARGS"]='--num-executors 3 --executor-memory 3g --driver-memory 2g pyspark-shell'

spark_home = os.environ.get('SPARK_HOME', None)

sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
from pyspark import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as t
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
import numpy as np
from numpy import dot
from numpy. linalg import norm
conf = SparkConf()

In [2]:
spark = SparkSession\
    .builder\
    .config(conf=conf)\
    .appName("GF_spark")\
    .getOrCreate()

In [3]:
spark

In [4]:
sc = spark.sparkContext

In [5]:
sc

### show slaba03 files

In [6]:
! hdfs dfs -ls /labs/slaba03/

Found 4 items
-rw-r--r--   3 hdfs hdfs   91066524 2022-01-06 18:46 /labs/slaba03/laba03_items.csv
-rw-r--r--   3 hdfs hdfs   29965581 2022-01-06 18:46 /labs/slaba03/laba03_test.csv
-rw-r--r--   3 hdfs hdfs   74949368 2022-01-06 18:46 /labs/slaba03/laba03_train.csv
-rw-r--r--   3 hdfs hdfs  871302535 2022-01-06 18:46 /labs/slaba03/laba03_views_programmes.csv


In [7]:
sc.setCheckpointDir('checkpoint/')

### dounload data

In [8]:
schema = t.StructType(fields=[t.StructField("user_id", t.IntegerType()),
                            t.StructField("item_id", t.IntegerType()),
                            t.StructField("purchase", t.IntegerType())])

train = spark.read.csv('/labs/slaba03/laba03_train.csv', schema=schema, header=True)
train.summary().show()
train.agg(F.countDistinct("user_id")).show()
train.agg(F.countDistinct("item_id")).show()
train.show(5)

+-------+-----------------+------------------+--------------------+
|summary|          user_id|           item_id|            purchase|
+-------+-----------------+------------------+--------------------+
|  count|          5032624|           5032624|             5032624|
|   mean|869680.9464782189| 66869.30485865823|0.002166662957534...|
| stddev|60601.09821562329|35242.282055382406|0.046496977952916414|
|    min|             1654|               326|                   0|
|    25%|           846231|             65667|                   0|
|    50%|           885247|             79853|                   0|
|    75%|           908588|             93606|                   0|
|    max|           941450|            104165|                   1|
+-------+-----------------+------------------+--------------------+

+-----------------------+
|count(DISTINCT user_id)|
+-----------------------+
|                   1941|
+-----------------------+

+-----------------------+
|count(DISTINCT item_id)|


In [10]:
schema = t.StructType(fields=[t.StructField("user_id", t.IntegerType()),
                            t.StructField("item_id", t.IntegerType()),
                            t.StructField("purchase", t.IntegerType())])

test = spark.read.csv('/labs/slaba03/laba03_test.csv', schema=schema, header=True)
test.show(5)

+-------+-------+--------+
|user_id|item_id|purchase|
+-------+-------+--------+
|   1654|  94814|    null|
|   1654|  93629|    null|
|   1654|   9980|    null|
|   1654|  95099|    null|
|   1654|  11265|    null|
+-------+-------+--------+
only showing top 5 rows



### import ml

In [12]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import BinaryClassificationEvaluator

### evaluator, asl

In [13]:
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="prediction",
    labelCol="purchase",
    metricName="areaUnderROC"
)

In [14]:
als = ALS(maxIter=30,
          regParam=2.2,
          rank=6,
          coldStartStrategy="nan",
          userCol='user_id',
          itemCol='item_id',
          ratingCol='purchase',
          nonnegative=False,
          implicitPrefs=True,
          alpha=5.0,
          seed=87)
%time als_model = als.fit(train)

CPU times: user 6.29 ms, sys: 4.19 ms, total: 10.5 ms
Wall time: 23.5 s


### predict

In [16]:
predict_1 = als_model.transform(train)
%time predict_1.show(5)

+-------+-------+--------+-------------+
|user_id|item_id|purchase|   prediction|
+-------+-------+--------+-------------+
| 793876|   8389|       0|  0.062498033|
| 795620|   8389|       0| -0.012890842|
| 851848|   8389|       0|-1.8043676E-4|
| 880451|   8389|       0|   0.11716948|
| 900203|   8389|       0|  0.037745614|
+-------+-------+--------+-------------+
only showing top 5 rows

CPU times: user 2.53 ms, sys: 127 µs, total: 2.66 ms
Wall time: 17.2 s


In [17]:
predict_1 = predict_1.coalesce(4).cache()

In [18]:
predict_1.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- purchase: integer (nullable = true)
 |-- prediction: float (nullable = false)



In [20]:
predict_1 = predict_1.withColumn("prediction", predict_1.prediction.cast(t.DoubleType()))

In [21]:
predict_1.checkpoint()

DataFrame[user_id: int, item_id: int, purchase: int, prediction: double]

In [22]:
#rocauc_train = evaluator.evaluate(predict_train)

In [23]:
predict = als_model.transform(test)
%time predict.show(5)

+-------+-------+--------+-------------+
|user_id|item_id|purchase|   prediction|
+-------+-------+--------+-------------+
| 822709|   8389|    null| 4.146702E-28|
| 824008|   8389|    null|-0.0016319451|
| 890476|   8389|    null|          0.0|
| 899993|   8389|    null| 9.4599556E-4|
| 937345|   8389|    null|   0.03161242|
+-------+-------+--------+-------------+
only showing top 5 rows

CPU times: user 1.9 ms, sys: 0 ns, total: 1.9 ms
Wall time: 11.7 s


In [24]:
predict.rdd.getNumPartitions()

200

In [25]:
predict = predict.coalesce(4).cache()

In [27]:
finally_data = predict.select(
    'user_id',
    'item_id',
    F.col('prediction').alias('purchase')
).orderBy(['user_id', 'item_id'])
finally_data.show(20)

+-------+-------+-------------+
|user_id|item_id|     purchase|
+-------+-------+-------------+
|   1654|    336|          0.0|
|   1654|    678|          0.0|
|   1654|    691|          0.0|
|   1654|    696|  7.360736E-4|
|   1654|    763| 0.0017997124|
|   1654|    795|  0.007775446|
|   1654|    861| 0.0048438795|
|   1654|   1137| 0.0063141193|
|   1654|   1159| -0.001759076|
|   1654|   1428|   0.01012432|
|   1654|   1685|  0.006617021|
|   1654|   1686| 0.0031508922|
|   1654|   1704|  0.011399428|
|   1654|   2093|          0.0|
|   1654|   2343| 0.0038297991|
|   1654|   2451|          0.0|
|   1654|   2469| 0.0043335725|
|   1654|   2603|-0.0076052453|
|   1654|   2609|          0.0|
|   1654|   2621|   0.00132692|
+-------+-------+-------------+
only showing top 20 rows



In [30]:
finally_data.coalesce(1).write.csv(
    'lab03',
    header=True,
    sep=',',
    mode='overwrite'
)

In [31]:
! hdfs dfs -ls

Found 7 items
drwxr-xr-x   - sergey.vaschuk sergey.vaschuk          0 2023-05-18 10:09 .sparkStaging
drwxr-xr-x   - sergey.vaschuk sergey.vaschuk          0 2023-05-18 10:10 checkpoint
drwxr-xr-x   - sergey.vaschuk sergey.vaschuk          0 2023-05-13 22:26 data.csv
drwxr-xr-x   - sergey.vaschuk sergey.vaschuk          0 2023-05-18 10:14 lab03
drwxr-xr-x   - sergey.vaschuk sergey.vaschuk          0 2023-05-13 21:50 lab03.csv
drwxr-xr-x   - sergey.vaschuk sergey.vaschuk          0 2023-05-18 09:26 lab3
drwxr-xr-x   - sergey.vaschuk sergey.vaschuk          0 2023-05-13 22:01 predictions.csv


In [32]:
! pwd

/data/home/sergey.vaschuk


In [33]:
! ls -l

total 98500
-rw-rw-r-- 1 sergey.vaschuk sergey.vaschuk     9627 Apr 28 13:21 l1.ipynb
-rw-rw-r-- 1 sergey.vaschuk sergey.vaschuk    20774 Apr 28 13:18 l2.ipynb
-rw-rw-r-- 1 sergey.vaschuk sergey.vaschuk      186 Apr 28 13:22 lab01.json
-rw-rw-r-- 1 sergey.vaschuk sergey.vaschuk     1000 Apr 28 13:16 lab02.json
-rw-rw-r-- 1 sergey.vaschuk sergey.vaschuk 50181946 May 18 10:13 lab03-Copy1.csv
-rw-rw-r-- 1 sergey.vaschuk sergey.vaschuk    19519 May 18 10:12 lab3_gf.ipynb
drwxrwxr-x 7 sergey.vaschuk sergey.vaschuk     4096 May 11 13:49 sber-spark-ds-12
-rw-r--r-- 1 sergey.vaschuk sergey.vaschuk        0 May 18 09:58 _SUCCESS
-rw-rw-r-- 1 sergey.vaschuk sergey.vaschuk 50591023 May 14 20:09 trash.csv


In [34]:
! hdfs dfs -copyToLocal lab03/* ~/

copyToLocal: `/data/home/sergey.vaschuk/_SUCCESS': File exists
