In [1]:
from google.colab import drive
drive.mount('/content/drive')

%cd drive/MyDrive/sber_risk_spark/lesson_3

Mounted at /content/drive
/content/drive/MyDrive/sber_risk_spark/lesson_3


In [2]:
!pip install pyspark findspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 40 kB/s 
[?25hCollecting findspark
  Downloading findspark-1.4.2-py2.py3-none-any.whl (4.2 kB)
Collecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 16.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=92620a170307d7fb8b794d0b9d9c60fff59b1308f025fdd8cc56155e13af5029
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark, findspark
Successfully installed findspark-1.4.2 py4j-0.10.9.2 pyspark-3.2.0


In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

conf = SparkConf().set('spark.ui.port', '4050').set('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')\
                  .set('spark.dynamicAllocation.enabled', 'true')\
                  .set('spark.shuffle.service.enabled', 'true') #трекер, чтобы возвращать ресурсы
sc = SparkContext(conf=conf)
spark = SparkSession.builder.master('local[*]').getOrCreate()

**Создание DataFrame**

Из RDD

In [None]:
def cleaning(row):
    row = row.split('\t')[:3]
    row = [float(val) for val in row]
    return row

In [None]:
ratings = sc.textFile('user_ratedmovies.dat')

first_row = ratings.first()
ratings = ratings.filter(lambda row: row != first_row)\
                 .map(cleaning)

In [None]:
columns = first_row.split('\t')[:3]

In [None]:
columns

['userID', 'movieID', 'rating']

In [None]:
ratings.take(5)

[[75.0, 3.0, 1.0],
 [75.0, 32.0, 4.5],
 [75.0, 110.0, 4.0],
 [75.0, 160.0, 2.0],
 [75.0, 163.0, 4.0]]

In [None]:
df_rdd = spark.createDataFrame(ratings, columns)

In [None]:
df_rdd

DataFrame[userID: double, movieID: double, rating: double]

Можно еще вот так:

In [None]:
df_rdd = ratings.toDF(columns)

In [None]:
df_rdd

DataFrame[userID: double, movieID: double, rating: double]

Так, а если не хочу вот эти приседания с RDD, а хочу сразу из файла?

In [None]:
df = spark.read\
          .format("csv")\
          .options(**{'sep': '\t', 'header': 'true'})\
          .load("user_ratedmovies.dat")

In [None]:
df

DataFrame[userID: string, movieID: string, rating: string, date_day: string, date_month: string, date_year: string, date_hour: string, date_minute: string, date_second: string]

Все в string, так не пойдет, давайте автоматически определим тип данных

In [None]:
df = spark.read\
          .format("csv")\
          .options(**{'sep': '\t', 'header': 'true', 'inferSchema': 'true'})\
          .load("user_ratedmovies.dat")

In [None]:
df

DataFrame[userID: int, movieID: int, rating: double, date_day: int, date_month: int, date_year: int, date_hour: int, date_minute: int, date_second: int]

In [None]:
df.show(10)

+------+-------+------+--------+----------+---------+---------+-----------+-----------+
|userID|movieID|rating|date_day|date_month|date_year|date_hour|date_minute|date_second|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+
|    75|      3|   1.0|      29|        10|     2006|       23|         17|         16|
|    75|     32|   4.5|      29|        10|     2006|       23|         23|         44|
|    75|    110|   4.0|      29|        10|     2006|       23|         30|          8|
|    75|    160|   2.0|      29|        10|     2006|       23|         16|         52|
|    75|    163|   4.0|      29|        10|     2006|       23|         29|         30|
|    75|    165|   4.5|      29|        10|     2006|       23|         25|         15|
|    75|    173|   3.5|      29|        10|     2006|       23|         17|         37|
|    75|    296|   5.0|      29|        10|     2006|       23|         24|         49|
|    75|    353|   3.5|      29|

А можно заранее сказать, какой тип данных я ожидаю?

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

In [None]:
schema = StructType([ \
    StructField("userID",IntegerType(),True), \
    StructField("movieID",IntegerType(),True), \
    StructField("rating",DoubleType(),True), \
    StructField("date_day", StringType(), True), \
    StructField("date_month", StringType(), True), \
    StructField("date_year", IntegerType(), True), \
    StructField("date_hour", IntegerType(), True), \
    StructField("date_minute", IntegerType(), True), \
    StructField("date_second", IntegerType(), True)
  ])

In [None]:
df = spark.read\
          .format("csv")\
          .options(**{'sep': '\t', 'header': 'true'})\
          .schema(schema)\
          .load("user_ratedmovies.dat")

In [None]:
df.printSchema()

root
 |-- userID: integer (nullable = true)
 |-- movieID: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- date_day: string (nullable = true)
 |-- date_month: string (nullable = true)
 |-- date_year: integer (nullable = true)
 |-- date_hour: integer (nullable = true)
 |-- date_minute: integer (nullable = true)
 |-- date_second: integer (nullable = true)



In [None]:
df.show(10)

+------+-------+------+--------+----------+---------+---------+-----------+-----------+
|userID|movieID|rating|date_day|date_month|date_year|date_hour|date_minute|date_second|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+
|    75|      3|   1.0|      29|        10|     2006|       23|         17|         16|
|    75|     32|   4.5|      29|        10|     2006|       23|         23|         44|
|    75|    110|   4.0|      29|        10|     2006|       23|         30|          8|
|    75|    160|   2.0|      29|        10|     2006|       23|         16|         52|
|    75|    163|   4.0|      29|        10|     2006|       23|         29|         30|
|    75|    165|   4.5|      29|        10|     2006|       23|         25|         15|
|    75|    173|   3.5|      29|        10|     2006|       23|         17|         37|
|    75|    296|   5.0|      29|        10|     2006|       23|         24|         49|
|    75|    353|   3.5|      29|

Но есть уже готовая обертка под все нужды

In [None]:
df = spark.read.csv(path='user_ratedmovies.dat', sep='\t', header=True, inferSchema=True, schema=None)

In [None]:
df.printSchema()

root
 |-- userID: integer (nullable = true)
 |-- movieID: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- date_day: integer (nullable = true)
 |-- date_month: integer (nullable = true)
 |-- date_year: integer (nullable = true)
 |-- date_hour: integer (nullable = true)
 |-- date_minute: integer (nullable = true)
 |-- date_second: integer (nullable = true)



Так, а как сохранить? Лучше быть аккуратнее с overwrite, перезапишет весь указанный путь, append будет безопаснее

In [None]:
df.write.option("header",True)\
        .mode("overwrite")\
        .parquet('write_1.parquet')

А что с партицированием?

In [None]:
df.write.option("header",True)\
        .partitionBy('date_year')\
        .mode("overwrite")\
        .parquet('write_2.parquet')

Кстати, раз уж заговорили про схемы данных, то из можно задвать интереснее, например, под группированные данные

In [None]:
structureData = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

df2 = spark.createDataFrame(data=structureData,schema=structureSchema)
df2.printSchema()
df2.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+--------------------+-----+------+------+
|name                |id   |gender|salary|
+--------------------+-----+------+------+
|{James, , Smith}    |36636|M     |3100  |
|{Michael, Rose, }   |40288|M     |4300  |
|{Robert, , Williams}|42114|M     |1400  |
|{Maria, Anne, Jones}|39192|F     |5500  |
|{Jen, Mary, Brown}  |     |F     |-1    |
+--------------------+-----+------+------+



Со структурой можно работать и менять ее под ваши нужны

In [None]:
from pyspark.sql.functions import col,struct,when
updatedDF = df2.withColumn("OtherInfo", 
    struct(col("id").alias("identifier"),
    col("gender").alias("gender"),
    col("salary").alias("salary"),
    when(col("salary").cast(IntegerType()) < 2000,"Low")
      .when(col("salary").cast(IntegerType()) < 4000,"Medium")
      .otherwise("High").alias("Salary_Grade")
  )).drop("id","gender","salary")

updatedDF.printSchema()
updatedDF.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- OtherInfo: struct (nullable = false)
 |    |-- identifier: string (nullable = true)
 |    |-- gender: string (nullable = true)
 |    |-- salary: integer (nullable = true)
 |    |-- Salary_Grade: string (nullable = false)

+--------------------+------------------------+
|name                |OtherInfo               |
+--------------------+------------------------+
|{James, , Smith}    |{36636, M, 3100, Medium}|
|{Michael, Rose, }   |{40288, M, 4300, High}  |
|{Robert, , Williams}|{42114, M, 1400, Low}   |
|{Maria, Anne, Jones}|{39192, F, 5500, High}  |
|{Jen, Mary, Brown}  |{, F, -1, Low}          |
+--------------------+------------------------+



Что мы там сделали????

1) Создали новую структуру данных OtherInfo

