In [82]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("Bank Transactions Analysis") \
    .getOrCreate()

# Load the dataset
data_path = "bank_transactions.csv"
bank_transactions = spark.read.csv(data_path, header=True, inferSchema=True)

# Show the first few rows to understand what the data looks like
bank_transactions.show(5)

# Print the schema to see the data types and structure
bank_transactions.printSchema()

# Count the number of records in the dataset
print("Number of records:", bank_transactions.count())

# Show the number of columns and their names
print("Columns:", bank_transactions.columns)


+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|TransactionID|CustomerID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|           T1|  C5841053|    10/1/94|         F|  JAMSHEDPUR|          17819.05|         2/8/16|         143207|                   25.0|
|           T2|  C2142763|     4/4/57|         M|     JHAJJAR|           2270.69|         2/8/16|         141858|                27999.0|
|           T3|  C4417068|   26/11/96|         F|      MUMBAI|          17874.44|         2/8/16|         142712|                  459.0|
|           T4|  C5342380|    14/9/73|         F|      MUMBAI|         866503.21|         2/8/16|         142714|                 2060.0|
|           T5|  C9031234|    24/3

In [83]:
# Describe the data and show summary statistics (count, mean, stddev, min, max)
bank_transactions.describe().show()

+-------+-------------+----------+-----------+----------+--------------------+------------------+---------------+------------------+-----------------------+
|summary|TransactionID|CustomerID|CustomerDOB|CustGender|        CustLocation|CustAccountBalance|TransactionDate|   TransactionTime|TransactionAmount (INR)|
+-------+-------------+----------+-----------+----------+--------------------+------------------+---------------+------------------+-----------------------+
|  count|      1048567|   1048567|    1048567|   1047467|             1048416|           1046198|        1048567|           1048567|                1048567|
|   mean|         NULL|      NULL|        NaN|      NULL|            400012.0|115403.54005622248|           NULL|157087.52939297154|     1574.3350034571122|
| stddev|         NULL|      NULL|        NaN|      NULL|                 0.0| 846485.3806006602|           NULL| 51261.85402232927|      6574.742978453993|
|    min|           T1|  C1010011|     1/1/00|         F|(

In [84]:
from pyspark.sql.functions import col, count, when, isnan

# Counting missing and null values in each column
def count_missing_values(df):
    df_missing = df.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in df.columns])
    return df_missing

# Display the count of missing values for each column
missing_values = count_missing_values(bank_transactions)
missing_values.show()


+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|TransactionID|CustomerID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|            0|         0|       3397|      1100|         151|              2369|              0|              0|                      0|
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+



In [85]:
# Drop any rows with missing values across any columns
cleaned_transactions = bank_transactions.na.drop()

# Show the result to verify rows have been dropped
cleaned_transactions.show()

# Optionally, you can count the rows before and after to verify the number of rows dropped
print("Original Dataset Row Count:", bank_transactions.count())
print("Cleaned Dataset Row Count:", cleaned_transactions.count())


+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|TransactionID|CustomerID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|           T1|  C5841053|    10/1/94|         F|  JAMSHEDPUR|          17819.05|         2/8/16|         143207|                   25.0|
|           T2|  C2142763|     4/4/57|         M|     JHAJJAR|           2270.69|         2/8/16|         141858|                27999.0|
|           T3|  C4417068|   26/11/96|         F|      MUMBAI|          17874.44|         2/8/16|         142712|                  459.0|
|           T4|  C5342380|    14/9/73|         F|      MUMBAI|         866503.21|         2/8/16|         142714|                 2060.0|
|           T5|  C9031234|    24/3

In [86]:
count_missing_values(cleaned_transactions).show()

+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|TransactionID|CustomerID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|            0|         0|       3333|         0|           0|                 0|              0|              0|                      0|
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+



In [87]:
# Drop the 'TransactionID' and 'CustomerID' columns
reduced_transactions = cleaned_transactions.drop("TransactionID", "CustomerID")

# Show the result to verify the columns have been dropped
reduced_transactions.show()

# Optionally, print the schema to confirm the removal of the columns
reduced_transactions.printSchema()


