<a href="https://colab.research.google.com/github/deepds/sparkcourse/blob/master/Spark0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!pip install pyspark



### 1. Основы Big Data

Что такое Big Data? Определение варьируется в зависимости от области применения. Это технология обработки больших массивов данных, которые нельзя обработать традиционными data processing software. 

Концепция 3 V:
1. Volume (объем данных)
2. Velocity (скорость передачи, обновления, накопления)
3. Variety (разные истоники и формат данных)

Термины:
1. Кластерные вычисления: объединение ресурсов нескольких машин
2. Параллельные вычисления: одновременные вычисления 
3. Распределенные вычисления: коллекция узлов (сетевых компьютеров), которые работают параллельно
4. Пакетная обработка: разбить работу на мелкие кусочки и запустить их на отдельных машинах
5. Обработка в реальном времени: real-time обработка данных

Системы обработки больших данных:
1. Hadoop / MapReduce: масштабируемая и отказоустойчивая среда написана на Java. Открытый исходный код. Пакетная обработка
2. Apache Spark: универсальная и быстрая кластерная вычислительная система. Открытый исходный код. Обработка данных как в пакетном режиме, так и в режиме реального времени

Преимущества Apache Spark:

1. Распределенные вычисления 
2. Эффективные in-memory вычисления для больших датасетов
3. Акцент на быстродействие
4. Поддержка Java, Scala, Python, R





Компоненты Apache Spark

