In [1]:
# Importing necessary PySpark modules
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

In [2]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("my_app").getOrCreate()


In [3]:
sc = spark.sparkContext
sc

In [4]:
# Reading a CSV file into a Spark DataFrame
spark_df = spark.read.csv("churn.csv", header = True, inferSchema = True)


In [5]:
spark_df.show(10)

+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId| Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|        1|  15634602|Hargrave|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|
|        2|  15647311|    Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|
|        3|  15619304|    Onio|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|
|        4|  15701354|    Boni|        699|   France|Female| 39|     1|      0.0|            2|        0|             0|       93826.63|

In [6]:
# Counting the number of rows in the DataFrame
spark_df.count()

10000

In [7]:
# Generating summary statistics for the DataFrame
spark_df.describe().toPandas().T


Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
RowNumber,10000,5000.5,2886.8956799071675,1,10000
CustomerId,10000,1.56909405694E7,71936.18612274907,15565701,15815690
Surname,10000,,,Abazu,Zuyeva
CreditScore,10000,650.5288,96.65329873613035,350,850
Geography,10000,,,France,Spain
Gender,10000,,,Female,Male
Age,10000,38.9218,10.487806451704587,18,92
Tenure,10000,5.0128,2.8921743770496837,0,10
Balance,10000,76485.88928799961,62397.40520238599,0.0,250898.09


In [8]:
# Renaming a column in the DataFrame
spark_df = spark_df.withColumnRenamed("RowNumber", "index")


In [9]:
spark_df.show(5)

+-----+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|index|CustomerId| Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+-----+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|    1|  15634602|Hargrave|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|
|    2|  15647311|    Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|
|    3|  15619304|    Onio|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|
|    4|  15701354|    Boni|        699|   France|Female| 39|     1|      0.0|            2|        0|             0|       93826.63|     0|
|    5|  15737888|Mi

In [10]:
# checking the data types of each column
spark_df.dtypes

[('index', 'int'),
 ('CustomerId', 'int'),
 ('Surname', 'string'),
 ('CreditScore', 'int'),
 ('Geography', 'string'),
 ('Gender', 'string'),
 ('Age', 'int'),
 ('Tenure', 'int'),
 ('Balance', 'double'),
 ('NumOfProducts', 'int'),
 ('HasCrCard', 'int'),
 ('IsActiveMember', 'int'),
 ('EstimatedSalary', 'double'),
 ('Exited', 'int')]

In [12]:
from pyspark.ml.feature import StringIndexer

# Apply StringIndexer
stringIndexer = StringIndexer(inputCol="Exited", outputCol="label")
indexed_df = stringIndexer.fit(spark_df).transform(spark_df)

# The new DataFrame `indexed_df` will have a `label` column in addition to all original columns.
indexed_df.show()


+-----+----------+---------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+-----+
|index|CustomerId|  Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|label|
+-----+----------+---------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+-----+
|    1|  15634602| Hargrave|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|  1.0|
|    2|  15647311|     Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|  0.0|
|    3|  15619304|     Onio|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|  1.0|
|    4|  15701354|     Boni|        699|   France|Female| 39|     1|      0.0|            2|        0|             0| 

In [13]:
# renaming the Exited column to label
spark_df= spark_df.withColumnRenamed("Exited", "label")


In [14]:
spark_df.toPandas().head()

Unnamed: 0,index,CustomerId,Surname,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,label
0,1,15634602,Hargrave,619,France,Female,42,2,0.0,1,1,1,101348.88,1
1,2,15647311,Hill,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0
2,3,15619304,Onio,502,France,Female,42,8,159660.8,3,1,0,113931.57,1
3,4,15701354,Boni,699,France,Female,39,1,0.0,2,0,0,93826.63,0
4,5,15737888,Mitchell,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0


In [15]:
# Dropping unnecessary columns from the DataFrame
from pyspark.ml.feature import StringIndexer, OneHotEncoder
spark_df = spark_df.drop("CustomerId", "Surname", "index")
spark_df.toPandas().head()