2) Передали туда id (переименовав столбец), gender, salary

3) Создали столбец Salary_grade из условий

4) удалили id, gender, salary из старой структуры

Есть и еще структуры данных!

In [None]:
from pyspark.sql.types import ArrayType, MapType

In [None]:
arrayStructureSchema = StructType([
    StructField('name', StructType([
       StructField('firstname', StringType(), True),
       StructField('middlename', StringType(), True),
       StructField('lastname', StringType(), True)
       ])),
       StructField('hobbies', ArrayType(StringType()), True),
       StructField('properties', MapType(IntegerType(),StringType()), True)
    ])

In [None]:
structureData = [
    (("James","","Smith"), ['car', 'volleyball'], {1: 'a', 4: 'd'}),
    (("Michael","Rose",""), ['car', 'football'], {2: 'b'}),
    (("Robert","","Williams"), ['box', 'music'], {3: 'c'})
  ]

In [None]:
df3 = spark.createDataFrame(data=structureData,schema=arrayStructureSchema)
df3.printSchema()
df3.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- hobbies: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: integer
 |    |-- value: string (valueContainsNull = true)

+--------------------+-----------------+----------------+
|name                |hobbies          |properties      |
+--------------------+-----------------+----------------+
|{James, , Smith}    |[car, volleyball]|{4 -> d, 1 -> a}|
|{Michael, Rose, }   |[car, football]  |{2 -> b}        |
|{Robert, , Williams}|[box, music]     |{3 -> c}        |
+--------------------+-----------------+----------------+



In [None]:
df3.select('properties').collect()

[Row(properties={4: 'd', 1: 'a'}),
 Row(properties={2: 'b'}),
 Row(properties={3: 'c'})]

**Описание данных**

Общее описание данных

In [None]:
df = spark.read.csv(path='user_ratedmovies.dat', sep='\t', header=True, inferSchema=True, schema=None)

In [None]:
df.describe().show()

+-------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+
|summary|            userID|           movieID|            rating|          date_day|       date_month|        date_year|         date_hour|       date_minute|       date_second|
+-------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+
|  count|            855598|            855598|            855598|            855598|           855598|           855598|            855598|            855598|            855598|
|   mean| 35190.83255103448| 8710.179402008887| 3.437945156487042|15.568923723524366| 6.54079485926802|2005.323717446745|12.124531614145896|29.645382527775894|29.510222090280717|
| stddev|20385.003346991376|14446.852908494207|1.0025608721610382| 8.951201361275306|3.506399425908971|2.

In [None]:
df.summary().show()

+-------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+
|summary|            userID|           movieID|            rating|          date_day|       date_month|        date_year|         date_hour|       date_minute|       date_second|
+-------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+
|  count|            855598|            855598|            855598|            855598|           855598|           855598|            855598|            855598|            855598|
|   mean| 35190.83255103448| 8710.179402008887| 3.437945156487042|15.568923723524366| 6.54079485926802|2005.323717446745|12.124531614145896|29.645382527775894|29.510222090280717|
| stddev|20385.003346991376|14446.852908494207|1.0025608721610382| 8.951201361275306|3.506399425908971|2.

Количество записей

In [None]:
df.count()

855598

Количество партиций

In [None]:
df.rdd.getNumPartitions()

2

 Менять число партиций можно, все как с rdd

In [None]:
df = df.repartition(5)

In [None]:
df.rdd.getNumPartitions()

5

In [None]:
df = df.coalesce(2)

In [None]:
df.rdd.getNumPartitions()

2

**Различные методы**

Ну теперь давайте тыкать 

Удаляем дубликаты и помним, что есть actions и transformations, count заставит все сделать

In [None]:
df_without_duplicates = df.drop_duplicates()

Есть alias

In [None]:
df_without_duplicates = df.dropDuplicates()

Как удалить дубликаты по отдельным колонкам?

In [None]:
df_without_duplicates = df.drop_duplicates(['userID', 'rating'])

In [None]:
df_without_duplicates.count()

18847

In [None]:
df_without_duplicates.show(10)

+------+-------+------+--------+----------+---------+---------+-----------+-----------+
|userID|movieID|rating|date_day|date_month|date_year|date_hour|date_minute|date_second|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+
| 21712|    534|   5.0|      10|         1|     1999|       21|         32|         12|
| 22701|   2916|   4.5|       7|         3|     2006|       12|         48|         58|
| 11420|   2827|   3.5|       6|         1|     2008|        6|         28|         59|
| 18718|   8360|   2.0|      17|         9|     2008|       16|         43|         15|
| 28962|   1204|   4.0|      29|         3|     2008|       18|         15|         42|
| 29225|   1267|   0.5|      23|         2|     2007|       18|         30|         24|
| 15816|  34162|   4.5|       4|         1|     2007|       23|          1|         27|
|  4869|    595|   3.5|      22|         7|     2008|        8|         41|         58|
| 16862|   8638|   4.5|      10|

Корреляции

In [None]:
df.corr('rating', 'date_day')

0.016638388440498197

In [None]:
df.corr('rating', 'date_hour')

-0.012518740192686253

In [None]:
df.corr('rating', 'date_year')

-0.0021299262698304955

Как закинуть данные в любимый pandas?

 Самый простой вариант - встроенный метод

In [None]:
import pandas as pd

In [None]:
pandas_df = df.toPandas()

In [None]:
pandas_df

Unnamed: 0,userID,movieID,rating,date_day,date_month,date_year,date_hour,date_minute,date_second
0,22872,32596,3.5,20,7,2005,15,35,45
1,25649,165,3.5,29,2,2004,2,30,59
2,28082,3832,4.0,8,12,2000,18,55,15
3,7587,55280,4.0,4,5,2008,18,59,50
4,16574,4878,4.5,12,2,2006,13,35,8
...,...,...,...,...,...,...,...,...,...
855593,56596,4226,4.0,14,12,2008,0,53,55
855594,59390,39715,1.5,3,9,2006,0,37,45
855595,43102,5218,3.0,3,10,2002,22,14,25
855596,70331,2759,4.0,2,4,2003,16,51,15


Как говорили на лекции, может все упасть например тут. Как перейти к итератору?

prefetchPartitions - подготавливать ли следующую партию данных, пока не запросили

In [None]:
iter_df = df.toLocalIterator()

In [None]:
row = iter_df.send(None)

In [None]:
row

Row(userID=22872, movieID=32596, rating=3.5, date_day=20, date_month=7, date_year=2005, date_hour=15, date_minute=35, date_second=45)

In [None]:
row.asDict()

{'date_day': 20,
 'date_hour': 15,
 'date_minute': 35,
 'date_month': 7,
 'date_second': 45,
 'date_year': 2005,
 'movieID': 32596,
 'rating': 3.5,
 'userID': 22872}

Отсюда идея: можно вытягивать данные по 1 записи и записывать в датафрейм. Долго, но зато отработает.

In [None]:
iter_df = df.toLocalIterator()

In [None]:
list_of_rows = [value for value in iter_df]
print(len(list_of_rows))

855598


In [None]:
df.columns