+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|
+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|    10/1/94|         F|  JAMSHEDPUR|          17819.05|         2/8/16|         143207|                   25.0|
|     4/4/57|         M|     JHAJJAR|           2270.69|         2/8/16|         141858|                27999.0|
|   26/11/96|         F|      MUMBAI|          17874.44|         2/8/16|         142712|                  459.0|
|    14/9/73|         F|      MUMBAI|         866503.21|         2/8/16|         142714|                 2060.0|
|    24/3/88|         F| NAVI MUMBAI|           6714.43|         2/8/16|         181156|                 1762.5|
|    8/10/72|         F|    ITANAGAR|           53609.2|         2/8/16|         173940|        

In [88]:
count_missing_values(reduced_transactions).show()

+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|
+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|       3333|         0|           0|                 0|              0|              0|                      0|
+-----------+----------+------------+------------------+---------------+---------------+-----------------------+



In [89]:
from pyspark.sql.functions import col

# Group by 'CustGender' and count occurrences
reduced_transactions.groupBy("CustGender").count().show()



+----------+------+
|CustGender| count|
+----------+------+
|         F|281241|
|         M|763705|
|         T|     1|
+----------+------+



In [90]:
reduced_transactions.select("CustomerDOB").show()

+-----------+
|CustomerDOB|
+-----------+
|    10/1/94|
|     4/4/57|
|   26/11/96|
|    14/9/73|
|    24/3/88|
|    8/10/72|
|    26/1/92|
|    27/1/82|
|    19/4/88|
|    22/6/84|
|    22/7/82|
|     7/7/88|
|    13/6/78|
|     5/1/92|
|    24/3/78|
|    10/7/68|
|   1/1/1800|
|    16/7/89|
|    11/1/91|
|    24/6/85|
+-----------+
only showing top 20 rows



In [91]:
from pyspark.sql.functions import col

# Group by 'CustGender' and count occurrences
reduced_transactions.groupBy("CustomerDOB").count().show(40)

+-----------+-----+
|CustomerDOB|count|
+-----------+-----+
|   30/10/90|  189|
|     9/2/92|  125|
|    12/8/90|  248|
|    3/10/90|  187|
|    24/2/88|  144|
|    17/3/87|  203|
|    20/8/83|  113|
|    19/3/90|  144|
|   22/10/82|  140|
|    6/11/86|   90|
|     7/5/92|  175|
|     6/1/70|   94|
|   11/10/89|  212|
|    16/4/88|  244|
|    4/12/95|  101|
|    2/12/75|   27|
|     4/8/82|   97|
|   27/10/78|   22|
|    30/6/86|  204|
|    31/7/84|   77|
|     9/7/92|  126|
|    16/4/84|  129|
|    11/2/78|   12|
|     5/4/96|   55|
|    1/11/90|  152|
|    11/8/88|  160|
|   26/12/79|  130|
|    17/6/87|  104|
|   21/12/81|  115|
|    11/1/97|    6|
|    28/8/80|  104|
|    22/9/89|  173|
|   29/12/52|   32|
|    15/1/76|   52|
|    28/6/85|  144|
|    25/1/80|   29|
|    21/8/82|   61|
|     1/4/52|   17|
|   21/12/82|   73|
|   16/12/94|   75|
+-----------+-----+
only showing top 40 rows



In [92]:
from pyspark.sql.functions import regexp_extract, col

# Assuming you want to set a default year for null dates, e.g., 'Unknown'
reduced_transactions_dob = reduced_transactions.withColumn(
    "YearOfBirth",
    when(col("CustomerDOB").isNull(), "Unknown").otherwise(regexp_extract(col("CustomerDOB"), r'(\d{2}|\d{4})$', 0))
)


In [93]:
reduced_transactions_dob.show()

