In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("LoanPrediction") \
    .getOrCreate()


In [4]:
df = spark.read.csv("/DATA/loan.csv", header=True, inferSchema=True)


                                                                                

In [5]:
df.printSchema()


root
 |-- id: string (nullable = true)
 |-- member_id: string (nullable = true)
 |-- loan_amnt: integer (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- funded_amnt_inv: double (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- issue_d: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- pymnt_plan: string (nullable = true)
 |-- url: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- title: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: strin

In [6]:
df.na.drop().count()


2023-10-05 18:32:02,745 WARN util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

0

In [7]:
# Show işlemini kullanarak DataFrame'i daha düzenli görüntüleme
df.show(truncate=False, vertical=True)



-RECORD 0-----------------------------------------------------------------
 id                                         | null                        
 member_id                                  | null                        
 loan_amnt                                  | 2500                        
 funded_amnt                                | 2500                        
 funded_amnt_inv                            | 2500.0                      
 term                                       |  36 months                  
 int_rate                                   | 13.56                       
 installment                                | 84.92                       
 grade                                      | C                           
 sub_grade                                  | C1                          
 emp_title                                  | Chef                        
 emp_length                                 | 10+ years                   
 home_ownership          

In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit

# Spark oturumu başlatma
spark = SparkSession.builder.appName("MissingValuesPercentage").getOrCreate()

# Örnek bir Spark DataFrame oluşturma (df_read yerine gerçek DataFrame'inizi kullanın)
df = spark.read.csv("/DATA/loan.csv", header=True, inferSchema=True)

# Eksik değerlerin yüzdesini hesaplama
percentage_missing = df.select([(col(c).isNull().cast("int") / df.count() * 100).alias(c) for c in df.columns])

# Yeni bir Spark DataFrame oluşturma
# Bu DataFrame'de sütun adları aynı kalacak ve "Percentage" adında bir indeks ekleyeceğiz
new_df = percentage_missing.withColumn("Percentage", lit("Percentage"))

# "Percentage" sütununu ilk sütun yapma
new_df = new_df.select("Percentage", *df.columns)

# Sonuçları gösterme
new_df.show()


[Stage 443:>                                                        (0 + 1) / 1]

+----------+--------------------+--------------------+---------+-----------+---------------+----+--------+-----------+-----+---------+--------------------+----------+--------------+----------+-------------------+-------+-----------+----------+--------------------+--------------------+-------+-----+--------+----------+---+-----------+----------------+--------------+----------------------+----------------------+--------+-------+---------+--------------------+---------+-------------------+---------+-------------+-----------+---------------+---------------+-------------+------------------+----------+-----------------------+------------+---------------+------------+------------------+--------------------------+---------------------------+-----------+----------------+--------------------+--------------------+-------------------------+--------------+------------+-----------+-----------+-----------+-----------+-----------+------------------+------------+--------------------+-----------+-------

                                                                                

In [9]:
# Eksik değer yüzdesini hesaplama ve sütunları seçme
threshold_percentage = 20  # %20'nin altındaki sütunları seçme eşik değeri
selected_columns = [c for c in df.columns if (df.filter(col(c).isNotNull()).count() / df.count()) * 100 >= threshold_percentage]

# Yeni DataFrame'i oluşturma
new_df = df.select(selected_columns)

# Sonuçları yazdırma
print("Total features before:", len(df.columns))
print("Total features now:", len(new_df.columns))

                                                                                

Total features before: 145
Total features now: 106


In [10]:
new_df.show(truncate=False, vertical=True)

-RECORD 0-----------------------------------------------------
 loan_amnt                      | 2500                        
 funded_amnt                    | 2500                        
 funded_amnt_inv                | 2500.0                      
 term                           |  36 months                  
 int_rate                       | 13.56                       
 installment                    | 84.92                       
 grade                          | C                           
 sub_grade                      | C1                          
 emp_title                      | Chef                        
 emp_length                     | 10+ years                   
 home_ownership                 | RENT                        
 annual_inc                     | 55000                       
 verification_status            | Not Verified                
 issue_d                        | Dec-2018                    
 loan_status                    | Current              

In [11]:
# Spark oturumu başlatma
spark = SparkSession.builder.appName("SelectFeatures").getOrCreate()

# Seçilen sütunları belirleme
selected_features = ['loan_amnt','term', 'int_rate', 'installment', 'grade', 'sub_grade','emp_length','home_ownership',
                'annual_inc','verification_status','purpose','dti','delinq_2yrs','inq_last_6mths','open_acc',
                'pub_rec','revol_bal','revol_util','total_acc','last_pymnt_amnt','loan_status']

# Sadece seçilen sütunları içeren yeni bir Spark DataFrame oluşturma
df_selected = new_df.select(selected_features)

# Seçilen sütunların sayısını yazdırma
print("Number of selected features:", len(selected_features))

Number of selected features: 21


In [18]:
# Sonuçları gösterme
df_selected.show()

+---------+----------+--------+-----------+-----+---------+----------+--------------+----------+-------------------+------------------+-----+-----------+--------------+--------+-------+---------+----------+---------+---------------+-----------+
|loan_amnt|      term|int_rate|installment|grade|sub_grade|emp_length|home_ownership|annual_inc|verification_status|           purpose|  dti|delinq_2yrs|inq_last_6mths|open_acc|pub_rec|revol_bal|revol_util|total_acc|last_pymnt_amnt|loan_status|
+---------+----------+--------+-----------+-----+---------+----------+--------------+----------+-------------------+------------------+-----+-----------+--------------+--------+-------+---------+----------+---------+---------------+-----------+
|     2500| 36 months|   13.56|      84.92|    C|       C1| 10+ years|          RENT|     55000|       Not Verified|debt_consolidation|18.24|          0|             1|       9|      1|     4341|      10.3|       34|          84.92|    Current|
|    30000| 60 month

In [34]:
from pyspark.sql import SparkSession

# Spark oturumu başlatma
spark = SparkSession.builder.appName("DropNullRows").getOrCreate()


# NaN içeren satırları silme
df = df.na.drop()

# Sonuçları gösterme
df.show()

+---------+----+--------+-----------+-----+---------+----------+--------------+----------+-------------------+------------------+-----+-----------+--------------+--------+-------+---------+----------+---------+---------------+-----------+
|loan_amnt|term|int_rate|installment|grade|sub_grade|emp_length|home_ownership|annual_inc|verification_status|           purpose|  dti|delinq_2yrs|inq_last_6mths|open_acc|pub_rec|revol_bal|revol_util|total_acc|last_pymnt_amnt|loan_status|
+---------+----+--------+-----------+-----+---------+----------+--------------+----------+-------------------+------------------+-----+-----------+--------------+--------+-------+---------+----------+---------+---------------+-----------+
|    30000|  36|   22.35|    1151.16|    3|       19|         5|      MORTGAGE|    100000|    Source Verified|debt_consolidation|30.46|          0|             0|      11|      1|    15603|      37.0|       19|       30082.32|          0|
|    40000|  60|   16.14|     975.71|    2| 

In [35]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import trim, regexp_replace
from pyspark.sql.types import IntegerType

# Spark oturumu başlatma
spark = SparkSession.builder.appName("TermColumnTransformation").getOrCreate()


# Term sütununu dönüştürme
df = df_selected.withColumn("term", trim(regexp_replace("term", " months", "")).cast(IntegerType()))

# Sonuçları gösterme
df.show()


+---------+----+--------+-----------+-----+---------+----------+--------------+----------+-------------------+------------------+-----+-----------+--------------+--------+-------+---------+----------+---------+---------------+-----------+
|loan_amnt|term|int_rate|installment|grade|sub_grade|emp_length|home_ownership|annual_inc|verification_status|           purpose|  dti|delinq_2yrs|inq_last_6mths|open_acc|pub_rec|revol_bal|revol_util|total_acc|last_pymnt_amnt|loan_status|
+---------+----+--------+-----------+-----+---------+----------+--------------+----------+-------------------+------------------+-----+-----------+--------------+--------+-------+---------+----------+---------+---------------+-----------+
|     2500|  36|   13.56|      84.92|    C|       C1| 10+ years|          RENT|     55000|       Not Verified|debt_consolidation|18.24|          0|             1|       9|      1|     4341|      10.3|       34|          84.92|    Current|
|    30000|  60|   18.94|     777.23|    D| 

In [36]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, regexp_extract
from pyspark.sql.types import IntegerType


# emp_length sütununu dönüştürme
df = df.withColumn("emp_length", when(df["emp_length"].contains("10+"), 10)
                                  .when(df["emp_length"].contains("< 1"), 0)
                                  .otherwise(regexp_extract(df["emp_length"], r"(\d+)", 1).cast(IntegerType())))

# Sonuçları gösterme
df.show()

+---------+----+--------+-----------+-----+---------+----------+--------------+----------+-------------------+------------------+-----+-----------+--------------+--------+-------+---------+----------+---------+---------------+-----------+
|loan_amnt|term|int_rate|installment|grade|sub_grade|emp_length|home_ownership|annual_inc|verification_status|           purpose|  dti|delinq_2yrs|inq_last_6mths|open_acc|pub_rec|revol_bal|revol_util|total_acc|last_pymnt_amnt|loan_status|
+---------+----+--------+-----------+-----+---------+----------+--------------+----------+-------------------+------------------+-----+-----------+--------------+--------+-------+---------+----------+---------+---------------+-----------+
|     2500|  36|   13.56|      84.92|    C|       C1|        10|          RENT|     55000|       Not Verified|debt_consolidation|18.24|          0|             1|       9|      1|     4341|      10.3|       34|          84.92|    Current|
|    30000|  60|   18.94|     777.23|    D| 

In [37]:
from pyspark.sql.functions import when, expr

# Spark oturumu başlatma
spark = SparkSession.builder.appName("LabelEncoding").getOrCreate()

df = df.withColumn("grade", when(df["grade"] == "A", 0)
                      .when(df["grade"] == "B", 1)
                      .when(df["grade"] == "C", 2)
                      .when(df["grade"] == "D", 3)
                      .when(df["grade"] == "E", 4)
                      .when(df["grade"] == "F", 5)
                      .when(df["grade"] == "G", 6))

df = df.withColumn("sub_grade", expr(
    "CASE WHEN sub_grade = 'A1' THEN 0 "
    "WHEN sub_grade = 'A2' THEN 1 "
    "WHEN sub_grade = 'A3' THEN 2 "
    "WHEN sub_grade = 'A4' THEN 3 "
    "WHEN sub_grade = 'A5' THEN 4 "
    "WHEN sub_grade = 'B1' THEN 5 "
    "WHEN sub_grade = 'B2' THEN 6 "
    "WHEN sub_grade = 'B3' THEN 7 "
    "WHEN sub_grade = 'B4' THEN 8 "
    "WHEN sub_grade = 'B5' THEN 9 "
    "WHEN sub_grade = 'C1' THEN 10 "
    "WHEN sub_grade = 'C2' THEN 11 "
    "WHEN sub_grade = 'C3' THEN 12 "
    "WHEN sub_grade = 'C4' THEN 13 "
    "WHEN sub_grade = 'C5' THEN 14 "
    "WHEN sub_grade = 'D1' THEN 15 "
    "WHEN sub_grade = 'D2' THEN 16 "
    "WHEN sub_grade = 'D3' THEN 17 "
    "WHEN sub_grade = 'D4' THEN 18 "
    "WHEN sub_grade = 'D5' THEN 19 "
    "WHEN sub_grade = 'E1' THEN 20 "
    "WHEN sub_grade = 'E2' THEN 21 "
    "WHEN sub_grade = 'E3' THEN 22 "
    "WHEN sub_grade = 'E4' THEN 23 "
    "WHEN sub_grade = 'E5' THEN 24 "
    "WHEN sub_grade = 'F1' THEN 25 "
    "WHEN sub_grade = 'F2' THEN 26 "
    "WHEN sub_grade = 'F3' THEN 27 "
    "WHEN sub_grade = 'F4' THEN 28 "
    "WHEN sub_grade = 'F5' THEN 29 "
    "WHEN sub_grade = 'G1' THEN 30 "
    "WHEN sub_grade = 'G2' THEN 31 "
    "WHEN sub_grade = 'G3' THEN 32 "
    "WHEN sub_grade = 'G4' THEN 33 "
    "WHEN sub_grade = 'G5' THEN 34 "
    "END"))



df = df.withColumn("loan_status", when(df["loan_status"] == "Fully Paid", 0)
                                    .when(df["loan_status"] == "Charged Off", 1))

# Sonuçları gösterme
df.show()

+---------+----+--------+-----------+-----+---------+----------+--------------+----------+-------------------+------------------+-----+-----------+--------------+--------+-------+---------+----------+---------+---------------+-----------+
|loan_amnt|term|int_rate|installment|grade|sub_grade|emp_length|home_ownership|annual_inc|verification_status|           purpose|  dti|delinq_2yrs|inq_last_6mths|open_acc|pub_rec|revol_bal|revol_util|total_acc|last_pymnt_amnt|loan_status|
+---------+----+--------+-----------+-----+---------+----------+--------------+----------+-------------------+------------------+-----+-----------+--------------+--------+-------+---------+----------+---------+---------------+-----------+
|     2500|  36|   13.56|      84.92|    2|       10|        10|          RENT|     55000|       Not Verified|debt_consolidation|18.24|          0|             1|       9|      1|     4341|      10.3|       34|          84.92|       null|
|    30000|  60|   18.94|     777.23|    3| 

In [31]:

from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

# Spark oturumu başlatma
spark = SparkSession.builder.appName("OneHotEncoding").getOrCreate()



# One-Hot Encoding için sütunlarınızı seçin
categorical_columns = ["home_ownership", "verification_status", "purpose"]

# StringIndexer ile kategorik sütunları sayısal indekslere dönüştürme
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index") for col in categorical_columns]

# OneHotEncoder ile indekslenmiş sütunları One-Hot Encoding'e dönüştürme
encoders = [OneHotEncoder(inputCol=col + "_index", outputCol=col + "_encoded") for col in categorical_columns]

# Pipeline oluşturma
pipeline = Pipeline(stages=indexers + encoders)

# Veri çerçevesini dönüştürme
model = pipeline.fit(df)
df_encoded = model.transform(df)

# Gereksiz sütunları silme
columns_to_drop = categorical_columns + [col + "_index" for col in categorical_columns]
df_encoded = df_encoded.drop(*columns_to_drop)

# Sonuçları gösterme
df_encoded.show()


                                                                                

+---------+----+--------+-----------+-----+---------+----------+----------+-----+-----------+--------------+--------+-------+---------+----------+---------+---------------+-----------+----------------------+---------------------------+----------------+
|loan_amnt|term|int_rate|installment|grade|sub_grade|emp_length|annual_inc|  dti|delinq_2yrs|inq_last_6mths|open_acc|pub_rec|revol_bal|revol_util|total_acc|last_pymnt_amnt|loan_status|home_ownership_encoded|verification_status_encoded| purpose_encoded|
+---------+----+--------+-----------+-----+---------+----------+----------+-----+-----------+--------------+--------+-------+---------+----------+---------+---------------+-----------+----------------------+---------------------------+----------------+
|     2500|  36|   13.56|      84.92|    2|       10|        10|     55000|18.24|          0|             1|       9|      1|     4341|      10.3|       34|          84.92|       null|         (6,[1],[1.0])|              (3,[1],[1.0])| (267,

In [38]:
from pyspark.sql import SparkSession

# Spark oturumu başlatma
spark = SparkSession.builder.appName("DropNullRows").getOrCreate()


# NaN içeren satırları silme
df = df.na.drop()

# Sonuçları gösterme
df.show()

+---------+----+--------+-----------+-----+---------+----------+--------------+----------+-------------------+------------------+-----+-----------+--------------+--------+-------+---------+----------+---------+---------------+-----------+
|loan_amnt|term|int_rate|installment|grade|sub_grade|emp_length|home_ownership|annual_inc|verification_status|           purpose|  dti|delinq_2yrs|inq_last_6mths|open_acc|pub_rec|revol_bal|revol_util|total_acc|last_pymnt_amnt|loan_status|
+---------+----+--------+-----------+-----+---------+----------+--------------+----------+-------------------+------------------+-----+-----------+--------------+--------+-------+---------+----------+---------+---------------+-----------+
|    30000|  36|   22.35|    1151.16|    3|       19|         5|      MORTGAGE|    100000|    Source Verified|debt_consolidation|30.46|          0|             0|      11|      1|    15603|      37.0|       19|       30082.32|          0|
|    40000|  60|   16.14|     975.71|    2| 

In [40]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

# Spark oturumu başlatma
spark = SparkSession.builder.appName("DataPreprocessing").getOrCreate()


# Bağımsız ve bağımlı değişkenleri ayırma
X_columns = ["int_rate", "installment"]
y_column = "loan_status"
X_assembler = VectorAssembler(inputCols=X_columns, outputCol="features")
X_df = X_assembler.transform(df).select("features")
y_df = df.select(y_column)

# Veri setini eğitim ve test setlerine böleme
train_df, test_df = X_df.randomSplit([0.75, 0.25], seed=100)

# MinMaxScaler kullanma
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(train_df)
train_df = scaler_model.transform(train_df)
test_df = scaler_model.transform(test_df)

# Sonuçları gösterme
train_df.show()
test_df.show()


                                                                                

+-------------+--------------------+
|     features|     scaled_features|
+-------------+--------------------+
| [6.0,346.06]|[1.26077718877253...|
| [6.11,43.43]|[1.28389143723336...|
| [6.11,45.71]|[1.28389143723336...|
| [6.11,48.76]|[1.28389143723336...|
| [6.11,60.95]|[1.28389143723336...|
| [6.11,60.95]|[1.28389143723336...|
| [6.11,70.09]|[1.28389143723336...|
| [6.11,76.18]|[1.28389143723336...|
| [6.11,91.42]|[1.28389143723336...|
|[6.11,102.09]|[1.28389143723336...|
|[6.11,106.66]|[1.28389143723336...|
| [6.11,109.7]|[1.28389143723336...|
| [6.11,109.7]|[1.28389143723336...|
|[6.11,121.89]|[1.28389143723336...|
|[6.11,121.89]|[1.28389143723336...|
|[6.11,152.36]|[1.28389143723336...|
|[6.11,152.36]|[1.28389143723336...|
|[6.11,152.36]|[1.28389143723336...|
|[6.11,152.36]|[1.28389143723336...|
|[6.11,152.36]|[1.28389143723336...|
+-------------+--------------------+
only showing top 20 rows



[Stage 1366:>                                                       (0 + 1) / 1]

+-------------+--------------------+
|     features|     scaled_features|
+-------------+--------------------+
| [6.11,48.76]|[1.28389143723336...|
| [6.11,91.42]|[1.28389143723336...|
| [6.11,97.51]|[1.28389143723336...|
| [6.11,109.7]|[1.28389143723336...|
|[6.11,152.36]|[1.28389143723336...|
|[6.11,152.36]|[1.28389143723336...|
| [6.11,167.6]|[1.28389143723336...|
|[6.11,182.84]|[1.28389143723336...|
|[6.11,182.84]|[1.28389143723336...|
|[6.11,198.07]|[1.28389143723336...|
|[6.11,213.31]|[1.28389143723336...|
|[6.11,213.31]|[1.28389143723336...|
|[6.11,213.31]|[1.28389143723336...|
|[6.11,232.61]|[1.28389143723336...|
|[6.11,243.78]|[1.28389143723336...|
|[6.11,289.49]|[1.28389143723336...|
|[6.11,292.53]|[1.28389143723336...|
|[6.11,304.72]|[1.28389143723336...|
|[6.11,304.72]|[1.28389143723336...|
|[6.11,304.72]|[1.28389143723336...|
+-------------+--------------------+
only showing top 20 rows



                                                                                

In [41]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count

# Spark oturumu başlatma
spark = SparkSession.builder.appName("LoanStatusCounts").getOrCreate()



# 'loan_status' sütununu gruplayarak sayma işlemi
result = df.groupBy("loan_status").agg(count("*").alias("count"))

# Sonuçları gösterme
result.show()





+-----------+------+
|loan_status| count|
+-----------+------+
|          1|241092|
|          0|986132|
+-----------+------+



                                                                                

In [43]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Spark oturumu başlatma
spark = SparkSession.builder.appName("LoanStatusCounts").getOrCreate()

# 'loan_status' sütununu gruplayarak sayıları hesaplama
loan_status_counts = df.groupBy("loan_status").count()

# Sonuçları gösterme
loan_status_counts.show()

# Yüzde hesaplamaları yapma
total_count = df.count()
loan_status_counts = loan_status_counts.withColumn("percentage", (col("count") / total_count) * 100)
loan_status_counts.show()


                                                                                

+-----------+------+
|loan_status| count|
+-----------+------+
|          1|241092|
|          0|986132|
+-----------+------+





+-----------+------+------------------+
|loan_status| count|        percentage|
+-----------+------+------------------+
|          1|241092|19.645313325032756|
|          0|986132| 80.35468667496724|
+-----------+------+------------------+



                                                                                