## Spark Session ve Veriseti

In [2]:
from warnings import filterwarnings
filterwarnings(action='ignore')
import findspark
findspark.init('/Users/ibrahim/spark/spark-3.5.1-bin-hadoop3')

In [3]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

In [4]:
spark = (SparkSession.builder  
            .master("local") 
            .appName("Using ML Algorithms on Spark") 
            .getOrCreate()
)
sc = spark.sparkContext
sc
# configin normalde boş bırakılması sisteme uygun bir şekilde configlenir. istersek ram ayarları gibi ayarları yapabiliriz.

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/22 18:39:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
spark_df = spark.read.csv("./churn.csv", header=True, inferSchema=True, sep=",")
spark_df.cache()

DataFrame[_c0: int, Names: string, Age: double, Total_Purchase: double, Account_Manager: int, Years: double, Num_Sites: double, Churn: int]

In [6]:
spark_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Churn: integer (nullable = true)



In [7]:
spark_df.show(5)

+---+----------------+----+--------------+---------------+-----+---------+-----+
|_c0|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|
+---+----------------+----+--------------+---------------+-----+---------+-----+
|  0|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|    1|
|  1|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|    1|
|  2|     Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|    1|
|  3|   Phillip White|42.0|       8010.76|              0| 6.71|     10.0|    1|
|  4|  Cynthia Norton|37.0|       9191.58|              0| 5.56|      9.0|    1|
+---+----------------+----+--------------+---------------+-----+---------+-----+
only showing top 5 rows



24/05/22 18:39:33 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , Names, Age, Total_Purchase, Account_Manager, Years, Num_Sites, Churn
 Schema: _c0, Names, Age, Total_Purchase, Account_Manager, Years, Num_Sites, Churn
Expected: _c0 but found: 
CSV file: file:///Users/ibrahim/Desktop/workspace/Data%20Science%20Stats/Using-Machine-Learning-Algorithms-on-Big-Data/churn.csv


In [8]:
spark_df = spark_df.toDF(*[c.lower() for c in spark_df.columns])

In [9]:
spark_df.show(5)

+---+----------------+----+--------------+---------------+-----+---------+-----+
|_c0|           names| age|total_purchase|account_manager|years|num_sites|churn|
+---+----------------+----+--------------+---------------+-----+---------+-----+
|  0|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|    1|
|  1|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|    1|
|  2|     Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|    1|
|  3|   Phillip White|42.0|       8010.76|              0| 6.71|     10.0|    1|
|  4|  Cynthia Norton|37.0|       9191.58|              0| 5.56|      9.0|    1|
+---+----------------+----+--------------+---------------+-----+---------+-----+
only showing top 5 rows



## Bir sıkıntı çıkarmaması için sütun isimleri küçültüldü

In [10]:
spark_df = spark_df.withColumnRenamed("_c0","index")

In [11]:
spark_df.show(5)

+-----+----------------+----+--------------+---------------+-----+---------+-----+
|index|           names| age|total_purchase|account_manager|years|num_sites|churn|
+-----+----------------+----+--------------+---------------+-----+---------+-----+
|    0|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|    1|
|    1|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|    1|
|    2|     Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|    1|
|    3|   Phillip White|42.0|       8010.76|              0| 6.71|     10.0|    1|
|    4|  Cynthia Norton|37.0|       9191.58|              0| 5.56|      9.0|    1|
+-----+----------------+----+--------------+---------------+-----+---------+-----+
only showing top 5 rows



In [12]:
spark_df.count()

900

In [13]:
len(spark_df.columns)

8

In [14]:
spark_df.distinct().count()

900

In [15]:
spark_df.select("names").distinct().count()

899

In [16]:
spark_df.groupBy("names").count().sort("count",ascending = False).show()

+------------------+-----+
|             names|count|
+------------------+-----+
|     Jennifer Wood|    2|
|      Patrick Bell|    1|
|  Patrick Robinson|    1|
|     Chelsea Marsh|    1|
|       John Barber|    1|
|       Amber Evans|    1|
|     David Compton|    1|
| Mr. Jerome Dawson|    1|
|        Lisa Davis|    1|
|     Maria Stanley|    1|
|Alexandra Phillips|    1|
|     Nicholas Levy|    1|
|    Richard Farmer|    1|
|     Linda Hubbard|    1|
|    Jesse Mitchell|    1|
|    Brittany Green|    1|
|  Timothy Johnston|    1|
|   Charles Whitney|    1|
|    Tony Schneider|    1|
|  Stefanie Miranda|    1|
+------------------+-----+
only showing top 20 rows



## Aynı isimde olan Jennifer Wood ismi bir çoklama mı yoksa ayrı ayrı insanlar mı sorusunun cevabını alalım

