## Installation for MacOS

- **Step 1:** Install Java SE Development Kit https://www.oracle.com/java/technologies/downloads/#jdk21-mac
- **Step 2:** Install Apache Spark to Local https://spark.apache.org/downloads.html (Download, extract and copy it into the spark folder in the user folder (create it if you don't have one))

## Initialize Spark

In [183]:
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=FutureWarning)

In [184]:
!pip install --upgrade pip
!pip install findspark



In [185]:
import findspark

#Initialize spark file path
findspark.init("/Users/hunkargencyildiz/spark/spark-3.5.0-bin-hadoop3")

In [186]:
from pyspark import SparkContext

## Run Spark

In [187]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

In [188]:
# Define Spark
spark = SparkSession.builder.master("local").appName("time_series_forecasting").getOrCreate()

In [189]:
sc = spark.sparkContext

In [190]:
sc

In [191]:
sc.version

'3.5.0'

In [192]:
sc.appName

'time_series_forecasting'

In [193]:
# Don't forget
# sc.stop()

## Warm-up Tours

In [194]:
spark_df = spark.read.csv("./datasets/churn.csv", header = True, inferSchema = True)

In [195]:
spark_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Churn: integer (nullable = true)



In [196]:
# Type of Spark
type(spark_df)

pyspark.sql.dataframe.DataFrame

In [197]:
# Cache df
spark_df.cache()

23/12/25 18:40:19 WARN CacheManager: Asked to cache already cached data.


DataFrame[_c0: int, Names: string, Age: double, Total_Purchase: double, Account_Manager: int, Years: double, Num_Sites: double, Churn: int]

## Comparison pandas vs spark

In [198]:
import seaborn as sns
df = sns.load_dataset("titanic")

In [199]:
print("Spark Dataframe Type:", type(spark_df))
print("Pandas Dataframe Type:", type(df))

Spark Dataframe Type: <class 'pyspark.sql.dataframe.DataFrame'>
Pandas Dataframe Type: <class 'pandas.core.frame.DataFrame'>


In [200]:
print("Spark Dataframe Head:", spark_df.head())
print("Pandas Dataframe Head:", df.head())

Spark Dataframe Head: Row(_c0=0, Names='Cameron Williams', Age=42.0, Total_Purchase=11066.8, Account_Manager=0, Years=7.22, Num_Sites=8.0, Churn=1)
Pandas Dataframe Head:    survived  pclass     sex   age  ...  deck  embark_town  alive  alone
0         0       3    male  22.0  ...   NaN  Southampton     no  False
1         1       1  female  38.0  ...     C    Cherbourg    yes  False
2         1       3  female  26.0  ...   NaN  Southampton    yes   True
3         1       1  female  35.0  ...     C  Southampton    yes  False
4         0       3    male  35.0  ...   NaN  Southampton     no   True

[5 rows x 15 columns]


print("Spark Dataframe Dtypes:", spark_df.dtypes) 
print("Pandas Dataframe Dtypes:", df.dtypes)

In [201]:
# print("Spark Dataframe Ndim:", spark_df.ndim) # Give error
print("Pandas Dataframe Ndim:", df.ndim)

Pandas Dataframe Ndim: 2


## PySpark Functions

In [202]:
spark_df.head()

Row(_c0=0, Names='Cameron Williams', Age=42.0, Total_Purchase=11066.8, Account_Manager=0, Years=7.22, Num_Sites=8.0, Churn=1)

In [203]:
# Pandas Head = Spark Show
spark_df.show()
spark_df.show(5, truncate=True)

+---+-------------------+----+--------------+---------------+-----+---------+-----+
|_c0|              Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|
+---+-------------------+----+--------------+---------------+-----+---------+-----+
|  0|   Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|    1|
|  1|      Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|    1|
|  2|        Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|    1|
|  3|      Phillip White|42.0|       8010.76|              0| 6.71|     10.0|    1|
|  4|     Cynthia Norton|37.0|       9191.58|              0| 5.56|      9.0|    1|
|  5|   Jessica Williams|48.0|      10356.02|              0| 5.12|      8.0|    1|
|  6|        Eric Butler|44.0|      11331.58|              1| 5.23|     11.0|    1|
|  7|      Zachary Walsh|32.0|       9885.12|              1| 6.92|      9.0|    1|
|  8|        Ashlee Carr|43.0|       14062.6|              1| 5.46|     11.0