['userID',
 'movieID',
 'rating',
 'date_day',
 'date_month',
 'date_year',
 'date_hour',
 'date_minute',
 'date_second']

In [None]:
pandas_df = pd.DataFrame(list_of_rows, columns=df.columns)

In [None]:
pandas_df

Unnamed: 0,userID,movieID,rating,date_day,date_month,date_year,date_hour,date_minute,date_second
0,22872,32596,3.5,20,7,2005,15,35,45
1,25649,165,3.5,29,2,2004,2,30,59
2,28082,3832,4.0,8,12,2000,18,55,15
3,7587,55280,4.0,4,5,2008,18,59,50
4,16574,4878,4.5,12,2,2006,13,35,8
...,...,...,...,...,...,...,...,...,...
855593,56596,4226,4.0,14,12,2008,0,53,55
855594,59390,39715,1.5,3,9,2006,0,37,45
855595,43102,5218,3.0,3,10,2002,22,14,25
855596,70331,2759,4.0,2,4,2003,16,51,15


**Show**

In [None]:
df.show(10)

+------+-------+------+--------+----------+---------+---------+-----------+-----------+
|userID|movieID|rating|date_day|date_month|date_year|date_hour|date_minute|date_second|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+
| 22872|  32596|   3.5|      20|         7|     2005|       15|         35|         45|
| 25649|    165|   3.5|      29|         2|     2004|        2|         30|         59|
| 28082|   3832|   4.0|       8|        12|     2000|       18|         55|         15|
|  7587|  55280|   4.0|       4|         5|     2008|       18|         59|         50|
| 16574|   4878|   4.5|      12|         2|     2006|       13|         35|          8|
| 25038|   3250|   3.0|      30|        12|     2007|        8|         34|         29|
| 23733|    480|   3.5|       3|         5|     2004|       11|         49|         30|
|  1988|    802|   3.5|       3|         7|     2007|       22|         25|         34|
| 16677|   2822|   3.0|      25|

Обрезаем до 2 символов

In [None]:
df.show(10, truncate=2)

+------+-------+------+--------+----------+---------+---------+-----------+-----------+
|userID|movieID|rating|date_day|date_month|date_year|date_hour|date_minute|date_second|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+
|    22|     32|    3.|      20|         7|       20|       15|         35|         45|
|    25|     16|    3.|      29|         2|       20|        2|         30|         59|
|    28|     38|    4.|       8|        12|       20|       18|         55|         15|
|    75|     55|    4.|       4|         5|       20|       18|         59|         50|
|    16|     48|    4.|      12|         2|       20|       13|         35|          8|
|    25|     32|    3.|      30|        12|       20|        8|         34|         29|
|    23|     48|    3.|       3|         5|       20|       11|         49|         30|
|    19|     80|    3.|       3|         7|       20|       22|         25|         34|
|    16|     28|    3.|      25|

вертикальное отображение

In [None]:
df.show(10, vertical=True)

-RECORD 0------------
 userID      | 22872 
 movieID     | 32596 
 rating      | 3.5   
 date_day    | 20    
 date_month  | 7     
 date_year   | 2005  
 date_hour   | 15    
 date_minute | 35    
 date_second | 45    
-RECORD 1------------
 userID      | 25649 
 movieID     | 165   
 rating      | 3.5   
 date_day    | 29    
 date_month  | 2     
 date_year   | 2004  
 date_hour   | 2     
 date_minute | 30    
 date_second | 59    
-RECORD 2------------
 userID      | 28082 
 movieID     | 3832  
 rating      | 4.0   
 date_day    | 8     
 date_month  | 12    
 date_year   | 2000  
 date_hour   | 18    
 date_minute | 55    
 date_second | 15    
-RECORD 3------------
 userID      | 7587  
 movieID     | 55280 
 rating      | 4.0   
 date_day    | 4     
 date_month  | 5     
 date_year   | 2008  
 date_hour   | 18    
 date_minute | 59    
 date_second | 50    
-RECORD 4------------
 userID      | 16574 
 movieID     | 4878  
 rating      | 4.5   
 date_day    | 12    
 date_mont

**Select**

В PySpark функция select() используется для выбора одного, нескольких столбцов по индексу, всех столбцов из списка и вложенных столбцов из фрейма данных. Функция PySpark select() является функцией преобразования, поэтому она возвращает новый фрейм данных с выбранными столбцами.

In [None]:
df.columns

['userID',
 'movieID',
 'rating',
 'date_day',
 'date_month',
 'date_year',
 'date_hour',
 'date_minute',
 'date_second']

Упс, pandas-style тут не приветствуется

In [None]:
df.userID.show(5)

TypeError: ignored

In [None]:
df.select('userID').show(5)

+------+
|userID|
+------+
| 16592|
| 16592|
| 16592|
| 16592|
| 16592|
+------+
only showing top 5 rows



Куча вариантов, выбирайте любой

In [None]:
df.select('userID', 'rating').show(5)

+------+------+
|userID|rating|
+------+------+
| 13216|   4.5|
| 13216|   4.5|
| 13216|   4.5|
| 13216|   4.5|
| 13216|   4.5|
+------+------+
only showing top 5 rows



In [None]:
df.select(['userID', 'rating']).show(5)

+------+------+
|userID|rating|
+------+------+
| 13216|   4.5|
| 13216|   4.5|
| 13216|   4.5|
| 13216|   4.5|
| 13216|   4.5|
+------+------+
only showing top 5 rows



In [None]:
df.select(df.userID,df.rating).show(5)

+------+------+
|userID|rating|
+------+------+
| 13216|   4.5|
| 13216|   4.5|
| 13216|   4.5|
| 13216|   4.5|
| 13216|   4.5|
+------+------+
only showing top 5 rows



In [None]:
df.select(df['userID'],df['rating']).show(5)

+------+------+
|userID|rating|
+------+------+
| 13216|   4.5|
| 13216|   4.5|
| 13216|   4.5|
| 13216|   4.5|
| 13216|   4.5|
+------+------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import col
df.select(col("userID"),col("rating")).show(5)

+------+------+
|userID|rating|
+------+------+
| 13216|   4.5|
| 13216|   4.5|
| 13216|   4.5|
| 13216|   4.5|
| 13216|   4.5|
+------+------+
only showing top 5 rows



можно налету переименовать столбец

In [None]:
df.select(df.userID, df.rating.alias('mark')).show(5)

+------+----+
|userID|mark|
+------+----+
| 13216| 4.5|
| 13216| 4.5|
| 13216| 4.5|
| 13216| 4.5|
| 13216| 4.5|
+------+----+
only showing top 5 rows



In [None]:
#регулярки
df.select(df.colRegex("`d+.*y`")).show(5)

+--------+
|date_day|
+--------+
|      19|
|      19|
|      19|
|      19|
|      19|
+--------+
only showing top 5 rows



примеры с дургим датафреймом, где структура сложнее

In [None]:
structureData = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

df2 = spark.createDataFrame(data=structureData,schema=structureSchema)
df2.printSchema()
df2.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+--------------------+-----+------+------+
|name                |id   |gender|salary|
+--------------------+-----+------+------+
|{James, , Smith}    |36636|M     |3100  |
|{Michael, Rose, }   |40288|M     |4300  |
|{Robert, , Williams}|42114|M     |1400  |
|{Maria, Anne, Jones}|39192|F     |5500  |
|{Jen, Mary, Brown}  |     |F     |-1    |
+--------------------+-----+------+------+



In [None]:
df2.select('name').show(5)

+--------------------+
|                name|
+--------------------+
|    {James, , Smith}|
|   {Michael, Rose, }|
|{Robert, , Williams}|
|{Maria, Anne, Jones}|
|  {Jen, Mary, Brown}|
+--------------------+



In [None]:
df2.select('name.lastname').show(5)

+--------+
|lastname|
+--------+
|   Smith|
|        |
|Williams|
|   Jones|
|   Brown|
+--------+



In [None]:
df2.select('name.firstname', 'name.lastname').show(5)