Unnamed: 0,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,label
0,619,France,Female,42,2,0.0,1,1,1,101348.88,1
1,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0
2,502,France,Female,42,8,159660.8,3,1,0,113931.57,1
3,699,France,Female,39,1,0.0,2,0,0,93826.63,0
4,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0


In [16]:
# Categorical variables were converted with StringIndexer.
geo_indexer = StringIndexer(inputCol = "Geography", outputCol = "Geo_cat")
geo_indexer_model = geo_indexer.fit(spark_df)
geo_indexer_df = geo_indexer_model.transform(spark_df)
geo_indexer_df.toPandas().head(10)

Unnamed: 0,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,label,Geo_cat
0,619,France,Female,42,2,0.0,1,1,1,101348.88,1,0.0
1,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0,2.0
2,502,France,Female,42,8,159660.8,3,1,0,113931.57,1,0.0
3,699,France,Female,39,1,0.0,2,0,0,93826.63,0,0.0
4,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0,2.0
5,645,Spain,Male,44,8,113755.78,2,1,0,149756.71,1,2.0
6,822,France,Male,50,7,0.0,2,1,1,10062.8,0,0.0
7,376,Germany,Female,29,4,115046.74,4,1,0,119346.88,1,1.0
8,501,France,Male,44,4,142051.07,2,0,1,74940.5,0,0.0
9,684,France,Male,27,2,134603.88,1,1,1,71725.73,0,0.0


In [17]:
# Categorical variables were converted with StringIndexer.
gender_indexer = StringIndexer(inputCol = "Gender", outputCol = "Gender_cat")
gender_indexer_model = gender_indexer.fit(geo_indexer_df)
gender_indexer_df = gender_indexer_model.transform(geo_indexer_df)
gender_indexer_df.toPandas().head(10)

Unnamed: 0,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,label,Geo_cat,Gender_cat
0,619,France,Female,42,2,0.0,1,1,1,101348.88,1,0.0,1.0
1,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0,2.0,1.0
2,502,France,Female,42,8,159660.8,3,1,0,113931.57,1,0.0,1.0
3,699,France,Female,39,1,0.0,2,0,0,93826.63,0,0.0,1.0
4,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0,2.0,1.0
5,645,Spain,Male,44,8,113755.78,2,1,0,149756.71,1,2.0,0.0
6,822,France,Male,50,7,0.0,2,1,1,10062.8,0,0.0,0.0
7,376,Germany,Female,29,4,115046.74,4,1,0,119346.88,1,1.0,1.0
8,501,France,Male,44,4,142051.07,2,0,1,74940.5,0,0.0,0.0
9,684,France,Male,27,2,134603.88,1,1,1,71725.73,0,0.0,0.0


In [18]:
from pyspark.ml.feature import OneHotEncoder

# Apply OneHotEncoder to 'Geo_cat' column
encoder_geo = OneHotEncoder(inputCol="Geo_cat", outputCol="Geo_ohe")
encoder_model_geo = encoder_geo.fit(gender_indexer_df)
encoder_df = encoder_model_geo.transform(gender_indexer_df)

# Now apply OneHotEncoder to 'Gender_cat' column (on the transformed DataFrame)
encoder_gender = OneHotEncoder(inputCol="Gender_cat", outputCol="Gender_ohe")
encoder_model_gender = encoder_gender.fit(encoder_df)
encoder_df = encoder_model_gender.transform(encoder_df)

# View the result
encoder_df.toPandas().head()