+-----------+----------+------------+------------------+---------------+---------------+-----------------------+-----------+
|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|YearOfBirth|
+-----------+----------+------------+------------------+---------------+---------------+-----------------------+-----------+
|    10/1/94|         F|  JAMSHEDPUR|          17819.05|         2/8/16|         143207|                   25.0|         94|
|     4/4/57|         M|     JHAJJAR|           2270.69|         2/8/16|         141858|                27999.0|         57|
|   26/11/96|         F|      MUMBAI|          17874.44|         2/8/16|         142712|                  459.0|         96|
|    14/9/73|         F|      MUMBAI|         866503.21|         2/8/16|         142714|                 2060.0|         73|
|    24/3/88|         F| NAVI MUMBAI|           6714.43|         2/8/16|         181156|                 1762.5|         88|


In [94]:
count_missing_values(reduced_transactions_dob).show()

+-----------+----------+------------+------------------+---------------+---------------+-----------------------+-----------+
|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|YearOfBirth|
+-----------+----------+------------+------------------+---------------+---------------+-----------------------+-----------+
|       3333|         0|           0|                 0|              0|              0|                      0|          0|
+-----------+----------+------------+------------------+---------------+---------------+-----------------------+-----------+



In [95]:
from pyspark.sql.functions import regexp_extract, col, when, concat, lit

# Standardize to four digits, assuming 'YY' means '19YY'
reduced_transactions_dob = reduced_transactions_dob.withColumn(
    "YearOfBirth",
    when(col("YearOfBirth").rlike(r'^\d{2}$'), concat(lit("19"), col("YearOfBirth")))
    .otherwise(col("YearOfBirth"))
)

# Cast 'YearOfBirth' to integer to prepare for comparison
reduced_transactions_dob = reduced_transactions_dob.withColumn("YearOfBirth", col("YearOfBirth").cast("integer"))

# Display to check if conversion and extraction are correct
reduced_transactions_dob.select("CustomerDOB", "YearOfBirth").show()



+-----------+-----------+
|CustomerDOB|YearOfBirth|
+-----------+-----------+
|    10/1/94|       1994|
|     4/4/57|       1957|
|   26/11/96|       1996|
|    14/9/73|       1973|
|    24/3/88|       1988|
|    8/10/72|       1972|
|    26/1/92|       1992|
|    27/1/82|       1982|
|    19/4/88|       1988|
|    22/6/84|       1984|
|    22/7/82|       1982|
|     7/7/88|       1988|
|    13/6/78|       1978|
|     5/1/92|       1992|
|    24/3/78|       1978|
|    10/7/68|       1968|
|   1/1/1800|       1800|
|    16/7/89|       1989|
|    11/1/91|       1991|
|    24/6/85|       1985|
+-----------+-----------+
only showing top 20 rows



In [96]:
# Filter to keep only records with 'YearOfBirth' from 1900 onwards
reduced_transactions_dob = reduced_transactions_dob.filter(col("YearOfBirth") >= 1900)

# Show the updated DataFrame to ensure filtering is correct
reduced_transactions_dob.select("CustomerDOB", "YearOfBirth").show()


+-----------+-----------+
|CustomerDOB|YearOfBirth|
+-----------+-----------+
|    10/1/94|       1994|
|     4/4/57|       1957|
|   26/11/96|       1996|
|    14/9/73|       1973|
|    24/3/88|       1988|
|    8/10/72|       1972|
|    26/1/92|       1992|
|    27/1/82|       1982|
|    19/4/88|       1988|
|    22/6/84|       1984|
|    22/7/82|       1982|
|     7/7/88|       1988|
|    13/6/78|       1978|
|     5/1/92|       1992|
|    24/3/78|       1978|
|    10/7/68|       1968|
|    16/7/89|       1989|
|    11/1/91|       1991|
|    24/6/85|       1985|
|    20/4/93|       1993|
+-----------+-----------+
only showing top 20 rows



In [97]:
reduced_transactions_dob.show()

+-----------+----------+--------------------+------------------+---------------+---------------+-----------------------+-----------+
|CustomerDOB|CustGender|        CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|YearOfBirth|
+-----------+----------+--------------------+------------------+---------------+---------------+-----------------------+-----------+
|    10/1/94|         F|          JAMSHEDPUR|          17819.05|         2/8/16|         143207|                   25.0|       1994|
|     4/4/57|         M|             JHAJJAR|           2270.69|         2/8/16|         141858|                27999.0|       1957|
|   26/11/96|         F|              MUMBAI|          17874.44|         2/8/16|         142712|                  459.0|       1996|
|    14/9/73|         F|              MUMBAI|         866503.21|         2/8/16|         142714|                 2060.0|       1973|
|    24/3/88|         F|         NAVI MUMBAI|           6714.43|     

