# Running Pyspark

In [1]:
#Extracting Spark files
#Installing FindSpark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
#setting up home enviroment
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [3]:
!ls

sample_data  spark-3.1.1-bin-hadoop3.2	spark-3.1.1-bin-hadoop3.2.tgz


In [4]:
#Creating Spark session
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [5]:
#Stoping Spark session
spark.stop()

# PySpark Shell

In [1]:
#Installing Pyspark
!pip install pyspark
import pyspark



In [2]:
#Creating a Spark Context
from pyspark import SparkConf
from pyspark import SparkContext
conf = SparkConf()
conf.setMaster('local')
conf.setAppName('spark-basic')
sc = SparkContext(conf=conf)

In [3]:
#Spark задача массив 1:3
#Creating an RDD
nums = sc.parallelize([100, 1000, 10000])
result = nums.flatMap(lambda x: range(1, x)).take(3)
print(result)

[1, 2, 3]


In [None]:
#Spark задача общее количество слов
# Создаем RDD из строки с текстом
text = sc.parallelize(["Veni, vidi, vici.", "Alea jacta est.", "Gutta cavat lapidem."])

# Разбиваем строки на слова и создаем пары (слово, 1)
word_counts = text.flatMap(lambda line: line.split(" ")) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(lambda a, b: a + b)

# Подсчитываем общее количество слов
total_word_count = word_counts.count()

print(f"Общее количество слов: {total_word_count}")

Общее количество слов: 9


In [None]:
#Spark задача максимальное количество слов в строке
# Создаем RDD из переданных строк с текстом
text = sc.parallelize([
    "Carpe diem",
    "Homo homini lupus est",
    "Ceterum censeo carthaginem esse delendam",
    "Si vis pacem, para bellum",
    "Cogito, ergo sum",
    "Quod licet Jovi, non licet bovi"
])

# Разбиваем строки на слова и подсчитываем количество слов в каждой строке
word_counts = text.map(lambda line: len(line.split(" ")))

# Находим максимальное количество слов в строке
max_word_count = word_counts.reduce(lambda a, b: max(a, b))

print(f"Максимальное количество слов в строке: {max_word_count}")

Максимальное количество слов в строке: 6


In [None]:
#Spark задача подсчет суммы массива с прибавлением значения

# Создаем broadcast переменную
a = sc.broadcast(1)

# Создаем аккумулятор
b = sc.accumulator(1)

# Создаем RDD из массива [1, 2, 3]
rdd = sc.parallelize([1, 2, 3])

# Применяем функцию к каждому элементу RDD
def add_with_broadcast(x):
    global b
    b += x + a.value

rdd.foreach(add_with_broadcast)

# Получаем значение аккумулятора
result = b.value
print(f"Результат: {result}")

Результат: 10


In [7]:
#spark задача результатом которой возведение в 2 исходного массива,разветка в диапазоне и фильтрация значений по делимости на 3
# Исходный массив
nums = sc.parallelize([1, 3])

# Вычисление результата
result = (nums
    .map(lambda x: x * x)  # Возведение в квадрат
    .flatMap(lambda x: range(1, x))  # Развертка в диапазоны
    .filter(lambda x: x % 3 == 0)  # Фильтрация по делимости на 3
)

# Вывод результата
print(result.collect())  # Вывод массива в виде списка

[3, 6]


# Exploring the Dataset

In [None]:
df = spark.read.csv('/content/Mall_Customers.csv', header=True)
df.show(5)

+----------+------+---+------------------+----------------------+
|CustomerID|Gender|Age|Annual Income (k$)|Spending Score (1-100)|
+----------+------+---+------------------+----------------------+
|         1|  Male| 19|                15|                    39|
|         2|  Male| 21|                15|                    81|
|         3|Female| 20|                16|                     6|
|         4|Female| 23|                16|                    77|
|         5|Female| 31|                17|                    40|
+----------+------+---+------------------+----------------------+
only showing top 5 rows



In [None]:
df.count()

200

In [None]:
#drop Nan
df = df.na.drop()

In [None]:
df.printSchema()

root
 |-- CustomerID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Annual Income (k$): string (nullable = true)
 |-- Spending Score (1-100): string (nullable = true)



*we can't do manipulation with type column string.Change to integer*

In [None]:
import pyspark
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
from pyspark.sql.functions import *

In [None]:
df = df.withColumn("CustomerID",col("CustomerID").cast(IntegerType()))\
.withColumn("Age",col("Age").cast(IntegerType()))\
.withColumn("Annual Income (k$)",col("Annual Income (k$)").cast(IntegerType()))\
.withColumn("Spending Score (1-100)",col("Spending Score (1-100)").cast(IntegerType()))

In [None]:
df.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Annual Income (k$): integer (nullable = true)
 |-- Spending Score (1-100): integer (nullable = true)



In [None]:
df.show(5)