In [204]:
# Row Count
spark_df.count()

900

In [205]:
# Cols
spark_df.columns

['_c0',
 'Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Churn']

In [206]:
# Describe statistics
spark_df.describe().show()

+-------+------------------+-------------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+
|summary|               _c0|        Names|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|              Churn|
+-------+------------------+-------------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+
|  count|               900|          900|              900|              900|               900|              900|               900|                900|
|   mean|             449.5|         NULL|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|0.16666666666666666|
| stddev|259.95191863111916|         NULL|6.127560416916251|2408.644531858096|0.4999208935073339|1.274449013194616|1.7648355920350969| 0.3728852122772358|
|    min|                 0|   Aaron King|             22.0|          

In [207]:
# Select a col
spark_df.describe("Age").show()

+-------+-----------------+
|summary|              Age|
+-------+-----------------+
|  count|              900|
|   mean|41.81666666666667|
| stddev|6.127560416916251|
|    min|             22.0|
|    max|             65.0|
+-------+-----------------+



In [208]:
# Select cols
spark_df.select("Age", "Names").show()

+----+-------------------+
| Age|              Names|
+----+-------------------+
|42.0|   Cameron Williams|
|41.0|      Kevin Mueller|
|38.0|        Eric Lozano|
|42.0|      Phillip White|
|37.0|     Cynthia Norton|
|48.0|   Jessica Williams|
|44.0|        Eric Butler|
|32.0|      Zachary Walsh|
|43.0|        Ashlee Carr|
|40.0|     Jennifer Lynch|
|30.0|       Paula Harris|
|45.0|     Bruce Phillips|
|45.0|       Craig Garner|
|40.0|       Nicole Olson|
|41.0|     Harold Griffin|
|38.0|       James Wright|
|45.0|      Doris Wilkins|
|43.0|Katherine Carpenter|
|53.0|     Lindsay Martin|
|46.0|        Kathy Curry|
+----+-------------------+
only showing top 20 rows



In [209]:
# Filtering
spark_df.filter(spark_df.Age > 40).count()

524

In [210]:
# Group By
spark_df.groupby("Churn").count().show()

+-----+-----+
|Churn|count|
+-----+-----+
|    1|  150|
|    0|  750|
+-----+-----+



In [211]:
# Aggregations
spark_df.groupby("Churn").agg({"Age": "mean"}).show()

+-----+-----------------+
|Churn|         avg(Age)|
+-----+-----------------+
|    1|42.99333333333333|
|    0|41.58133333333333|
+-----+-----------------+



## SQL Processes

In [212]:
spark_df.registerTempTable("tbl_df") # Temporary Table

In [213]:
spark.sql("show databases").show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [214]:
spark.sql("show tables").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|         |   tbl_df|       true|
+---------+---------+-----------+



In [215]:
spark.sql("select * from tbl_df").show(5)

+---+----------------+----+--------------+---------------+-----+---------+-----+
|_c0|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|
+---+----------------+----+--------------+---------------+-----+---------+-----+
|  0|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|    1|
|  1|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|    1|
|  2|     Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|    1|
|  3|   Phillip White|42.0|       8010.76|              0| 6.71|     10.0|    1|
|  4|  Cynthia Norton|37.0|       9191.58|              0| 5.56|      9.0|    1|
+---+----------------+----+--------------+---------------+-----+---------+-----+
only showing top 5 rows



In [216]:
spark.sql("select Churn, mean(Age) from tbl_df group by Churn").show()

+-----+-----------------+
|Churn|        mean(Age)|
+-----+-----------------+
|    1|42.99333333333333|
|    0|41.58133333333333|
+-----+-----------------+



## Visualization

In [217]:
import matplotlib.pyplot as plt
import seaborn as sns

In [218]:
sns.barplot(x="Churn")

ValueError: Could not interpret input 'Churn'

In [None]:
# Transform to pandas
p_df = spark_df.toPandas()

In [None]:
p_df.head()

In [None]:
sns.barplot(x="Churn", y=p_df.Churn.index, data=p_df);

In [None]:
reducing_df = spark_df.groupby("Churn").count().toPandas()

In [None]:
reducing_df.head()

In [None]:
type(reducing_df)

## Machine Learning Application Example

In [None]:
spark_df = spark.read.csv("./datasets/churn.csv", header = True, inferSchema = True)<

In [None]:
spark_df.show(10)

