# Введение в Spark

**Spark** - фреймоврк для обработки Big Data.
+ https://spark.apache.org/
+ https://www.bigdataschool.ru/wiki/spark



## Hardware

+ **Процессор (CPU)** - "мозг компьютера". Каждый процесс в комьютере обрабатывается CPU.
+ **Оперативная память, ОЗУ (Memory, RAM)** — это временное хранилище данных перед отправкой их в CPU при запуске программ. Очищается при выключение комьютера.
+ **Долговременная память (Storage, SSD или Magnetic Disk)** - хранит данные в течение длительного периода. При запуске программ ОЗУ загружает данные из этого хранилища.
+ **Сеть (Network - LAN или Internet)** - это шлюз для всего, что вам нужно, но не хранится на вашем компьютере.

![Hardware](img/hardware.png)

📌Операции в оперативной памяти выполняются относительно быстро по сравнению с чтением и записью с диска или перемещением данных по сети. Однако ОЗУ стоит дорого, и данные, хранящиеся в ОЗУ, стираются при выключении компьютера.

📌Передача данных по сети, т.е. между компьютерами, является самым большим узким местом при работе с большими данными. Одним из преимуществ *Spark* является то, что он перетасовывает данные между компьютерами только тогда, когда это необходимо.

## Distributed Calculation
![](img/distributed_parallel.png)
📌На высоком уровне *распределенные вычисления* подразумевают несколько процессоров, каждый со своей собственной памятью. *Параллельные вычисления* используют несколько процессоров, совместно использующих одну и ту же память.

## Экосистема Hadoop
**Hadoop** - экосистема инструментов (более старая чем *Spark*) для хранения и анализа больших данных. Основное различие между *Spark* и *Hadoop* заключается в том, как они используют память. *Hadoop* записывает промежуточные результаты на диск, тогда как *Spark* пытается по возможности сохранять данные в памяти. Это делает *Spark* быстрее во многих случаях использования.
+ **Hadoop MapReduce** - система для параллельной обработки и анализа больших наборов данных.
+ **Hadoop YARN** - менеджер ресурсов, который планирует задания в кластере. Менеджер отслеживает, какие компьютерные ресурсы доступны, а затем назначает эти ресурсы для конкретных задач.
+ **Hadoop Distributed File System (HDFS)** - система хранения больших данных, которая разбивает данные на фрагменты и хранит фрагменты в кластере компьютеров.

Пример схемы MapReduce:
![MapReduce](img/mapreduce.png)

   
## Как работает Spark
+ **driver**: исполняет программу
  - исполняет код программы
  - планирует и запускает *executor*
+ **executor**: выполняет вычисления
  - исполняет код, переданный *driver*
  - отчитывается перед *driver* о состоянии процесса вычисления
+ **cluster manager**: управляет физическими машинами и выделяет ресурсы *Spark* приложениям
   - *Standalone*
   - *YARN*
   - *Mesos*
   - *Kubernetes* 
   
![](img\cluster.png)   
   
## Spark modes
📌 Возможность управления размещением процессов (*driver*, *executors*)

+ **Local mode** - запуск *Spark* на собственном компьютере, как на одной машине (потоки).
+ **Deploy modes**: запуск *Spark* на нескольких машинах. 
  - **Cluster mode** - Все в кластере
  - **Client mode** - *driver* работает вне кластера, *executor*-ы в кластере   
   
   
## Структуры данных в Spark   
+ **RDD (resilient distributed dataset)** (отказоустойчивый распределенный датасет) - 
основной вычислительный примитив *Spark*. Это распределенная коллекция данных, размещенных на узлах кластера.  
+ **DataFrame** - соответствует таблице в реляционной базе данных с улучшенной оптимизаций для распределенных вычислений
+ **DataSet** - типизированный *DataFrame*

## Методы Spark
+ **transformation** - изменение объекта
+ **action** - вычисление результата 

📌 **lazy evaluation** - *трансформации* выполняются только во время *действия*

## Spark кластеры

📌*Spark* не имеет собсвенной распределенной системы хранения.

Вместо этого использует:
+ *HDFS* (Hadoop Distributed File System)
+ Облачное распределенное хранилище (например *AWS S3*)


Благодаря гибкости расположения и снижению стоимости обслуживания облачные решения пользуются большей популярностью.

# Обработка данных с помощью Spark