+----------+------+---+------------------+----------------------+
|CustomerID|Gender|Age|Annual Income (k$)|Spending Score (1-100)|
+----------+------+---+------------------+----------------------+
|         1|  Male| 19|                15|                    39|
|         2|  Male| 21|                15|                    81|
|         3|Female| 20|                16|                     6|
|         4|Female| 23|                16|                    77|
|         5|Female| 31|                17|                    40|
+----------+------+---+------------------+----------------------+
only showing top 5 rows



*Also change name columns for simply use pyspsark sql*

In [None]:
#Renaming a column in PySpark
df = df.withColumnRenamed('Annual Income (k$)', 'Income') \
       .withColumnRenamed('Spending Score (1-100)', 'Score')

df.show(truncate=False)

+----------+------+---+------+-----+
|CustomerID|Gender|Age|Income|Score|
+----------+------+---+------+-----+
|1         |Male  |19 |15    |39   |
|2         |Male  |21 |15    |81   |
|3         |Female|20 |16    |6    |
|4         |Female|23 |16    |77   |
|5         |Female|31 |17    |40   |
|6         |Female|22 |17    |76   |
|7         |Female|35 |18    |6    |
|8         |Female|23 |18    |94   |
|9         |Male  |64 |19    |3    |
|10        |Female|30 |19    |72   |
|11        |Male  |67 |19    |14   |
|12        |Female|35 |19    |99   |
|13        |Female|58 |20    |15   |
|14        |Female|24 |20    |77   |
|15        |Male  |37 |20    |13   |
|16        |Male  |22 |20    |79   |
|17        |Female|35 |21    |35   |
|18        |Male  |20 |21    |66   |
|19        |Male  |52 |23    |29   |
|20        |Female|35 |23    |98   |
+----------+------+---+------+-----+
only showing top 20 rows



In [None]:
#create a temporary view/table for working with sql
df.createOrReplaceTempView("Data")

In [None]:
spark.sql("select * from Data").show()

+----------+------+---+------+-----+
|CustomerID|Gender|Age|Income|Score|
+----------+------+---+------+-----+
|         1|  Male| 19|    15|   39|
|         2|  Male| 21|    15|   81|
|         3|Female| 20|    16|    6|
|         4|Female| 23|    16|   77|
|         5|Female| 31|    17|   40|
|         6|Female| 22|    17|   76|
|         7|Female| 35|    18|    6|
|         8|Female| 23|    18|   94|
|         9|  Male| 64|    19|    3|
|        10|Female| 30|    19|   72|
|        11|  Male| 67|    19|   14|
|        12|Female| 35|    19|   99|
|        13|Female| 58|    20|   15|
|        14|Female| 24|    20|   77|
|        15|  Male| 37|    20|   13|
|        16|  Male| 22|    20|   79|
|        17|Female| 35|    21|   35|
|        18|  Male| 20|    21|   66|
|        19|  Male| 52|    23|   29|
|        20|Female| 35|    23|   98|
+----------+------+---+------+-----+
only showing top 20 rows



In [None]:
#counting groups
spark.sql("select Gender,count(*) from Data group by 1").show()

+------+--------+
|Gender|count(1)|
+------+--------+
|Female|     112|
|  Male|      88|
+------+--------+



In [None]:
#counting metrics for groups
spark.sql("select Gender, avg(Age),avg(Income),avg(Score) from Data group by Gender ;").show()

+------+------------------+-----------------+------------------+
|Gender|          avg(Age)|      avg(Income)|        avg(Score)|
+------+------------------+-----------------+------------------+
|Female|38.098214285714285|            59.25|51.526785714285715|
|  Male| 39.80681818181818|62.22727272727273| 48.51136363636363|
+------+------------------+-----------------+------------------+



In [None]:
#top 5 custumers id by score
spark.sql("select CustomerID, Gender, Score from Data order by Score DESC LIMIT 5;").show()

+----------+------+-----+
|CustomerID|Gender|Score|
+----------+------+-----+
|        12|Female|   99|
|        20|Female|   98|
|       186|  Male|   97|
|       146|  Male|   97|
|       168|Female|   95|
+----------+------+-----+



In [None]:
#count by age groups
spark.sql("""
SELECT
    CASE
        WHEN Age BETWEEN 18 AND 25 THEN '18-25'
        WHEN Age BETWEEN 26 AND 35 THEN '26-35'
        ELSE '36+'
    END AS Group,
    count(*) AS Count
FROM Data
GROUP BY Group
""").show()

+-----+-----+
|Group|Count|
+-----+-----+
|18-25|   38|
|26-35|   60|
|  36+|  102|
+-----+-----+



In [None]:
#average income by age groups
spark.sql("""
SELECT
    CASE
        WHEN Age BETWEEN 18 AND 25 THEN '18-25'
        WHEN Age BETWEEN 26 AND 35 THEN '26-35'
        ELSE '36+'
    END AS Group,
    AVG(Income) AS Avg_Income
    FROM Data
GROUP BY Group
""").show()

+-----+-----------------+
|Group|       Avg_Income|
+-----+-----------------+
|18-25|45.68421052631579|
|26-35|            68.15|
|  36+|61.63725490196079|
+-----+-----------------+



