### Bu notebookta bir projenin ilk adımları olarak:
* Gerekli kütüphanlerin yüklenmesi ve kurulması
* Bir sparkContexti oluşturumlası
* Spark DataFrame üzerinde ufak veri manipülasyonları
* Verilere nasıl ulaşılacağı

mevcuttur

In [2]:
!pip install findspark



In [4]:
import findspark
findspark.init("c:\spark")

In [5]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

In [13]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("churn_modelleing") \
                    .getOrCreate()

In [17]:
sc = spark.sparkContext
sc

In [21]:
spark_df = spark.read.csv("churn.csv",
                      header = True,
                      inferSchema = True,
                      sep=",")
spark_df.cache()

DataFrame[_c0: int, Names: string, Age: double, Total_Purchase: double, Account_Manager: int, Years: double, Num_Sites: double, Churn: int]

In [22]:
spark_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Churn: integer (nullable = true)



In [23]:
spark_df.show(10)

+---+----------------+----+--------------+---------------+-----+---------+-----+
|_c0|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|
+---+----------------+----+--------------+---------------+-----+---------+-----+
|  0|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|    1|
|  1|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|    1|
|  2|     Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|    1|
|  3|   Phillip White|42.0|       8010.76|              0| 6.71|     10.0|    1|
|  4|  Cynthia Norton|37.0|       9191.58|              0| 5.56|      9.0|    1|
|  5|Jessica Williams|48.0|      10356.02|              0| 5.12|      8.0|    1|
|  6|     Eric Butler|44.0|      11331.58|              1| 5.23|     11.0|    1|
|  7|   Zachary Walsh|32.0|       9885.12|              1| 6.92|      9.0|    1|
|  8|     Ashlee Carr|43.0|       14062.6|              1| 5.46|     11.0|    1|
|  9|  Jennifer Lynch|40.0| 

In [26]:
## setting columns' name
spark_df = spark_df.toDF(*[c.lower() for c in spark_df.columns])
spark_df.show(5)

+---+----------------+----+--------------+---------------+-----+---------+-----+
|_c0|           names| age|total_purchase|account_manager|years|num_sites|churn|
+---+----------------+----+--------------+---------------+-----+---------+-----+
|  0|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|    1|
|  1|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|    1|
|  2|     Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|    1|
|  3|   Phillip White|42.0|       8010.76|              0| 6.71|     10.0|    1|
|  4|  Cynthia Norton|37.0|       9191.58|              0| 5.56|      9.0|    1|
+---+----------------+----+--------------+---------------+-----+---------+-----+
only showing top 5 rows



Kolon ismini değiştirme işlemini, spark_df'i Pandas DataFrame'e dönüştürüpte yapabiliriz. Tabii sadece kolon ismi değiştirme değil, her türlü veri manipülasyonunu böyle yapabilriiz.

df.columns = map(str.lower, df.columns)

In [29]:
print(f"Num of columns: {len(spark_df.columns)}")
print(f"Columns Name: {spark_df.columns}")

Num of columns: 8
Columns Name: ['_c0', 'names', 'age', 'total_purchase', 'account_manager', 'years', 'num_sites', 'churn']


In [30]:
## _c0 saçma bir isim, onun yerine direk index ifadesini kullanalım
spark_df = spark_df.withColumnRenamed("_c0","index")
spark_df.columns

['index',
 'names',
 'age',
 'total_purchase',
 'account_manager',
 'years',
 'num_sites',
 'churn']

In [31]:
## gözlem sayısı
spark_df.count()

900

In [33]:
## gözlemler uniqe mi? acaba bir gözlem iki kere girilmiş mi? distinct ile kontrol edelin.
spark_df.distinct().count()

900

Distinct fonksiyonu kullanınca 900 adet değer dönmesi demek: 900 tane eşsiz(uniqe) değer var demektir. Bizim gözlem sayımızda 900 olduğuna göre sorun yok.

