In [None]:
# This Project coded in Google Colab environment by #
# Builded with MongoDB #


# PROJECT MEMBERS #
# Abdullah Berke Özder 202011410 #
# Şima Kayısı 201811043 #
# Aybüke Gökmen 201811031 #

# FOR CENG 476 PROJECT #

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *

conf = pyspark.SparkConf().set("spark.jars.packages",
                               "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1").setMaster("local").setAppName("bigdata").setAll([("spark.driver.memory","40g"),("spark.executor.memory","50g")])

spark = SparkSession.builder.config(conf=conf).getOrCreate()

mongo_uri = "mongodb://localhost:27017/"
database_name = "bigdata"
collection_name = "bigdata"

# MongoDB verilerini çekme
df = spark.read.format("mongo").option("uri", mongo_uri + database_name + "." + collection_name).load()

# DataFrame'i kullanma
df.printSchema()
df.show()


In [5]:
df.columns

['Account_Manager',
 'Age',
 'Churn',
 'Company',
 'Location',
 'Names',
 'Num_Sites',
 'Onboard_date',
 'Total_Purchase',
 'Years',
 '_id']

In [6]:
from pyspark.ml.feature import VectorAssembler

In [7]:
assembler = VectorAssembler (inputCols= ['Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites'],
 outputCol = "features")

In [8]:
output = assembler.transform(df)

In [9]:
df_final = output.select("features", "churn")

In [10]:
df_final.show()

+--------------------+-----+
|            features|churn|
+--------------------+-----+
|[42.0,11066.8,0.0...|    1|
|[41.0,11916.22,0....|    1|
|[38.0,12884.75,0....|    1|
|[42.0,8010.76,0.0...|    1|
|[37.0,9191.58,0.0...|    1|
|[48.0,10356.02,0....|    1|
|[44.0,11331.58,1....|    1|
|[32.0,9885.12,1.0...|    1|
|[43.0,14062.6,1.0...|    1|
|[40.0,8066.94,1.0...|    1|
|[30.0,11575.37,1....|    1|
|[45.0,8771.02,1.0...|    1|
|[45.0,8988.67,1.0...|    1|
|[40.0,8283.32,1.0...|    1|
|[41.0,6569.87,1.0...|    1|
|[38.0,10494.82,1....|    1|
|[45.0,8213.41,1.0...|    1|
|[43.0,11226.88,0....|    1|
|[53.0,5515.09,0.0...|    1|
|[46.0,8046.4,1.0,...|    1|
+--------------------+-----+
only showing top 20 rows



In [11]:
train, test = df_final.randomSplit([0.7, 0.3], seed=42)

In [12]:
from pyspark.ml.classification import LogisticRegression

In [13]:
lr = LogisticRegression(labelCol="churn")

In [14]:
lrm = lr.fit(train)

In [15]:
lrm.summary

<pyspark.ml.classification.BinaryLogisticRegressionTrainingSummary at 0x21632b494b0>

In [16]:
lrm_summary = lrm.summary

In [17]:
lrm_summary.predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[22.0,11254.38,1....|  0.0|[4.55979963242690...|[0.98964420919358...|       0.0|
|[25.0,9672.03,0.0...|  0.0|[4.67536757133125...|[0.99076400091638...|       0.0|
|[26.0,8939.61,0.0...|  0.0|[6.28230440567531...|[0.99813439848473...|       0.0|
|[27.0,8628.8,1.0,...|  0.0|[5.32554251161008...|[0.99515784990742...|       0.0|
|[28.0,8670.98,0.0...|  0.0|[7.59026200904824...|[0.99949490661754...|       0.0|
|[28.0,11128.95,1....|  0.0|[4.09749022712613...|[0.98365720318515...|       0.0|
|[29.0,5900.78,1.0...|  0.0|[4.06733742325029...|[0.98316533957374...|       0.0|
|[29.0,8688.17,1.0...|  1.0|[2.71962101743711...|[0.93817455523842...|       0.0|
|[29.0,9378.24,0.0...|  0.0|[4.73007562142385...|[0.99125140974466...|       0.0|
|[29.0,12711.15,

In [18]:
lrm_summary.predictions.describe().show()

+-------+------------------+-------------------+
|summary|             churn|         prediction|
+-------+------------------+-------------------+
|  count|               667|                667|
|   mean|0.1634182908545727|0.12293853073463268|
| stddev|0.3700243606477147|0.32861306618408714|
|    min|               0.0|                0.0|
|    max|               1.0|                1.0|
+-------+------------------+-------------------+



In [19]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [20]:
pred_labels = lrm.evaluate(test)

In [21]:
pred_labels.predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[26.0,8787.39,1.0...|    1|[0.79106248132020...|[0.68805942038441...|       0.0|
|[28.0,9090.43,1.0...|    0|[1.61026688857555...|[0.83344843709585...|       0.0|
|[28.0,11204.23,0....|    0|[1.97148345642762...|[0.87777036176974...|       0.0|
|[28.0,11245.38,0....|    0|[3.75331012854098...|[0.97709682330471...|       0.0|
|[29.0,9617.59,0.0...|    0|[4.42202807822147...|[0.98813267415840...|       0.0|
|[29.0,10203.18,1....|    0|[3.71080419195967...|[0.97612605863733...|       0.0|
|[29.0,11274.46,1....|    0|[4.39058463869248...|[0.98775823667280...|       0.0|
|[30.0,6744.87,0.0...|    0|[3.55749267043437...|[0.97228008128416...|       0.0|
|[30.0,8403.78,1.0...|    0|[5.76304568829245...|[0.99686830940139...|       0.0|
|[30.0,8874.83,0

In [22]:
eval = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="churn")

In [23]:
auc = eval.evaluate(pred_labels.predictions)

In [24]:
print(auc)

0.7456808943089431


In [28]:
collection_name = "bigdata_test"

# MongoDB verilerini çekme
df_new_customer = spark.read.format("mongo").option("uri", mongo_uri + database_name + "." + collection_name).load()

# DataFrame'i kullanma
df_new_customer.printSchema()

root
 |-- Account_Manager: integer (nullable = true)
 |-- Age: double (nullable = true)
 |-- Company: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Names: string (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: string (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Years: double (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)



In [29]:
final_model = lr.fit(df_final)

In [30]:
customer_valid = assembler.transform(df_new_customer)

In [31]:
customer_valid.printSchema()

root
 |-- Account_Manager: integer (nullable = true)
 |-- Age: double (nullable = true)
 |-- Company: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Names: string (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: string (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Years: double (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- features: vector (nullable = true)



In [32]:
results = final_model.transform(customer_valid)

In [33]:
results.select("Company", "Prediction").show()

+----------------+----------+
|         Company|Prediction|
+----------------+----------+
|        King Ltd|       0.0|
|   Cannon-Benson|       1.0|
|Barron-Robertson|       1.0|
|   Sexton-Golden|       1.0|
|        Wood LLC|       0.0|
|   Parks-Robbins|       1.0|
+----------------+----------+

