# Технологии обработки больших данных

Занятие 3. PySpark Data Structures

0. Запуск PySpark на локальной машине
1. Spark DataFrame 
2. Spark RDD (разбор предыдущего ДЗ)
3. Spark Pandas API DataFrame
4. Домашнее задание 

In [None]:
%%bash
pip install pandas pyarrow plotly



In [None]:
! pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 47.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=5474e0471d0b577bde331117fc5654af1e706c598c3b6bc93a371f5d5cd857ff
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [None]:
import pyspark

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Рассмотрим пример данных [German Credit](https://www.kaggle.com/uciml/german-credit), которые используются для решении задачи кредитного скоринга. Это небольшой датасет с информацией о клиентах, необходимой для принятия решения - выдавать кредит или нет.  

Сегодня мы не будем решать задачу предсказания, просто разберемся с основными приемами EDA (Exploratory data analysis, [Разведочный анализ данных](https://ru.wikipedia.org/wiki/%D0%A0%D0%B0%D0%B7%D0%B2%D0%B5%D0%B4%D0%BE%D1%87%D0%BD%D1%8B%D0%B9_%D0%B0%D0%BD%D0%B0%D0%BB%D0%B8%D0%B7_%D0%B4%D0%B0%D0%BD%D0%BD%D1%8B%D1%85)). 

In [None]:
DATA_PATH = 'sample_data/credit_data.csv'

**Columns**  

Age (numeric)  
Sex (text: male, female)  
Job (numeric: 0 - unskilled and non-resident, 1 - unskilled and resident, 2 - skilled, 3 - highly skilled)  
Housing (text: own, rent, or free)  
Saving accounts (text - little, moderate, quite rich, rich)  
Checking account (numeric, in DM - Deutsch Mark)  
Credit amount (numeric, in DM)  
Duration (numeric, in month)  
Purpose (text: car, furniture/equipment, radio/TV, domestic appliances, repairs, education, business, vacation/others)

## 1. Spark DataFrame

Базовый класс для работы со структуированными данными в pyspark.

In [None]:
df = spark.read.csv(DATA_PATH, header=True)
type(df)

pyspark.sql.dataframe.DataFrame

In [None]:
# First rows in this DataFrame
df.show(10, truncate=False) 

+---+---+------+---+-------+---------------+----------------+-------------+--------+-------------------+
|id |Age|Sex   |Job|Housing|Saving_accounts|Checking_account|Credit_amount|Duration|Purpose            |
+---+---+------+---+-------+---------------+----------------+-------------+--------+-------------------+
|0  |67 |male  |2  |own    |NA             |little          |1169         |6       |radio/TV           |
|1  |22 |female|2  |own    |little         |moderate        |5951         |48      |radio/TV           |
|2  |49 |male  |1  |own    |little         |NA              |2096         |12      |education          |
|3  |45 |male  |2  |free   |little         |little          |7882         |42      |furniture/equipment|
|4  |53 |male  |2  |free   |little         |little          |4870         |24      |car                |
|5  |35 |male  |1  |free   |NA             |NA              |9055         |36      |education          |
|6  |53 |male  |2  |own    |quite rich     |NA         

### Схема данных как в SQL

In [None]:
schema = "id INT, Age INT, Sex STRING, Job INT, Housing STRING, Saving_accounts STRING, \
Checking_account STRING, Credit_amount INT, Duration INT, Purpose STRING"

In [None]:
df = spark.read.csv('sample_data/credit_data.csv', schema=schema, header=True )

In [None]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Job: integer (nullable = true)
 |-- Housing: string (nullable = true)
 |-- Saving_accounts: string (nullable = true)
 |-- Checking_account: string (nullable = true)
 |-- Credit_amount: integer (nullable = true)
 |-- Duration: integer (nullable = true)
 |-- Purpose: string (nullable = true)



### Сортировка и фильтрация данных

In [None]:
# One column sorting
df.sort('Job', ascending=False).show()

+---+---+------+---+-------+---------------+----------------+-------------+--------+-------------------+
| id|Age|   Sex|Job|Housing|Saving_accounts|Checking_account|Credit_amount|Duration|            Purpose|
+---+---+------+---+-------+---------------+----------------+-------------+--------+-------------------+
| 56| 52|  male|  3|    own|             NA|        moderate|         6468|      12|           radio/TV|
|175| 53|female|  3|    own|             NA|              NA|         7485|      30|                car|
| 58| 23|female|  3|    own|         little|            rich|         1961|      18|                car|
|  9| 28|  male|  3|    own|         little|        moderate|         5234|      30|                car|
| 62| 61|  male|  3|   free|         little|        moderate|         1953|      36|           business|
| 51| 30|  male|  3|    own|         little|        moderate|         5965|      27|                car|
| 72| 51|  male|  3|   free|         little|          l

In [None]:
# Few columns sorting
df.sort(['Age', 'Credit_amount'], ascending=[False, True]).show()

+---+---+------+---+-------+---------------+----------------+-------------+--------+---------+
| id|Age|   Sex|Job|Housing|Saving_accounts|Checking_account|Credit_amount|Duration|  Purpose|
+---+---+------+---+-------+---------------+----------------+-------------+--------+---------+
|536| 75|female|  3|    own|             NA|          little|         1374|       6|      car|
|330| 75|  male|  3|   free|         little|          little|         6615|      24|      car|
|756| 74|  male|  0|    own|         little|            rich|         1299|       6|      car|
|430| 74|  male|  1|    own|         little|              NA|         3448|       5| business|
|606| 74|  male|  3|    own|         little|              NA|         4526|      24| business|
|186| 74|female|  3|   free|         little|        moderate|         5129|       9|      car|
|163| 70|  male|  3|   free|         little|        moderate|         7308|      10|      car|
|187| 68|  male|  0|   free|         little|      

In [None]:
df_car = df.filter(df["Purpose"] == 'car')

In [None]:
# Доля автокредитов
df_car.count() / df.count()

0.337

### Группировка данных

In [None]:
df.groupBy("Age").count().sort('Age').show()

+---+-----+
|Age|count|
+---+-----+
| 19|    2|
| 20|   14|
| 21|   14|
| 22|   27|
| 23|   48|
| 24|   44|
| 25|   41|
| 26|   50|
| 27|   51|
| 28|   43|
| 29|   37|
| 30|   40|
| 31|   38|
| 32|   34|
| 33|   33|
| 34|   32|
| 35|   40|
| 36|   39|
| 37|   29|
| 38|   24|
+---+-----+
only showing top 20 rows



## 2. Spark RDD 

Resilient Distributed Dataset. 
Менее удобный, но более производительный контейнер для данных.  

Подробнее про DataFrame, DataSet и RDD на русском языке
[1](https://www.bigdataschool.ru/blog/spark-sql-data-structures.html), 
[2](https://www.bigdataschool.ru/blog/rdd-vs-dataframe-vs-dataset.html).  

На английском рекомендую [официальный гайд](https://spark.apache.org/docs/latest/sql-getting-started.html).

In [None]:
log_file = spark.read.text('sample_data/log.txt')

In [None]:
%%time

# Note, we cant use   lambda x:   x.value.upper()
df = log_file.rdd.map(lambda x: ( x.value.upper() ,) ).toDF() 

df.show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|_1                                                                                                                                                                                                                                                                                                                                                                                                                                        |
+---------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
with open('sample_data/white_list.txt') as f:
    ww = f.readlines()

ww = "".join([w for w in ww]).split()
ww = list(map(str.lower, ww))

In [None]:
def white_filter(s):
    w = s.value.split()
    timestamp = str(w[0]) + str(w[1]) + " "
    words = w[2:]
    filtered_words = list(filter(lambda x: x in ww, words))
    return (timestamp, " ".join([w for w in filtered_words])  )

In [None]:
%%time
df2 = log_file.rdd.map(white_filter).toDF(schema=('TimeStamp', 'Words'))
df2.show(truncate=False)

+--------------------------+-----------+
|TimeStamp                 |Words      |
+--------------------------+-----------+
|2022-02-1617:47:42.154547 |dam sat    |
|2022-02-1617:47:42.155723 |eye nut    |
|2022-02-1617:47:42.156961 |box        |
|2022-02-1617:47:42.163537 |           |
|2022-02-1617:47:42.164261 |nod        |
|2022-02-1617:47:42.164976 |car        |
|2022-02-1617:47:42.165674 |for        |
|2022-02-1617:47:42.166676 |           |
|2022-02-1617:47:42.167349 |all why gel|
|2022-02-1617:47:42.168004 |           |
|2022-02-1617:47:42.168754 |god jog for|
|2022-02-1617:47:42.169431 |           |
|2022-02-1617:47:42.170268 |bye        |
|2022-02-1617:47:42.171248 |           |
|2022-02-1617:47:42.172156 |all tow    |
|2022-02-1617:47:42.173072 |zip        |
|2022-02-1617:47:42.174047 |           |
|2022-02-1617:47:42.174767 |wow        |
|2022-02-1617:47:42.175409 |           |
|2022-02-1617:47:42.175939 |you        |
+--------------------------+-----------+
only showing top

In [None]:
wordCounts = df2.rdd.flatMap(lambda line: line[1].split(" "))\
                      .map(lambda word: (word, 1))\
                      .reduceByKey(lambda a, b: a + b)

In [None]:
%%time
wordCounts.toDF().show()

+---+-----+
| _1|   _2|
+---+-----+
|sat|  495|
|nut|  513|
|   |33234|
|why|  516|
|gel|  522|
|jog|  500|
|bye|  501|
|tow|  498|
|zip|  480|
|now|  487|
|yet|  501|
|vet|  540|
|dot|  521|
|ace|  514|
|fix|  471|
|ink|  492|
|cut|  507|
|rag|  496|
|pup|  504|
|yes|  491|
+---+-----+
only showing top 20 rows

CPU times: user 186 ms, sys: 19.3 ms, total: 205 ms
Wall time: 33 s


## 3. Spark Pandas API 

Начиная с версии Spark 3.2 имеется реализация Pandas API.    
Хороший материал непосредственно по pandas: [mlcourse.ai](https://habr.com/ru/company/ods/blog/322626/)

In [None]:
# import pandas as pd

import pyspark.pandas as pd



In [None]:
df = pd.read_csv('sample_data/credit_data.csv')
type(df)

pyspark.pandas.frame.DataFrame

In [None]:
df.head()

Unnamed: 0,id,Age,Sex,Job,Housing,Saving_accounts,Checking_account,Credit_amount,Duration,Purpose
0,0,67,male,2,own,,little,1169,6,radio/TV
1,1,22,female,2,own,little,moderate,5951,48,radio/TV
2,2,49,male,1,own,little,,2096,12,education
3,3,45,male,2,free,little,little,7882,42,furniture/equipment
4,4,53,male,2,free,little,little,4870,24,car


In [None]:
df.info()

<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 1000 entries, 0 to 999
Data columns (total 10 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   id                1000 non-null   int32 
 1   Age               1000 non-null   int32 
 2   Sex               1000 non-null   object
 3   Job               1000 non-null   int32 
 4   Housing           1000 non-null   object
 5   Saving_accounts   1000 non-null   object
 6   Checking_account  1000 non-null   object
 7   Credit_amount     1000 non-null   int32 
 8   Duration          1000 non-null   int32 
 9   Purpose           1000 non-null   object
dtypes: int32(5), object(5)

In [None]:
# Средний возраст заемщиков 
df['Age'].mean()

35.546

In [None]:
# Статистика по всем числовым колонкам
df.describe()

Unnamed: 0,id,Age,Job,Credit_amount,Duration
count,1000.0,1000.0,1000.0,1000.0,1000.0
mean,499.5,35.546,1.904,3271.258,20.903
std,288.819436,11.375469,0.653614,2822.736876,12.058814
min,0.0,19.0,0.0,250.0,4.0
25%,249.0,27.0,2.0,1364.0,12.0
50%,499.0,33.0,2.0,2319.0,18.0
75%,749.0,42.0,2.0,3972.0,24.0
max,999.0,75.0,3.0,18424.0,72.0


### Индексация и фильтрация данных

In [None]:
# Индексация python slices 
df[1:11:2]

Unnamed: 0,id,Age,Sex,Job,Housing,Saving_accounts,Checking_account,Credit_amount,Duration,Purpose
1,1,22,female,2,own,little,moderate,5951,48,radio/TV
3,3,45,male,2,free,little,little,7882,42,furniture/equipment
5,5,35,male,1,free,,,9055,36,education
7,7,35,male,3,rent,little,moderate,6948,36,car
9,9,28,male,3,own,little,moderate,5234,30,car


In [None]:
# Фильтрация по условию
df[df["Sex"] == 'male']

In [None]:
# Какой средний размер кредита у заемщиков мужчин?
df[df["Sex"] == 'male']['Credit_amount'].mean()

3448.040579710145

In [None]:
# Какой средний размер кредита у заемщиков женщин?
df[df["Sex"] == 'female']['Credit_amount'].mean()

2877.7741935483873

### Группировка данных

In [None]:
# Группировка разделяет df на несколько частей, в которых значения заданной колонки будут одинаковыми
df.groupby('Sex')

<pyspark.sql.group.GroupedData at 0x7f0af8822650>

In [None]:
# Можно указать какие колонки нас интересуют 
df.groupby('Sex')['Credit_amount']

<pyspark.pandas.groupby.SeriesGroupBy at 0x7f0afa52b8d0>

In [None]:
# В конце группировки нужно указать функцию
df.groupby('Sex')['Credit_amount'].mean()

Sex
female    2877.774194
male      3448.040580
Name: Credit_amount, dtype: float64

### Визуализация

In [None]:
df['Age'].hist()

In [None]:
df['Age'].plot()

## 4. Домашнее задание  



1. Сколько мужчин и женщин (признак Sex) представлено в этом наборе данных?

In [None]:
# Ваш код здесь

2. Каков средний возраст (признак Age) женщин?

In [None]:
# Ваш код здесь

3. Какова доля заемщиков с собственным жильем (признак Housing)?

In [None]:
# Ваш код здесь

4. Каково среднее значение возраста тех, кто имеет высокие накопления (признак Saving_accounts)?

In [None]:
# Ваш код здесь

5. Каково среднеквадратичное отклонения возраста тех, кто имеет высокие накопления (признак Saving_accounts)?

In [None]:
# Ваш код здесь

6. Выведите гистограмму категорий покупок (признак Purpose) для мужчин и женщин.

In [None]:
# Ваш код здесь

7. На что чаще всего берутся длинные кредиты (более 24 мес)?

In [None]:
# Ваш код здесь

8. Какой средний срок кредита (признак Duration) для заемщиков, имеющих высокие текущие траты (признак Checking_account)?

In [None]:
# Ваш код здесь

9. Какой средний срок кредита (признак Duration) для заемщиков, имеющих низкие текущие траты (признак Checking_account)?

In [None]:
# Ваш код здесь

10. На какую цель взят самый дорогой кредит?

In [None]:
# Ваш код здесь