In [17]:
spark_df.filter(spark_df.names == "Jennifer Wood").show(2)

+-----+-------------+----+--------------+---------------+-----+---------+-----+
|index|        names| age|total_purchase|account_manager|years|num_sites|churn|
+-----+-------------+----+--------------+---------------+-----+---------+-----+
|   22|Jennifer Wood|35.0|       9381.12|              1| 6.78|     11.0|    1|
|  439|Jennifer Wood|48.0|      11585.16|              0| 4.61|      9.0|    0|
+-----+-------------+----+--------------+---------------+-----+---------+-----+



## Çıktıdan da görüldüğü üzere ayrı ayrı aynı isime sahip insanlarmış.

In [18]:
spark_df.select("names").dropDuplicates().groupby("names").count().sort("count", ascending = False).show(5)

+----------------+-----+
|           names|count|
+----------------+-----+
|    Patrick Bell|    1|
|Patrick Robinson|    1|
|   Chelsea Marsh|    1|
|     John Barber|    1|
|     Amber Evans|    1|
+----------------+-----+
only showing top 5 rows



## Yukarıda names sütunu bazında tekrar eden verileri sildik bunları grupladık ve isimlerin ne kadar tekrar ettiğini count ile yanına yazdırdık bunun yanında count'a göre sıraladık.

In [19]:
spark_df.where(spark_df.index == 439).select("names").show()

+-------------+
|        names|
+-------------+
|Jennifer Wood|
+-------------+



In [20]:
spark_df.where(spark_df.index == 439).show()

+-----+-------------+----+--------------+---------------+-----+---------+-----+
|index|        names| age|total_purchase|account_manager|years|num_sites|churn|
+-----+-------------+----+--------------+---------------+-----+---------+-----+
|  439|Jennifer Wood|48.0|      11585.16|              0| 4.61|      9.0|    0|
+-----+-------------+----+--------------+---------------+-----+---------+-----+



## Sorgunun çıktısını bir girdi olarak kullanabilmek adına collect() fonksiyonunu kullanabiliriz

In [21]:
jen = spark_df.where(spark_df.index == 439).collect()[0]["names"]

In [22]:
jen

'Jennifer Wood'

In [23]:
type(jen)

str

## Keşifçi Veri Analizi

In [25]:
spark_df.describe().show()

24/05/22 18:40:11 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+------------------+-------------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+
|summary|             index|        names|              age|   total_purchase|   account_manager|            years|         num_sites|              churn|
+-------+------------------+-------------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+
|  count|               900|          900|              900|              900|               900|              900|               900|                900|
|   mean|             449.5|         NULL|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|0.16666666666666666|
| stddev|259.95191863111916|         NULL|6.127560416916251|2408.644531858096|0.4999208935073339|1.274449013194616|1.7648355920350969| 0.3728852122772358|
|    min|                 0|   Aaron King|             22.0|          

In [26]:
spark_df.select("age","total_purchase", "account_manager", "years","num_sites","churn").describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
age,900,41.81666666666667,6.127560416916251,22.0,65.0
total_purchase,900,10062.82403333334,2408.644531858096,100.0,18026.01
account_manager,900,0.4811111111111111,0.4999208935073339,0,1
years,900,5.27315555555555,1.274449013194616,1.0,9.15
num_sites,900,8.587777777777777,1.7648355920350969,3.0,14.0
churn,900,0.16666666666666666,0.3728852122772358,0,1


In [27]:
spark_df.filter(spark_df.age > 47).count()

161

In [28]:
spark_df.groupby("churn").count().show()

+-----+-----+
|churn|count|
+-----+-----+
|    1|  150|
|    0|  750|
+-----+-----+



## Makine öğrenim modeli tek bir fonksiyondan ibarettir esas üzerinde durmamız gereken konu çaprazlamalardır.

## Aşağıdaki çaprazlama incelendiğinde churn olup olmama durumunun satın alma ile çaprazlaması sonucu şu sonuca ulaşıyoruz demek ki churn olup olmama satın alma ile alakalı değil çünkü ortalamaları birbirine çok yakın

In [29]:
spark_df.groupby("churn").agg({"total_purchase": "mean"}).show()

+-----+-------------------+
|churn|avg(total_purchase)|
+-----+-------------------+
|    1| 10192.179933333337|
|    0| 10036.952853333332|
+-----+-------------------+



In [30]:
spark_df.groupby("churn").agg({"years": "mean"}).show()

+-----+------------------+
|churn|        avg(years)|
+-----+------------------+
|    1|5.8835999999999995|
|    0|5.1510666666666625|
+-----+------------------+



In [31]:
kor_data = spark_df.drop("index","names").toPandas()