In [None]:
# Transform Col Names
spark_df = spark_df.toDF(*[col.lower() for col in spark_df.columns])

In [None]:
spark_df.show(5)

In [None]:
# Rename Col Names
spark_df = spark_df.withColumnRenamed("_c0", "index")

In [None]:
spark_df.show(5)

In [None]:
# Total observations
spark_df.count()

In [None]:
# Cols Length
len(spark_df.columns)

In [None]:
# Col names
spark_df.columns

In [None]:
# Describes statistics
spark_df.describe().show()

In [None]:
# Select a variable
spark_df.select("age","total_purchase","account_manager","years","num_sites","churn").describe().toPandas().transpose()

In [None]:
# Drop missing values
spark_df = spark_df.dropna()

In [None]:
# Add new feature
spark_df = spark_df.withColumn("age_squared", spark_df.age**2)

In [None]:
spark_df.show(2)

In [None]:
# Define dependent variable
from pyspark.ml.feature import StringIndexer

In [None]:
stringIndexer = StringIndexer(inputCol="churn", outputCol="label")

In [None]:
trans = stringIndexer.fit(spark_df)

In [None]:
indexed = trans.transform(spark_df)

In [None]:
spark_df = indexed.withColumn("label", indexed["label"].cast("integer"))

In [None]:
spark_df.show(5)

In [None]:
# Define independent variables
from pyspark.ml.feature import VectorAssembler

In [None]:
spark_df.columns

In [None]:
independent_variables = ["age","total_purchase","account_manager","years","num_sites"]

In [None]:
vectorAssembler = VectorAssembler(inputCols=independent_variables, outputCol="features")

In [None]:
va_df = vectorAssembler.transform(spark_df)

In [None]:
va_df.show(5)

In [None]:
final_df = va_df.select(["features","label"])

In [None]:
final_df.show()

In [None]:
# Split Train and Test Dfs
splits = final_df.randomSplit([0.80,0.20])

In [None]:
train_df = splits[0]
test_df = splits[1]

In [None]:
train_df.show(5)

In [None]:
test_df.show(5)

In [None]:
# Modelling
from pyspark.ml.classification import GBTClassifier

In [None]:
gbm = GBTClassifier(maxIter=10, featuresCol="features", labelCol="label")

In [None]:
gbm_model = gbm.fit(train_df)

In [None]:
y_pred = gbm_model.transform(test_df)

In [None]:
y_pred

In [None]:
accuracy = y_pred.select("label","prediction")

In [None]:
accuracy.filter(accuracy.label == accuracy.prediction).count()/accuracy.count()

In [None]:
# Model Tunning

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [None]:
evaluator = BinaryClassificationEvaluator()

In [None]:
paramGrid = (ParamGridBuilder().addGrid(gbm.maxDepth, [2,4,6]).addGrid(gbm.maxBins,[20,30]).addGrid(gbm.maxIter, [10,20]).build())

In [None]:
cv = CrossValidator(estimator=gbm, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=10)

In [None]:
cv_model = cv.fit(train_df)

In [None]:
y_pred = cv_model.transform(test_df)

In [None]:
accuracy = y_pred.select("label","prediction")

In [None]:
accuracy.filter(accuracy.label == accuracy.prediction).count()/accuracy.count()

In [None]:
# Customer Leave Problem

In [None]:
import pandas as pd

names = pd.Series(['Bob Smith', 'Sally Jones', 'Mike Thompson', 'Kim Johnson', 'Bob Smith'])
ages = pd.Series([21, 34, 27, 24, 34])
total_purchase = pd.Series([251.00, 325.00, 421.00, 521.00, 825.00])
account_manager = pd.Series([1, 0, 1, 1, 0])
years = pd.Series([3, 14, 11, 9, 4])
num_sites = pd.Series([8, 14, 11, 9, 4])

new_customers = pd.DataFrame({'names': names, 'age': ages, 'total_purchase': total_purchase, 'account_manager': account_manager, 'years': years, 'num_sites': num_sites})

In [None]:
new_customers.columns

In [None]:
# Transform to spark
new_customers_spark_df = spark.createDataFrame(new_customers)

In [None]:
new_customers_spark_df.show()

In [None]:
new_customers = vectorAssembler.transform(new_customers_spark_df)

In [None]:
results = cv_model.transform(new_customers)

In [None]:
results.select("names","prediction").show() # Predictions