Peki gözlem değilde ya kullanıcılar birden fazla girildiyse? Buna "Veri Çoklanması" denir. Aynı kişinin verisi 1 gözlemde değilde birden fazla gözlemde verilmiş olabilir. Bak buradaki 2 tür durum var: biri, yukarıda bahsettiğimiz gibi, kişinin iki kere girilmesi. Bir diğeri ise kişinin iki işlem bilgisi vardır, ikisininde ayrı gözlemlerde olması. İlk sorunu yaşıyorsa tekrar edeni silmelisin. İkinci sorunu yaşıyorsan gözlemlri birleştirmelisin.

Şİmdi aşşağıda kontrol edelim kişiler uniqe'mi:

In [34]:
spark_df.select("names").distinct().count()

899

899'a düştüğüne göre 1 veri çıkarılmış, yani aynı isimden 2 tane gözlem var. Ya Veri Çoklanması var ya da aynı isimden iki kişide olabilir.

Not: Aşşağıdaki 5 hücreyi duplicate edilmiş verileri bulmak için oluşturdum ama bulamadım. Yine de güzel şeyler elde ettim :)

In [37]:
dir(spark_df)

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattr__',
 '__getattribute__',
 '__getitem__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_collect_as_arrow',
 '_jcols',
 '_jdf',
 '_jmap',
 '_joinAsOf',
 '_jseq',
 '_lazy_rdd',
 '_repr_html_',
 '_sc',
 '_schema',
 '_session',
 '_sort_cols',
 '_sql_ctx',
 '_support_repr_html',
 '_to_corrected_pandas_type',
 'agg',
 'alias',
 'approxQuantile',
 'cache',
 'checkpoint',
 'coalesce',
 'colRegex',
 'collect',
 'columns',
 'corr',
 'count',
 'cov',
 'createGlobalTempView',
 'createOrReplaceGlobalTempView',
 'createOrReplaceTempView',
 'createTempView',
 'crossJoin',
 'crosstab',
 'cube',
 'describe',
 'distinct',
 'drop',
 'dropDuplicates',
 'drop_duplicates',
 'dropna',
 'dtypes',
 

In [44]:
spark_df.groupby("names").count().select("names","count").show(3)

+----------------+-----+
|           names|count|
+----------------+-----+
|    Patrick Bell|    1|
|Patrick Robinson|    1|
|   Chelsea Marsh|    1|
+----------------+-----+
only showing top 3 rows



In [42]:
spark_df.groupby("names").count().show(3)

+----------------+-----+
|           names|count|
+----------------+-----+
|    Patrick Bell|    1|
|Patrick Robinson|    1|
|   Chelsea Marsh|    1|
+----------------+-----+
only showing top 3 rows



In [52]:
spark_df.groupby("age").count().select("age","count").show()

+----+-----+
| age|count|
+----+-----+
|49.0|   30|
|29.0|    9|
|47.0|   29|
|42.0|   49|
|44.0|   53|
|35.0|   32|
|39.0|   48|
|37.0|   48|
|34.0|   25|
|25.0|    1|
|36.0|   39|
|41.0|   69|
|56.0|    5|
|50.0|   15|
|45.0|   56|
|31.0|   11|
|58.0|    2|
|51.0|   21|
|48.0|   36|
|22.0|    1|
+----+-----+
only showing top 20 rows



In [54]:
spark_df.groupby("churn").count().show()

+-----+-----+
|churn|count|
+-----+-----+
|    1|  150|
|    0|  750|
+-----+-----+



Tekrar eden değerleri bulma için güzel bir yol:

In [55]:
spark_df.groupby("names").count().sort("count", ascending = False).show(5)

+----------------+-----+
|           names|count|
+----------------+-----+
|   Jennifer Wood|    2|
|    Patrick Bell|    1|
|   Chelsea Marsh|    1|
|Patrick Robinson|    1|
|     John Barber|    1|
+----------------+-----+
only showing top 5 rows



Gördüğün gibi tekrar edenleri çektik. Şimdi inceleyelim:

In [58]:
spark_df.filter(spark_df.names=="Jennifer Wood").show()