*Summarry: As a result, we found out that more clients are women, women have less income but more score rating. Most customers over the age of 36 and they have less income than the group 26-35*

 # Linear Regression Model

*First of all, we need to change Gender to boolean data type for creating model*

In [None]:
df = df.withColumn("GenderCode", when(df.Gender == "Male", 0).otherwise(1))

In [None]:
df.show(5)

+----------+------+---+------+-----+----------+
|CustomerID|Gender|Age|Income|Score|GenderCode|
+----------+------+---+------+-----+----------+
|         1|  Male| 19|    15|   39|         0|
|         2|  Male| 21|    15|   81|         0|
|         3|Female| 20|    16|    6|         1|
|         4|Female| 23|    16|   77|         1|
|         5|Female| 31|    17|   40|         1|
+----------+------+---+------+-----+----------+
only showing top 5 rows



In [None]:
df.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Income: integer (nullable = true)
 |-- Score: integer (nullable = true)
 |-- GenderCode: integer (nullable = false)



In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression


In [None]:
#Input all the features in one vector column
assembler = VectorAssembler(inputCols=['Income','Age'], outputCol = 'Attributes')
output = assembler.transform(df)
#Input vs Output
finalized_data = output.select("Attributes","Score")
finalized_data.show()

+-----------+-----+
| Attributes|Score|
+-----------+-----+
|[15.0,19.0]|   39|
|[15.0,21.0]|   81|
|[16.0,20.0]|    6|
|[16.0,23.0]|   77|
|[17.0,31.0]|   40|
|[17.0,22.0]|   76|
|[18.0,35.0]|    6|
|[18.0,23.0]|   94|
|[19.0,64.0]|    3|
|[19.0,30.0]|   72|
|[19.0,67.0]|   14|
|[19.0,35.0]|   99|
|[20.0,58.0]|   15|
|[20.0,24.0]|   77|
|[20.0,37.0]|   13|
|[20.0,22.0]|   79|
|[21.0,35.0]|   35|
|[21.0,20.0]|   66|
|[23.0,52.0]|   29|
|[23.0,35.0]|   98|
+-----------+-----+
only showing top 20 rows



In [None]:
#Split training and testing data
train_data,test_data = finalized_data.randomSplit([0.7,0.3])
regressor = LinearRegression(featuresCol = 'Attributes', labelCol = 'Score')
#Learn to fit the model from training set
regressor = regressor.fit(train_data)
#To predict the prices on testing set
pred = regressor.evaluate(test_data)
#Predict the model
pred.predictions.show()

+-----------+-----+------------------+
| Attributes|Score|        prediction|
+-----------+-----+------------------+
|[15.0,21.0]|   81|   56.334241866265|
|[16.0,23.0]|   77| 55.35657960490568|
|[17.0,31.0]|   40|51.196417772759524|
|[20.0,58.0]|   15| 37.12468249092764|
|[21.0,35.0]|   35|  49.4074351078468|
|[23.0,52.0]|   29| 40.55669484842335|
|[24.0,25.0]|   73|54.961113845867075|
|[28.0,35.0]|   61| 49.98963161016751|
|[29.0,40.0]|   31| 47.42071956341477|
|[30.0,21.0]|   73| 57.58180579980938|
|[33.0,49.0]|   14| 42.97965392284635|
|[37.0,20.0]|   75| 58.69441889726123|
|[38.0,24.0]|   92| 56.65592344563963|
|[39.0,24.0]|   65| 56.73909437454259|
|[39.0,31.0]|   61|53.026178208624614|
|[43.0,47.0]|   41|44.872196402138215|
|[46.0,27.0]|   51| 55.73004109146988|
|[48.0,32.0]|   47|  53.2442999736201|
|[48.0,63.0]|   51|36.801385524554775|
|[48.0,68.0]|   48| 34.14930254889907|
+-----------+-----+------------------+
only showing top 20 rows



In [None]:
#coefficient of the regression model
coeff = regressor.coefficients
#X and Y intercept
intr = regressor.intercept
print ("The coefficient of the model is : %a" %coeff)
print ("The Intercept of the model is : %f" %intr)

The coefficient of the model is : DenseVector([0.0832, -0.5304])
The Intercept of the model is : 66.225426


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="Score", predictionCol="prediction", metricName="rmse")
# Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print("- Root Mean Square Error        RMSE: %6.3f" % rmse)
# Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("- Mean Square Error              MSE: %6.3f" % mse)
# Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("- Mean Absolute Error            MAE: %6.3f" % mae)
# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("- Coefficient of determination    r2: %6.3f" %r2)

- Root Mean Square Error        RMSE: 23.666
- Mean Square Error              MSE: 560.080
- Mean Absolute Error            MAE: 20.463
- Coefficient of determination    r2:  0.105


*Summarry: We can see the best model fit result. This model is of low quality and is not suitable for use. It is generally considered that a model with R2 > 70% can be considered for further work*