# PySpark clustering

In [107]:
import matplotlib.pyplot as plt

from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, col, count, desc, countDistinct
from pyspark.sql.types import IntegerType

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

from src.customEncoder import CustomEncoder

ModuleNotFoundError: No module named 'src'

In [19]:
spark = SparkSession.builder.appName("session_new_1").getOrCreate()

In [20]:
data = spark.read.csv("archive/churn-bigml-80.csv", header=True, inferSchema=True)

# header=True - for reading names of columnes
# inferSchema=True - for correct reading columns types

### Practise

In [105]:
type(data)

pyspark.sql.dataframe.DataFrame

In [106]:
data.head(5)

[Row(State='KS', Account length=128, Area code=415, International plan='No', Voice mail plan='Yes', Number vmail messages=25, Total day minutes=265.1, Total day calls=110, Total day charge=45.07, Total eve minutes=197.4, Total eve calls=99, Total eve charge=16.78, Total night minutes=244.7, Total night calls=91, Total night charge=11.01, Total intl minutes=10.0, Total intl calls=3, Total intl charge=2.7, Customer service calls=1, Churn=False),
 Row(State='OH', Account length=107, Area code=415, International plan='No', Voice mail plan='Yes', Number vmail messages=26, Total day minutes=161.6, Total day calls=123, Total day charge=27.47, Total eve minutes=195.5, Total eve calls=103, Total eve charge=16.62, Total night minutes=254.4, Total night calls=103, Total night charge=11.45, Total intl minutes=13.7, Total intl calls=3, Total intl charge=3.7, Customer service calls=1, Churn=False),
 Row(State='NJ', Account length=137, Area code=415, International plan='No', Voice mail plan='No', Num

Selecting

In [23]:
data.select("State").show(3)

+-----+
|State|
+-----+
|   KS|
|   OH|
|   NJ|
+-----+
only showing top 3 rows



In [24]:
data.select(["State", "Area code"]).show(3)

+-----+---------+
|State|Area code|
+-----+---------+
|   KS|      415|
|   OH|      415|
|   NJ|      415|
+-----+---------+
only showing top 3 rows



Dtypes

In [25]:
print(data.dtypes)

[('State', 'string'), ('Account length', 'int'), ('Area code', 'int'), ('International plan', 'string'), ('Voice mail plan', 'string'), ('Number vmail messages', 'int'), ('Total day minutes', 'double'), ('Total day calls', 'int'), ('Total day charge', 'double'), ('Total eve minutes', 'double'), ('Total eve calls', 'int'), ('Total eve charge', 'double'), ('Total night minutes', 'double'), ('Total night calls', 'int'), ('Total night charge', 'double'), ('Total intl minutes', 'double'), ('Total intl calls', 'int'), ('Total intl charge', 'double'), ('Customer service calls', 'int'), ('Churn', 'boolean')]


In [26]:
data.describe().show()

+-------+-----+------------------+------------------+------------------+---------------+---------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+----------------------+
|summary|State|    Account length|         Area code|International plan|Voice mail plan|Number vmail messages| Total day minutes|   Total day calls|  Total day charge| Total eve minutes|   Total eve calls|  Total eve charge|Total night minutes| Total night calls|Total night charge|Total intl minutes|  Total intl calls| Total intl charge|Customer service calls|
+-------+-----+------------------+------------------+------------------+---------------+---------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+-----------

Add column

In [27]:
data = data.withColumn("New Area code", data["Area code"]*2)
data.show(5)

+-----+--------------+---------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+-----------------+----------------------+-----+-------------+
|State|Account length|Area code|International plan|Voice mail plan|Number vmail messages|Total day minutes|Total day calls|Total day charge|Total eve minutes|Total eve calls|Total eve charge|Total night minutes|Total night calls|Total night charge|Total intl minutes|Total intl calls|Total intl charge|Customer service calls|Churn|New Area code|
+-----+--------------+---------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+-----------------+------

Drop column

In [28]:
data = data.drop("New Area code")

Rename

In [29]:
data = data.withColumnRenamed("Area code", "Code of area")
data = data.withColumnRenamed("Code of area", "Area code")

Handling missing values

In [30]:
print(f"Количество нановых колонок:", data.count()-data.na.drop(how="all").count())

Количество нановых колонок: 0


In [31]:
print(f"Количество ячеек с пропусками пропусков:", data.count()-data.na.drop(how="any").count())

Количество ячеек с пропусками пропусков: 0


In [32]:
working_cols = ["Total day minutes", "Total day calls", "Total eve minutes"]
data.na.drop(how="any", thresh=2, subset=working_cols).show(5)

+-----+--------------+---------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+-----------------+----------------------+-----+
|State|Account length|Area code|International plan|Voice mail plan|Number vmail messages|Total day minutes|Total day calls|Total day charge|Total eve minutes|Total eve calls|Total eve charge|Total night minutes|Total night calls|Total night charge|Total intl minutes|Total intl calls|Total intl charge|Customer service calls|Churn|
+-----+--------------+---------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+-----------------+----------------------+-----+
|   

Filling missing values

In [33]:
data.na.fill(value="Missing value", subset="Area code").show(2)

+-----+--------------+---------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+-----------------+----------------------+-----+
|State|Account length|Area code|International plan|Voice mail plan|Number vmail messages|Total day minutes|Total day calls|Total day charge|Total eve minutes|Total eve calls|Total eve charge|Total night minutes|Total night calls|Total night charge|Total intl minutes|Total intl calls|Total intl charge|Customer service calls|Churn|
+-----+--------------+---------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+-----------------+----------------------+-----+
|   

In [34]:
from pyspark.ml.feature import Imputer 

col_subset = ["Total day minutes", "Number vmail messages"]
imputer = Imputer(
    inputCols=col_subset,
    outputCols=[f"{col_name}_imputed" for col_name in col_subset]
    ).setStrategy("mean")

In [35]:
imputer.fit(data).transform(data).show(5)

+-----+--------------+---------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+-----------------+----------------------+-----+-------------------------+-----------------------------+
|State|Account length|Area code|International plan|Voice mail plan|Number vmail messages|Total day minutes|Total day calls|Total day charge|Total eve minutes|Total eve calls|Total eve charge|Total night minutes|Total night calls|Total night charge|Total intl minutes|Total intl calls|Total intl charge|Customer service calls|Churn|Total day minutes_imputed|Number vmail messages_imputed|
+-----+--------------+---------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-------------

Filtering

In [36]:
data.filter(~((data['Total intl calls'] < 5) & (data['Total intl calls'] > 2))).select("Total intl calls").count()

1619

Aggregating and grouping


In [37]:
data.groupBy("State").avg("Total intl calls").cache().sort(desc("avg(Total intl calls)")).show(5) 

+-----+---------------------+
|State|avg(Total intl calls)|
+-----+---------------------+
|   AZ|    5.288888888888889|
|   ND|    5.136363636363637|
|   NH|                  5.0|
|   WY|    4.924242424242424|
|   MS|    4.895833333333333|
+-----+---------------------+
only showing top 5 rows



Ml

In [38]:
cols = ['Account length', 'Area code', 'Number vmail messages', 'Total day minutes', 'Total day calls', 'Total day charge', 'Total eve minutes', 'Total eve calls', 'Total eve charge', 'Total night minutes', 'Total night calls', 'Total night charge', 'Total intl minutes', 'Total intl calls', 'Total intl charge', 'Customer service calls']
featureassembler=VectorAssembler(inputCols=cols, outputCol="Input")

In [39]:
output=featureassembler.transform(data)

In [40]:
output.show(5)

+-----+--------------+---------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+-----------------+----------------------+-----+--------------------+
|State|Account length|Area code|International plan|Voice mail plan|Number vmail messages|Total day minutes|Total day calls|Total day charge|Total eve minutes|Total eve calls|Total eve charge|Total night minutes|Total night calls|Total night charge|Total intl minutes|Total intl calls|Total intl charge|Customer service calls|Churn|               Input|
+-----+--------------+---------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+----------

In [41]:
final_data=output.select(["Input", "Churn"])

In [42]:
final_data.show(5)

+--------------------+-----+
|               Input|Churn|
+--------------------+-----+
|[128.0,415.0,25.0...|false|
|[107.0,415.0,26.0...|false|
|[137.0,415.0,0.0,...|false|
|[84.0,408.0,0.0,2...|false|
|[75.0,415.0,0.0,1...|false|
+--------------------+-----+
only showing top 5 rows



In [43]:
final_data=final_data.withColumn("Churn_int", col("Churn").cast("int"))
# df = df.withColumn("boolean_column_int", col("boolean_column").cast("int"))

In [44]:
from pyspark.ml.regression import LinearRegression

train_data, test_data = final_data.randomSplit([0.75, 0.25])
regressor=LinearRegression(featuresCol="Input", labelCol="Churn_int")
regressor=regressor.fit(train_data)

In [45]:
regressor.coefficients

DenseVector([0.0002, 0.0001, -0.002, -0.218, 0.0004, 1.2897, 0.0747, -0.0002, -0.8729, -0.0997, 0.0002, 2.2217, 0.2251, -0.0106, -0.7843, 0.0535])

In [46]:
preds = regressor.evaluate(test_data)
preds.predictions.show(5)

+--------------------+-----+---------+--------------------+
|               Input|Churn|Churn_int|          prediction|
+--------------------+-----+---------+--------------------+
|[1.0,415.0,0.0,19...|false|        0| 0.17892560941437446|
|[1.0,415.0,26.0,1...|false|        0|0.008844205602314748|
|[7.0,415.0,0.0,20...|false|        0|  0.2772575897952739|
|[11.0,408.0,24.0,...|false|        0| 0.11043630323605053|
|[13.0,408.0,0.0,2...|false|        0| 0.26014738542659815|
+--------------------+-----+---------+--------------------+
only showing top 5 rows



In [47]:
preds.meanAbsoluteError

0.2298348514553787