+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|        |
|   Robert|Williams|
|    Maria|   Jones|
|      Jen|   Brown|
+---------+--------+



**withColumn**

PySpark withColumn() - это функция преобразования (transform), которая используется для изменения значения, преобразования типа данных существующего столбца, создания нового столбца и многого другого. Поговорим о часто используемых операциях со столбцами данных PySpark, используя примеры.

In [None]:
df.printSchema()

root
 |-- userID: integer (nullable = true)
 |-- movieID: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- date_day: integer (nullable = true)
 |-- date_month: integer (nullable = true)
 |-- date_year: integer (nullable = true)
 |-- date_hour: integer (nullable = true)
 |-- date_minute: integer (nullable = true)
 |-- date_second: integer (nullable = true)



Меняем тип данных

In [None]:
df.withColumn("date_month",col("date_month").cast("String")).printSchema()

root
 |-- userID: integer (nullable = true)
 |-- movieID: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- date_day: integer (nullable = true)
 |-- date_month: string (nullable = true)
 |-- date_year: integer (nullable = true)
 |-- date_hour: integer (nullable = true)
 |-- date_minute: integer (nullable = true)
 |-- date_second: integer (nullable = true)



Модифицировать столбец/создать новый

In [None]:
df.withColumn("rating_x_10",col("rating") * 10).show(5)

+------+-------+------+--------+----------+---------+---------+-----------+-----------+-----------+
|userID|movieID|rating|date_day|date_month|date_year|date_hour|date_minute|date_second|rating_x_10|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+-----------+
| 22872|  32596|   3.5|      20|         7|     2005|       15|         35|         45|       35.0|
| 25649|    165|   3.5|      29|         2|     2004|        2|         30|         59|       35.0|
| 28082|   3832|   4.0|       8|        12|     2000|       18|         55|         15|       40.0|
|  7587|  55280|   4.0|       4|         5|     2008|       18|         59|         50|       40.0|
| 16574|   4878|   4.5|      12|         2|     2006|       13|         35|          8|       45.0|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+-----------+
only showing top 5 rows



Делаем 2 константных столбца

In [None]:
from pyspark.sql.functions import lit

In [None]:
df.withColumn('fix_1', lit(1)).withColumn('fix_2', lit(2)).show(5)

+------+-------+------+--------+----------+---------+---------+-----------+-----------+-----+-----+
|userID|movieID|rating|date_day|date_month|date_year|date_hour|date_minute|date_second|fix_1|fix_2|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+-----+-----+
| 22872|  32596|   3.5|      20|         7|     2005|       15|         35|         45|    1|    2|
| 25649|    165|   3.5|      29|         2|     2004|        2|         30|         59|    1|    2|
| 28082|   3832|   4.0|       8|        12|     2000|       18|         55|         15|    1|    2|
|  7587|  55280|   4.0|       4|         5|     2008|       18|         59|         50|    1|    2|
| 16574|   4878|   4.5|      12|         2|     2006|       13|         35|          8|    1|    2|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+-----+-----+
only showing top 5 rows



**withColumnsRenamed**

Предыдущий вариант не давал возможности переименовать столбцы, это можно сделать иначе

In [None]:
df.withColumnRenamed('rating', 'mark').show(5)

+------+-------+----+--------+----------+---------+---------+-----------+-----------+
|userID|movieID|mark|date_day|date_month|date_year|date_hour|date_minute|date_second|
+------+-------+----+--------+----------+---------+---------+-----------+-----------+
| 22872|  32596| 3.5|      20|         7|     2005|       15|         35|         45|
| 25649|    165| 3.5|      29|         2|     2004|        2|         30|         59|
| 28082|   3832| 4.0|       8|        12|     2000|       18|         55|         15|
|  7587|  55280| 4.0|       4|         5|     2008|       18|         59|         50|
| 16574|   4878| 4.5|      12|         2|     2006|       13|         35|          8|
+------+-------+----+--------+----------+---------+---------+-----------+-----------+
only showing top 5 rows



**filter (where) и иные филтрации**

Функция PySpark filter() используется для фильтрации строк из RDD / DataFrame на основе заданного условия или выражения SQL, вы также можете использовать предложение where() вместо filter() обе эти функции работают аналогично.

1 условие

In [None]:
df.filter(df.rating == 5.0).show(5)

+------+-------+------+--------+----------+---------+---------+-----------+-----------+
|userID|movieID|rating|date_day|date_month|date_year|date_hour|date_minute|date_second|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+
| 24695|   7698|   5.0|      21|         5|     2004|       21|         40|         13|
| 21374|  49932|   5.0|       3|         9|     2007|       12|         48|         31|
| 18161|   7438|   5.0|      15|         4|     2005|       17|         22|         35|
| 24695|   4866|   5.0|      29|         5|     2002|       15|         33|         41|
| 29501|   2594|   5.0|       2|         4|     2006|        8|         14|         26|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+
only showing top 5 rows



In [None]:
df.filter(~(df.rating == 5.0)).show(5)

+------+-------+------+--------+----------+---------+---------+-----------+-----------+
|userID|movieID|rating|date_day|date_month|date_year|date_hour|date_minute|date_second|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+
| 22872|  32596|   3.5|      20|         7|     2005|       15|         35|         45|
| 25649|    165|   3.5|      29|         2|     2004|        2|         30|         59|
| 28082|   3832|   4.0|       8|        12|     2000|       18|         55|         15|
|  7587|  55280|   4.0|       4|         5|     2008|       18|         59|         50|
| 16574|   4878|   4.5|      12|         2|     2006|       13|         35|          8|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+
only showing top 5 rows



In [None]:
df.filter('rating = 5').show(5)

+------+-------+------+--------+----------+---------+---------+-----------+-----------+
|userID|movieID|rating|date_day|date_month|date_year|date_hour|date_minute|date_second|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+
| 24695|   7698|   5.0|      21|         5|     2004|       21|         40|         13|
| 21374|  49932|   5.0|       3|         9|     2007|       12|         48|         31|
| 18161|   7438|   5.0|      15|         4|     2005|       17|         22|         35|
| 24695|   4866|   5.0|      29|         5|     2002|       15|         33|         41|
| 29501|   2594|   5.0|       2|         4|     2006|        8|         14|         26|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+
only showing top 5 rows



Несколько условий

In [None]:
df.filter((df.rating == 5.0) & (df.date_year == 2006)).show(5)

+------+-------+------+--------+----------+---------+---------+-----------+-----------+
|userID|movieID|rating|date_day|date_month|date_year|date_hour|date_minute|date_second|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+
| 17647|  39183|   5.0|      12|         2|     2006|       19|         30|         53|
| 13748|   4993|   5.0|      29|        12|     2006|        6|         17|          5|
| 13043|   5502|   5.0|      27|        12|     2006|        4|         15|         32|
| 28665|   6711|   5.0|      31|         3|     2006|       18|         11|         36|
| 15974|   1193|   5.0|       8|         2|     2006|        0|         15|         30|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+
only showing top 5 rows



In [None]:
df.filter('(rating = 5.0) and (date_year = 2006)').show(5)

+------+-------+------+--------+----------+---------+---------+-----------+-----------+
|userID|movieID|rating|date_day|date_month|date_year|date_hour|date_minute|date_second|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+
| 17647|  39183|   5.0|      12|         2|     2006|       19|         30|         53|
| 13748|   4993|   5.0|      29|        12|     2006|        6|         17|          5|
| 13043|   5502|   5.0|      27|        12|     2006|        4|         15|         32|
| 28665|   6711|   5.0|      31|         3|     2006|       18|         11|         36|
| 15974|   1193|   5.0|       8|         2|     2006|        0|         15|         30|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+
only showing top 5 rows



In [None]:
df.filter('(rating = 5.0) and (userID between 70 and 80)').show(5)

