In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f_

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
ratings_df = spark.read.csv("///var/localdata/datasets/movielens/ratings.csv", header=True)

### DataFrames Operations

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql

In [5]:
display(ratings_df.select(['movieId','rating']))

In [6]:
ratings_df.printSchema()

In [7]:
# Изменение типа данных осуществляется с помощью типов данных
from pyspark.sql.types import IntegerType

# создаем новую колонку withColumn() и применяем к колонке новый дата тип
ratings_df = ratings_df.withColumn("rating", ratings_df['rating'].cast(IntegerType()))
# take a look at the schema now
ratings_df.select(['movieId','rating']).printSchema()

In [8]:
# фильтр данных
ratings_df.filter(ratings_df.rating.isNull()).count()

In [9]:
# если это SQL DF, то можно применить where
ratings_df.where(ratings_df.rating.isNull()).count()

### Group By
**aggregate functions (COUNT, MAX, MIN, SUM, AVG)** 

In [11]:
# Агрега по фильмам, где нужно найти средний рейтинг фильма, и сделать агрегат по среднему рейтингу
display(ratings_df.groupBy('movieId').agg(avg('rating').alias('avg_rating'), count('rating').alias('reviews')))

In [12]:
# Обзор данных, где фильм входит в ТОП 10, и имеет не менее 50 оценок
ratings_sum_df = ratings_df.groupBy('movieId').agg(avg('rating').alias('avg_rating'), count('rating').alias('reviews'))
display(ratings_sum_df.filter(ratings_sum_df.reviews > 50).sort('avg_rating', ascending=False).limit(10))

### User Defined Functions (UDF)

https://docs.databricks.com/spark/latest/spark-sql/udf-in-python.html

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

In [15]:
# Определим собственную функцию из лямбды и зарегистрируем её
# функция определяюшая рейтинг выше/ниже 3.5

watchable_udf = udf(lambda avg_rating: 'yes' if avg_rating > 3.5 else 'no', StringType())

In [16]:
# другой вариант
def watchable_udf(avg_rating, reviews):
    if avg_rating > 3.5 and reviews > 50:
        return 'yes'
      elif avg_rating > 3.5 and reviews < 50:
        return 'maybe'
      else:
        return 'no'
    
watchable_udf = udf(watchable_udf, StringType())

In [17]:
# использование функции
ratings_sum_df = ratings_sum_df.withColumn('watchable', watchable_udf(ratings_sum_df.avg_rating,ratings_sum_df.reviews))

In [18]:
display(ratings_sum_df)

### Joins


* (INNER) JOIN
* LEFT (OUTER) JOIN
* RIGHT (OUTER) JOIN
* FULL (OUTER) JOIN

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#join 

In [20]:
movies_df = spark.read.csv("///var/localdata/datasets/movielens/movies.csv", header=True)
display(movies_df)

In [21]:
# расширим наш дата сет
movie_ratings_sum_df = ratings_sum_df.join(movies_df, ratings_sum_df.movieId == movies_df.movieId)

In [22]:
display(movie_ratings_sum_df.select(['title','avg_rating','reviews','watchable']))