In [98]:
count_missing_values(reduced_transactions_dob).show()

+-----------+----------+------------+------------------+---------------+---------------+-----------------------+-----------+
|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|YearOfBirth|
+-----------+----------+------------+------------------+---------------+---------------+-----------------------+-----------+
|          0|         0|           0|                 0|              0|              0|                      0|          0|
+-----------+----------+------------+------------------+---------------+---------------+-----------------------+-----------+



In [99]:
reduced_transactions_dob.count()

985322

In [100]:
reduced_transactions_dob.select("YearOfBirth").summary().show()

+-------+------------------+
|summary|       YearOfBirth|
+-------+------------------+
|  count|            985322|
|   mean|1984.8800361709168|
| stddev| 9.075799588673496|
|    min|              1900|
|    25%|              1982|
|    50%|              1987|
|    75%|              1991|
|    max|              1999|
+-------+------------------+



In [101]:
from pyspark.sql.functions import col, lit

# Add 'Age' column by calculating it from 'YearOfBirth', using 2016 as the reference year
reduced_transactions_dob = reduced_transactions_dob.withColumn(
    "Age",
    lit(2016) - col("YearOfBirth")
)

# Show the DataFrame to verify the 'Age' column has been added correctly
reduced_transactions_dob.select("CustomerDOB", "YearOfBirth", "Age").show()


+-----------+-----------+---+
|CustomerDOB|YearOfBirth|Age|
+-----------+-----------+---+
|    10/1/94|       1994| 22|
|     4/4/57|       1957| 59|
|   26/11/96|       1996| 20|
|    14/9/73|       1973| 43|
|    24/3/88|       1988| 28|
|    8/10/72|       1972| 44|
|    26/1/92|       1992| 24|
|    27/1/82|       1982| 34|
|    19/4/88|       1988| 28|
|    22/6/84|       1984| 32|
|    22/7/82|       1982| 34|
|     7/7/88|       1988| 28|
|    13/6/78|       1978| 38|
|     5/1/92|       1992| 24|
|    24/3/78|       1978| 38|
|    10/7/68|       1968| 48|
|    16/7/89|       1989| 27|
|    11/1/91|       1991| 25|
|    24/6/85|       1985| 31|
|    20/4/93|       1993| 23|
+-----------+-----------+---+
only showing top 20 rows



In [102]:
reduced_transactions_age = reduced_transactions_dob.drop("CustomerDOB", "YearOfBirth")

In [103]:
reduced_transactions_age.show()

+----------+--------------------+------------------+---------------+---------------+-----------------------+---+
|CustGender|        CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|Age|
+----------+--------------------+------------------+---------------+---------------+-----------------------+---+
|         F|          JAMSHEDPUR|          17819.05|         2/8/16|         143207|                   25.0| 22|
|         M|             JHAJJAR|           2270.69|         2/8/16|         141858|                27999.0| 59|
|         F|              MUMBAI|          17874.44|         2/8/16|         142712|                  459.0| 20|
|         F|              MUMBAI|         866503.21|         2/8/16|         142714|                 2060.0| 43|
|         F|         NAVI MUMBAI|           6714.43|         2/8/16|         181156|                 1762.5| 28|
|         F|            ITANAGAR|           53609.2|         2/8/16|         173940|            

In [104]:
df = reduced_transactions_age

In [105]:
df.show()