+------+-------+------+--------+----------+---------+---------+-----------+-----------+
|userID|movieID|rating|date_day|date_month|date_year|date_hour|date_minute|date_second|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+
|    78|    745|   5.0|       7|         5|     2004|       22|         48|         10|
|    78|   3503|   5.0|       7|         5|     2004|       23|         42|         43|
|    78|   8622|   5.0|      11|         7|     2004|       12|         16|         35|
|    78|   2571|   5.0|       7|         5|     2004|       23|          7|         24|
|    78|   1206|   5.0|       7|         5|     2004|       23|          9|         17|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+
only showing top 5 rows



фильтр по списку значений из list

In [None]:
years = [2006, 2007]

In [None]:
df.filter(df.date_year.isin(years)).show(5)

+------+-------+------+--------+----------+---------+---------+-----------+-----------+
|userID|movieID|rating|date_day|date_month|date_year|date_hour|date_minute|date_second|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+
|  7704|   2858|   4.0|       2|        12|     2006|        0|         44|         48|
|  5706|   1092|   4.0|      21|        11|     2006|       14|         35|         29|
|   267|     10|   3.0|      30|         1|     2006|        0|          2|         17|
|  3600|   3552|   3.5|      13|        11|     2006|        3|         55|         20|
|  5461|   1517|   2.5|      20|         9|     2007|        0|          1|         21|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+
only showing top 5 rows



 проверим

In [None]:
df.filter(df.date_year.isin(years))\
  .select('date_year')\
  .dropDuplicates()\
  .collect()

[Row(date_year=2007), Row(date_year=2006)]

создадим игрушечный датайфрейм для текстовых столбцов

In [None]:
data2 = [(2,"Michael Rose"),(3,"Robert Williams"),
     (4,"Rames Rose"),(5,"Rames Black"), (6, 'Albus Torch'),
     (7, 'Fred Tf')
  ]
df2 = spark.createDataFrame(data2, ['id', 'name'])

In [None]:
df2.show()

+---+---------------+
| id|           name|
+---+---------------+
|  2|   Michael Rose|
|  3|Robert Williams|
|  4|     Rames Rose|
|  5|    Rames Black|
|  6|    Albus Torch|
|  7|        Fred Tf|
+---+---------------+



In [None]:
df2.filter('name like "R%"').show()

+---+---------------+
| id|           name|
+---+---------------+
|  3|Robert Williams|
|  4|     Rames Rose|
|  5|    Rames Black|
+---+---------------+



In [None]:
df2.filter(df2.name.startswith('R')).show()

+---+---------------+
| id|           name|
+---+---------------+
|  3|Robert Williams|
|  4|     Rames Rose|
|  5|    Rames Black|
+---+---------------+



In [None]:
df2.filter(df2.name.endswith('Tf')).show()

+---+-------+
| id|   name|
+---+-------+
|  7|Fred Tf|
+---+-------+



In [None]:
df2.filter(df2.name.contains('Wil')).show()

+---+---------------+
| id|           name|
+---+---------------+
|  3|Robert Williams|
+---+---------------+



Бывает, что у нас внутри датафрейма есть массив и с ним что-то хочется сделать

In [None]:
from pyspark.sql.functions import array_contains

In [None]:
arrayStructureSchema = StructType([
    StructField('name', StructType([
       StructField('firstname', StringType(), True),
       StructField('middlename', StringType(), True),
       StructField('lastname', StringType(), True)
       ])),
       StructField('hobbies', ArrayType(StringType()), True),
       StructField('properties', MapType(IntegerType(),StringType()), True)
    ])

structureData = [
    (("James","","Smith"), ['car', 'volleyball'], {1: 'a', 4: 'd'}),
    (("Michael","Rose",""), ['car', 'football'], {2: 'b'}),
    (("Robert","","Williams"), ['box', 'music'], {3: 'c'})
  ]

df3 = spark.createDataFrame(data=structureData,schema=arrayStructureSchema)
df3.printSchema()
df3.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- hobbies: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: integer
 |    |-- value: string (valueContainsNull = true)

+--------------------+-----------------+----------------+
|name                |hobbies          |properties      |
+--------------------+-----------------+----------------+
|{James, , Smith}    |[car, volleyball]|{4 -> d, 1 -> a}|
|{Michael, Rose, }   |[car, football]  |{2 -> b}        |
|{Robert, , Williams}|[box, music]     |{3 -> c}        |
+--------------------+-----------------+----------------+



In [None]:
df3.filter(array_contains(df3.hobbies,"football")) \
    .show()  

+-----------------+---------------+----------+
|             name|        hobbies|properties|
+-----------------+---------------+----------+
|{Michael, Rose, }|[car, football]|  {2 -> b}|
+-----------------+---------------+----------+



**Сортировка**

сделаем еще фильтрацию, чтобы увидеть резульат (orderBy тут аналог)

In [None]:
df.filter(df.userID == 75).sort(df.date_minute, df.rating.desc()).show(20)

+------+-------+------+--------+----------+---------+---------+-----------+-----------+
|userID|movieID|rating|date_day|date_month|date_year|date_hour|date_minute|date_second|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+
|    75|   1304|   2.5|      29|        10|     2006|       23|         16|         56|
|    75|    420|   2.0|      29|        10|     2006|       23|         16|         42|
|    75|    160|   2.0|      29|        10|     2006|       23|         16|         52|
|    75|   2011|   2.0|      29|        10|     2006|       23|         16|         39|
|    75|    832|   4.5|      29|        10|     2006|       23|         17|         49|
|    75|   2700|   4.5|      29|        10|     2006|       23|         17|         52|
|    75|   1374|   4.0|      29|        10|     2006|       23|         17|         20|
|    75|   1485|   4.0|      29|        10|     2006|       23|         17|         46|
|    75|    173|   3.5|      29|

**groupby**

Когда мы выполняем groupBy() в PySpark DataFrame, он возвращает объект GroupedData, который содержит следующие агрегатные функции:

min(), max(), mean(), count(), sum(), avg(), agg(), pivot() 

In [None]:
df.groupby('date_year').mean('rating').collect()

[Row(date_year=2003, avg(rating)=3.4279154852282083),
 Row(date_year=2007, avg(rating)=3.4367384383270894),
 Row(date_year=2006, avg(rating)=3.4294610550371356),
 Row(date_year=1997, avg(rating)=3.571018651362984),
 Row(date_year=2004, avg(rating)=3.3916603359500295),
 Row(date_year=1998, avg(rating)=3.5080568720379146),
 Row(date_year=2009, avg(rating)=3.369243156199678),
 Row(date_year=2001, avg(rating)=3.4490124195720484),
 Row(date_year=2005, avg(rating)=3.395790002433071),
 Row(date_year=2000, avg(rating)=3.5767733835530446),
 Row(date_year=2008, avg(rating)=3.486752231109465),
 Row(date_year=1999, avg(rating)=3.4925797284496367),
 Row(date_year=2002, avg(rating)=3.4140127388535033)]

мы уже умеем применять разные методы

In [None]:
df.groupby('date_year')\
  .mean('rating')\
  .sort('date_year')\
  .collect()

[Row(date_year=1997, avg(rating)=3.571018651362984),
 Row(date_year=1998, avg(rating)=3.5080568720379146),
 Row(date_year=1999, avg(rating)=3.4925797284496367),
 Row(date_year=2000, avg(rating)=3.5767733835530446),
 Row(date_year=2001, avg(rating)=3.4490124195720484),
 Row(date_year=2002, avg(rating)=3.4140127388535033),
 Row(date_year=2003, avg(rating)=3.4279154852282083),
 Row(date_year=2004, avg(rating)=3.3916603359500295),
 Row(date_year=2005, avg(rating)=3.395790002433071),
 Row(date_year=2006, avg(rating)=3.4294610550371356),
 Row(date_year=2007, avg(rating)=3.4367384383270894),
 Row(date_year=2008, avg(rating)=3.486752231109465),
 Row(date_year=2009, avg(rating)=3.369243156199678)]

In [None]:
df.filter(df.rating <= 2)\
  .groupby('date_year')\
  .count()\
  .withColumnRenamed('count', 'number')\
  .sort('date_year')\
  .collect()