Unnamed: 0,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,label,Geo_cat,Gender_cat,Geo_ohe,Gender_ohe
0,619,France,Female,42,2,0.0,1,1,1,101348.88,1,0.0,1.0,"(1.0, 0.0)",(0.0)
1,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0,2.0,1.0,"(0.0, 0.0)",(0.0)
2,502,France,Female,42,8,159660.8,3,1,0,113931.57,1,0.0,1.0,"(1.0, 0.0)",(0.0)
3,699,France,Female,39,1,0.0,2,0,0,93826.63,0,0.0,1.0,"(1.0, 0.0)",(0.0)
4,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0,2.0,1.0,"(0.0, 0.0)",(0.0)


In [19]:
# Assembling features into a single vector column
from pyspark.ml.feature import VectorAssembler
cols = ["CreditScore","Age", "Tenure", "Balance","NumOfProducts", "HasCrCard","IsActiveMember","EstimatedSalary", "Geo_ohe", "Gender_ohe"]
va = VectorAssembler(inputCols = cols, outputCol = "features")
va_df = va.transform(encoder_df)
va_df.toPandas().head()

Unnamed: 0,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,label,Geo_cat,Gender_cat,Geo_ohe,Gender_ohe,features
0,619,France,Female,42,2,0.0,1,1,1,101348.88,1,0.0,1.0,"(1.0, 0.0)",(0.0),"[619.0, 42.0, 2.0, 0.0, 1.0, 1.0, 1.0, 101348...."
1,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0,2.0,1.0,"(0.0, 0.0)",(0.0),"[608.0, 41.0, 1.0, 83807.86, 1.0, 0.0, 1.0, 11..."
2,502,France,Female,42,8,159660.8,3,1,0,113931.57,1,0.0,1.0,"(1.0, 0.0)",(0.0),"[502.0, 42.0, 8.0, 159660.8, 3.0, 1.0, 0.0, 11..."
3,699,France,Female,39,1,0.0,2,0,0,93826.63,0,0.0,1.0,"(1.0, 0.0)",(0.0),"(699.0, 39.0, 1.0, 0.0, 2.0, 0.0, 0.0, 93826.6..."
4,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0,2.0,1.0,"(0.0, 0.0)",(0.0),"[850.0, 43.0, 2.0, 125510.82, 1.0, 1.0, 1.0, 7..."


In [20]:
encoder_df.printSchema()

root
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- label: integer (nullable = true)
 |-- Geo_cat: double (nullable = false)
 |-- Gender_cat: double (nullable = false)
 |-- Geo_ohe: vector (nullable = true)
 |-- Gender_ohe: vector (nullable = true)



In [21]:
from pyspark.ml.feature import  StandardScaler, Normalizer
#Initialize the standardScaler
standardScaler = StandardScaler(inputCol = "features", outputCol = "features_scaled")
# Fit the DataFrame to the scaler
scaled_df = standardScaler.fit(va_df).transform(va_df)
scaled_df.toPandas().head()

Unnamed: 0,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,label,Geo_cat,Gender_cat,Geo_ohe,Gender_ohe,features,features_scaled
0,619,France,Female,42,2,0.0,1,1,1,101348.88,1,0.0,1.0,"(1.0, 0.0)",(0.0),"[619.0, 42.0, 2.0, 0.0, 1.0, 1.0, 1.0, 101348....","[6.404333924389993, 4.0046505619078925, 0.6915..."
1,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0,2.0,1.0,"(0.0, 0.0)",(0.0),"[608.0, 41.0, 1.0, 83807.86, 1.0, 0.0, 1.0, 11...","[6.2905250824379895, 3.9093017390053237, 0.345..."
2,502,France,Female,42,8,159660.8,3,1,0,113931.57,1,0.0,1.0,"(1.0, 0.0)",(0.0),"[502.0, 42.0, 8.0, 159660.8, 3.0, 1.0, 0.0, 11...","[5.193821696355051, 4.0046505619078925, 2.7660..."
3,699,France,Female,39,1,0.0,2,0,0,93826.63,0,0.0,1.0,"(1.0, 0.0)",(0.0),"(699.0, 39.0, 1.0, 0.0, 2.0, 0.0, 0.0, 93826.6...","(7.232034593131834, 3.718604093200186, 0.34576..."
4,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0,2.0,1.0,"(0.0, 0.0)",(0.0),"[850.0, 43.0, 2.0, 125510.82, 1.0, 1.0, 1.0, 7...","[8.794319605382057, 4.099999384810461, 0.69152..."