+----------+--------------------+------------------+---------------+---------------+-----------------------+---+
|CustGender|        CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|Age|
+----------+--------------------+------------------+---------------+---------------+-----------------------+---+
|         F|          JAMSHEDPUR|          17819.05|         2/8/16|         143207|                   25.0| 22|
|         M|             JHAJJAR|           2270.69|         2/8/16|         141858|                27999.0| 59|
|         F|              MUMBAI|          17874.44|         2/8/16|         142712|                  459.0| 20|
|         F|              MUMBAI|         866503.21|         2/8/16|         142714|                 2060.0| 43|
|         F|         NAVI MUMBAI|           6714.43|         2/8/16|         181156|                 1762.5| 28|
|         F|            ITANAGAR|           53609.2|         2/8/16|         173940|            

In [108]:
spark.version

'3.5.1'

In [107]:
df.write.csv('cleaned.csv', header=True)

Py4JJavaError: An error occurred while calling o1970.csv.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:850)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:242)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:94)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:372)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 25 more


In [75]:
df.show()

+----------+--------------------+------------------+---------------+---------------+-----------------------+---+
|CustGender|        CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|Age|
+----------+--------------------+------------------+---------------+---------------+-----------------------+---+
|         F|          JAMSHEDPUR|          17819.05|         2/8/16|         143207|                   25.0| 22|
|         M|             JHAJJAR|           2270.69|         2/8/16|         141858|                27999.0| 59|
|         F|              MUMBAI|          17874.44|         2/8/16|         142712|                  459.0| 20|
|         F|              MUMBAI|         866503.21|         2/8/16|         142714|                 2060.0| 43|
|         F|         NAVI MUMBAI|           6714.43|         2/8/16|         181156|                 1762.5| 28|
|         F|            ITANAGAR|           53609.2|         2/8/16|         173940|            

In [80]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import BooleanType
import numpy as np

# Define a function to calculate IQR and identify outliers
def identify_outliers(df, column_name):
    quantiles = df.approxQuantile(column_name, [0.25, 0.75], 0.05)
    Q1 = quantiles[0]
    Q3 = quantiles[1]
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR

    # UDF to flag rows with outliers
    outlier_udf = udf(lambda x: True if (x < lower_bound) or (x > upper_bound) else False, BooleanType())

    df = df.withColumn(column_name + "_outlier", outlier_udf(col(column_name)))
    return df

# Apply the function to the columns of interest
columns_to_check = ['CustAccountBalance', 'TransactionAmount (INR)', 'Age']
for col_name in columns_to_check:
    df = identify_outliers(df, col_name)


In [81]:
# Convert to Pandas DataFrame for visualization
sample_data = df.sample(fraction=0.1).toPandas()

Py4JJavaError: An error occurred while calling o1519.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 212.0 failed 1 times, most recent failure: Lost task 1.0 in stage 212.0 (TID 1303) (DESKTOP-14MAQMU executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:54)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:858)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:713)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:757)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:675)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:641)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:617)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:574)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:532)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 36 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4148)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4322)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4320)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4145)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:54)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:858)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:713)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:757)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:675)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:641)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:617)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:574)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:532)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 36 more


Here's how you can perform clustering using "Age" and other insightful columns such as "CustAccountBalance" and "TransactionAmount (INR)". These columns could potentially offer insights into different customer segments based on their financial activity and demographics.

In [49]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator


In [51]:
# Assuming 'bank_transactions' is your DataFrame
# Select features for clustering
features = ['Age', 'CustAccountBalance', 'TransactionAmount (INR)']

# Assemble features into a single vector column 'features_vec'
assembler = VectorAssembler(inputCols=features, outputCol='features_vec')
reduced_transactions_age = assembler.transform(reduced_transactions_age)

# Scale the features
scaler = StandardScaler(inputCol='features_vec', outputCol='scaledFeatures', withStd=True, withMean=False)
scaler_model = scaler.fit(reduced_transactions_age)
reduced_transactions_age = scaler_model.transform(reduced_transactions_age)

# Trains a k-means model with K clusters
k = 3  # Choose an appropriate K for your data
kmeans = KMeans(featuresCol='scaledFeatures', k=k)
model = kmeans.fit(reduced_transactions_age)

# Make predictions
predictions = model.transform(reduced_transactions_age)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator(featuresCol='scaledFeatures')

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the cluster centers
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

IllegalArgumentException: Output column features_vec already exists.