[Row(date_year=1997, number=98),
 Row(date_year=1998, number=321),
 Row(date_year=1999, number=2270),
 Row(date_year=2000, number=4732),
 Row(date_year=2001, number=4793),
 Row(date_year=2002, number=5791),
 Row(date_year=2003, number=7595),
 Row(date_year=2004, number=10205),
 Row(date_year=2005, number=15982),
 Row(date_year=2006, number=23463),
 Row(date_year=2007, number=19792),
 Row(date_year=2008, number=15128),
 Row(date_year=2009, number=369)]

несколько колонок

In [None]:
df.groupBy("date_year", "date_month") \
  .mean("rating", "userID") \
  .sort('date_year', 'date_month') \
  .show()

+---------+----------+------------------+------------------+
|date_year|date_month|       avg(rating)|       avg(userID)|
+---------+----------+------------------+------------------+
|     1997|         9|3.7094972067039107| 35144.61452513967|
|     1997|        10|3.7762237762237763|34880.769230769234|
|     1997|        11| 3.360655737704918| 9451.180327868853|
|     1997|        12| 3.549618320610687| 20251.01526717557|
|     1998|         1|          3.765625|      22209.515625|
|     1998|         2|               3.3|           31662.4|
|     1998|         3| 3.238095238095238| 19582.04761904762|
|     1998|         4| 3.908333333333333|31813.866666666665|
|     1998|         5| 3.652054794520548|20393.520547945205|
|     1998|         6|             3.225|         27092.625|
|     1998|         7|3.5547550432276656|38376.249279538904|
|     1998|         8| 3.642857142857143|41151.357142857145|
|     1998|         9|3.1881533101045294| 46104.74912891986|
|     1998|        10|3.

Для того, чтобы делать несколько разных агрегаций и еще менять сразу имя столбца нужно немного изменить синтаксис

In [None]:
from pyspark.sql.functions import max, mean, min

In [None]:
df.groupBy("date_year") \
    .agg(min("rating").alias("min_rating"), \
         mean("rating").alias("mean_rating"), \
         max("rating").alias("max_rating")
         ) \
    .show()

+---------+----------+------------------+----------+
|date_year|min_rating|       mean_rating|max_rating|
+---------+----------+------------------+----------+
|     2003|       0.5|3.4279154852282083|       5.0|
|     2007|       0.5|3.4367384383270894|       5.0|
|     2006|       0.5|3.4294610550371356|       5.0|
|     1997|       1.0| 3.571018651362984|       5.0|
|     2004|       0.5|3.3916603359500295|       5.0|
|     1998|       1.0|3.5080568720379146|       5.0|
|     2009|       0.5| 3.369243156199678|       5.0|
|     2001|       1.0|3.4490124195720484|       5.0|
|     2005|       0.5| 3.395790002433071|       5.0|
|     2000|       1.0|3.5767733835530446|       5.0|
|     2008|       0.5| 3.486752231109465|       5.0|
|     1999|       1.0|3.4925797284496367|       5.0|
|     2002|       1.0|3.4140127388535033|       5.0|
+---------+----------+------------------+----------+



Еще можно сделать pivot

In [None]:
df.groupBy('date_year')\
  .pivot('date_month')\
  .mean('rating')\
  .show(5)

+---------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|date_year|                 1|                 2|                 3|                 4|                 5|                 6|                 7|                 8|                 9|                10|                11|                12|
+---------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|     2003|3.4720922980979108| 3.333741687084354|3.4834824501032347|3.4579510703363914| 3.366686911630732|  3.55358230160341|3.5085277150739906| 3.447642637512812| 3.579355840124175| 3.165437788018433| 3.446676231468197| 3.440287081339713|
|     2007| 3.520191839855425| 3.3341516

**Join's**

Куда же без них. Что есть: INNER, LEFT OUTER, RIGHT OUTER, LEFT ANTI, LEFT SEMI, CROSS, SELF JOIN

Благодаря оптимизации в датафреймах уже все хорошо работает, спасибо catalist, но чудеса не вечны и плохой код/незнание данных все равно даст о себе знать

Сделаем для себя несколько таблиц, чтобы можно было экспериментировать

In [None]:
df_mean_user_rating = df.groupBy("userID")\
                        .mean('rating')\
                        .withColumnRenamed('avg(rating)', 'avg_rating_all')

df_mean_user_rating_year = df.groupby('userID', 'date_year')\
                             .mean('rating')\
                             .withColumnRenamed('avg(rating)', 'avg_rating_year')

In [None]:
df_mean_user_rating.printSchema()

df_mean_user_rating_year.printSchema()

root
 |-- userID: integer (nullable = true)
 |-- avg_rating_all: double (nullable = true)

root
 |-- userID: integer (nullable = true)
 |-- date_year: integer (nullable = true)
 |-- avg_rating_year: double (nullable = true)



И давайте все в 1 блоке кода, чтобы не растягивать

In [None]:
df.join(df_mean_user_rating, on=df.userID == df_mean_user_rating.userID, how='inner')\
  .join(df_mean_user_rating_year, on=[df.userID == df_mean_user_rating_year.userID,
                                      df.date_year == df_mean_user_rating_year.date_year],
        how='inner')

AnalysisException: ignored

Надо удалить дублирующиеся столбцы

In [None]:
res_join = df.alias('t').join(df_mean_user_rating.alias('t1'), on=col('t.userID') == col('t1.userID'), how='inner')\
  .drop(col('t1.userID'))\
  .join(df_mean_user_rating_year.alias('t2'), on=[col('t.userID') == col('t2.userID'),
                                      col('t.date_year') == col('t2.date_year')],
        how='inner')\
  .drop(col('t2.userID'))\
  .drop(col('t2.date_year'))

In [None]:
res_join.show(5)

+------+-------+------+--------+----------+---------+---------+-----------+-----------+------------------+-----------------+
|userID|movieID|rating|date_day|date_month|date_year|date_hour|date_minute|date_second|    avg_rating_all|  avg_rating_year|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+------------------+-----------------+
| 67068|   6480|   4.0|      28|        11|     2003|       14|         43|          6|3.2815934065934065|3.358974358974359|
| 67068|   5418|   1.5|      21|        12|     2003|        3|         26|         52|3.2815934065934065|3.358974358974359|
| 67068|   6653|   4.0|      11|         8|     2003|        1|         39|         31|3.2815934065934065|3.358974358974359|
| 67068|   6324|   2.0|      22|        11|     2003|       14|         52|          6|3.2815934065934065|3.358974358974359|
| 67068|   3552|   3.5|      15|        11|     2003|       20|         22|         52|3.2815934065934065|3.358974358974359|


**union и unionAll**

Используются для объединения датафреймов с одинаковой структурой, используется union, так как unionAll с версии 2.0.0 более не используется

In [None]:
df1 = df.filter(df.date_year == 2006)
df2 = df.filter(df.date_year != 2006)

In [None]:
union_df = df1.union(df2)

print(df.count(), union_df.count())

855598 855598


Desclaimer: все по sql, надеюсь, помнят разницу между union и union all, когда union убирает дубликаты. Так вот pyspark ничего не удаляет, убрать дубликаты можно только через drop_duplicates, distinct

Также при union pyspark делает объединение по столбцам as is, не пытаясь понять, что в одном датафрейме нужный стобец на 1 позиции, а в другом он на 5. Для этого с версии 3.1 есть замечательный метод unionByName

**UDF - user defined functions**

из курса про rdd помним про map, тут тоже можно перегнать все в rdd и делать map, но можно и через udf. Стоит отметить, что при этом мы теряем возможность оптимизации и произодительность в dataframe, так как udf - black box для спарка.

Но зато эти функции переиспользуемы и их можно применять в sql запросах, как те же udf в oracle

In [None]:
def udf_example(rating):
    rating = rating * 20
    return rating

In [None]:
from pyspark.sql.functions import udf