In [22]:
# Selecting specific columns from the DataFrame
final_df = scaled_df.select(["features_scaled","label"])


In [23]:
final_df = final_df.withColumnRenamed("features_scaled", "features")
final_df.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[6.40433392438999...|    1|
|[6.29052508243798...|    0|
|[5.19382169635505...|    1|
|(11,[0,1,2,4,7,8]...|    0|
|[8.79431960538205...|    0|
|[6.67333664173109...|    1|
|[8.50462437132241...|    0|
|[3.89019314308665...|    1|
|[5.18347543799577...|    0|
|[7.07684071774273...|    0|
|[5.46282441369614...|    0|
|[5.14209040455868...|    0|
|[4.92481897901395...|    0|
|(11,[0,1,2,4,7,8]...|    0|
|[6.56987405813836...|    0|
|[6.37329514931217...|    0|
|[6.75610670860527...|    1|
|[5.68009583924088...|    0|
|(11,[0,1,2,4,7,10...|    0|
|[7.51138356883220...|    0|
+--------------------+-----+
only showing top 20 rows



In [24]:
# Splitting the DataFrame into training and testing datasets
splits = final_df.randomSplit([0.70, 0.30])
train_df = splits[0]
test_df = splits[1]

In [25]:
# Initializing and training a Gradient-Boosted Tree (GBT) classifier
from pyspark.ml.classification import GBTClassifier
gbm = GBTClassifier(maxIter = 10, featuresCol = "features", labelCol = "label")
gbm_model = gbm.fit(train_df)
y_pred = gbm_model.transform(test_df)
y_pred.toPandas().head()

Unnamed: 0,features,label,rawPrediction,probability,prediction
0,"(5.390400605181238, 4.672092322225875, 1.38304...",1,"[0.7090545466072112, -0.7090545466072112]","[0.8050418107179744, 0.19495818928202557]",0.0
1,"(5.442131896977603, 3.432557624492479, 1.72880...",0,"[0.8010372967437304, -0.8010372967437304]","[0.8323081384994887, 0.1676918615005113]",0.0
2,"(5.493863188773968, 4.0046505619078925, 2.0745...",0,"[0.9882499530360335, -0.9882499530360335]","[0.8783075552699371, 0.12169244473006291]",0.0
3,"(5.524901963851787, 3.1465111557847725, 1.0372...",0,"[0.8577200272238258, -0.8577200272238258]","[0.8475405537188242, 0.1524594462811758]",0.0
4,"(6.156023723767441, 2.1930229267590837, 3.4576...",0,"[1.1402177282901034, -1.1402177282901034]","[0.9072436981723696, 0.0927563018276304]",0.0


In [26]:
ac = y_pred.select("label","prediction")
ac.toPandas().head()

Unnamed: 0,label,prediction
0,1,0.0
1,0,0.0
2,0,0.0
3,0,0.0
4,0,0.0


In [27]:
# Selecting the "label" and "prediction" columns from the predicted results
ac.filter(ac.label == ac.prediction).count() / ac.count()


0.8603950451958486

In [28]:
# Setting up hyperparameter tuning with Cross-Validation
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

evaluator = BinaryClassificationEvaluator()

paramGrid = (ParamGridBuilder()
             .addGrid(gbm.maxDepth, [2, 4, 6])
             .addGrid(gbm.maxBins, [20, 30])
             .addGrid(gbm.maxIter, [10, 20])
             .build())

cv = CrossValidator(estimator = gbm, estimatorParamMaps = paramGrid, evaluator = evaluator, numFolds = 10)

In [29]:
# Training the model with cross-validation
cv_model = cv.fit(train_df)
y_pred = cv_model.transform(test_df)
ac = y_pred.select("label","prediction")
ac.filter(ac.label == ac.prediction).count() / ac.count()

0.8560428523602277

In [30]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Initialize Logistic Regression
log_reg = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)

# Fit the Logistic Regression model on the training dataset
log_reg_model = log_reg.fit(train_df)

# Predict on the test dataset
y_pred = log_reg_model.transform(test_df)

# Initialize the evaluator for accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# Evaluate accuracy
accuracy = evaluator.evaluate(y_pred)

print(f"Logistic Regression Model Accuracy: {accuracy:.2f}")


Logistic Regression Model Accuracy: 0.82


In [31]:
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Initialize the Linear SVC
svc = LinearSVC(featuresCol="features", labelCol="label", maxIter=10, regParam=0.1)

# Fit the SVC model on the training dataset
svc_model = svc.fit(train_df)

# Predict on the test dataset
y_pred_svc = svc_model.transform(test_df)

# Initialize the evaluator for accuracy
evaluator_svc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# Evaluate accuracy
accuracy_svc = evaluator_svc.evaluate(y_pred_svc)

print(f"Support Vector Classifier Accuracy: {accuracy_svc:.2f}")


Support Vector Classifier Accuracy: 0.80


In [32]:
from sklearn.ensemble import AdaBoostClassifier
from sklearn.metrics import accuracy_score

# Convert Spark DataFrame to Pandas for scikit-learn compatibility
train_pandas = train_df.toPandas()
test_pandas = test_df.toPandas()

# Prepare features (X) and labels (y)
X_train = train_pandas['features'].tolist()
y_train = train_pandas['label']
X_test = test_pandas['features'].tolist()
y_test = test_pandas['label']

# Initialize and fit the AdaBoost model
adaboost = AdaBoostClassifier(n_estimators=50, random_state=42)
adaboost.fit(X_train, y_train)

# Predict on the test dataset
y_pred = adaboost.predict(X_test)

# Evaluate accuracy
accuracy = accuracy_score(y_test, y_pred)
print(f"AdaBoost Classifier Accuracy: {accuracy:.2f}")


AdaBoost Classifier Accuracy: 0.85


In [33]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Initialize the Random Forest Classifier
rf_classifier = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=50)

# Train the Random Forest model on the training dataset
rf_model = rf_classifier.fit(train_df)

# Predict on the test dataset
y_pred_rf = rf_model.transform(test_df)

# Initialize the evaluator for accuracy
evaluator_rf = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# Evaluate accuracy
accuracy_rf = evaluator_rf.evaluate(y_pred_rf)

print(f"Random Forest Classifier Accuracy: {accuracy_rf:.2f}")


Random Forest Classifier Accuracy: 0.86


In [34]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Initialize the Random Forest Classifier
rf_classifier = RandomForestClassifier(featuresCol="features", labelCol="label")

# Define the parameter grid for hyperparameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(rf_classifier.numTrees, [10, 20, 50]) \
    .addGrid(rf_classifier.maxDepth, [5, 10, 15]) \
    .build()

# Define the evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# Initialize CrossValidator
crossval = CrossValidator(estimator=rf_classifier,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)  # 3-fold cross-validation

# Perform hyperparameter tuning
cv_model = crossval.fit(train_df)

# Use the best model for predictions
best_model = cv_model.bestModel
y_pred_rf = best_model.transform(test_df)

# Evaluate accuracy of the best model
accuracy = evaluator.evaluate(y_pred_rf)
print(f"Best Random Forest Model Accuracy: {accuracy:.2f}")

# Print the best hyperparameters
print(f"Best Parameters: numTrees = {best_model.getNumTrees}, maxDepth = {best_model.getMaxDepth()}")


Best Random Forest Model Accuracy: 0.86
Best Parameters: numTrees = 20, maxDepth = 10