![](https://drive.google.com/uc?export=view&id=1SQ1q2GnfR4LynFjSj6YGIBj6g5FXPMsR)

Локальный режим: один компьютер, например, ноутбук

Локальная модель удобна для тестирования, отладки и демонстрации

Режим кластера: набор предопределенных машин

Хорошо для продуктива

Работа: Локальная -> кластеры

**Нет необходимости менять код!!!**


Что такое Spark Shell?

Интерактивная среда для выполнения заданий Spark (джобов)

Полезно для быстрого интерактивного прототипирования

Spark Shell позволяют взаимодействовать с данными на диске или в памяти

Три разных корпуса Spark:
1. Spark-shell для Scala
2. PySpark-оболочка для Python
3. SparkR для R

### 1. Понимание SparkContext и Parallelize

SparkContext представляет точку входа в функциональность Spark. Это как ключ к машине.

In [0]:
from pyspark import SparkContext
sc = SparkContext("local", "First App")

In [0]:
# Print the version of SparkContext
print("The version of Spark Context in the PySpark shell is", sc.version)

# Print the Python version of SparkContext
print("The Python version of Spark Context in the PySpark shell is", sc.pythonVer)

# Print the master of SparkContext
print("The master of Spark Context in the PySpark shell is", sc.master)

The version of Spark Context in the PySpark shell is 2.4.4
The Python version of Spark Context in the PySpark shell is 3.6
The master of Spark Context in the PySpark shell is local


Для распараллеливания коллекций в программе Driver Spark предоставляет метод SparkContext.parallelize(). При применении метода распараллеливания в коллекции (с элементами) создается новый распределенный набор данных с указанным числом разделов, а элементы коллекции копируются в распределенный набор данных (RDD).

In [0]:
# Create a python list of numbers from 1 to 100 
numb = range(1, 100)

# Load the list into PySpark  
spark_data = sc.parallelize(numb)

# Load a local file into PySpark shell
lines = sc.textFile(file_path)

NameError: ignored

### 2. Lambda, map и filter методы

In [0]:
my_list = numb

# Print my_list in the console
print("Input list is", my_list)

# Square all numbers in my_list
squared_list_lambda = list(map(lambda x: x**2, my_list))

# Print the result of the map function
print("The squared numbers are", squared_list_lambda)

Input list is range(1, 100)
The squared numbers are [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801]


In [0]:
# Print my_list2 in the console
print("Input list is:", my_list2)

# Filter numbers divisible by 10
filtered_list = list(filter(lambda x: (x%10 == 0), my_list2))

# Print the numbers divisible by 10
print("Numbers divisible by 10 are:", filtered_list)

### 3. Работа с RDD

### RDD из распараллеленных коллекций

Resilient Distributed Dataset (RDD) - основная абстракция в Spark. Это неизменяемая распределенная коллекция объектов. Поскольку RDD является фундаментальным и базовым типом данных в Spark, важно, чтобы вы понимали, как его создать. В этом упражнении вы создадите свой первый RDD в PySpark из набора слов.

In [0]:
# Create an RDD from a list of words
RDD = sc.parallelize(["Spark", "is", "a", "framework", "for", "Big Data processing"])

# Print out the type of the created object
print("The type of RDD is", type(RDD))

The type of RDD is <class 'pyspark.rdd.RDD'>


### RDD из внешних наборов данных

PySpark может легко создавать RDD из файлов, которые хранятся на внешних устройствах хранения, таких как HDFS (распределенная файловая система Hadoop), корзины Amazon S3 и т. Д. Однако наиболее распространенный метод создания RDD - это файлы, хранящиеся в локальной файловой системе. Этот метод берет путь к файлу и читает его как набор строк. В этом упражнении вы создадите RDD из пути к файлу (file_path) с именем файла README.md

In [0]:
# Print the file_path
print("The file_path is", file_path)

# Create a fileRDD from file_path
fileRDD = sc.textFile(file_path)

# Check the type of fileRDD
print("The file type of fileRDD is", type(fileRDD))

### Партицирование ваших данных

Метод textFile() SparkContext принимает необязательный второй аргумент minPartitions для указания минимального количества партиций. В этом упражнении вы создадите RDD с именем fileRDD_part с 5 разделами, а затем сравните его с fileRDD, созданным в предыдущем упражнении.

In [0]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 97kB/s 
[?25hCollecting py4j==0.10.7 (from pyspark)
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 35.3MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130387 sha256=9fbec0fd84063078f46e31428c13270229cd8c5f295c2350ab7f6d2e690ab87c
  Stored in directory: /root/.cache/pip/wheels/ab/09/4d/0d184230058e654eb1b04467dbc1292f00eaa186544604b471
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pys

In [0]:
# Check the number of partitions in fileRDD
print("Number of partitions in fileRDD is", fileRDD.getNumPartitions()) 

# Create a fileRDD_part from file_path with 5 partitions
fileRDD_part = sc.textFile(file_path, minPartitions = 6)

# Check the number of partitions in fileRDD_part
print("Number of partitions in fileRDD_part is", fileRDD_part.getNumPartitions())

### Трансформации и действия

Все операции на RDD делятся на два вида - трансформации и действия. При трансформациях создается новый RDD, при действии - на выходе число. Все трансформации подвержены lazy вычислениям, осуществляются до первого действия. При операциях с RDD Spark создает вычислительный граф. Это делает Spark высокоэффективным. Если цепочка трансформаций передается первой трансформации, то логично не вычислять всю цепочку, а использовать первый результат.

Трансформации: map, filter, flatMap, union

Действия: collect, take, first, count


In [0]:
# Create map() transformation to cube numbers
cubedRDD = numbRDD.map(lambda x: ____)

# Collect the results
numbers_all = cubedRDD.____()

# Print the numbers from numbers_all
for numb in ____:
	print(____)

In [0]:
# Filter the fileRDD to select lines with Spark keyword
fileRDD_filter = fileRDD.filter(lambda line: 'Spark' in ____) # in line

# How many lines are there in fileRDD?
print("The total number of lines with the keyword Spark is", fileRDD_filter.____())

# Print the first four lines of fileRDD
for line in fileRDD_filter.____(____): 
  print(line)

### Pair RDD

Реальные датасеты обычно составляют пары ключ-значение. Два способа создания Pair RDD: из кортежа и из regular RDD

In [0]:
my_tuple = [('Sam', 23), ('Mary', 34), ('Peter', 25)]
pairRDD_tuple = sc.parallelize(my_tuple)

In [0]:
my_list = ['Sam 23', 'Mary 24', 'Peter 25']
regularRDD = sc.parallelize(my_list)
pairRDD_RDD = regularRDD.map(lambda s: (s.split(" ")[0], s.split(" ")[1])) 

Функции, применимые к pairRDD 

reduceByKey() # combine values with the same key

groupByKey() # group values with the same key

sortByKey() # return rdd sorted by the key

join() # join two pair rdd by key

In [0]:
regular_RDD = sc.parallelize([("Messi", 23), ("Neymar", 24), ("Ronaldo", 34), ("Messi", 24)])
pair_RDD_reducedbykey = regularRDD.reduceByKey(lambda x,y: x+y)
pair_RDD_reducedbykey.collect()

In [0]:
pair_RDD_reducedbykey_rev = pairRDD_reducedbykey.map(lambda x: (x[1], x[0]))
pair_RDD_reducedbykey_rev.sortByKey(ascending=False).collect()

In [0]:
airports = [("UK", "JFK"), ("US", "LHR"), ("UK", "UIY"), ("US", "KUJ")]
regular_RDD = sc.parallelize(airports)
pairRDD_group = regular_RDD.groupByKey().collect()

for cont, air in pairRDD_group:
  print(cont, list(air))

In [0]:
RDD1 = sc.parallelize([("Messi", 45), ("Ronaldo", 23), ("Gerrard", 34), ("Zidane", 100)])
RDD2 = sc.parallelize([("Messi", 32), ("Coman", 3), ('Ronaldo', 99)]
RDD1.join(RDD2).collect()                      

In [0]:
# Create PairRDD Rdd with key value pairs
Rdd = sc.parallelize([____])

# Apply reduceByKey() operation on Rdd
Rdd_Reduced = Rdd.reduceByKey(lambda x, y: ____)

# Iterate over the result and print the output
for num in Rdd_Reduced.____: 
  print("Key {} has {} Counts".format(____, num[1]))

In [0]:
# Sort the reduced RDD with the key by descending order
Rdd_Reduced_Sort = Rdd_Reduced.____(ascending=False)

# Iterate over the result and print the output
for num in Rdd_Reduced_Sort.____():
  print("Key {} has {} Counts".format(____, num[1]))

### Продвинутые операции с RDD

Reduce() action

saveOfTextFile() # save rdd to a text file, separate file for each partition

coalesce() # compose all partitions to a single RDD 

collectAsMap() # return key-value pairs in the RDD as dictionary

In [0]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 1.3MB/s 
[?25hCollecting py4j==0.10.7 (from pyspark)
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 50.0MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130387 sha256=d281cf2bee253929c691aad2d79c32bd047ec40337fdf304c8320109114a2c9a
  Stored in directory: /root/.cache/pip/wheels/ab/09/4d/0d184230058e654eb1b04467dbc1292f00eaa186544604b471
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 py

In [0]:
import pyspark
pyspark.__version__

'2.4.4'

In [0]:
from pyspark import SparkContext
sc = SparkContext("local","First App")

In [0]:
x = [1,3,4,6]

RDD = sc.parallelize(x)

RDD.reduce(lambda x,y: x+y)

RDD.saveAsTextFile("tempFile")

RDD.coalesce(1).saveAsTextFile("tempFile") # save rdd as one text file

In [0]:
countByKey() # avalilable for type (k,v), action counts the number of elements for each key

In [0]:
rdd = sc.parallelize([("a",1),("b",1),("a",1)])
for key, val in rdd.countByKey().items():
  print(key,val)

In [0]:
sc.parallelize([(1,2),(3,4)]).collectAsMap()

Задание. Удалите стоп-слова и уменьшите набор данных

После разбиения строк в файле на длинный список слов с помощью преобразования flatMap () на следующем шаге вы удалите стоп слова из ваших данных. 
Стоп-слова являются общими словами, которые часто неинтересны. Например, «I», «the», «a» и т. Д. Являются стоп-словами. 
Вы можете удалить много очевидных стоп-слов со своим собственным списком. Но для этого упражнения вы просто удалите стоп-слова из списка курируемых слов stop_words, предоставленных вам в вашей среде.
После удаления стоп-слов вы затем создадите пару RDD, где каждый элемент является парным кортежем (k, v), где k - это ключ, а v - это значение. В этом примере pair RDD состоит из (w, 1), где w - для каждого слова в RDD, 
а 1 - число. Наконец, вы скомбинируете значения с одним и тем же ключом из пары RDD, используя операцию reduByKey ()
Помните, что в вашей рабочей области уже есть SparkContext sc и splitRDD.

In [0]:
# Convert the words in lower case and remove stop words from stop_words
splitRDD_no_stop = splitRDD.filter(lambda x: x.lower() not in x)

# Create a tuple of the word and 1 
splitRDD_no_stop_words = splitRDD_no_stop.map(lambda w: (w, 1))

# Count of the number of occurences of each word
resultRDD = splitRDD_no_stop_words.reduceByKey(lambda x, y: x + y)

In [0]:
# Display the first 10 words and their frequencies
for word in resultRDD.take(10):
	print(word)

# Swap the keys and values 
resultRDD_swap = resultRDD.map(lambda x: (x[1], x[0]))

# Sort the keys in descending order
resultRDD_swap_sort = resultRDD_swap.sortByKey(ascending=False)

# Show the top 10 most frequent words and their frequencies
for word in resultRDD_swap_sort.take(10):
	print("{} has {} counts". format(word[1], word[0]))

### Dataframes

RDD в DataFrame

Подобно RDD, DataFrames являются неизменяемыми и распределенными структурами данных в Spark. Несмотря на то, что RDD являются фундаментальной структурой данных в Spark, работа с данными в DataFrame в большинстве случаев проще, чем RDD, и поэтому необходимо понимание того, как преобразовать RDD в DataFrame.

В этом упражнении вы сначала создаете RDD, используя sample_list, который содержит список кортежей («Mona», 20), («Jennifer», 34), («John», 20), («Jim», 26 ) с каждым кортежем указывается имя человека и его возраст. Затем создайте DataFrame с использованием RDD и схемы (которая является списком «Name» и «Age») и, наконец,  выведите как PySpark DataFrame.

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()

In [0]:
# Create a list of tuples
sample_list = [('Mona',20), ('Jennifer', 34), ('John', 20), ('Jim', 26)]

# Create a RDD from the list
rdd = sc.parallelize(sample_list)

# Create a PySpark DataFrame
names_df = spark.createDataFrame(rdd, schema=['Name', 'Age'])


In [0]:
names_df.take(1)

[Row(Name='Mona', Age=20)]

In [0]:
# Create an DataFrame from file_path
people_df = spark.read.csv(file_path, header=True, inferSchema=True)

# Check the type of people_df
print("The type of people_df is", type(people_df))

### Transformations и Actions 

select(), filter(), groupby(), orderby(), dropDuplicates(), withColumnRenamed()

printSchema(), head(), show(), count(), columns(), describe()

In [0]:
# Print the first 10 observations 
people_df.____(10)

# Count the number of rows 
print("There are {} rows in the people_df DataFrame.".format(people_df.____()))

# Count the number of columns and their names
print("There are {} columns in the people_df DataFrame and their names are {}".format(len(people_df.____), people_df.____))

In [0]:
# Select name, sex and date of birth columns
people_df_sub = people_df.____('name', ____, ____)

# Print the first 10 observations from people_df_sub
people_df_sub.____(____)

# Remove duplicate entries from people_df_sub
people_df_sub_nodup = people_df_sub.____()

# Count the number of rows
print("There were {} rows before removing duplicates, and {} rows after removing duplicates".format(people_df_sub.____(), people_df_sub_nodup.____()))

In [0]:
# Filter people_df to select females 
people_df_female = people_df.____(people_df.____ == "female")

# Filter people_df to select males
people_df_male = people_df.____(____ == "____")

# Count the number of rows 
print("There are {} rows in the people_df_female DataFrame and {} rows in the people_df_male DataFrame".format(people_df_female.____(), people_df_male.____()))

### Исполнение SQL запросов

SparkSession поддерживает выполнение SQL запросов

sql() метод принимает SQL запрос и возвращается датафрейм

createOrReplaceTempView() метод создает временную табличку из датафрейма

In [0]:
df.createOrReplaceTempView("table1")

df2 = spark.sql("SELECT field1, field2 from table1")

df2.collect

In [0]:
# Create a temporary table "people"
people_df.createOrReplaceTempView("people")

# Construct a query to select the names of the people from the temporary table "people"
query = '''SELECT name FROM people'''

# Assign the result of Spark's query to people_df_names
people_df_names = spark.sql(query)

In [0]:
# Filter the people table to select female sex 
people_female_df = spark.sql('SELECT * FROM people WHERE sex=="female"')

# Filter the people table DataFrame to select male sex
people_male_df = spark.sql('SELECT * FROM people WHERE sex=="male"')

# Count the number of rows in both DataFrames
print("There are {} rows in the people_female_df and {} rows in the people_male_df DataFrames".format(people_female_df.count(), people_male_df.count()))

### Визуализация датафреймов

Matplotlib, seaborn, bokeh

1. Pyspark_dist_explore библиотека поддерживает три функции hist(), distplot(), pandas_histogram()
2. Использование функций pandas (в основе matplotlib и seaborn) для отрисовки датафреймов (pandas dataframes - находятся в in-memory, значит, что обработка ограниченна мощностями одного сервера), Pyspark dataframes обрабатываются распределенно и lazy. 
Датафреймы Pandas - лишены этих преимуществ. toPandas() метод.

3.HandySpark библиотека для визуализации датафреймов

In [0]:
# Check the column names of names_df
print("The column names of names_df are", names_df.____)

# Convert to Pandas DataFrame  
df_pandas = names_df.____()

# Create a horizontal bar plot
____.plot(kind='barh', x='____', y='____', colormap='winter_r')
plt.show()

In [0]:
# Load the Dataframe
fifa_df = spark.____(____, header=True, inferSchema=True)

# Check the schema of columns
fifa_df.____()

# Show the first 10 observations
fifa_df.____(____)

# Print the total number of rows
print("There are {} rows in the fifa_df DataFrame".format(fifa_df.____()))

### PySpark MLlib

In [0]:
# Import the library for ALS
from pyspark.mllib.recommendation import ALS

# Import the library for Logistic Regression
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

# Import the library for Kmeans
from pyspark.mllib.clustering import KMeans

### Коллаборативная фильтрация

Use-user collaborative filtering - поиск пользователей наболее близких к целевому пользователю

Item-item colloborative filtering - поиск товаров, которые наиболее близки к целевому товару

(user, product, rating) - структура

In [0]:
from pyspark.mllib.recommendation import Rating, ALS

r = Rating(user = 1, product = 2, rating = 5.0)

In [0]:
data = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
train, test = data.randomSplit([0.6, 0.4])

train.collect
test.collect

<bound method RDD.collect of PythonRDD[25] at RDD at PythonRDD.scala:53>

In [0]:
r1 = Rating(1,1,1.0)
r2 = Rating(1,2,5.0)
r3 = Rating(2,1,2.0)
ratings = sc.parallelize([r1, r2, r3])
ratings.collect

<bound method RDD.collect of ParallelCollectionRDD[26] at parallelize at PythonRDD.scala:195>

In [0]:
model = ALS.train(ratings, rank=10, iterations=10)

unrated_RDD = sc.parallelize([(1,2),(1,1)])

In [0]:
predictions = model.predictAll(unrated_RDD)
predictions.collect()

Загрузка набора данных кино в RDD

Коллаборативная фильтрация - это метод для рекомендательных систем, в котором оценки пользователей и взаимодействия с различными продуктами используются для рекомендации новых. С появлением машинного обучения и параллельной обработки данных рекомендательные системы стали широко популярными в последние годы и используются во многих областях, включая фильмы, музыку, новости, книги, исследовательские статьи, поисковые запросы, социальные теги. В этом упражнении, состоящем из трех частей, ваша цель - разработать простую систему рекомендаций к фильмам с использованием PySpark MLlib с использованием подмножества набора данных MovieLens 100k.

В первой части вы сначала загрузите данные MovieLens (ratings.csv) в RDD и из каждой строки в RDD, отформатированной как userId, movieId, rating, timestamp, вам нужно будет сопоставить данные MovieLens с Рейтинговый объект (userID, productID, rating) после удаления столбца меток времени, и, наконец, вы разделите RDD на обучающие и тестовые RDD.

Помните, у вас есть SparkContext sc в вашей рабочей области. Также переменная file_path (которая является путем к файлу ratings.csv) и класс ALS уже доступны в вашей рабочей области.

In [0]:
# Load the data into RDD
data = sc.textFile(file_path)

# Split the RDD 
ratings = data.map(lambda l: l.split(','))

# Transform the ratings RDD 
ratings_final = ratings.map(lambda line: Rating(int(line[0]), int(line[1]), float(line[2])))

# Split the data into training and test
training_data, test_data = ratings_final.randomSplit([0.8, 0.2])

In [0]:
# Create the ALS model on the training data
model = ALS.train(training_data, rank=10, iterations=10)

# Drop the ratings column 
testdata_no_rating = test_data.map(lambda p: (p[0], p[1]))

# Predict the model  
predictions = model.predictAll(testdata_no_rating)

# Print the first rows of the RDD
predictions.take(2)

Оценка модели с использованием MSE

После создания прогнозируемых оценок на основе тестовых данных с использованием модели ALS в этой заключительной части упражнения вы подготовите данные для расчета среднеквадратической ошибки (MSE) модели. MSE - это среднее значение (исходный рейтинг - прогнозируемый рейтинг) ^ 2 для всех пользователей и указывает абсолютное соответствие модели данным. Чтобы сделать это, вы сначала организуете и рейтинговые и прогнозные RDD, чтобы сделать кортеж ((пользователь, продукт), рейтинг)), затем присоединитесь к рейтинговому RDD с прогнозным RDD и, наконец, примените функцию разности в квадрате вместе со средним (), чтобы получить MSE.

In [0]:
# Prepare ratings data
rates = ratings_final.map(lambda r: ((r[0], r[1]), r[2]))

# Prepare predictions data
preds = predictions.map(lambda r: ((r[0], r[1]), r[2]))

# Join the ratings data with predictions data
rates_and_preds = rates.join(preds)

# Calculate and print MSE
MSE = rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error of the model for the test data = {:.2f}".format(MSE))

### Классификация

Два типа структур данных в MLlib:

1. Vectors 
2. LabeledPoint

Существует два типа векторов:

1. Dense vectors # store all entries in array as float numbers
2. Sparse vectors # store only non zero values and their indices!!!

LabeledPoint - враппер для входящих переменных и целевой функции для дальнейшего использования в моделях



In [0]:
from pyspark.mllib.linalg import Vectors

dense_vec = Vectors.dense([1.0, 2.0, 3.0])
sparse_vec = Vectors.sparse(4, {1: 2.0, 3: 5.0}) # 4 is the dimension of vector, 1 и 3 indices of non-zero elements

In [0]:
sparse_vec

SparseVector(4, {1: 2.0, 3: 5.0})

In [0]:
from pyspark.mllib.regression import LabeledPoint

positive = LabeledPoint(1, [1.0, 2.4, 5.0])
negative = LabeledPoint(0, [0.0, -1.0, 3.0])

print(positive)
print(negative)

(1.0,[1.0,2.4,5.0])
(0.0,[0.0,-1.0,3.0])


In [0]:
from pyspark.mllib.feature import HashingTF #as countvectorizer

sentence = "hello world to all"

words = sentence.split()

tf = HashingTF(10000) # countvectorizer version (but there are differencies, see stackoverflow)

tf.transform(words)

SparseVector(10000, {2342: 1.0, 3959: 1.0, 4673: 1.0, 9357: 1.0})

In [0]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

data = [LabeledPoint(0, [0.0, 1.0]), LabeledPoint(1, [1.0, 0.0])]

rdd = sc.parallelize(data)

model = LogisticRegressionWithLBFGS.train(rdd)

Py4JJavaError: ignored

In [0]:
# Load the datasets into RDDs
spam_rdd = sc.textFile(file_path_spam)
non_spam_rdd = sc.textFile(file_path_non_spam)

# Split the email messages into words
spam_words = spam_rdd.flatMap(lambda email: email.split(' '))
non_spam_words = non_spam_rdd.flatMap(lambda email: email.split(' '))

# Print the first element in the split RDD
print("The first element in spam_words is", spam_words.first())
print("The first element in non_spam_words is",non_spam_words.first())

In [0]:
# Create a HashingTf instance with 200 features
tf = HashingTF(numFeatures=200)

# Map each word to one feature
spam_features = tf.transform(spam_words)
non_spam_features = tf.transform(non_spam_words)

# Label the features: 1 for spam, 0 for non-spam
spam_samples = spam_features.map(lambda features:LabeledPoint(1, features))
non_spam_samples = non_spam_features.map(lambda features:LabeledPoint(0, features))

# Combine the two datasets
samples = spam_samples.join(non_spam_samples)

In [0]:
# Split the data into training and testing
train_samples,test_samples = samples.randomSplit([0.8, 0.2])

# Train the model
model = LogisticRegressionWithLBFGS.train(train_samples)

# Create a prediction label from the test data
predictions = model.predict(test_samples.map(lambda x: x.features))

# Combine original labels with the predicted labels
labels_and_preds = test_samples.map(lambda x: x.label).zip(predictions)

# Check the accuracy of the model on the test data
accuracy = labels_and_preds.filter(lambda x: x[1] == x[1]).count() / float(test_samples.count())
print("Model accuracy : {:.2f}".format(accuracy))

### Кластеризация

In [0]:
# Load the dataset into a RDD
clusterRDD = sc.textFile(file_path)

# Split the RDD based on tab
rdd_split = clusterRDD.map(lambda x: x.split("\t"))

# Transform the split RDD by creating a list of integers
rdd_split_int = rdd_split.map(lambda x: [int(x[0]), int(x[1])])

# Count the number of rows in RDD 
print("There are {} rows in the rdd_split_int dataset".format(rdd_split_int.count()))

In [0]:
# Train the model with clusters from 13 to 16 and compute WSSSE 
for clst in range(13, 17):
    model = KMeans.train(rdd_split_int, clst, seed=1)
    WSSSE = rdd_split_int.map(lambda point: error(point)).reduce(lambda x, y: x + y)
    print("The cluster {} has Within Set Sum of Squared Error {}".format(clst, WSSSE))

# Train the model again with the best k 
model = KMeans.train(rdd_split_int, k=15, seed=1)

# Get cluster centers
cluster_centers = model.clusterCenters

In [0]:
# Convert rdd_split_int RDD into Spark DataFrame
rdd_split_int_df = spark.createDataFrame(rdd_split_int, schema=["col1", "col2"])

# Convert Spark DataFrame into Pandas DataFrame
rdd_split_int_df_pandas = rdd_split_int_df.toPandas()

# Convert "cluster_centers" that you generated earlier into Pandas DataFrame
cluster_centers_pandas = pd.DataFrame(cluster_centers, columns=["col1", "col2"])

# Create an overlaid scatter plot
plt.scatter(rdd_split_int_df_pandas["col1"], rdd_split_int_df_pandas["col2"])
plt.scatter(cluster_centers_pandas["col1"], cluster_centers_pandas["col2"], color="red", marker="x")
plt.show()