In [None]:
my_udf = udf(lambda x: udf_example(x), DoubleType())

In [None]:
df.select(['userID', 'movieID', 'rating'])\
  .withColumn('rating_100', my_udf(col('rating')))\
  .show(5)

+------+-------+------+----------+
|userID|movieID|rating|rating_100|
+------+-------+------+----------+
| 33880|  43376|   3.0|      60.0|
| 39282|  56367|   3.5|      70.0|
|  7815|   5446|   3.5|      70.0|
| 36558|   5666|   2.5|      50.0|
|  6082|    858|   5.0|     100.0|
+------+-------+------+----------+
only showing top 5 rows



Для тех, кто любит декораторы

In [None]:
@udf(returnType=DoubleType()) 
def udf_example_decorator(rating):
    rating = rating * 20
    return rating

In [None]:
df.select(['userID', 'movieID', 'rating'])\
  .withColumn('rating_100', udf_example_decorator(col('rating')))\
  .show(5)

+------+-------+------+----------+
|userID|movieID|rating|rating_100|
+------+-------+------+----------+
| 33880|  43376|   3.0|      60.0|
| 39282|  56367|   3.5|      70.0|
|  7815|   5446|   3.5|      70.0|
| 36558|   5666|   2.5|      50.0|
|  6082|    858|   5.0|     100.0|
+------+-------+------+----------+
only showing top 5 rows



Зарегистрируем функцию для будущих примеров с sql

In [None]:
spark.udf.register("udf_example_decorator", udf_example_decorator)

<function __main__.udf_example_decorator>

**SQL**

Ну раз уж пошла такая тема, давайте рассмотрим, как можно сделать все при помощи любимого SQL

можно делать TempView и GlodalTempView, отличие в том, что обычный view будет жить, пока жива сессия спрака, а глобальная, пока жив sparkcontext

In [None]:
df.createOrReplaceTempView('df')

In [None]:
query = '''
select userID, movieID, rating, udf_example_decorator(rating) as rating_100
from
df
'''
spark.sql(query).show(5)

+------+-------+------+----------+
|userID|movieID|rating|rating_100|
+------+-------+------+----------+
| 33880|  43376|   3.0|      60.0|
| 39282|  56367|   3.5|      70.0|
|  7815|   5446|   3.5|      70.0|
| 36558|   5666|   2.5|      50.0|
|  6082|    858|   5.0|     100.0|
+------+-------+------+----------+
only showing top 5 rows



Ну и наш join

In [None]:
df_mean_user_rating.createOrReplaceTempView('df_mean_user_rating')
df_mean_user_rating_year.createOrReplaceTempView('df_mean_user_rating_year')

In [None]:
query = '''
select t.*, t1.avg_rating_all, t2.avg_rating_year
from
df t, df_mean_user_rating t1, df_mean_user_rating_year t2
where
    t.userID = t1.userID and
    t.userID = t2.userID and
    t.date_year = t2.date_year
'''
spark.sql(query).show(5)

+------+-------+------+--------+----------+---------+---------+-----------+-----------+------------------+-----------------+
|userID|movieID|rating|date_day|date_month|date_year|date_hour|date_minute|date_second|    avg_rating_all|  avg_rating_year|
+------+-------+------+--------+----------+---------+---------+-----------+-----------+------------------+-----------------+
| 67068|   6480|   4.0|      28|        11|     2003|       14|         43|          6|3.2815934065934065|3.358974358974359|
| 67068|   5418|   1.5|      21|        12|     2003|        3|         26|         52|3.2815934065934065|3.358974358974359|
| 67068|   6653|   4.0|      11|         8|     2003|        1|         39|         31|3.2815934065934065|3.358974358974359|
| 67068|   6324|   2.0|      22|        11|     2003|       14|         52|          6|3.2815934065934065|3.358974358974359|
| 67068|   3552|   3.5|      15|        11|     2003|       20|         22|         52|3.2815934065934065|3.358974358974359|


**fill() и fillna()**

Оба метода идентичны, заполняют пропуски

In [None]:
import numpy as np

In [None]:
data2 = [(2,"Michael Rose"),(3,"Robert Williams"),
     (4,"Rames Rose"),(5, None), (6, None),
     (None, 'Fred Tf')
  ]
df2 = spark.createDataFrame(data2, ['id', 'name'])

А где пропуски?

In [None]:
from pyspark.sql.functions import col,isnan, when, count
df2.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df2.columns]).show()

+---+----+
| id|name|
+---+----+
|  1|   2|
+---+----+



In [None]:
df2.show()

+----+---------------+
|  id|           name|
+----+---------------+
|   2|   Michael Rose|
|   3|Robert Williams|
|   4|     Rames Rose|
|   5|           null|
|   6|           null|
|null|        Fred Tf|
+----+---------------+



In [None]:
df2.fillna({'id': 0}).show()

+---+---------------+
| id|           name|
+---+---------------+
|  2|   Michael Rose|
|  3|Robert Williams|
|  4|     Rames Rose|
|  5|           null|
|  6|           null|
|  0|        Fred Tf|
+---+---------------+



In [None]:
df2.fillna({'id': 0, 'name': 'Unknown'}).show()

+---+---------------+
| id|           name|
+---+---------------+
|  2|   Michael Rose|
|  3|Robert Williams|
|  4|     Rames Rose|
|  5|        Unknown|
|  6|        Unknown|
|  0|        Fred Tf|
+---+---------------+



Аналогично

In [None]:
df2.na.fill({'id': 0, 'name': 'Unknown'}).show()

+---+---------------+
| id|           name|
+---+---------------+
|  2|   Michael Rose|
|  3|Robert Williams|
|  4|     Rames Rose|
|  5|        Unknown|
|  6|        Unknown|
|  0|        Fred Tf|
+---+---------------+



**Домашнее задание**

Куда же без домашки, верно?

Есть данные по транзакциям клиентов, ваша задача состоит в анализе этих данных и подготовки к структуре, которая похожа на ту структуру, которая сейчас часто нами используется при построении моделей на транзакциях + промежуточные задания.

Не забудьте делать всякие show после каждого задания, чтобы было видно результат

Файл spark_transactions.parquet можете забрать в папке с записями лекций

In [4]:
from pyspark.sql.functions import mean
from pyspark.sql.functions import col
from pyspark.sql import Row

In [5]:
trans_data = spark.read.parquet('data/spark_transactions.parquet')

In [None]:
trans_data.show(5)

+----+----+----+-----+---+-----+-------+-----------------+-------------+-------------+-------+----+-------+
|User|Card|Year|Month|Day| Time| Amount|          UseChip| MerchantCity|MerchantState|    Zip| MCC|IsFraud|
+----+----+----+-----+---+-----+-------+-----------------+-------------+-------------+-------+----+-------+
|   0|   0|2002|    9|  1|06:21|$134.09|Swipe Transaction|     La Verne|           CA|91750.0|5300|     No|
|   0|   0|2002|    9|  1|06:42| $38.48|Swipe Transaction|Monterey Park|           CA|91754.0|5411|     No|
|   0|   0|2002|    9|  2|06:22|$120.34|Swipe Transaction|Monterey Park|           CA|91754.0|5411|     No|
|   0|   0|2002|    9|  2|17:45|$128.95|Swipe Transaction|Monterey Park|           CA|91754.0|5651|     No|
|   0|   0|2002|    9|  3|06:23|$104.71|Swipe Transaction|     La Verne|           CA|91750.0|5912|     No|
+----+----+----+-----+---+-----+-------+-----------------+-------------+-------------+-------+----+-------+
only showing top 5 rows



Посмотрим на схему данных

In [None]:
trans_data.printSchema()

root
 |-- User: long (nullable = true)
 |-- Card: long (nullable = true)
 |-- Year: long (nullable = true)
 |-- Month: long (nullable = true)
 |-- Day: long (nullable = true)
 |-- Time: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- UseChip: string (nullable = true)
 |-- MerchantCity: string (nullable = true)
 |-- MerchantState: string (nullable = true)
 |-- Zip: double (nullable = true)
 |-- MCC: long (nullable = true)
 |-- IsFraud: string (nullable = true)