+-----+-------------+----+--------------+---------------+-----+---------+-----+
|index|        names| age|total_purchase|account_manager|years|num_sites|churn|
+-----+-------------+----+--------------+---------------+-----+---------+-----+
|   22|Jennifer Wood|35.0|       9381.12|              1| 6.78|     11.0|    1|
|  439|Jennifer Wood|48.0|      11585.16|              0| 4.61|      9.0|    0|
+-----+-------------+----+--------------+---------------+-----+---------+-----+



Yaşları farklı olduğu için biz bu gözlemlerin aynı isme sahip başka kişiler olduğunu anlıyoruz. Faka öğrenmek için duplicate verilerin nasıl silindiğini görelim

In [70]:
spark_df.select("names").dropDuplicates()\
.groupby("names").count().sort("count", ascending = False).show(5) # burada yaptığımız iş görselleştirme, duplice verileri silmeyi yukarıda yaptık

+----------------+-----+
|           names|count|
+----------------+-----+
|    Patrick Bell|    1|
|Patrick Robinson|    1|
|   Chelsea Marsh|    1|
|     John Barber|    1|
|     Amber Evans|    1|
+----------------+-----+
only showing top 5 rows



In [73]:
spark_df.filter(spark_df.names == "Jennifer Wood").show()   # yukarıdakini kaydetmedik o yüzden hala duplicate veri var olarak gözüküyor

+-----+-------------+----+--------------+---------------+-----+---------+-----+
|index|        names| age|total_purchase|account_manager|years|num_sites|churn|
+-----+-------------+----+--------------+---------------+-----+---------+-----+
|   22|Jennifer Wood|35.0|       9381.12|              1| 6.78|     11.0|    1|
|  439|Jennifer Wood|48.0|      11585.16|              0| 4.61|      9.0|    0|
+-----+-------------+----+--------------+---------------+-----+---------+-----+



Duplicate veriler mutlaka gözlenmeli!!! Veri Çoklanması aynı gözlem verisinden mi kaynaklanıyor yoksa aynı kişinin farklı işlemleri farklı gözlemlere mi kaydedilmiş mutlaka bakılması ve sebeplerinin araştırılması gerekiyor.

In [76]:
#herhangi bir gözleme ulaşmak istediğimizde. Bunu kendin çeşitlendirebilirsin.
spark_df.where(spark_df.index == 456).show()

+-----+-----------+----+--------------+---------------+-----+---------+-----+
|index|      names| age|total_purchase|account_manager|years|num_sites|churn|
+-----+-----------+----+--------------+---------------+-----+---------+-----+
|  456|Edwin Young|49.0|      10226.43|              0| 6.02|      8.0|    0|
+-----+-----------+----+--------------+---------------+-----+---------+-----+



Gözlem birimine ulaştık ama içerisindeki değerlere ulaşıp kullanmak için onları nasıl spark_df'den çekebiliriz, başka bir bakışla: içerisindeki verileri nasıl kullanabileceğimiz hale getiririz?

In [79]:
spark_df.where(spark_df.index == 439).collect()[0]["names"]

'Jennifer Wood'

In [80]:
type(spark_df.where(spark_df.index == 439).collect()[0]["names"])

str

İşte bu şekilde verileri çekip kullanabilirsin. collect() fonksiyonu sayesinde spark çıktılarını kullanabilir çıktılar olmasını sağladık.

In [82]:
first_person = spark_df.where(spark_df.index == 439).collect()[0]["names"]

In [83]:
spark_df.where(spark_df.names == first_person).show()

+-----+-------------+----+--------------+---------------+-----+---------+-----+
|index|        names| age|total_purchase|account_manager|years|num_sites|churn|
+-----+-------------+----+--------------+---------------+-----+---------+-----+
|   22|Jennifer Wood|35.0|       9381.12|              1| 6.78|     11.0|    1|
|  439|Jennifer Wood|48.0|      11585.16|              0| 4.61|      9.0|    0|
+-----+-------------+----+--------------+---------------+-----+---------+-----+



Gibi kullanımlar yapabilir ya da saklayabilirsin.