In [1]:
!pwd

/data/home/alexey.klimov


In [2]:
!hdfs dfs -ls /labs/laba01/ml-100k

Found 23 items
-rw-r--r--   3 hdfs hdfs       6750 2022-01-06 18:46 /labs/laba01/ml-100k/README
-rw-r--r--   3 hdfs hdfs        716 2022-01-06 18:46 /labs/laba01/ml-100k/allbut.pl
-rw-r--r--   3 hdfs hdfs        643 2022-01-06 18:46 /labs/laba01/ml-100k/mku.sh
-rw-r--r--   3 hdfs hdfs    1979173 2022-01-06 18:46 /labs/laba01/ml-100k/u.data
-rw-r--r--   3 hdfs hdfs        202 2022-01-06 18:46 /labs/laba01/ml-100k/u.genre
-rw-r--r--   3 hdfs hdfs         36 2022-01-06 18:46 /labs/laba01/ml-100k/u.info
-rw-r--r--   3 hdfs hdfs     236344 2022-01-06 18:46 /labs/laba01/ml-100k/u.item
-rw-r--r--   3 hdfs hdfs        193 2022-01-06 18:46 /labs/laba01/ml-100k/u.occupation
-rw-r--r--   3 hdfs hdfs      22628 2022-01-06 18:46 /labs/laba01/ml-100k/u.user
-rw-r--r--   3 hdfs hdfs    1586544 2022-01-06 18:46 /labs/laba01/ml-100k/u1.base
-rw-r--r--   3 hdfs hdfs     392629 2022-01-06 18:46 /labs/laba01/ml-100k/u1.test
-rw-r--r--   3 hdfs hdfs    1583948 2022-01-06 18:46 /labs/laba01/ml-1

In [3]:
# поднимем Spark
from IPython.display import IFrame, Image

In [4]:
import os
import sys
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'
os.environ["PYSPARK_SUBMIT_ARGS"]='--num-executors 3 pyspark-shell'

spark_home = os.environ.get('SPARK_HOME', None)

sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))


In [5]:
from pyspark import SparkContext, SparkConf

config = SparkConf()
config.set("spark.app.name", "Alexey Klimov lab01 app")

sc = SparkContext(conf=config)

In [39]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf()
conf.set("spark.app.name", "Alexey Klimov lab01 app") 

spark = SparkSession.builder.config(conf=conf).appName("Alexey Klimov lab01 app").getOrCreate()

In [6]:
sc

In [7]:
# читаем данные
rdd = sc.textFile("/labs/laba01/ml-100k/u.data").map(lambda x: x.split("\t"))

In [8]:
rdd.take(5)

[['196', '242', '3', '881250949'],
 ['186', '302', '3', '891717742'],
 ['22', '377', '1', '878887116'],
 ['244', '51', '2', '880606923'],
 ['166', '346', '1', '886397596']]

In [12]:
rdd.getNumPartitions()

2

In [11]:
# считаем статистики для выбранного фильма
# film_id = 318

In [34]:
# Нагляднее сгруппировать через датафрейм
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [48]:
schema_data = StructType(fields=[
    StructField("user id", StringType()),
    StructField("item id", StringType()),
    StructField("rating", StringType()),
    StructField("timestamp", StringType())
])

In [49]:
df = (spark.read
          .schema(schema_data)
          .format("csv")
          .option("sep", "\t")
          .load("/lectures/lecture02/data/ml-100k/u.data"))

In [52]:
df.show(10)

+-------+-------+------+---------+
|user id|item id|rating|timestamp|
+-------+-------+------+---------+
|    196|    242|     3|881250949|
|    186|    302|     3|891717742|
|     22|    377|     1|878887116|
|    244|     51|     2|880606923|
|    166|    346|     1|886397596|
|    298|    474|     4|884182806|
|    115|    265|     2|881171488|
|    253|    465|     5|891628467|
|    305|    451|     3|886324817|
|      6|     86|     3|883603013|
+-------+-------+------+---------+
only showing top 10 rows



In [53]:
df.take(5)

[Row(user id='196', item id='242', rating='3', timestamp='881250949'),
 Row(user id='186', item id='302', rating='3', timestamp='891717742'),
 Row(user id='22', item id='377', rating='1', timestamp='878887116'),
 Row(user id='244', item id='51', rating='2', timestamp='880606923'),
 Row(user id='166', item id='346', rating='1', timestamp='886397596')]

In [57]:
# расчет для выбранного фильма
calc_my_item_id = df[df['item id']=='318'].groupby('rating').count().orderBy('rating')

In [58]:
calc_my_item_id.show()

+------+-----+
|rating|count|
+------+-----+
|     1|    4|
|     2|    6|
|     3|   23|
|     4|   79|
|     5|  186|
+------+-----+



In [None]:
# общее количество поставленных оценок для всех фильмов

In [61]:
calc_all_item_id = df.groupby('rating').count().orderBy('rating')

In [62]:
calc_all_item_id.show()

+------+-----+
|rating|count|
+------+-----+
|     1| 6110|
|     2|11370|
|     3|27145|
|     4|34174|
|     5|21201|
+------+-----+



In [65]:
result = \
{
   "hist_film": [  
      4,
      6,
      23,
      79,
      186
   ],
   "hist_all": [  
      6110,
      11370,
      27145,
      34174,
      21201
   ]
}

In [66]:
result

{'hist_film': [4, 6, 23, 79, 186],
 'hist_all': [6110, 11370, 27145, 34174, 21201]}

In [None]:
# записываем результат в lab01.json

In [67]:
import json
with open('lab01.json', 'w') as fp:
    json.dump(result, fp)

In [70]:
# закрываем Spark
sc.stop()