Сколько транзакций у пользователя

In [None]:
trans_data.groupby('User').count().show(5)

+----+-----+
|User|count|
+----+-----+
|  26|10978|
|  29|15661|
| 474|25256|
| 964|   92|
|1677| 3215|
+----+-----+
only showing top 5 rows



Сколько карт у пользователей в среднем

In [None]:
trans_data.groupby('User').mean('Card').show(5)

+----+--------------------+
|User|           avg(Card)|
+----+--------------------+
|  26|0.003006012024048096|
|  29|   1.812144818338548|
| 474|                 0.0|
| 964|   0.391304347826087|
|1677| 0.09082426127527216|
+----+--------------------+
only showing top 5 rows



Немного обработаем данные: Amount в float, из Time вытянем час транзакции и удалим исходный Time, Zip  к типу int

In [6]:
from pyspark.sql.functions import substring

trans_data = trans_data.withColumn('Amount', substring('Amount', 2, 100).cast('float'))\
                       .withColumn('Hour', substring('Time', 1, 2).cast('int'))\
                       .drop('Time')\
                       .withColumn('Zip', col('Zip').cast('int'))
                       
trans_data.show(5)

+----+----+----+-----+---+------+-----------------+-------------+-------------+-----+----+-------+----+
|User|Card|Year|Month|Day|Amount|          UseChip| MerchantCity|MerchantState|  Zip| MCC|IsFraud|Hour|
+----+----+----+-----+---+------+-----------------+-------------+-------------+-----+----+-------+----+
|   0|   0|2002|    9|  1|134.09|Swipe Transaction|     La Verne|           CA|91750|5300|     No|   6|
|   0|   0|2002|    9|  1| 38.48|Swipe Transaction|Monterey Park|           CA|91754|5411|     No|   6|
|   0|   0|2002|    9|  2|120.34|Swipe Transaction|Monterey Park|           CA|91754|5411|     No|   6|
|   0|   0|2002|    9|  2|128.95|Swipe Transaction|Monterey Park|           CA|91754|5651|     No|  17|
|   0|   0|2002|    9|  3|104.71|Swipe Transaction|     La Verne|           CA|91750|5912|     No|   6|
+----+----+----+-----+---+------+-----------------+-------------+-------------+-----+----+-------+----+
only showing top 5 rows



Посчитайте количество транзакций по годам, учитывая только те транзакции, объем которых был больше 100

In [None]:
trans_data.printSchema()

root
 |-- User: long (nullable = true)
 |-- Card: long (nullable = true)
 |-- Year: long (nullable = true)
 |-- Month: long (nullable = true)
 |-- Day: long (nullable = true)
 |-- Amount: float (nullable = true)
 |-- UseChip: string (nullable = true)
 |-- MerchantCity: string (nullable = true)
 |-- MerchantState: string (nullable = true)
 |-- Zip: integer (nullable = true)
 |-- MCC: long (nullable = true)
 |-- IsFraud: string (nullable = true)
 |-- Hour: integer (nullable = true)



In [None]:
trans_data.filter(trans_data.Amount > 100)\
          .groupby('Year')\
          .count().show(5)

+----+------+
|Year| count|
+----+------+
|2007|121489|
|2014|179492|
|2012|174439|
|1991|   266|
|2016|182742|
+----+------+
only showing top 5 rows



Определите, есть ли пропуски в данных по каждому столбцу

In [None]:
from pyspark.sql.functions import isnan, when, count, col
import pyspark.sql.functions as F

# the MAX of a boolean is true if there is at least one true value
trans_data.select([F.max(isnan(c) | col(c).isNull().alias(c)).alias(c) for c in trans_data.columns]).show(5)

+-----+-----+-----+-----+-----+------+-------+------------+-------------+----+-----+-------+-----+
| User| Card| Year|Month|  Day|Amount|UseChip|MerchantCity|MerchantState| Zip|  MCC|IsFraud| Hour|
+-----+-----+-----+-----+-----+------+-------+------------+-------------+----+-----+-------+-----+
|false|false|false|false|false| false|  false|       false|         true|true|false|  false|false|
+-----+-----+-----+-----+-----+------+-------+------------+-------------+----+-----+-------+-----+



Заполните пропуски исходя из типа данных

In [8]:
trans_data = trans_data.fillna({'MerchantState': 'Unknown'})
trans_data.show(4)

+----+----+----+-----+---+------+-----------------+-------------+-------------+-----+----+-------+----+
|User|Card|Year|Month|Day|Amount|          UseChip| MerchantCity|MerchantState|  Zip| MCC|IsFraud|Hour|
+----+----+----+-----+---+------+-----------------+-------------+-------------+-----+----+-------+----+
|   0|   0|2002|    9|  1|134.09|Swipe Transaction|     La Verne|           CA|91750|5300|     No|   6|
|   0|   0|2002|    9|  1| 38.48|Swipe Transaction|Monterey Park|           CA|91754|5411|     No|   6|
|   0|   0|2002|    9|  2|120.34|Swipe Transaction|Monterey Park|           CA|91754|5411|     No|   6|
|   0|   0|2002|    9|  2|128.95|Swipe Transaction|Monterey Park|           CA|91754|5651|     No|  17|
+----+----+----+-----+---+------+-----------------+-------------+-------------+-----+----+-------+----+
only showing top 4 rows



Теперь самое время сгруппировать данные по каждому клиенту (можно использовать collect_list для сбора данных после агрегации)
Когда будете делать агрегацию, то возьмите только чать выборки, например, у кого User <= 10, для всей выборки либо не хватит памяти, либо очень долго считать

In [9]:
trans_data.count()

24386900

### Variant 1

In [18]:
from pyspark.sql.functions import struct


NUM = 0
trans_data.filter(trans_data.User <= NUM)\
          .groupby('User')\
          .agg(struct(F.collect_set('Card'),
                      F.collect_list('Amount'))\
               .alias('User_info')
               ).show(2)

+----+--------------------+
|User|           User_info|
+----+--------------------+
|   0|{[0, 1, 2, 3, 4],...|
+----+--------------------+



### Variant 2

In [25]:
from pyspark.sql.functions import struct


NUM = 10
trans_data.filter(trans_data.User <= NUM)\
          .groupby('User')\
          .agg(F.collect_set('Card').alias('Unique cards'),
               F.collect_list('Amount').alias('Trans volumes'),
               F.collect_list('MerchantCity').alias('MerchantCity'),
               F.collect_list('MerchantState').alias('MerchantState'),
               F.collect_list('Zip').alias('Zip'),
               F.collect_list('MCC').alias('MCC'),
               F.collect_list('IsFraud').alias('IsFraud'))\
          .sort('User')\
          .show(NUM+1)

+----+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|User|   Unique cards|       Trans volumes|        MerchantCity|       MerchantState|                 Zip|                 MCC|             IsFraud|
+----+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|   0|[0, 1, 2, 3, 4]|[134.09, 38.48, 1...|[La Verne, Monter...|[CA, CA, CA, CA, ...|[91750, 91754, 91...|[5300, 5411, 5411...|[No, No, No, No, ...|
|   1|[0, 1, 2, 3, 4]|[65.06, 98.97, 49...|[Rochester, Flush...|[NY, NY, NY, Unkn...|[14626, 11367, 11...|[5814, 5719, 5970...|[No, No, No, No, ...|
|   2|[0, 1, 2, 3, 4]|[948.44, 42.98, 9...|[ ONLINE,  ONLINE...|[Unknown, Unknown...|[91792, 91792, 91...|[4722, 4784, 5411...|[No, No, No, No, ...|
|   3|   [0, 1, 2, 3]|[1195.46, 106.78,...|[Franklin Square,...|[NY, NY, NY, NY, ...|[11010, 10281, 10...|