+ [PySpark Tutorial](https://sparkbyexamples.com/pyspark-tutorial/) - здесь можно найти много полезного
+ [medium: Introduction to PySpark](https://medium.com/the-researchers-guide/introduction-to-pyspark-a61f7217398e)
+ [Руководство по PySpark для начинающих](https://pythonru.com/biblioteki/pyspark-dlja-nachinajushhih?ysclid=lditcls4ah385567956)
+ [First Steps With PySpark and Big Data Processing](https://realpython.com/pyspark-intro/#pyspark-shell)

YouTube:
+ [Анализируем данные с помощью фреймворка Spark](https://youtu.be/McXK_ObP00c)
+ [МИТАП "Apache Spark за 2 часа - для нетерпеливых"_20 апреля 2022г](https://youtu.be/xuMe6OFyQ2s)

## Установка

Pyspark:
+ По умолчанию в Jupyter не запускается - нужна установка Spark. 
+ По умолчанию запускается в Google Colab

Уcтановка spark на Windows:
+ https://naomi-fridman.medium.com/install-pyspark-to-run-on-jupyter-notebook-on-windows-4ec2009de21f

В моем случае (hadoop 3)
+ https://github.com/dotnet/spark/discussions/986

📌 `hadoop.dll` загружать вместе с `winutils.exe`

In [1]:
# !pip install pyspark
# !pip install findspark

In [2]:
import datetime

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql.types import IntegerType

import findspark
findspark.init()
findspark.find()

'C:\\Programs\\Spark\\spark-3.3.1-bin-hadoop3'

## SparkSession - точка входа

In [2]:
# Первое создание сессии по времени может занять несколько минут
spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark_Tutorial')\
        .getOrCreate()

In [3]:
spark

In [4]:
# spark.sparkContext.getConf().getAll()

In [5]:
# закрыть сессию
# spark.stop()

## Map

In [4]:
sc = spark.sparkContext
log_of_songs = [
        "Despacito",
        "Nice for what",
        "No tears left to cry",
        "Despacito",
        "Havana",
        "In my feelings",
        "Nice for what",
        "despacito",
        "All the stars"
]

# Создает RDD из коллекции Python
rdd_songs = sc.parallelize(log_of_songs)
rdd_songs

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

In [7]:
rdd_songs.map(lambda song: song.lower())

PythonRDD[5] at RDD at PythonRDD.scala:53

>Чтобы заставить *Spark* фактически запустить этап `map`, вам нужно использовать «действие». Например метод `collect()`.
Метод `collect()` берет результаты со всех кластеров и «собирает» их в единый список на главном узле.

In [8]:
rdd_songs.map(lambda song: song.lower()).collect()

['despacito',
 'nice for what',
 'no tears left to cry',
 'despacito',
 'havana',
 'in my feelings',
 'nice for what',
 'despacito',
 'all the stars']

## Создание таблиц

In [9]:
df_country = spark.createDataFrame(data=[(0, 'ru', 'Russia'),
                                         (1, 'it', 'Italy'),
                                         (2, 'fi', 'Finland')],
                                   schema=['id', 'code', 'name'])
df_country.show()

+---+----+-------+
| id|code|   name|
+---+----+-------+
|  0|  ru| Russia|
|  1|  it|  Italy|
|  2|  fi|Finland|
+---+----+-------+



In [10]:
df_capital = spark.createDataFrame(data=[(100, 'ru', 'Moscow', 15_000_000),
                                         (101, 'it', 'Rome', 4_000_000)],
                                   schema=['id', 'cntr_code', 'city', 'population'])
df_capital.show()

+---+---------+------+----------+
| id|cntr_code|  city|population|
+---+---------+------+----------+
|100|       ru|Moscow|  15000000|
|101|       it|  Rome|   4000000|
+---+---------+------+----------+



<div style="font-size: 16px; font-weight: bold;">toPandas()</div> - преобразует в pandas DataFrame

In [11]:
df_capital.toPandas()

Unnamed: 0,id,cntr_code,city,population
0,100,ru,Moscow,15000000
1,101,it,Rome,4000000


## Read and Write Data

![](img\read_data.png)

Красивый вывод
+ https://stackoverflow.com/questions/43427138/pyspark-show-dataframe-as-table-with-horizontal-scroll-in-ipython-notebook

In [27]:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

### Read

In [13]:
path = "data/sparkify_log_small.json"
df = spark.read.json(path)
df

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

In [14]:
# Data Types
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [15]:
df.columns

['artist',
 'auth',
 'firstName',
 'gender',
 'itemInSession',
 'lastName',
 'length',
 'level',
 'location',
 'method',
 'page',
 'registration',
 'sessionId',
 'song',
 'status',
 'ts',
 'userAgent',
 'userId']

<div style="font-size: 16px; font-weight: bold;">show()</div> - Выводит данные DataFrame

In [16]:
df.show(1)

+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|       artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|           userAgent|userId|
+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|Showaddywaddy|Logged In|  Kenneth|     M|          112|Matthews|232.93342| paid|Charlotte-Concord...|   PUT|NextSong|1509380319284|     5132|Christmas Tears W...|   200|1513720872284|"Mozilla/5.0 (Win...|  1046|
+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+------

In [17]:
df.show(1, truncate=False, vertical=True)

-RECORD 0-----------------------------------------------------------------------------------------------------------------
 artist        | Showaddywaddy                                                                                            
 auth          | Logged In                                                                                                
 firstName     | Kenneth                                                                                                  
 gender        | M                                                                                                        
 itemInSession | 112                                                                                                      
 lastName      | Matthews                                                                                                 
 length        | 232.93342                                                                                                
 level         |

<div style="font-size: 16px; font-weight: bold;">take(n) == head(n)</div> - Выводят список строк

In [18]:
df.take(3)

[Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046'),
 Row(artist='Lily Allen', auth='Logged In', firstName='Elizabeth', gender='F', itemInSession=7, lastName='Chase', length=195.23873, level='free', location='Shreveport-Bossier City, LA', method='PUT', page='NextSong', registration=1512718541284, sessionId=5027, song='Cheryl Tweedy', status=200, ts=1513720878284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1000'),
 Row(artist='Cobra Starship Featuring Leighton Meester', auth='Logged In', firstNa

In [19]:
df.head(3)

[Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046'),
 Row(artist='Lily Allen', auth='Logged In', firstName='Elizabeth', gender='F', itemInSession=7, lastName='Chase', length=195.23873, level='free', location='Shreveport-Bossier City, LA', method='PUT', page='NextSong', registration=1512718541284, sessionId=5027, song='Cheryl Tweedy', status=200, ts=1513720878284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1000'),
 Row(artist='Cobra Starship Featuring Leighton Meester', auth='Logged In', firstNa

<div style="font-size: 16px; font-weight: bold;">limit(n)</div> - Трансформирует DataFrame до n строк

In [20]:
df.limit(3).show()

+--------------------+---------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|     auth|firstName|gender|itemInSession| lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+---------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|       Showaddywaddy|Logged In|  Kenneth|     M|          112| Matthews|232.93342| paid|Charlotte-Concord...|   PUT|NextSong|1509380319284|     5132|Christmas Tears W...|   200|1513720872284|"Mozilla/5.0 (Win...|  1046|
|          Lily Allen|Logged In|Elizabeth|     F|            7|    Chase|195.23873| free|Shreveport-Bossie...|   PUT

### Write
📌 Spark сохраняет частями (несколькими файлами)

In [21]:
out_path = "data/sparkify_log_small"

df.write.save(out_path, format="csv", header=True, mode='overwrite')

# сохранить в один файл (Если файлы большие, то может не сработать)
# df.coalesce(1).write.mode('overwrite').options(header='True', delimiter=',').csv(out_path)

In [22]:
# inferSchema - корректное отображение типов данных
df2 = spark.read.csv(out_path, header=True, inferSchema = True)
df2.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: integer (nullable = true)



## Основные функции

Строки: 
+ `select()`: возвращает новый DataFrame с выбранными столбцами
  - `select("*")`: возвращает все колонки
+ `filter()`: фильтрует строки, используя заданное условие
  - Понимает диалект SQL
  - Как pandas
  - через `pyspark.sql.functions`
+ `where()`: это псевдоним для `filter()`
+ `sort()`: возвращает новый DataFrame, отсортированный по указанным столбцам. По умолчанию второй параметр'ascending' имеет значение True.
+ `dropDuplicates()`: возвращает новый DataFrame с уникальными строками на основе всех или только подмножества столбцов
+ `distinct()`: оставляет уникальные значения

Колонки:
+ `withColumn()`: возвращает новый DataFrame, добавляя столбец или заменяя существующий столбец с таким же именем. Первый параметр — это имя нового столбца, второй — выражение того, как его вычислить.
+ `drop(column)`: удаляет столбец
+ `withColumnRenamed(old_name, new_name)`: переименовать столбцы

In [23]:
df.select("page").dropDuplicates().sort("page").show()

+----------------+
|            page|
+----------------+
|           About|
|       Downgrade|
|           Error|
|            Help|
|            Home|
|           Login|
|          Logout|
|        NextSong|
|   Save Settings|
|        Settings|
|Submit Downgrade|
|  Submit Upgrade|
|         Upgrade|
+----------------+



In [24]:
df.select(["userId", "firstname", "page", "song"]).where(df.userId == "1046").collect()

[Row(userId='1046', firstname='Kenneth', page='NextSong', song='Christmas Tears Will Fall'),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='Be Wary Of A Woman'),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='Public Enemy No.1'),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='Reign Of The Tyrants'),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='Father And Son'),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='No. 5'),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='Seventeen'),
 Row(userId='1046', firstname='Kenneth', page='Home', song=None),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='War on war'),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='Killermont Street'),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='Black & Blue'),
 Row(userId='1046', firstname='Kenneth', page='Logout', song=None),
 Row(userId='1046', firstname='Kenneth'

In [25]:
df.select('firstName').filter(F.col('firstName').startswith('W')).dropDuplicates().collect()

[Row(firstName='Willie'),
 Row(firstName='Wyatt'),
 Row(firstName='Weston'),
 Row(firstName='William')]

In [26]:
df.select('firstName').filter('firstName like "W%"').dropDuplicates().collect()

[Row(firstName='Willie'),
 Row(firstName='Wyatt'),
 Row(firstName='Weston'),
 Row(firstName='William')]

## Агрегирующие функции
+ `describe()`: выводит статистики
+ `groupBy()`: группирует DataFrame, используя указанные столбцы
+ `agg()`: выполняет указанную агрегацию
+ `count()` - возвращает общее число строк в датасете.

Из модуля `pyspark.sql.functions`
+ `countDistinct()`
+ `avg()` = `mean()`
+ `max()`
+ `min()`

In [27]:
df.count()

10000

In [28]:
df.describe("sessionId").show()

+-------+------------------+
|summary|         sessionId|
+-------+------------------+
|  count|             10000|
|   mean|         4436.7511|
| stddev|2043.1281541827561|
|    min|                 9|
|    max|              7144|
+-------+------------------+



In [29]:
df.agg({'length': 'mean', 'ts': 'max'}).show()

+-----------------+-------------+
|      avg(length)|      max(ts)|
+-----------------+-------------+
|249.6486587492506|1513848349284|
+-----------------+-------------+



In [30]:
df.select(F.mean('length'), F.max('ts')).show()

+-----------------+-------------+
|      avg(length)|      max(ts)|
+-----------------+-------------+
|249.6486587492506|1513848349284|
+-----------------+-------------+



In [31]:
df.groupby('gender').avg().show()

+------+------------------+------------------+--------------------+------------------+-----------------+--------------------+
|gender|avg(itemInSession)|       avg(length)|   avg(registration)|    avg(sessionId)|      avg(status)|             avg(ts)|
+------+------------------+------------------+--------------------+------------------+-----------------+--------------------+
|     F|16.904188481675394|249.82205541807014|1.504609475280858...|4286.4780104712045|201.4740837696335|1.513786896915937...|
|  null| 7.669642857142857|              null|                null| 4215.958333333333|          240.125|1.513785530792928...|
|     M|22.173682409308693|249.53725770169163|1.504751515423117E12|  4547.67334017796|201.6890828199863|1.513785432853986...|
+------+------------------+------------------+--------------------+------------------+-----------------+--------------------+



In [32]:
df.groupby('gender').agg(F.countDistinct('gender').alias('countDistinct'), 
                         F.count('gender').alias('count')
                        ).show()

+------+-------------+-----+
|gender|countDistinct|count|
+------+-------------+-----+
|     F|            1| 3820|
|     M|            1| 5844|
|  null|            0|    0|
+------+-------------+-----+



## Пользовательские функции - udf()

In [33]:
get_hour = F.udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).hour)

In [34]:
df = df.withColumn("hour", get_hour(df.ts))

In [35]:
songs_in_hour = (df.filter(df.page == "NextSong")
                   .groupby(df.hour).count()
                   .orderBy(df.hour.cast("int"))
                )
songs_in_hour.show()

+----+-----+
|hour|count|
+----+-----+
|   0|  248|
|   1|  369|
|   2|  375|
|   3|  456|
|   4|  454|
|   5|  382|
|   6|  302|
|   7|  352|
|   8|  276|
|   9|  348|
|  10|  358|
|  11|  375|
|  12|  249|
|  13|  216|
|  14|  228|
|  15|  251|
|  16|  339|
|  17|  462|
|  18|  479|
|  19|  484|
+----+-----+
only showing top 20 rows



## Оконные функции

In [36]:
flag_downgrade_event = F.udf(lambda x: 1 if x == "Submit Downgrade" else 0, IntegerType())

df_downgrade = df.withColumn("downgraded", flag_downgrade_event("page"))

windowval = Window.partitionBy("userId").orderBy(F.desc("ts")).rangeBetween(Window.unboundedPreceding, 0)
df_window = df_downgrade.withColumn("phase", F.sum("downgraded").over(windowval))

In [37]:
(df_window.select(["userId", "firstname", "ts", "page", "level", "phase"])
          .where(df.userId == "1138")
          .sort("ts")
          .collect()
)

[Row(userId='1138', firstname='Kelly', ts=1513729066284, page='Home', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513729066284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513729313284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513729552284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513729783284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513730001284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513730263284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513730518284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513730768284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513731182284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firs

## Объединение таблиц

### join()

In [38]:
df_join = df_country.join(df_capital, 
                          on=(df_country['code'] == df_capital['cntr_code']), 
                          how='inner')
df_join.show()

+---+----+------+---+---------+------+----------+
| id|code|  name| id|cntr_code|  city|population|
+---+----+------+---+---------+------+----------+
|  1|  it| Italy|101|       it|  Rome|   4000000|
|  0|  ru|Russia|100|       ru|Moscow|  15000000|
+---+----+------+---+---------+------+----------+



### union()

In [39]:
df_union = df_country.union(df_country)
df_union.show()

+---+----+-------+
| id|code|   name|
+---+----+-------+
|  0|  ru| Russia|
|  1|  it|  Italy|
|  2|  fi|Finland|
|  0|  ru| Russia|
|  1|  it|  Italy|
|  2|  fi|Finland|
+---+----+-------+



## Пропущенные значения

### Удаление строк

In [40]:
df.count()

10000

In [41]:
df.dropna(how='any').count()

8347

### Заполнение пропусков

In [42]:
df.fillna('Missing').where('gender = "Missing"').show(3)

+-------+----------+---------+-------+-------------+--------+------+-----+--------+------+-----+------------+---------+-------+------+-------------+---------+------+----+
| artist|      auth|firstName| gender|itemInSession|lastName|length|level|location|method| page|registration|sessionId|   song|status|           ts|userAgent|userId|hour|
+-------+----------+---------+-------+-------------+--------+------+-----+--------+------+-----+------------+---------+-------+------+-------------+---------+------+----+
|Missing|Logged Out|  Missing|Missing|            0| Missing|  null| free| Missing|   PUT|Login|        null|     5598|Missing|   307|1513721196284|  Missing|      |   1|
|Missing|Logged Out|  Missing|Missing|           26| Missing|  null| paid| Missing|   GET| Home|        null|      428|Missing|   200|1513721274284|  Missing|      |   1|
|Missing|Logged Out|  Missing|Missing|            5| Missing|  null| free| Missing|   GET| Home|        null|     2941|Missing|   200|15137220092

## Квиз

Which page did user id "" (empty string) NOT visit?

In [43]:
df.select('Page').filter('userId = ""').distinct().show()

+-----+
| Page|
+-----+
| Home|
|About|
|Login|
| Help|
+-----+



In [44]:
df.filter('userId = ""').show()

+------+----------+---------+------+-------------+--------+------+-----+--------+------+-----+------------+---------+----+------+-------------+---------+------+----+
|artist|      auth|firstName|gender|itemInSession|lastName|length|level|location|method| page|registration|sessionId|song|status|           ts|userAgent|userId|hour|
+------+----------+---------+------+-------------+--------+------+-----+--------+------+-----+------------+---------+----+------+-------------+---------+------+----+
|  null|Logged Out|     null|  null|            0|    null|  null| free|    null|   PUT|Login|        null|     5598|null|   307|1513721196284|     null|      |   1|
|  null|Logged Out|     null|  null|           26|    null|  null| paid|    null|   GET| Home|        null|      428|null|   200|1513721274284|     null|      |   1|
|  null|Logged Out|     null|  null|            5|    null|  null| free|    null|   GET| Home|        null|     2941|null|   200|1513722009284|     null|      |   1|
|  n

In [45]:
df.groupby('auth').agg(F.count(F.when(F.col('userId') == "", 1)).alias('count empty')).show()

+----------+-----------+
|      auth|count empty|
+----------+-----------+
|Logged Out|        334|
|     Guest|          2|
| Logged In|          0|
+----------+-----------+



In [46]:
df.groupby('auth').agg(F.sum(F.when(F.col('userId') == "", 1).otherwise(0)).alias('count empty')).show()

+----------+-----------+
|      auth|count empty|
+----------+-----------+
|Logged Out|        334|
|     Guest|          2|
| Logged In|          0|
+----------+-----------+



How many female users do we have in the data set?

In [47]:
df.select(['gender', 'userId']).filter(F.col('gender') == 'F').distinct().count()

462

How many songs were played from the most played artist?

In [48]:
df.groupBy('artist').count().sort('count', ascending=False).show()

+--------------------+-----+
|              artist|count|
+--------------------+-----+
|                null| 1653|
|            Coldplay|   83|
|       Kings Of Leon|   69|
|Florence + The Ma...|   52|
|            BjÃÂ¶rk|   46|
|       Dwight Yoakam|   45|
|       Justin Bieber|   43|
|      The Black Keys|   40|
|         OneRepublic|   37|
|        Jack Johnson|   36|
|                Muse|   36|
|           Radiohead|   31|
|        Taylor Swift|   29|
|Barry Tuckwell/Ac...|   28|
|          Lily Allen|   28|
|               Train|   28|
|           Metallica|   27|
|           Daft Punk|   27|
|          Nickelback|   27|
|          Kanye West|   26|
+--------------------+-----+
only showing top 20 rows



In [49]:
df.filter(df.artist == 'Coldplay').select('song').count()

83

How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.

In [50]:
function = F.udf(lambda ishome : int(ishome == 'Home'), IntegerType())

user_window = Window \
    .partitionBy('userID') \
    .orderBy(F.desc('ts')) \
#     .rangeBetween(Window.unboundedPreceding, 0)

cusum = df.filter((df.page == 'NextSong') | (df.page == 'Home')) \
    .select('userID', 'page', 'ts') \
    .withColumn('homevisit', function(F.col('page'))) \
    .withColumn('period', F.sum('homevisit').over(user_window))

cusum.filter((cusum.page == 'NextSong')) \
    .groupBy('userID', 'period') \
    .agg({'period':'count'}) \
    .agg({'count(period)':'avg'}).show()

+------------------+
|avg(count(period))|
+------------------+
| 6.898347107438017|
+------------------+



In [51]:
cusum.where(df.userId == "1138").show(100)

+------+--------+-------------+---------+------+
|userID|    page|           ts|homevisit|period|
+------+--------+-------------+---------+------+
|  1138|NextSong|1513833144284|        0|     0|
|  1138|    Home|1513821430284|        1|     1|
|  1138|NextSong|1513814880284|        0|     1|
|  1138|    Home|1513768456284|        1|     2|
|  1138|NextSong|1513768452284|        0|     2|
|  1138|NextSong|1513768242284|        0|     2|
|  1138|NextSong|1513768012284|        0|     2|
|  1138|NextSong|1513767643284|        0|     2|
|  1138|NextSong|1513767413284|        0|     2|
|  1138|NextSong|1513767203284|        0|     2|
|  1138|NextSong|1513766838284|        0|     2|
|  1138|NextSong|1513766599284|        0|     2|
|  1138|NextSong|1513766385284|        0|     2|
|  1138|NextSong|1513766189284|        0|     2|
|  1138|NextSong|1513766091284|        0|     2|
|  1138|NextSong|1513765818284|        0|     2|
|  1138|NextSong|1513765655284|        0|     2|
|  1138|NextSong|151

## SQL

In [52]:
df.createOrReplaceTempView("user_log_table")

In [53]:
spark.sql("SELECT * FROM user_log_table LIMIT 2").show()

+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+----+
|       artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|           userAgent|userId|hour|
+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+----+
|Showaddywaddy|Logged In|  Kenneth|     M|          112|Matthews|232.93342| paid|Charlotte-Concord...|   PUT|NextSong|1509380319284|     5132|Christmas Tears W...|   200|1513720872284|"Mozilla/5.0 (Win...|  1046|   1|
|   Lily Allen|Logged In|Elizabeth|     F|            7|   Chase|195.23873| free|Shreveport-Bossie...|   PUT|NextSong|1512718541

In [54]:
spark.sql('''
          SELECT COUNT(*) 
          FROM user_log_table 
          '''
          ).show()

+--------+
|count(1)|
+--------+
|   10000|
+--------+



### Пользовательсие функции для SQL

In [55]:
spark.udf.register("get_hour", lambda x: int(datetime.datetime.fromtimestamp(x / 1000.0).hour))

<function __main__.<lambda>(x)>

In [56]:
spark.sql('''
          SELECT *, get_hour(ts) AS hour
          FROM user_log_table 
          LIMIT 1
          '''
          ).collect()

[Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046', hour='1', hour='1')]

In [57]:
songs_in_hour = spark.sql('''
          SELECT get_hour(ts) AS hour, COUNT(*) as plays_per_hour
          FROM user_log_table
          WHERE page = "NextSong"
          GROUP BY get_hour(ts)
          ORDER BY cast(hour as int) ASC
          '''
          )

In [58]:
songs_in_hour.show()

+----+--------------+
|hour|plays_per_hour|
+----+--------------+
|   0|           248|
|   1|           369|
|   2|           375|
|   3|           456|
|   4|           454|
|   5|           382|
|   6|           302|
|   7|           352|
|   8|           276|
|   9|           348|
|  10|           358|
|  11|           375|
|  12|           249|
|  13|           216|
|  14|           228|
|  15|           251|
|  16|           339|
|  17|           462|
|  18|           479|
|  19|           484|
+----+--------------+
only showing top 20 rows



## Pandas API on Spark
+ https://docs.databricks.com/pandas/pandas-on-spark.html
+ https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/index.html
+ https://sparkbyexamples.com/pyspark/pandas-api-on-apache-spark-pyspark/

In [64]:
# !pip install pyarrow

In [68]:
spark.conf.set('spark.sql.execution.arrow.enabled', 'true')

In [69]:
import pyspark.pandas as ps

In [74]:
pdf = df.pandas_api()
pdf.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,hour
0,Showaddywaddy,Logged In,Kenneth,M,112,Matthews,232.93342,paid,"Charlotte-Concord-Gastonia, NC-SC",PUT,NextSong,1509380319284,5132,Christmas Tears Will Fall,200,1513720872284,"""Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537....",1046,1
1,Lily Allen,Logged In,Elizabeth,F,7,Chase,195.23873,free,"Shreveport-Bossier City, LA",PUT,NextSong,1512718541284,5027,Cheryl Tweedy,200,1513720878284,"""Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537....",1000,1
2,Cobra Starship Featuring Leighton Meester,Logged In,Vera,F,6,Blackwell,196.20526,paid,"Racine, WI",PUT,NextSong,1499855749284,5516,Good Girls Go Bad (Feat.Leighton Meester) (Alb...,200,1513720881284,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2219,1
3,Alex Smoke,Logged In,Sophee,F,8,Barker,405.99465,paid,"San Luis Obispo-Paso Robles-Arroyo Grande, CA",PUT,NextSong,1513009647284,2372,Don't See The Point,200,1513720905284,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2373,1
4,,Logged In,Jordyn,F,0,Jones,,free,"Syracuse, NY",GET,Home,1513648531284,1746,,200,1513720913284,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",1747,1


In [81]:
pdf['page'].value_counts()

NextSong            8347
Home                1126
Login                126
Logout               100
Downgrade             75
Settings              59
Help                  58
About                 43
Upgrade               32
Error                 12
Save Settings         11
Submit Upgrade        10
Submit Downgrade       1
Name: page, dtype: int64

# Отладка и оптимизация

## Accumulators
📌 Переменные которые накапливаются

In [56]:
incorrect_record = sc.accumulator(0)
incorrect_record.value

0

In [57]:
def add_incorrect_record():
    global incorrect_record
    incorrect_record += 1

In [58]:
correct_id = F.udf(lambda x: 1 if x != '' else add_incorrect_record())
df_accum = df.where(df.gender.isNull()).withColumn('correct_id', correct_id(df['userid']))
df_accum.collect();


[Row(artist=None, auth='Logged Out', firstName=None, gender=None, itemInSession=0, lastName=None, length=None, level='free', location=None, method='PUT', page='Login', registration=None, sessionId=5598, song=None, status=307, ts=1513721196284, userAgent=None, userId='', correct_id=None),
 Row(artist=None, auth='Logged Out', firstName=None, gender=None, itemInSession=26, lastName=None, length=None, level='paid', location=None, method='GET', page='Home', registration=None, sessionId=428, song=None, status=200, ts=1513721274284, userAgent=None, userId='', correct_id=None),
 Row(artist=None, auth='Logged Out', firstName=None, gender=None, itemInSession=5, lastName=None, length=None, level='free', location=None, method='GET', page='Home', registration=None, sessionId=2941, song=None, status=200, ts=1513722009284, userAgent=None, userId='', correct_id=None),
 Row(artist=None, auth='Logged Out', firstName=None, gender=None, itemInSession=5, lastName=None, length=None, level='paid', location=N

In [59]:
incorrect_record.value

336

## Broadcast

📌 Позволяет присоединять большие таблицы к маленьким таблицам.

In [6]:
my_dict = {"item1": 1, "item2": 2, "item3": 3, "item4": 4} 
my_list = ["item1", "item2", "item3", "item4"]

my_dict_bc = sc.broadcast(my_dict)

def my_func(letter):
    return my_dict_bc.value[letter] 

my_list_rdd = sc.parallelize(my_list)

result = my_list_rdd.map(lambda x: my_func(x)).collect()

print(result)

[1, 2, 3, 4]


## Асимметрия данных

📌 Обрабатывают ли определенные `partition` значительно больше данных, чем другие?
+ https://sparkbyexamples.com/spark/spark-partitioning-understanding/


Оптимизация асимметрии:
+ Использовать `.repartition(number_of_workers or column_name)`
+ Использовать `.partitionBy(column_name)` после `write`




In [60]:
df_park = spark.read.csv('data/parking_violation.csv', header=True, inferSchema=True)
df_park.show()

+---+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+-------------+-------------------+--------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+---------------------+------------+------------+--------------+-------------------+---------------------+---------------------------------+-----------------+------------------------+--------+---------+---------------+------------------+------------+----+----+----+----+-----+
|_c0|Summons_Number|Plate_ID|Registration_State|Plate_Type|Issue_Date|Violation_Code|Vehicle_Body_Type|Vehicle_Make|Issuing_Agency|Street_Code1|Street_Code

In [61]:
df_park.groupBy('year').count().show()

+----+-------+
|year|  count|
+----+-------+
|2003|      3|
|2015|5986831|
|2013|    592|
|2014|5821043|
|2012|    189|
|2000|    261|
|2010|    110|
|2011|    122|
|2007|      3|
|1991|      1|
|2005|     14|
|2006|      3|
|2004|     22|
|2008|      6|
|2009|      3|
|2001|     22|
|2002|      3|
|1988|      3|
|1985|      1|
|1986|      1|
+----+-------+



In [62]:
df_park.groupBy('month').count().show()

+-----+-------+
|month|  count|
+-----+-------+
|   12| 774287|
|    1|1392992|
|    6|1276592|
|    3| 965119|
|    5|1040148|
|    9|1029583|
|    4| 951716|
|    8| 911641|
|    7| 969938|
|   10| 966456|
|   11| 798897|
|    2| 731864|
+-----+-------+



In [63]:
# df_park.write.partitionBy('year').csv("data/parking_violation_year.csv")

In [64]:
# df_park.repartition(5).write.csv("data/parking_violation_repartition.csv")

# Spark ML

In [3]:
from pyspark.ml.feature import RegexTokenizer, VectorAssembler, Normalizer, StandardScaler
import re

In [4]:
spark = SparkSession.builder \
    .master("local") \
    .appName("Word Count") \
    .getOrCreate()

In [43]:
df = spark.read.json("data/Train_onetag_small.json")
df.head()

Row(Body="<p>I'd like to check if an uploaded file is an image file (e.g png, jpg, jpeg, gif, bmp) or another file. The problem is that I'm using Uploadify to upload the files, which changes the mime type and gives a 'text/octal' or something as the mime type, no matter which file type you upload.</p>\n\n<p>Is there a way to check if the uploaded file is an image apart from checking the file extension using PHP?</p>\n", Id=1, Tags='php image-processing file-upload upload mime-types', Title='How to check if an uploaded file is an image without mime type?', oneTag='php')

In [44]:
df.printSchema()

root
 |-- Body: string (nullable = true)
 |-- Id: long (nullable = true)
 |-- Tags: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- oneTag: string (nullable = true)



## Tokenization

In [45]:
regexTokenizer = RegexTokenizer(inputCol="Body", outputCol="words", pattern="\\W")
df = regexTokenizer.transform(df)
df.head()

Row(Body="<p>I'd like to check if an uploaded file is an image file (e.g png, jpg, jpeg, gif, bmp) or another file. The problem is that I'm using Uploadify to upload the files, which changes the mime type and gives a 'text/octal' or something as the mime type, no matter which file type you upload.</p>\n\n<p>Is there a way to check if the uploaded file is an image apart from checking the file extension using PHP?</p>\n", Id=1, Tags='php image-processing file-upload upload mime-types', Title='How to check if an uploaded file is an image without mime type?', oneTag='php', words=['p', 'i', 'd', 'like', 'to', 'check', 'if', 'an', 'uploaded', 'file', 'is', 'an', 'image', 'file', 'e', 'g', 'png', 'jpg', 'jpeg', 'gif', 'bmp', 'or', 'another', 'file', 'the', 'problem', 'is', 'that', 'i', 'm', 'using', 'uploadify', 'to', 'upload', 'the', 'files', 'which', 'changes', 'the', 'mime', 'type', 'and', 'gives', 'a', 'text', 'octal', 'or', 'something', 'as', 'the', 'mime', 'type', 'no', 'matter', 'which

## Числовые признаки

In [46]:
body_length = F.udf(lambda x: len(x), IntegerType())
df = df.withColumn("BodyLength", body_length(df.words))

In [47]:
number_of_paragraphs = F.udf(lambda x: len(re.findall("</p>", x)), IntegerType())
number_of_links = F.udf(lambda x: len(re.findall("</a>", x)), IntegerType())

In [48]:
df = df.withColumn("NumParagraphs", number_of_paragraphs(df.Body))
df = df.withColumn("NumLinks", number_of_links(df.Body))

In [49]:
df.head(2)

[Row(Body="<p>I'd like to check if an uploaded file is an image file (e.g png, jpg, jpeg, gif, bmp) or another file. The problem is that I'm using Uploadify to upload the files, which changes the mime type and gives a 'text/octal' or something as the mime type, no matter which file type you upload.</p>\n\n<p>Is there a way to check if the uploaded file is an image apart from checking the file extension using PHP?</p>\n", Id=1, Tags='php image-processing file-upload upload mime-types', Title='How to check if an uploaded file is an image without mime type?', oneTag='php', words=['p', 'i', 'd', 'like', 'to', 'check', 'if', 'an', 'uploaded', 'file', 'is', 'an', 'image', 'file', 'e', 'g', 'png', 'jpg', 'jpeg', 'gif', 'bmp', 'or', 'another', 'file', 'the', 'problem', 'is', 'that', 'i', 'm', 'using', 'uploadify', 'to', 'upload', 'the', 'files', 'which', 'changes', 'the', 'mime', 'type', 'and', 'gives', 'a', 'text', 'octal', 'or', 'something', 'as', 'the', 'mime', 'type', 'no', 'matter', 'whic

### VectorAssembler

In [50]:
assembler = VectorAssembler(inputCols=["BodyLength", "NumParagraphs", "NumLinks"], outputCol="NumFeatures")
df = assembler.transform(df)

In [51]:
df.head()

Row(Body="<p>I'd like to check if an uploaded file is an image file (e.g png, jpg, jpeg, gif, bmp) or another file. The problem is that I'm using Uploadify to upload the files, which changes the mime type and gives a 'text/octal' or something as the mime type, no matter which file type you upload.</p>\n\n<p>Is there a way to check if the uploaded file is an image apart from checking the file extension using PHP?</p>\n", Id=1, Tags='php image-processing file-upload upload mime-types', Title='How to check if an uploaded file is an image without mime type?', oneTag='php', words=['p', 'i', 'd', 'like', 'to', 'check', 'if', 'an', 'uploaded', 'file', 'is', 'an', 'image', 'file', 'e', 'g', 'png', 'jpg', 'jpeg', 'gif', 'bmp', 'or', 'another', 'file', 'the', 'problem', 'is', 'that', 'i', 'm', 'using', 'uploadify', 'to', 'upload', 'the', 'files', 'which', 'changes', 'the', 'mime', 'type', 'and', 'gives', 'a', 'text', 'octal', 'or', 'something', 'as', 'the', 'mime', 'type', 'no', 'matter', 'which

### Normalize the Vectors

In [52]:
scaler = Normalizer(inputCol="NumFeatures", outputCol="ScaledNumFeatures")
df = scaler.transform(df)

In [53]:
df.head(2)

[Row(Body="<p>I'd like to check if an uploaded file is an image file (e.g png, jpg, jpeg, gif, bmp) or another file. The problem is that I'm using Uploadify to upload the files, which changes the mime type and gives a 'text/octal' or something as the mime type, no matter which file type you upload.</p>\n\n<p>Is there a way to check if the uploaded file is an image apart from checking the file extension using PHP?</p>\n", Id=1, Tags='php image-processing file-upload upload mime-types', Title='How to check if an uploaded file is an image without mime type?', oneTag='php', words=['p', 'i', 'd', 'like', 'to', 'check', 'if', 'an', 'uploaded', 'file', 'is', 'an', 'image', 'file', 'e', 'g', 'png', 'jpg', 'jpeg', 'gif', 'bmp', 'or', 'another', 'file', 'the', 'problem', 'is', 'that', 'i', 'm', 'using', 'uploadify', 'to', 'upload', 'the', 'files', 'which', 'changes', 'the', 'mime', 'type', 'and', 'gives', 'a', 'text', 'octal', 'or', 'something', 'as', 'the', 'mime', 'type', 'no', 'matter', 'whic

### Scale the Vectors

In [54]:
scaler2 = StandardScaler(inputCol="NumFeatures", outputCol="ScaledNumFeatures2", withStd=True)
scalerModel = scaler2.fit(df)
df = scalerModel.transform(df)

In [55]:
df.head(2)

[Row(Body="<p>I'd like to check if an uploaded file is an image file (e.g png, jpg, jpeg, gif, bmp) or another file. The problem is that I'm using Uploadify to upload the files, which changes the mime type and gives a 'text/octal' or something as the mime type, no matter which file type you upload.</p>\n\n<p>Is there a way to check if the uploaded file is an image apart from checking the file extension using PHP?</p>\n", Id=1, Tags='php image-processing file-upload upload mime-types', Title='How to check if an uploaded file is an image without mime type?', oneTag='php', words=['p', 'i', 'd', 'like', 'to', 'check', 'if', 'an', 'uploaded', 'file', 'is', 'an', 'image', 'file', 'e', 'g', 'png', 'jpg', 'jpeg', 'gif', 'bmp', 'or', 'another', 'file', 'the', 'problem', 'is', 'that', 'i', 'm', 'using', 'uploadify', 'to', 'upload', 'the', 'files', 'which', 'changes', 'the', 'mime', 'type', 'and', 'gives', 'a', 'text', 'octal', 'or', 'something', 'as', 'the', 'mime', 'type', 'no', 'matter', 'whic

## Текстовые признаки

In [56]:
from pyspark.ml.feature import CountVectorizer, IDF, StringIndexer

### CountVectorizer

In [57]:
cv = CountVectorizer(inputCol="words", outputCol="TF", vocabSize=1000)
cvmodel = cv.fit(df)
df = cvmodel.transform(df)
df.take(1)

[Row(Body="<p>I'd like to check if an uploaded file is an image file (e.g png, jpg, jpeg, gif, bmp) or another file. The problem is that I'm using Uploadify to upload the files, which changes the mime type and gives a 'text/octal' or something as the mime type, no matter which file type you upload.</p>\n\n<p>Is there a way to check if the uploaded file is an image apart from checking the file extension using PHP?</p>\n", Id=1, Tags='php image-processing file-upload upload mime-types', Title='How to check if an uploaded file is an image without mime type?', oneTag='php', words=['p', 'i', 'd', 'like', 'to', 'check', 'if', 'an', 'uploaded', 'file', 'is', 'an', 'image', 'file', 'e', 'g', 'png', 'jpg', 'jpeg', 'gif', 'bmp', 'or', 'another', 'file', 'the', 'problem', 'is', 'that', 'i', 'm', 'using', 'uploadify', 'to', 'upload', 'the', 'files', 'which', 'changes', 'the', 'mime', 'type', 'and', 'gives', 'a', 'text', 'octal', 'or', 'something', 'as', 'the', 'mime', 'type', 'no', 'matter', 'whic

In [58]:
cvmodel.vocabulary

['p',
 'the',
 'i',
 'to',
 'code',
 'a',
 'gt',
 'lt',
 'is',
 'and',
 'pre',
 'in',
 'this',
 'of',
 'it',
 'that',
 'for',
 '0',
 '1',
 'have',
 'my',
 'if',
 'on',
 'but',
 'with',
 'can',
 'not',
 'be',
 'as',
 't',
 'li',
 'from',
 '2',
 's',
 'http',
 'an',
 'm',
 'strong',
 'new',
 'how',
 'do',
 'com',
 'so',
 'or',
 'at',
 'using',
 'when',
 'am',
 'like',
 'class',
 'id',
 'there',
 'get',
 'are',
 'name',
 'what',
 'any',
 'file',
 'string',
 'data',
 'all',
 'which',
 'want',
 'would',
 'amp',
 'use',
 'java',
 'function',
 'public',
 'some',
 '3',
 'text',
 'error',
 'android',
 'value',
 'c',
 'x',
 'href',
 'you',
 'one',
 'by',
 'user',
 'me',
 'server',
 'type',
 'here',
 'way',
 'return',
 'int',
 'will',
 'div',
 'need',
 'then',
 'set',
 'e',
 'system',
 'has',
 'problem',
 'out',
 'php',
 'no',
 'just',
 '4',
 'org',
 'know',
 'html',
 'only',
 'where',
 'page',
 'application',
 '5',
 'thanks',
 'var',
 'br',
 'we',
 'd',
 'should',
 'does',
 'add',
 'n',
 'true',

In [59]:
cvmodel.vocabulary[-10:]

['customer',
 'desktop',
 'buttons',
 'previous',
 'math',
 'master',
 '000',
 'blog',
 'comes',
 'wordpress']

### Inter-document Frequency

In [60]:
idf = IDF(inputCol="TF", outputCol="TFIDF")
idfModel = idf.fit(df)
df = idfModel.transform(df)
df.head()

Row(Body="<p>I'd like to check if an uploaded file is an image file (e.g png, jpg, jpeg, gif, bmp) or another file. The problem is that I'm using Uploadify to upload the files, which changes the mime type and gives a 'text/octal' or something as the mime type, no matter which file type you upload.</p>\n\n<p>Is there a way to check if the uploaded file is an image apart from checking the file extension using PHP?</p>\n", Id=1, Tags='php image-processing file-upload upload mime-types', Title='How to check if an uploaded file is an image without mime type?', oneTag='php', words=['p', 'i', 'd', 'like', 'to', 'check', 'if', 'an', 'uploaded', 'file', 'is', 'an', 'image', 'file', 'e', 'g', 'png', 'jpg', 'jpeg', 'gif', 'bmp', 'or', 'another', 'file', 'the', 'problem', 'is', 'that', 'i', 'm', 'using', 'uploadify', 'to', 'upload', 'the', 'files', 'which', 'changes', 'the', 'mime', 'type', 'and', 'gives', 'a', 'text', 'octal', 'or', 'something', 'as', 'the', 'mime', 'type', 'no', 'matter', 'which

### StringIndexer

In [61]:
indexer = StringIndexer(inputCol="oneTag", outputCol="label")
df = indexer.fit(df).transform(df)

In [62]:
df.head()

Row(Body="<p>I'd like to check if an uploaded file is an image file (e.g png, jpg, jpeg, gif, bmp) or another file. The problem is that I'm using Uploadify to upload the files, which changes the mime type and gives a 'text/octal' or something as the mime type, no matter which file type you upload.</p>\n\n<p>Is there a way to check if the uploaded file is an image apart from checking the file extension using PHP?</p>\n", Id=1, Tags='php image-processing file-upload upload mime-types', Title='How to check if an uploaded file is an image without mime type?', oneTag='php', words=['p', 'i', 'd', 'like', 'to', 'check', 'if', 'an', 'uploaded', 'file', 'is', 'an', 'image', 'file', 'e', 'g', 'png', 'jpg', 'jpeg', 'gif', 'bmp', 'or', 'another', 'file', 'the', 'problem', 'is', 'that', 'i', 'm', 'using', 'uploadify', 'to', 'upload', 'the', 'files', 'which', 'changes', 'the', 'mime', 'type', 'and', 'gives', 'a', 'text', 'octal', 'or', 'something', 'as', 'the', 'mime', 'type', 'no', 'matter', 'which

## Квиз

In [67]:
df.filter(F.col('id') == F.lit('1112')).show()

+--------------------+----+--------------------+--------------------+------+--------------------+----------+-------------+--------+--------------+--------------------+--------------------+--------------------+--------------------+-----+
|                Body|  Id|                Tags|               Title|oneTag|               words|BodyLength|NumParagraphs|NumLinks|   NumFeatures|   ScaledNumFeatures|  ScaledNumFeatures2|                  TF|               TFIDF|label|
+--------------------+----+--------------------+--------------------+------+--------------------+----------+-------------+--------+--------------+--------------------+--------------------+--------------------+--------------------+-----+
|<p>I submitted my...|1112|iphone app-store ...|iPhone app releas...|iphone|[p, i, submitted,...|        63|            1|       0|[63.0,1.0,0.0]|[0.99987404748359...|[0.32825169441613...|(1000,[0,1,2,3,8,...|(1000,[0,1,2,3,8,...|  7.0|
+--------------------+----+--------------------+----

In [70]:
df_quiz = df.withColumn('BodyTitle', F.concat(F.col('Body'), F.lit(" "), F.col('Title')))
regexTokenizer = RegexTokenizer(inputCol="BodyTitle", outputCol="words2", pattern="\\W")
df_quiz = regexTokenizer.transform(df_quiz)
df_quiz = df_quiz.withColumn("Length", body_length(df_quiz['words2']))

In [71]:
df_quiz.filter(F.col('id') == F.lit('5123')).select("Length").show()

+------+
|Length|
+------+
|   135|
+------+



Дальше на курсе еще есть пару примеров